[2/2] net/af_xdp: make reserve/submit peek/release consistent

Message ID 20190412144844.50712-2-xiaolong.ye@intel.com (mailing list archive)
State Superseded, archived
Delegated to: Ferruh Yigit
Headers
Series [1/2] net/af_xdp: enqueue buf ring when allocate Tx queue fails |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation success Compilation OK

Commit Message

Xiaolong Ye April 12, 2019, 2:48 p.m. UTC
  As David pointed out, if we reserve N slots, but only submit n slots,
we would end up with an incorrect opinion of the number of available slots
later, we also would get wrong idx when we call xsk_ring_prod__reserve next
time. It also applies to xsk_ring_cons__peek()/xsk_ring_cons__release().

This patch ensures that both reserve/submit and peek/release are
consistent.

Fixes: f1debd77efaf ("net/af_xdp: introduce AF_XDP PMD")

Reported-by: David Marchand <david.marchand@redhat.com>
Signed-off-by: Xiaolong Ye <xiaolong.ye@intel.com>
---
 drivers/net/af_xdp/rte_eth_af_xdp.c | 80 +++++++++++++++--------------
 1 file changed, 41 insertions(+), 39 deletions(-)
  

Comments

David Marchand April 15, 2019, 8:19 a.m. UTC | #1
On Fri, Apr 12, 2019 at 4:54 PM Xiaolong Ye <xiaolong.ye@intel.com> wrote:

> As David pointed out, if we reserve N slots, but only submit n slots,
> we would end up with an incorrect opinion of the number of available slots
> later, we also would get wrong idx when we call xsk_ring_prod__reserve next
> time. It also applies to xsk_ring_cons__peek()/xsk_ring_cons__release().
>
> This patch ensures that both reserve/submit and peek/release are
> consistent.
>
> Fixes: f1debd77efaf ("net/af_xdp: introduce AF_XDP PMD")
>
> Reported-by: David Marchand <david.marchand@redhat.com>
> Signed-off-by: Xiaolong Ye <xiaolong.ye@intel.com>
> ---
>  drivers/net/af_xdp/rte_eth_af_xdp.c | 80 +++++++++++++++--------------
>  1 file changed, 41 insertions(+), 39 deletions(-)
>
> diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c
> b/drivers/net/af_xdp/rte_eth_af_xdp.c
> index 5cc643ce2..76a6a8331 100644
> --- a/drivers/net/af_xdp/rte_eth_af_xdp.c
> +++ b/drivers/net/af_xdp/rte_eth_af_xdp.c
> @@ -138,22 +138,19 @@ reserve_fill_queue(struct xsk_umem_info *umem, int
> reserve_size)
>  {
>         struct xsk_ring_prod *fq = &umem->fq;
>         uint32_t idx;
> -       int i, ret;
> -
> -       ret = xsk_ring_prod__reserve(fq, reserve_size, &idx);
> -       if (unlikely(!ret)) {
> -               AF_XDP_LOG(ERR, "Failed to reserve enough fq descs.\n");
> -               return ret;
> -       }
> +       int i;
>
>         for (i = 0; i < reserve_size; i++) {
>                 __u64 *fq_addr;
>                 void *addr = NULL;
>                 if (rte_ring_dequeue(umem->buf_ring, &addr)) {
> -                       i--;
>                         break;
>                 }
> -               fq_addr = xsk_ring_prod__fill_addr(fq, idx++);
> +               if (unlikely(!xsk_ring_prod__reserve(fq, 1, &idx))) {
> +                       AF_XDP_LOG(WARNING, "Failed to reserve 1 fq
> desc.\n");
> +                       break;
> +               }
> +               fq_addr = xsk_ring_prod__fill_addr(fq, idx);
>                 *fq_addr = (uint64_t)addr;
>         }
>
>
I just spotted that reserve_fill_queue always returns 0.
I understand that xsk_configure expects an errors when not succeeding in
populating this ring.
And for this, it expects a non zero value for this.

How about something like (neither tested nor compiled):

static inline int
reserve_fill_queue(struct xsk_umem_info *umem, int reserve_size)
{
    struct xsk_ring_prod *fq = &umem->fq;
    void *addrs[reserve_size];
    uint32_t idx;
    int i, ret;

    if (rte_ring_dequeue_bulk(umem->buf_ring, &addrs, reserve_size, NULL)
        != reserve_size) {
        AF_XDP_LOG(DEBUG, "Failed to get enough buffers for fq.\n");
        return -1;
    }

    ret = xsk_ring_prod__reserve(fq, reserve_size, &idx);
    if (unlikely(!ret)) {
        AF_XDP_LOG(DEBUG, "Failed to reserve enough fq descs.\n");
        rte_ring_enqueue_bulk(umem->buf_ring, &addrs, reserve_size,
                      NULL);
        return -1;
    }

    for (i = 0; i < reserve_size; i++) {
        __u64 *fq_addr;

        fq_addr = xsk_ring_prod__fill_addr(fq, idx++);
        *fq_addr = (uint64_t)addrs[i];
    }

    xsk_ring_prod__submit(fq, reserve_size);

    return 0;
}



@@ -179,6 +176,9 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs,
> uint16_t nb_pkts)
>
>         nb_pkts = RTE_MIN(nb_pkts, ETH_AF_XDP_TX_BATCH_SIZE);
>
> +       if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, nb_pkts)
> != 0))
> +               return 0;
> +
>         rcvd = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
>         if (rcvd == 0)
>                 return 0;
>

