@@ -63,6 +63,15 @@
#include "mlx4_utils.h"
/**
+ * Pointer-value pair structure used in tx_post_send for saving the first
+ * DWORD (32 byte) of a TXBB.
+ */
+struct pv {
+ struct mlx4_wqe_data_seg *dseg;
+ uint32_t val;
+};
+
+/**
* Stamp a WQE so it won't be reused by the HW.
*
* Routine is used when freeing WQE used by the chip or when failing
@@ -291,24 +300,28 @@
* Target Tx queue.
* @param pkt
* Packet to transmit.
- * @param send_flags
- * @p MLX4_WQE_CTRL_CQ_UPDATE to request completion on this packet.
*
* @return
* 0 on success, negative errno value otherwise and rte_errno is set.
*/
static inline int
-mlx4_post_send(struct txq *txq, struct rte_mbuf *pkt, uint32_t send_flags)
+mlx4_post_send(struct txq *txq, struct rte_mbuf *pkt)
{
struct mlx4_wqe_ctrl_seg *ctrl;
struct mlx4_wqe_data_seg *dseg;
struct mlx4_sq *sq = &txq->msq;
+ struct rte_mbuf *buf;
uint32_t head_idx = sq->head & sq->txbb_cnt_mask;
uint32_t lkey;
uintptr_t addr;
+ uint32_t srcrb_flags;
+ uint32_t owner_opcode = MLX4_OPCODE_SEND;
+ uint32_t byte_count;
int wqe_real_size;
int nr_txbbs;
int rc;
+ struct pv *pv = (struct pv *)txq->bounce_buf;
+ int pv_counter = 0;
/* Calculate the needed work queue entry size for this packet. */
wqe_real_size = sizeof(struct mlx4_wqe_ctrl_seg) +
@@ -324,56 +337,81 @@
rc = ENOSPC;
goto err;
}
- /* Get the control and single-data entries of the WQE. */
+ /* Get the control and data entries of the WQE. */
ctrl = (struct mlx4_wqe_ctrl_seg *)mlx4_get_send_wqe(sq, head_idx);
dseg = (struct mlx4_wqe_data_seg *)((uintptr_t)ctrl +
sizeof(struct mlx4_wqe_ctrl_seg));
- /* Fill the data segment with buffer information. */
- addr = rte_pktmbuf_mtod(pkt, uintptr_t);
- rte_prefetch0((volatile void *)addr);
- dseg->addr = rte_cpu_to_be_64(addr);
- /* Memory region key for this memory pool. */
- lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(pkt));
- if (unlikely(lkey == (uint32_t)-1)) {
- /* MR does not exist. */
- DEBUG("%p: unable to get MP <-> MR association", (void *)txq);
+ /* Fill the data segments with buffer information. */
+ for (buf = pkt; buf != NULL; buf = buf->next, dseg++) {
+ addr = rte_pktmbuf_mtod(buf, uintptr_t);
+ rte_prefetch0((volatile void *)addr);
+ /* Handle WQE wraparound. */
+ if (unlikely(dseg >= (struct mlx4_wqe_data_seg *)sq->eob))
+ dseg = (struct mlx4_wqe_data_seg *)sq->buf;
+ dseg->addr = rte_cpu_to_be_64(addr);
+ /* Memory region key for this memory pool. */
+ lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(buf));
+ if (unlikely(lkey == (uint32_t)-1)) {
+ /* MR does not exist. */
+ DEBUG("%p: unable to get MP <-> MR association",
+ (void *)txq);
+ /*
+ * Restamp entry in case of failure.
+ * Make sure that size is written correctly
+ * Note that we give ownership to the SW, not the HW.
+ */
+ ctrl->fence_size = (wqe_real_size >> 4) & 0x3f;
+ mlx4_txq_stamp_freed_wqe(sq, head_idx,
+ (sq->head & sq->txbb_cnt) ? 0 : 1);
+ rc = EFAULT;
+ goto err;
+ }
+ dseg->lkey = rte_cpu_to_be_32(lkey);
+ if (likely(buf->data_len)) {
+ byte_count = rte_cpu_to_be_32(buf->data_len);
+ } else {
+ /*
+ * Zero length segment is treated as inline segment
+ * with zero data.
+ */
+ byte_count = RTE_BE32(0x80000000);
+ }
/*
- * Restamp entry in case of failure, make sure that size is
- * written correctly.
- * Note that we give ownership to the SW, not the HW.
+ * If the data segment is not at the beginning of a
+ * Tx basic block (TXBB) then write the byte count,
+ * else postpone the writing to just before updating the
+ * control segment.
*/
- ctrl->fence_size = (wqe_real_size >> 4) & 0x3f;
- mlx4_txq_stamp_freed_wqe(sq, head_idx,
- (sq->head & sq->txbb_cnt) ? 0 : 1);
- rc = EFAULT;
- goto err;
+ if ((uintptr_t)dseg & (uintptr_t)(MLX4_TXBB_SIZE - 1)) {
+ /*
+ * Need a barrier here before writing the byte_count
+ * fields to make sure that all the data is visible
+ * before the byte_count field is set.
+ * Otherwise, if the segment begins a new cacheline,
+ * the HCA prefetcher could grab the 64-byte chunk and
+ * get a valid (!= 0xffffffff) byte count but stale
+ * data, and end up sending the wrong data.
+ */
+ rte_io_wmb();
+ dseg->byte_count = byte_count;
+ } else {
+ /*
+ * This data segment starts at the beginning of a new
+ * TXBB, so we need to postpone its byte_count writing
+ * for later.
+ */
+ pv[pv_counter].dseg = dseg;
+ pv[pv_counter++].val = byte_count;
+ }
}
- dseg->lkey = rte_cpu_to_be_32(lkey);
- /*
- * Need a barrier here before writing the byte_count field to
- * make sure that all the data is visible before the
- * byte_count field is set. Otherwise, if the segment begins
- * a new cache line, the HCA prefetcher could grab the 64-byte
- * chunk and get a valid (!= 0xffffffff) byte count but
- * stale data, and end up sending the wrong data.
- */
- rte_io_wmb();
- if (likely(pkt->data_len))
- dseg->byte_count = rte_cpu_to_be_32(pkt->data_len);
- else
- /*
- * Zero length segment is treated as inline segment
- * with zero data.
- */
- dseg->byte_count = RTE_BE32(0x80000000);
- /*
- * Fill the control parameters for this packet.
- * For raw Ethernet, the SOLICIT flag is used to indicate that no ICRC
- * should be calculated.
- */
- ctrl->srcrb_flags =
- rte_cpu_to_be_32(MLX4_WQE_CTRL_SOLICIT |
- (send_flags & MLX4_WQE_CTRL_CQ_UPDATE));
+ /* Write the first DWORD of each TXBB save earlier. */
+ if (pv_counter) {
+ /* Need a barrier here before writing the byte_count. */
+ rte_io_wmb();
+ for (--pv_counter; pv_counter >= 0; pv_counter--)
+ pv[pv_counter].dseg->byte_count = pv[pv_counter].val;
+ }
+ /* Fill the control parameters for this packet. */
ctrl->fence_size = (wqe_real_size >> 4) & 0x3f;
/*
* The caller should prepare "imm" in advance in order to support
@@ -382,14 +420,27 @@
*/
ctrl->imm = 0;
/*
- * Make sure descriptor is fully written before setting ownership
- * bit (because HW can start executing as soon as we do).
+ * For raw Ethernet, the SOLICIT flag is used to indicate that no ICRC
+ * should be calculated.
+ */
+ txq->elts_comp_cd -= nr_txbbs;
+ if (unlikely(txq->elts_comp_cd <= 0)) {
+ txq->elts_comp_cd = txq->elts_comp_cd_init;
+ srcrb_flags = RTE_BE32(MLX4_WQE_CTRL_SOLICIT |
+ MLX4_WQE_CTRL_CQ_UPDATE);
+ } else {
+ srcrb_flags = RTE_BE32(MLX4_WQE_CTRL_SOLICIT);
+ }
+ ctrl->srcrb_flags = srcrb_flags;
+ /*
+ * Make sure descriptor is fully written before
+ * setting ownership bit (because HW can start
+ * executing as soon as we do).
*/
rte_wmb();
- ctrl->owner_opcode =
- rte_cpu_to_be_32(MLX4_OPCODE_SEND |
- ((sq->head & sq->txbb_cnt) ?
- MLX4_BIT_WQE_OWN : 0));
+ ctrl->owner_opcode = rte_cpu_to_be_32(owner_opcode |
+ ((sq->head & sq->txbb_cnt) ?
+ MLX4_BIT_WQE_OWN : 0));
sq->head += nr_txbbs;
return 0;
err:
@@ -416,14 +467,13 @@
struct txq *txq = (struct txq *)dpdk_txq;
unsigned int elts_head = txq->elts_head;
const unsigned int elts_n = txq->elts_n;
- unsigned int elts_comp_cd = txq->elts_comp_cd;
unsigned int elts_comp = 0;
unsigned int bytes_sent = 0;
unsigned int i;
unsigned int max;
int err;
- assert(elts_comp_cd != 0);
+ assert(txq->elts_comp_cd != 0);
mlx4_txq_complete(txq);
max = (elts_n - (elts_head - txq->elts_tail));
if (max > elts_n)
@@ -442,8 +492,6 @@
(((elts_head + 1) == elts_n) ? 0 : elts_head + 1);
struct txq_elt *elt_next = &(*txq->elts)[elts_head_next];
struct txq_elt *elt = &(*txq->elts)[elts_head];
- unsigned int segs = buf->nb_segs;
- uint32_t send_flags = 0;
/* Clean up old buffer. */
if (likely(elt->buf != NULL)) {
@@ -461,34 +509,16 @@
tmp = next;
} while (tmp != NULL);
}
- /* Request Tx completion. */
- if (unlikely(--elts_comp_cd == 0)) {
- elts_comp_cd = txq->elts_comp_cd_init;
- ++elts_comp;
- send_flags |= MLX4_WQE_CTRL_CQ_UPDATE;
- }
- if (likely(segs == 1)) {
- /* Update element. */
- elt->buf = buf;
- RTE_MBUF_PREFETCH_TO_FREE(elt_next->buf);
- /* Post the packet for sending. */
- err = mlx4_post_send(txq, buf, send_flags);
- if (unlikely(err)) {
- if (unlikely(send_flags &
- MLX4_WQE_CTRL_CQ_UPDATE)) {
- elts_comp_cd = 1;
- --elts_comp;
- }
- elt->buf = NULL;
- goto stop;
- }
- elt->buf = buf;
- bytes_sent += buf->pkt_len;
- } else {
- err = -EINVAL;
- rte_errno = -err;
+ RTE_MBUF_PREFETCH_TO_FREE(elt_next->buf);
+ /* Post the packet for sending. */
+ err = mlx4_post_send(txq, buf);
+ if (unlikely(err)) {
+ elt->buf = NULL;
goto stop;
}
+ elt->buf = buf;
+ bytes_sent += buf->pkt_len;
+ ++elts_comp;
elts_head = elts_head_next;
}
stop:
@@ -504,7 +534,6 @@
rte_write32(txq->msq.doorbell_qpn, txq->msq.db);
txq->elts_head = elts_head;
txq->elts_comp += elts_comp;
- txq->elts_comp_cd = elts_comp_cd;
return i;
}
@@ -101,13 +101,15 @@ struct txq {
struct mlx4_cq mcq; /**< Info for directly manipulating the CQ. */
unsigned int elts_head; /**< Current index in (*elts)[]. */
unsigned int elts_tail; /**< First element awaiting completion. */
- unsigned int elts_comp; /**< Number of completion requests. */
- unsigned int elts_comp_cd; /**< Countdown for next completion. */
+ unsigned int elts_comp; /**< Number of packets awaiting completion. */
+ int elts_comp_cd; /**< Countdown for next completion. */
unsigned int elts_comp_cd_init; /**< Initial value for countdown. */
unsigned int elts_n; /**< (*elts)[] length. */
struct txq_elt (*elts)[]; /**< Tx elements. */
struct mlx4_txq_stats stats; /**< Tx queue counters. */
uint32_t max_inline; /**< Max inline send size. */
+ uint8_t *bounce_buf;
+ /**< Memory used for storing the first DWORD of data TXBBs. */
struct {
const struct rte_mempool *mp; /**< Cached memory pool. */
struct ibv_mr *mr; /**< Memory region (for mp). */
@@ -83,8 +83,13 @@
rte_calloc_socket("TXQ", 1, sizeof(*elts), 0, txq->socket);
int ret = 0;
- if (elts == NULL) {
- ERROR("%p: can't allocate packets array", (void *)txq);
+ /* Allocate bounce buffer. */
+ txq->bounce_buf = rte_zmalloc_socket("TXQ",
+ MLX4_MAX_WQE_SIZE,
+ RTE_CACHE_LINE_MIN_SIZE,
+ txq->socket);
+ if (!elts || !txq->bounce_buf) {
+ ERROR("%p: can't allocate TXQ memory", (void *)txq);
ret = ENOMEM;
goto error;
}
@@ -110,6 +115,8 @@
assert(ret == 0);
return 0;
error:
+ rte_free(txq->bounce_buf);
+ txq->bounce_buf = NULL;
rte_free(elts);
DEBUG("%p: failed, freed everything", (void *)txq);
assert(ret > 0);
@@ -175,6 +182,7 @@
claim_zero(ibv_destroy_qp(txq->qp));
if (txq->cq != NULL)
claim_zero(ibv_destroy_cq(txq->cq));
+ rte_free(txq->bounce_buf);
for (i = 0; (i != RTE_DIM(txq->mp2mr)); ++i) {
if (txq->mp2mr[i].mp == NULL)
break;