[2/2] af_xdp: avoid to unnecessary allocation and free mbuf
Checks
Commit Message
optimize rx performance, by allocating mbuf based on result
of xsk_ring_cons__peek, to avoid to redundancy allocation,
and free mbuf when receive packets
Signed-off-by: Li RongQing <lirongqing@baidu.com>
Signed-off-by: Dongsheng Rong <rongdongsheng@baidu.com>
---
drivers/net/af_xdp/rte_eth_af_xdp.c | 64 ++++++++++++++++---------------------
1 file changed, 27 insertions(+), 37 deletions(-)
Comments
>
> optimize rx performance, by allocating mbuf based on result
> of xsk_ring_cons__peek, to avoid to redundancy allocation,
> and free mbuf when receive packets
>
> Signed-off-by: Li RongQing <lirongqing@baidu.com>
> Signed-off-by: Dongsheng Rong <rongdongsheng@baidu.com>
> ---
> drivers/net/af_xdp/rte_eth_af_xdp.c | 64 ++++++++++++++++---------------
> ------
> 1 file changed, 27 insertions(+), 37 deletions(-)
>
> diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c
> b/drivers/net/af_xdp/rte_eth_af_xdp.c
> index 7ce4ad04a..48824050e 100644
> --- a/drivers/net/af_xdp/rte_eth_af_xdp.c
> +++ b/drivers/net/af_xdp/rte_eth_af_xdp.c
> @@ -229,28 +229,29 @@ af_xdp_rx_zc(void *queue, struct rte_mbuf
> **bufs, uint16_t nb_pkts)
> struct xsk_umem_info *umem = rxq->umem;
> uint32_t idx_rx = 0;
> unsigned long rx_bytes = 0;
> - int rcvd, i;
> + int i;
> struct rte_mbuf *fq_bufs[ETH_AF_XDP_RX_BATCH_SIZE];
>
> - /* allocate bufs for fill queue replenishment after rx */
> - if (rte_pktmbuf_alloc_bulk(umem->mb_pool, fq_bufs, nb_pkts)) {
> - AF_XDP_LOG(DEBUG,
> - "Failed to get enough buffers for fq.\n");
> - return 0;
> - }
>
> - rcvd = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
> + nb_pkts = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
>
> - if (rcvd == 0) {
> + if (nb_pkts == 0) {
> #if defined(XDP_USE_NEED_WAKEUP)
> if (xsk_ring_prod__needs_wakeup(&umem->fq))
> (void)poll(rxq->fds, 1, 1000);
> #endif
>
> - goto out;
> + return 0;
> }
>
> - for (i = 0; i < rcvd; i++) {
> + /* allocate bufs for fill queue replenishment after rx */
> + if (rte_pktmbuf_alloc_bulk(umem->mb_pool, fq_bufs, nb_pkts)) {
> + AF_XDP_LOG(DEBUG,
> + "Failed to get enough buffers for fq.\n");
Thanks for this patch. I've considered this in the past.
There is a problem if we hit this condition.
We advance the rx producer @ xsk_ring_cons__peek.
But if we have no mbufs to hold the rx data, it is lost.
That's why we allocate the mbufs up front now.
Agree that we might have wasteful allocations and it's not the most optimal, but we don't drop packets due to failed mbuf allocs.
> + return 0;
> + }
> +
> + for (i = 0; i < nb_pkts; i++) {
> const struct xdp_desc *desc;
> uint64_t addr;
> uint32_t len;
> @@ -275,20 +276,15 @@ af_xdp_rx_zc(void *queue, struct rte_mbuf
> **bufs, uint16_t nb_pkts)
> rx_bytes += len;
> }
>
> - xsk_ring_cons__release(rx, rcvd);
> + xsk_ring_cons__release(rx, nb_pkts);
>
> - (void)reserve_fill_queue(umem, rcvd, fq_bufs);
> + (void)reserve_fill_queue(umem, nb_pkts, fq_bufs);
>
> /* statistics */
> - rxq->stats.rx_pkts += rcvd;
> + rxq->stats.rx_pkts += nb_pkts;
> rxq->stats.rx_bytes += rx_bytes;
>
> -out:
> - if (rcvd != nb_pkts)
> - rte_mempool_put_bulk(umem->mb_pool, (void
> **)&fq_bufs[rcvd],
> - nb_pkts - rcvd);
> -
> - return rcvd;
> + return nb_pkts;
> }
> #else
> static uint16_t
> @@ -300,27 +296,26 @@ af_xdp_rx_cp(void *queue, struct rte_mbuf
> **bufs, uint16_t nb_pkts)
> struct xsk_ring_prod *fq = &umem->fq;
> uint32_t idx_rx = 0;
> unsigned long rx_bytes = 0;
> - int rcvd, i;
> + int i;
> uint32_t free_thresh = fq->size >> 1;
> struct rte_mbuf *mbufs[ETH_AF_XDP_RX_BATCH_SIZE];
>
> - if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, nb_pkts)
> != 0))
> - return 0;
> -
> - rcvd = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
> - if (rcvd == 0) {
> + nb_pkts = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
> + if (nb_pkts == 0) {
> #if defined(XDP_USE_NEED_WAKEUP)
> if (xsk_ring_prod__needs_wakeup(fq))
> (void)poll(rxq->fds, 1, 1000);
> #endif
> -
> - goto out;
> + return 0;
> }
>
> + if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, nb_pkts)
> != 0))
> + return 0;
> +
> if (xsk_prod_nb_free(fq, free_thresh) >= free_thresh)
> (void)reserve_fill_queue(umem,
> ETH_AF_XDP_RX_BATCH_SIZE, NULL);
>
> - for (i = 0; i < rcvd; i++) {
> + for (i = 0; i < nb_pkts; i++) {
> const struct xdp_desc *desc;
> uint64_t addr;
> uint32_t len;
> @@ -339,18 +334,13 @@ af_xdp_rx_cp(void *queue, struct rte_mbuf
> **bufs, uint16_t nb_pkts)
> bufs[i] = mbufs[i];
> }
>
> - xsk_ring_cons__release(rx, rcvd);
> + xsk_ring_cons__release(rx, nb_pkts);
>
> /* statistics */
> - rxq->stats.rx_pkts += rcvd;
> + rxq->stats.rx_pkts += nb_pkts;
> rxq->stats.rx_bytes += rx_bytes;
>
> -out:
> - if (rcvd != nb_pkts)
> - rte_mempool_put_bulk(rxq->mb_pool, (void
> **)&mbufs[rcvd],
> - nb_pkts - rcvd);
> -
> - return rcvd;
> + return nb_pkts;
> }
> #endif
>
> --
> 2.16.2
> -----Original Message-----
> From: Loftus, Ciara [mailto:ciara.loftus@intel.com]
> Sent: Friday, September 18, 2020 5:39 PM
> To: Li,Rongqing <lirongqing@baidu.com>
> Cc: dev@dpdk.org
> Subject: RE: [dpdk-dev] [PATCH 2/2] af_xdp: avoid to unnecessary allocation and
> free mbuf
>
> >
> > optimize rx performance, by allocating mbuf based on result of
> > xsk_ring_cons__peek, to avoid to redundancy allocation, and free mbuf
> > when receive packets
> >
> > Signed-off-by: Li RongQing <lirongqing@baidu.com>
> > Signed-off-by: Dongsheng Rong <rongdongsheng@baidu.com>
> > ---
> > drivers/net/af_xdp/rte_eth_af_xdp.c | 64
> > ++++++++++++++++---------------
> > ------
> > 1 file changed, 27 insertions(+), 37 deletions(-)
> >
> > diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c
> > b/drivers/net/af_xdp/rte_eth_af_xdp.c
> > index 7ce4ad04a..48824050e 100644
> > --- a/drivers/net/af_xdp/rte_eth_af_xdp.c
> > +++ b/drivers/net/af_xdp/rte_eth_af_xdp.c
> > @@ -229,28 +229,29 @@ af_xdp_rx_zc(void *queue, struct rte_mbuf
> > **bufs, uint16_t nb_pkts)
> > struct xsk_umem_info *umem = rxq->umem;
> > uint32_t idx_rx = 0;
> > unsigned long rx_bytes = 0;
> > - int rcvd, i;
> > + int i;
> > struct rte_mbuf *fq_bufs[ETH_AF_XDP_RX_BATCH_SIZE];
> >
> > - /* allocate bufs for fill queue replenishment after rx */
> > - if (rte_pktmbuf_alloc_bulk(umem->mb_pool, fq_bufs, nb_pkts)) {
> > - AF_XDP_LOG(DEBUG,
> > - "Failed to get enough buffers for fq.\n");
> > - return 0;
> > - }
> >
> > - rcvd = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
> > + nb_pkts = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
> >
> > - if (rcvd == 0) {
> > + if (nb_pkts == 0) {
> > #if defined(XDP_USE_NEED_WAKEUP)
> > if (xsk_ring_prod__needs_wakeup(&umem->fq))
> > (void)poll(rxq->fds, 1, 1000);
> > #endif
> >
> > - goto out;
> > + return 0;
> > }
> >
> > - for (i = 0; i < rcvd; i++) {
> > + /* allocate bufs for fill queue replenishment after rx */
> > + if (rte_pktmbuf_alloc_bulk(umem->mb_pool, fq_bufs, nb_pkts)) {
> > + AF_XDP_LOG(DEBUG,
> > + "Failed to get enough buffers for fq.\n");
>
> Thanks for this patch. I've considered this in the past.
> There is a problem if we hit this condition.
> We advance the rx producer @ xsk_ring_cons__peek.
> But if we have no mbufs to hold the rx data, it is lost.
> That's why we allocate the mbufs up front now.
True, thanks
-Li
> -----Original Message-----
> From: Loftus, Ciara [mailto:ciara.loftus@intel.com]
> Sent: Friday, September 18, 2020 5:39 PM
> To: Li,Rongqing <lirongqing@baidu.com>
> Cc: dev@dpdk.org
> Subject: RE: [dpdk-dev] [PATCH 2/2] af_xdp: avoid to unnecessary allocation and
> free mbuf
>
> >
> > optimize rx performance, by allocating mbuf based on result of
> > xsk_ring_cons__peek, to avoid to redundancy allocation, and free mbuf
> > when receive packets
> >
> > Signed-off-by: Li RongQing <lirongqing@baidu.com>
> > Signed-off-by: Dongsheng Rong <rongdongsheng@baidu.com>
> > ---
> > drivers/net/af_xdp/rte_eth_af_xdp.c | 64
> > ++++++++++++++++---------------
> > ------
> > 1 file changed, 27 insertions(+), 37 deletions(-)
> >
> > diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c
> > b/drivers/net/af_xdp/rte_eth_af_xdp.c
> > index 7ce4ad04a..48824050e 100644
> > --- a/drivers/net/af_xdp/rte_eth_af_xdp.c
> > +++ b/drivers/net/af_xdp/rte_eth_af_xdp.c
> > @@ -229,28 +229,29 @@ af_xdp_rx_zc(void *queue, struct rte_mbuf
> > **bufs, uint16_t nb_pkts)
> > struct xsk_umem_info *umem = rxq->umem;
> > uint32_t idx_rx = 0;
> > unsigned long rx_bytes = 0;
> > - int rcvd, i;
> > + int i;
> > struct rte_mbuf *fq_bufs[ETH_AF_XDP_RX_BATCH_SIZE];
> >
> > - /* allocate bufs for fill queue replenishment after rx */
> > - if (rte_pktmbuf_alloc_bulk(umem->mb_pool, fq_bufs, nb_pkts)) {
> > - AF_XDP_LOG(DEBUG,
> > - "Failed to get enough buffers for fq.\n");
> > - return 0;
> > - }
> >
> > - rcvd = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
> > + nb_pkts = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
> >
> > - if (rcvd == 0) {
> > + if (nb_pkts == 0) {
> > #if defined(XDP_USE_NEED_WAKEUP)
> > if (xsk_ring_prod__needs_wakeup(&umem->fq))
> > (void)poll(rxq->fds, 1, 1000);
> > #endif
> >
> > - goto out;
> > + return 0;
> > }
> >
> > - for (i = 0; i < rcvd; i++) {
> > + /* allocate bufs for fill queue replenishment after rx */
> > + if (rte_pktmbuf_alloc_bulk(umem->mb_pool, fq_bufs, nb_pkts)) {
> > + AF_XDP_LOG(DEBUG,
> > + "Failed to get enough buffers for fq.\n");
>
> Thanks for this patch. I've considered this in the past.
> There is a problem if we hit this condition.
> We advance the rx producer @ xsk_ring_cons__peek.
> But if we have no mbufs to hold the rx data, it is lost.
> That's why we allocate the mbufs up front now.
> Agree that we might have wasteful allocations and it's not the most optimal,
> but we don't drop packets due to failed mbuf allocs.
>
> > + return 0;
xsk_ring_cons__peek advance rx cached_cons and rx cached_prod, I think it is harmless to advance cached_prod,
so we can restore rx cached_cons if mbuf fail to be allocated, to fix, like:
if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, nb_pkts) != 0)) {
rx->cached_cons -= rcvd;
return 0;
}
Is it right?
-Li
@@ -229,28 +229,29 @@ af_xdp_rx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
struct xsk_umem_info *umem = rxq->umem;
uint32_t idx_rx = 0;
unsigned long rx_bytes = 0;
- int rcvd, i;
+ int i;
struct rte_mbuf *fq_bufs[ETH_AF_XDP_RX_BATCH_SIZE];
- /* allocate bufs for fill queue replenishment after rx */
- if (rte_pktmbuf_alloc_bulk(umem->mb_pool, fq_bufs, nb_pkts)) {
- AF_XDP_LOG(DEBUG,
- "Failed to get enough buffers for fq.\n");
- return 0;
- }
- rcvd = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
+ nb_pkts = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
- if (rcvd == 0) {
+ if (nb_pkts == 0) {
#if defined(XDP_USE_NEED_WAKEUP)
if (xsk_ring_prod__needs_wakeup(&umem->fq))
(void)poll(rxq->fds, 1, 1000);
#endif
- goto out;
+ return 0;
}
- for (i = 0; i < rcvd; i++) {
+ /* allocate bufs for fill queue replenishment after rx */
+ if (rte_pktmbuf_alloc_bulk(umem->mb_pool, fq_bufs, nb_pkts)) {
+ AF_XDP_LOG(DEBUG,
+ "Failed to get enough buffers for fq.\n");
+ return 0;
+ }
+
+ for (i = 0; i < nb_pkts; i++) {
const struct xdp_desc *desc;
uint64_t addr;
uint32_t len;
@@ -275,20 +276,15 @@ af_xdp_rx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
rx_bytes += len;
}
- xsk_ring_cons__release(rx, rcvd);
+ xsk_ring_cons__release(rx, nb_pkts);
- (void)reserve_fill_queue(umem, rcvd, fq_bufs);
+ (void)reserve_fill_queue(umem, nb_pkts, fq_bufs);
/* statistics */
- rxq->stats.rx_pkts += rcvd;
+ rxq->stats.rx_pkts += nb_pkts;
rxq->stats.rx_bytes += rx_bytes;
-out:
- if (rcvd != nb_pkts)
- rte_mempool_put_bulk(umem->mb_pool, (void **)&fq_bufs[rcvd],
- nb_pkts - rcvd);
-
- return rcvd;
+ return nb_pkts;
}
#else
static uint16_t
@@ -300,27 +296,26 @@ af_xdp_rx_cp(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
struct xsk_ring_prod *fq = &umem->fq;
uint32_t idx_rx = 0;
unsigned long rx_bytes = 0;
- int rcvd, i;
+ int i;
uint32_t free_thresh = fq->size >> 1;
struct rte_mbuf *mbufs[ETH_AF_XDP_RX_BATCH_SIZE];
- if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, nb_pkts) != 0))
- return 0;
-
- rcvd = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
- if (rcvd == 0) {
+ nb_pkts = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
+ if (nb_pkts == 0) {
#if defined(XDP_USE_NEED_WAKEUP)
if (xsk_ring_prod__needs_wakeup(fq))
(void)poll(rxq->fds, 1, 1000);
#endif
-
- goto out;
+ return 0;
}
+ if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, nb_pkts) != 0))
+ return 0;
+
if (xsk_prod_nb_free(fq, free_thresh) >= free_thresh)
(void)reserve_fill_queue(umem, ETH_AF_XDP_RX_BATCH_SIZE, NULL);
- for (i = 0; i < rcvd; i++) {
+ for (i = 0; i < nb_pkts; i++) {
const struct xdp_desc *desc;
uint64_t addr;
uint32_t len;
@@ -339,18 +334,13 @@ af_xdp_rx_cp(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
bufs[i] = mbufs[i];
}
- xsk_ring_cons__release(rx, rcvd);
+ xsk_ring_cons__release(rx, nb_pkts);
/* statistics */
- rxq->stats.rx_pkts += rcvd;
+ rxq->stats.rx_pkts += nb_pkts;
rxq->stats.rx_bytes += rx_bytes;
-out:
- if (rcvd != nb_pkts)
- rte_mempool_put_bulk(rxq->mb_pool, (void **)&mbufs[rcvd],
- nb_pkts - rcvd);
-
- return rcvd;
+ return nb_pkts;
}
#endif