When xsk_ring_cons__peek() returns 0, we will leak nb_pkts freshly
allocated mbufs.
See below for a suggestion.


@@ -186,9 +186,6 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs,
> uint16_t nb_pkts)
>         if (xsk_prod_nb_free(fq, free_thresh) >= free_thresh)
>                 (void)reserve_fill_queue(umem, ETH_AF_XDP_RX_BATCH_SIZE);
>
> -       if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, rcvd) !=
> 0))
> -               return 0;
> -
>         for (i = 0; i < rcvd; i++) {
>                 const struct xdp_desc *desc;
>                 uint64_t addr;
> @@ -211,6 +208,10 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs,
> uint16_t nb_pkts)
>
>         xsk_ring_cons__release(rx, rcvd);
>
> +       /* free the extra mbufs */
> +       for (; rcvd < nb_pkts; rcvd++)
> +               rte_pktmbuf_free(mbufs[rcvd]);
> +
>

You can move this block after the statistic update...


        /* statistics */
>         rxq->stats.rx_pkts += (rcvd - dropped);
>         rxq->stats.rx_bytes += rx_bytes;
>

... then define a out: label.
And those mbufs are still clean and coming from a single mempool, we can
put them back as a single bulk.
Something like (again, untested):

out:
    if (count != nb_pkts) {
        rte_mempool_put_bulk(rxq->mb_pool, &mbufs[count],
                     nb_pkts - count);
    }

    return count;
}

And you would jump to this label when xsk_ring_cons__peek() == 0.
What do you think ?



@@ -261,55 +262,56 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs,
> uint16_t nb_pkts)
>         struct xsk_umem_info *umem = txq->pair->umem;
>         struct rte_mbuf *mbuf;
>         void *addrs[ETH_AF_XDP_TX_BATCH_SIZE];
> +       struct rte_mbuf *valid_bufs[ETH_AF_XDP_TX_BATCH_SIZE];
>         unsigned long tx_bytes = 0;
> -       int i, valid = 0;
> +       int i;
> +       uint16_t nb_valid = 0;
>         uint32_t idx_tx;
> +       uint32_t buf_len = ETH_AF_XDP_FRAME_SIZE -
> ETH_AF_XDP_DATA_HEADROOM;
>
>         nb_pkts = RTE_MIN(nb_pkts, ETH_AF_XDP_TX_BATCH_SIZE);
>
>         pull_umem_cq(umem, nb_pkts);
>
> -       nb_pkts = rte_ring_dequeue_bulk(umem->buf_ring, addrs,
> -                                       nb_pkts, NULL);
> -       if (nb_pkts == 0)
> +       for (i = 0; i < nb_pkts; i++) {
> +               if (bufs[i]->pkt_len <= buf_len)
> +                       valid_bufs[nb_valid++] = bufs[i];
> +               else
> +                       rte_pktmbuf_free(bufs[i]);
> +       }
> +
> +       nb_valid = rte_ring_dequeue_bulk(umem->buf_ring, addrs,
> +                                       nb_valid, NULL);
> +       if (nb_valid == 0)
>                 return 0;
>
>
You can't return 0.
You have stolen buffers from the caller with the previous check on pktlen.
When the application resends this bulk or frees the whole bulk, we will
have mbuf reuse bugs.

Thinking about this, why would this happen ?
This limitation should be handled by properly reporting the mtu.
The application would then know it can't send those too big mbufs.


If I missed something else and/or if you still don't trust the application,
I think the tx burst function should go like as described below.

The first thing to do is check the mbufs length.
At the first invalid length, you break from the loop at index i (i is the
invalid buffer index).
Then dequeue i - 1 buffers from buf_ring.
Reserve i - 1 slots in tx.

And return i buffers have been sent (plus a tx error stat += 1).

You need to carefully take into account each step and free the buffer to
buf_ring when relevant and free the mbufs properly.


-       if (xsk_ring_prod__reserve(&txq->tx, nb_pkts, &idx_tx) != nb_pkts) {
> +       if (xsk_ring_prod__reserve(&txq->tx, nb_valid, &idx_tx) !=
> nb_valid) {
>                 kick_tx(txq);
> -               rte_ring_enqueue_bulk(umem->buf_ring, addrs, nb_pkts,
> NULL);
> +               rte_ring_enqueue_bulk(umem->buf_ring, addrs, nb_valid,
> NULL);
>                 return 0;
>         }
>
> -       for (i = 0; i < nb_pkts; i++) {
> +       for (i = 0; i < nb_valid; i++) {
>                 struct xdp_desc *desc;
>                 void *pkt;
> -               uint32_t buf_len = ETH_AF_XDP_FRAME_SIZE
> -                                       - ETH_AF_XDP_DATA_HEADROOM;
>                 desc = xsk_ring_prod__tx_desc(&txq->tx, idx_tx + i);
> -               mbuf = bufs[i];
> -               if (mbuf->pkt_len <= buf_len) {
> -                       desc->addr = (uint64_t)addrs[valid];
> -                       desc->len = mbuf->pkt_len;
> -                       pkt = xsk_umem__get_data(umem->mz->addr,
> -                                                desc->addr);
> -                       rte_memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *),
> -                              desc->len);
> -                       valid++;
> -                       tx_bytes += mbuf->pkt_len;
> -               }
> +               mbuf = valid_bufs[i];
> +               desc->addr = (uint64_t)addrs[i];
> +               desc->len = mbuf->pkt_len;
> +               pkt = xsk_umem__get_data(umem->mz->addr,
> +                                        desc->addr);
> +               rte_memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *),
> +                          desc->len);
> +               tx_bytes += mbuf->pkt_len;
>                 rte_pktmbuf_free(mbuf);
>         }
>
> -       xsk_ring_prod__submit(&txq->tx, nb_pkts);
> +       xsk_ring_prod__submit(&txq->tx, nb_valid);
>
>         kick_tx(txq);
>
> -       if (valid < nb_pkts)
> -               rte_ring_enqueue_bulk(umem->buf_ring, &addrs[valid],
> -                                nb_pkts - valid, NULL);
> -
> -       txq->stats.err_pkts += nb_pkts - valid;
> -       txq->stats.tx_pkts += valid;
> +       txq->stats.err_pkts += nb_pkts - nb_valid;
> +       txq->stats.tx_pkts += nb_valid;
>         txq->stats.tx_bytes += tx_bytes;
>
>         return nb_pkts;
> --
> 2.17.1
>
>
  
Xiaolong Ye April 15, 2019, 2:42 p.m. UTC | #2
Hi, David

Thanks for you detailed review comment. 

On 04/15, David Marchand wrote:
>On Fri, Apr 12, 2019 at 4:54 PM Xiaolong Ye <xiaolong.ye@intel.com> wrote:
>
>> As David pointed out, if we reserve N slots, but only submit n slots,
>> we would end up with an incorrect opinion of the number of available slots
>> later, we also would get wrong idx when we call xsk_ring_prod__reserve next
>> time. It also applies to xsk_ring_cons__peek()/xsk_ring_cons__release().
>>
>> This patch ensures that both reserve/submit and peek/release are
>> consistent.
>>
>> Fixes: f1debd77efaf ("net/af_xdp: introduce AF_XDP PMD")
>>
>> Reported-by: David Marchand <david.marchand@redhat.com>
>> Signed-off-by: Xiaolong Ye <xiaolong.ye@intel.com>
>> ---
>>  drivers/net/af_xdp/rte_eth_af_xdp.c | 80 +++++++++++++++--------------
>>  1 file changed, 41 insertions(+), 39 deletions(-)
>>
>> diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c
>> b/drivers/net/af_xdp/rte_eth_af_xdp.c
>> index 5cc643ce2..76a6a8331 100644
>> --- a/drivers/net/af_xdp/rte_eth_af_xdp.c
>> +++ b/drivers/net/af_xdp/rte_eth_af_xdp.c
>> @@ -138,22 +138,19 @@ reserve_fill_queue(struct xsk_umem_info *umem, int
>> reserve_size)
>>  {
>>         struct xsk_ring_prod *fq = &umem->fq;
>>         uint32_t idx;
>> -       int i, ret;
>> -
>> -       ret = xsk_ring_prod__reserve(fq, reserve_size, &idx);
>> -       if (unlikely(!ret)) {
>> -               AF_XDP_LOG(ERR, "Failed to reserve enough fq descs.\n");
>> -               return ret;
>> -       }
>> +       int i;
>>
>>         for (i = 0; i < reserve_size; i++) {
>>                 __u64 *fq_addr;
>>                 void *addr = NULL;
>>                 if (rte_ring_dequeue(umem->buf_ring, &addr)) {
>> -                       i--;
>>                         break;
>>                 }
>> -               fq_addr = xsk_ring_prod__fill_addr(fq, idx++);
>> +               if (unlikely(!xsk_ring_prod__reserve(fq, 1, &idx))) {
>> +                       AF_XDP_LOG(WARNING, "Failed to reserve 1 fq
>> desc.\n");
>> +                       break;
>> +               }
>> +               fq_addr = xsk_ring_prod__fill_addr(fq, idx);
>>                 *fq_addr = (uint64_t)addr;
>>         }
>>
>>
>I just spotted that reserve_fill_queue always returns 0.
>I understand that xsk_configure expects an errors when not succeeding in
>populating this ring.
>And for this, it expects a non zero value for this.

You are right, reserve_fill_queue does need retrun a non zero value when
it fails to populate the ring.

>
>How about something like (neither tested nor compiled):
>
>static inline int
>reserve_fill_queue(struct xsk_umem_info *umem, int reserve_size)
>{
>    struct xsk_ring_prod *fq = &umem->fq;
>    void *addrs[reserve_size];
>    uint32_t idx;
>    int i, ret;
>
>    if (rte_ring_dequeue_bulk(umem->buf_ring, &addrs, reserve_size, NULL)
>        != reserve_size) {
>        AF_XDP_LOG(DEBUG, "Failed to get enough buffers for fq.\n");
>        return -1;
>    }
>
>    ret = xsk_ring_prod__reserve(fq, reserve_size, &idx);
>    if (unlikely(!ret)) {
>        AF_XDP_LOG(DEBUG, "Failed to reserve enough fq descs.\n");
>        rte_ring_enqueue_bulk(umem->buf_ring, &addrs, reserve_size,
>                      NULL);
>        return -1;
>    }
>
>    for (i = 0; i < reserve_size; i++) {
>        __u64 *fq_addr;
>
>        fq_addr = xsk_ring_prod__fill_addr(fq, idx++);
>        *fq_addr = (uint64_t)addrs[i];
>    }
>
>    xsk_ring_prod__submit(fq, reserve_size);
>
>    return 0;
>}

Sounds better, I'll adopt it in my new version.

>
>
>
>@@ -179,6 +176,9 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs,
>> uint16_t nb_pkts)
>>
>>         nb_pkts = RTE_MIN(nb_pkts, ETH_AF_XDP_TX_BATCH_SIZE);
>>
>> +       if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, nb_pkts)
>> != 0))
>> +               return 0;
>> +
>>         rcvd = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
>>         if (rcvd == 0)
>>                 return 0;
>>
>
>When xsk_ring_cons__peek() returns 0, we will leak nb_pkts freshly
>allocated mbufs.
>See below for a suggestion.
>
>
>@@ -186,9 +186,6 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs,
>> uint16_t nb_pkts)
>>         if (xsk_prod_nb_free(fq, free_thresh) >= free_thresh)
>>                 (void)reserve_fill_queue(umem, ETH_AF_XDP_RX_BATCH_SIZE);
>>
>> -       if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, rcvd) !=
>> 0))
>> -               return 0;
>> -
>>         for (i = 0; i < rcvd; i++) {
>>                 const struct xdp_desc *desc;
>>                 uint64_t addr;
>> @@ -211,6 +208,10 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs,
>> uint16_t nb_pkts)
>>
>>         xsk_ring_cons__release(rx, rcvd);
>>
>> +       /* free the extra mbufs */
>> +       for (; rcvd < nb_pkts; rcvd++)
>> +               rte_pktmbuf_free(mbufs[rcvd]);
>> +
>>
>
>You can move this block after the statistic update...
>
>
>        /* statistics */
>>         rxq->stats.rx_pkts += (rcvd - dropped);
>>         rxq->stats.rx_bytes += rx_bytes;
>>
>
>... then define a out: label.
>And those mbufs are still clean and coming from a single mempool, we can
>put them back as a single bulk.
>Something like (again, untested):
>
>out:
>    if (count != nb_pkts) {
>        rte_mempool_put_bulk(rxq->mb_pool, &mbufs[count],
>                     nb_pkts - count);
>    }
>
>    return count;
>}
>
>And you would jump to this label when xsk_ring_cons__peek() == 0.
>What do you think ?

I think these are all sensible suggestions, will do.

>
>
>
>@@ -261,55 +262,56 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs,
>> uint16_t nb_pkts)
>>         struct xsk_umem_info *umem = txq->pair->umem;
>>         struct rte_mbuf *mbuf;
>>         void *addrs[ETH_AF_XDP_TX_BATCH_SIZE];
>> +       struct rte_mbuf *valid_bufs[ETH_AF_XDP_TX_BATCH_SIZE];
>>         unsigned long tx_bytes = 0;
>> -       int i, valid = 0;
>> +       int i;
>> +       uint16_t nb_valid = 0;
>>         uint32_t idx_tx;
>> +       uint32_t buf_len = ETH_AF_XDP_FRAME_SIZE -
>> ETH_AF_XDP_DATA_HEADROOM;
>>
>>         nb_pkts = RTE_MIN(nb_pkts, ETH_AF_XDP_TX_BATCH_SIZE);
>>
>>         pull_umem_cq(umem, nb_pkts);
>>
>> -       nb_pkts = rte_ring_dequeue_bulk(umem->buf_ring, addrs,
>> -                                       nb_pkts, NULL);
>> -       if (nb_pkts == 0)
>> +       for (i = 0; i < nb_pkts; i++) {
>> +               if (bufs[i]->pkt_len <= buf_len)
>> +                       valid_bufs[nb_valid++] = bufs[i];
>> +               else
>> +                       rte_pktmbuf_free(bufs[i]);
>> +       }
>> +
>> +       nb_valid = rte_ring_dequeue_bulk(umem->buf_ring, addrs,
>> +                                       nb_valid, NULL);
>> +       if (nb_valid == 0)
>>                 return 0;
>>
>>
>You can't return 0.
>You have stolen buffers from the caller with the previous check on pktlen.
>When the application resends this bulk or frees the whole bulk, we will
>have mbuf reuse bugs.
>
>Thinking about this, why would this happen ?
>This limitation should be handled by properly reporting the mtu.
>The application would then know it can't send those too big mbufs.
>

I think we can rely on mtu to handle the limitation, and assume that in this
tx function, all pktlen are valid. Will change in next version.

Thanks,
Xiaolong
>
>If I missed something else and/or if you still don't trust the application,
>I think the tx burst function should go like as described below.
>
>The first thing to do is check the mbufs length.
>At the first invalid length, you break from the loop at index i (i is the
>invalid buffer index).
>Then dequeue i - 1 buffers from buf_ring.
>Reserve i - 1 slots in tx.
>
>And return i buffers have been sent (plus a tx error stat += 1).
>
>You need to carefully take into account each step and free the buffer to
>buf_ring when relevant and free the mbufs properly.
>
>
>-       if (xsk_ring_prod__reserve(&txq->tx, nb_pkts, &idx_tx) != nb_pkts) {
>> +       if (xsk_ring_prod__reserve(&txq->tx, nb_valid, &idx_tx) !=
>> nb_valid) {
>>                 kick_tx(txq);
>> -               rte_ring_enqueue_bulk(umem->buf_ring, addrs, nb_pkts,
>> NULL);
>> +               rte_ring_enqueue_bulk(umem->buf_ring, addrs, nb_valid,
>> NULL);
>>                 return 0;
>>         }
>>
>> -       for (i = 0; i < nb_pkts; i++) {
>> +       for (i = 0; i < nb_valid; i++) {
>>                 struct xdp_desc *desc;
>>                 void *pkt;
>> -               uint32_t buf_len = ETH_AF_XDP_FRAME_SIZE
>> -                                       - ETH_AF_XDP_DATA_HEADROOM;
>>                 desc = xsk_ring_prod__tx_desc(&txq->tx, idx_tx + i);
>> -               mbuf = bufs[i];
>> -               if (mbuf->pkt_len <= buf_len) {
>> -                       desc->addr = (uint64_t)addrs[valid];
>> -                       desc->len = mbuf->pkt_len;
>> -                       pkt = xsk_umem__get_data(umem->mz->addr,
>> -                                                desc->addr);
>> -                       rte_memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *),
>> -                              desc->len);
>> -                       valid++;
>> -                       tx_bytes += mbuf->pkt_len;
>> -               }
>> +               mbuf = valid_bufs[i];
>> +               desc->addr = (uint64_t)addrs[i];
>> +               desc->len = mbuf->pkt_len;
>> +               pkt = xsk_umem__get_data(umem->mz->addr,
>> +                                        desc->addr);
>> +               rte_memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *),
>> +                          desc->len);
>> +               tx_bytes += mbuf->pkt_len;
>>                 rte_pktmbuf_free(mbuf);
>>         }
>>
>> -       xsk_ring_prod__submit(&txq->tx, nb_pkts);
>> +       xsk_ring_prod__submit(&txq->tx, nb_valid);
>>
>>         kick_tx(txq);
>>
>> -       if (valid < nb_pkts)
>> -               rte_ring_enqueue_bulk(umem->buf_ring, &addrs[valid],
>> -                                nb_pkts - valid, NULL);
>> -
>> -       txq->stats.err_pkts += nb_pkts - valid;
>> -       txq->stats.tx_pkts += valid;
>> +       txq->stats.err_pkts += nb_pkts - nb_valid;
>> +       txq->stats.tx_pkts += nb_valid;
>>         txq->stats.tx_bytes += tx_bytes;
>>
>>         return nb_pkts;
>> --
>> 2.17.1
>>
>>
>
>-- 
>David Marchand
  

Patch

diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c b/drivers/net/af_xdp/rte_eth_af_xdp.c
index 5cc643ce2..76a6a8331 100644
--- a/drivers/net/af_xdp/rte_eth_af_xdp.c
+++ b/drivers/net/af_xdp/rte_eth_af_xdp.c
@@ -138,22 +138,19 @@  reserve_fill_queue(struct xsk_umem_info *umem, int reserve_size)
 {
 	struct xsk_ring_prod *fq = &umem->fq;
 	uint32_t idx;
-	int i, ret;
-
-	ret = xsk_ring_prod__reserve(fq, reserve_size, &idx);
-	if (unlikely(!ret)) {
-		AF_XDP_LOG(ERR, "Failed to reserve enough fq descs.\n");
-		return ret;
-	}
+	int i;
 
 	for (i = 0; i < reserve_size; i++) {
 		__u64 *fq_addr;
 		void *addr = NULL;
 		if (rte_ring_dequeue(umem->buf_ring, &addr)) {
-			i--;
 			break;
 		}
-		fq_addr = xsk_ring_prod__fill_addr(fq, idx++);
+		if (unlikely(!xsk_ring_prod__reserve(fq, 1, &idx))) {
+			AF_XDP_LOG(WARNING, "Failed to reserve 1 fq desc.\n");
+			break;
+		}
+		fq_addr = xsk_ring_prod__fill_addr(fq, idx);
 		*fq_addr = (uint64_t)addr;
 	}
 
@@ -179,6 +176,9 @@  eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 
 	nb_pkts = RTE_MIN(nb_pkts, ETH_AF_XDP_TX_BATCH_SIZE);
 
+	if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, nb_pkts) != 0))
+		return 0;
+
 	rcvd = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
 	if (rcvd == 0)
 		return 0;
@@ -186,9 +186,6 @@  eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 	if (xsk_prod_nb_free(fq, free_thresh) >= free_thresh)
 		(void)reserve_fill_queue(umem, ETH_AF_XDP_RX_BATCH_SIZE);
 
-	if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, rcvd) != 0))
-		return 0;
-
 	for (i = 0; i < rcvd; i++) {
 		const struct xdp_desc *desc;
 		uint64_t addr;
@@ -211,6 +208,10 @@  eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 
 	xsk_ring_cons__release(rx, rcvd);
 
+	/* free the extra mbufs */
+	for (; rcvd < nb_pkts; rcvd++)
+		rte_pktmbuf_free(mbufs[rcvd]);
+
 	/* statistics */
 	rxq->stats.rx_pkts += (rcvd - dropped);
 	rxq->stats.rx_bytes += rx_bytes;
@@ -261,55 +262,56 @@  eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 	struct xsk_umem_info *umem = txq->pair->umem;
 	struct rte_mbuf *mbuf;
 	void *addrs[ETH_AF_XDP_TX_BATCH_SIZE];
+	struct rte_mbuf *valid_bufs[ETH_AF_XDP_TX_BATCH_SIZE];
 	unsigned long tx_bytes = 0;
-	int i, valid = 0;
+	int i;
+	uint16_t nb_valid = 0;
 	uint32_t idx_tx;
+	uint32_t buf_len = ETH_AF_XDP_FRAME_SIZE - ETH_AF_XDP_DATA_HEADROOM;
 
 	nb_pkts = RTE_MIN(nb_pkts, ETH_AF_XDP_TX_BATCH_SIZE);
 
 	pull_umem_cq(umem, nb_pkts);
 
-	nb_pkts = rte_ring_dequeue_bulk(umem->buf_ring, addrs,
-					nb_pkts, NULL);
-	if (nb_pkts == 0)
+	for (i = 0; i < nb_pkts; i++) {
+		if (bufs[i]->pkt_len <= buf_len)
+			valid_bufs[nb_valid++] = bufs[i];
+		else
+			rte_pktmbuf_free(bufs[i]);
+	}
+
+	nb_valid = rte_ring_dequeue_bulk(umem->buf_ring, addrs,
+					nb_valid, NULL);
+	if (nb_valid == 0)
 		return 0;
 
-	if (xsk_ring_prod__reserve(&txq->tx, nb_pkts, &idx_tx) != nb_pkts) {
+	if (xsk_ring_prod__reserve(&txq->tx, nb_valid, &idx_tx) != nb_valid) {
 		kick_tx(txq);
-		rte_ring_enqueue_bulk(umem->buf_ring, addrs, nb_pkts, NULL);
+		rte_ring_enqueue_bulk(umem->buf_ring, addrs, nb_valid, NULL);
 		return 0;
 	}
 
-	for (i = 0; i < nb_pkts; i++) {
+	for (i = 0; i < nb_valid; i++) {
 		struct xdp_desc *desc;
 		void *pkt;
-		uint32_t buf_len = ETH_AF_XDP_FRAME_SIZE
-					- ETH_AF_XDP_DATA_HEADROOM;
 		desc = xsk_ring_prod__tx_desc(&txq->tx, idx_tx + i);
-		mbuf = bufs[i];
-		if (mbuf->pkt_len <= buf_len) {
-			desc->addr = (uint64_t)addrs[valid];
-			desc->len = mbuf->pkt_len;
-			pkt = xsk_umem__get_data(umem->mz->addr,
-						 desc->addr);
-			rte_memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *),
-			       desc->len);
-			valid++;
-			tx_bytes += mbuf->pkt_len;
-		}
+		mbuf = valid_bufs[i];
+		desc->addr = (uint64_t)addrs[i];
+		desc->len = mbuf->pkt_len;
+		pkt = xsk_umem__get_data(umem->mz->addr,
+					 desc->addr);
+		rte_memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *),
+			   desc->len);
+		tx_bytes += mbuf->pkt_len;
 		rte_pktmbuf_free(mbuf);
 	}
 
-	xsk_ring_prod__submit(&txq->tx, nb_pkts);
+	xsk_ring_prod__submit(&txq->tx, nb_valid);
 
 	kick_tx(txq);
 
-	if (valid < nb_pkts)
-		rte_ring_enqueue_bulk(umem->buf_ring, &addrs[valid],
-				 nb_pkts - valid, NULL);
-
-	txq->stats.err_pkts += nb_pkts - valid;
-	txq->stats.tx_pkts += valid;
+	txq->stats.err_pkts += nb_pkts - nb_valid;
+	txq->stats.tx_pkts += nb_valid;
 	txq->stats.tx_bytes += tx_bytes;
 
 	return nb_pkts;