-----Original Message-----
> Date: Fri, 15 Dec 2017 18:38:23 +0530
> From: Sunil Kumar Kori <sunil.kori@nxp.com>
> To: jerin.jacob@caviumnetworks.com
> CC: dev@dpdk.org, hemant.agrawal@nxp.com
> Subject: [PATCH 1/6] bus/dpaa: added event dequeue and consumption support
> X-Mailer: git-send-email 2.9.3
>
> To receive events from given event port, corresponding
> function needs to be added which receives events
> from portal. Also added function to consume received
> events based on entry index.
>
> Signed-off-by: Sunil Kumar Kori <sunil.kori@nxp.com>
> ---
> drivers/bus/dpaa/base/qbman/qman.c | 90 +++++++++++++++++++++++++++++--
> drivers/bus/dpaa/dpaa_bus.c | 1 +
> drivers/bus/dpaa/include/fsl_qman.h | 26 +++++++--
> drivers/bus/dpaa/rte_bus_dpaa_version.map | 5 ++
> drivers/bus/dpaa/rte_dpaa_bus.h | 14 +++++
> drivers/net/dpaa/dpaa_rxtx.c | 1 +
> 6 files changed, 128 insertions(+), 9 deletions(-)
>
> diff --git a/drivers/bus/dpaa/base/qbman/qman.c b/drivers/bus/dpaa/base/qbman/qman.c
> index 42d509d..f39e618 100644
> --- a/drivers/bus/dpaa/base/qbman/qman.c
> +++ b/drivers/bus/dpaa/base/qbman/qman.c
> @@ -41,6 +41,7 @@
> #include "qman.h"
> #include <rte_branch_prediction.h>
> #include <rte_dpaa_bus.h>
> +#include <rte_eventdev.h>
>
> /* Compilation constants */
> #define DQRR_MAXFILL 15
> @@ -1144,6 +1145,74 @@ unsigned int qman_portal_poll_rx(unsigned int poll_limit,
> return limit;
> }
>
> +u32 qman_portal_dequeue(struct rte_event ev[], unsigned int poll_limit,
> + void **bufs)
> +{
> + const struct qm_dqrr_entry *dq;
> + struct qman_fq *fq;
> + enum qman_cb_dqrr_result res;
> + unsigned int limit = 0;
> + struct qman_portal *p = get_affine_portal();
> +#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
Use DPDK specific primitive(RTE_BYTE_ORDER)
> + struct qm_dqrr_entry *shadow;
> +#endif
> + unsigned int rx_number = 0;
> +
> + do {
@@ -41,6 +41,7 @@
#include "qman.h"
#include <rte_branch_prediction.h>
#include <rte_dpaa_bus.h>
+#include <rte_eventdev.h>
/* Compilation constants */
#define DQRR_MAXFILL 15
@@ -1144,6 +1145,74 @@ unsigned int qman_portal_poll_rx(unsigned int poll_limit,
return limit;
}
+u32 qman_portal_dequeue(struct rte_event ev[], unsigned int poll_limit,
+ void **bufs)
+{
+ const struct qm_dqrr_entry *dq;
+ struct qman_fq *fq;
+ enum qman_cb_dqrr_result res;
+ unsigned int limit = 0;
+ struct qman_portal *p = get_affine_portal();
+#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
+ struct qm_dqrr_entry *shadow;
+#endif
+ unsigned int rx_number = 0;
+
+ do {
+ qm_dqrr_pvb_update(&p->p);
+ dq = qm_dqrr_current(&p->p);
+ if (!dq)
+ break;
+#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
+ /*
+ * If running on an LE system the fields of the
+ * dequeue entry must be swapper. Because the
+ * QMan HW will ignore writes the DQRR entry is
+ * copied and the index stored within the copy
+ */
+ shadow = &p->shadow_dqrr[DQRR_PTR2IDX(dq)];
+ *shadow = *dq;
+ dq = shadow;
+ shadow->fqid = be32_to_cpu(shadow->fqid);
+ shadow->contextB = be32_to_cpu(shadow->contextB);
+ shadow->seqnum = be16_to_cpu(shadow->seqnum);
+ hw_fd_to_cpu(&shadow->fd);
+#endif
+
+ /* SDQCR: context_b points to the FQ */
+#ifdef CONFIG_FSL_QMAN_FQ_LOOKUP
+ fq = get_fq_table_entry(dq->contextB);
+#else
+ fq = (void *)(uintptr_t)dq->contextB;
+#endif
+ /* Now let the callback do its stuff */
+ res = fq->cb.dqrr_dpdk_cb(&ev[rx_number], p, fq,
+ dq, &bufs[rx_number]);
+ rx_number++;
+ /* Interpret 'dq' from a driver perspective. */
+ /*
+ * Parking isn't possible unless HELDACTIVE was set. NB,
+ * FORCEELIGIBLE implies HELDACTIVE, so we only need to
+ * check for HELDACTIVE to cover both.
+ */
+ DPAA_ASSERT((dq->stat & QM_DQRR_STAT_FQ_HELDACTIVE) ||
+ (res != qman_cb_dqrr_park));
+ if (res != qman_cb_dqrr_defer)
+ qm_dqrr_cdc_consume_1ptr(&p->p, dq,
+ res == qman_cb_dqrr_park);
+ /* Move forward */
+ qm_dqrr_next(&p->p);
+ /*
+ * Entry processed and consumed, increment our counter. The
+ * callback can request that we exit after consuming the
+ * entry, and we also exit if we reach our processing limit,
+ * so loop back only if neither of these conditions is met.
+ */
+ } while (++limit < poll_limit);
+
+ return limit;
+}
+
struct qm_dqrr_entry *qman_dequeue(struct qman_fq *fq)
{
struct qman_portal *p = get_affine_portal();
@@ -1262,13 +1331,20 @@ u32 qman_static_dequeue_get(struct qman_portal *qp)
return p->sdqcr;
}
-void qman_dca(struct qm_dqrr_entry *dq, int park_request)
+void qman_dca(const struct qm_dqrr_entry *dq, int park_request)
{
struct qman_portal *p = get_affine_portal();
qm_dqrr_cdc_consume_1ptr(&p->p, dq, park_request);
}
+void qman_dca_index(u8 index, int park_request)
+{
+ struct qman_portal *p = get_affine_portal();
+
+ qm_dqrr_cdc_consume_1(&p->p, index, park_request);
+}
+
/* Frame queue API */
static const char *mcr_result_str(u8 result)
{
@@ -2116,8 +2192,8 @@ int qman_enqueue(struct qman_fq *fq, const struct qm_fd *fd, u32 flags)
}
int qman_enqueue_multi(struct qman_fq *fq,
- const struct qm_fd *fd,
- int frames_to_send)
+ const struct qm_fd *fd, u32 *flags,
+ int frames_to_send)
{
struct qman_portal *p = get_affine_portal();
struct qm_portal *portal = &p->p;
@@ -2125,7 +2201,7 @@ int qman_enqueue_multi(struct qman_fq *fq,
register struct qm_eqcr *eqcr = &portal->eqcr;
struct qm_eqcr_entry *eq = eqcr->cursor, *prev_eq;
- u8 i, diff, old_ci, sent = 0;
+ u8 i = 0, diff, old_ci, sent = 0;
/* Update the available entries if no entry is free */
if (!eqcr->available) {
@@ -2149,7 +2225,11 @@ int qman_enqueue_multi(struct qman_fq *fq,
eq->fd.addr = cpu_to_be40(fd->addr);
eq->fd.status = cpu_to_be32(fd->status);
eq->fd.opaque = cpu_to_be32(fd->opaque);
-
+ if (flags[i] & QMAN_ENQUEUE_FLAG_DCA) {
+ eq->dca = QM_EQCR_DCA_ENABLE |
+ ((flags[i] >> 8) & QM_EQCR_DCA_IDXMASK);
+ }
+ i++;
eq = (void *)((unsigned long)(eq + 1) &
(~(unsigned long)(QM_EQCR_SIZE << 6)));
eqcr->available--;
@@ -80,6 +80,7 @@ pthread_key_t dpaa_portal_key;
unsigned int dpaa_svr_family;
RTE_DEFINE_PER_LCORE(bool, _dpaa_io);
+RTE_DEFINE_PER_LCORE(struct dpaa_portal_dqrr, held_bufs);
static inline void
dpaa_add_to_device_list(struct rte_dpaa_device *dev)
@@ -45,6 +45,7 @@ extern "C" {
#endif
#include <dpaa_rbtree.h>
+#include <rte_eventdev.h>
/* FQ lookups (turn this on for 64bit user-space) */
#if (__WORDSIZE == 64)
@@ -1239,6 +1240,7 @@ struct qman_fq {
/* DPDK Interface */
void *dpaa_intf;
+ struct rte_event ev;
/* affined portal in case of static queue */
struct qman_portal *qp;
@@ -1329,6 +1331,9 @@ struct qman_cgr {
*/
int qman_get_portal_index(void);
+u32 qman_portal_dequeue(struct rte_event ev[], unsigned int poll_limit,
+ void **bufs);
+
/**
* qman_affine_channel - return the channel ID of an portal
* @cpu: the cpu whose affine portal is the subject of the query
@@ -1462,7 +1467,21 @@ u32 qman_static_dequeue_get(struct qman_portal *qp);
* function must be called from the same CPU as that which processed the DQRR
* entry in the first place.
*/
-void qman_dca(struct qm_dqrr_entry *dq, int park_request);
+void qman_dca(const struct qm_dqrr_entry *dq, int park_request);
+
+/**
+ * qman_dca_index - Perform a Discrete Consumption Acknowledgment
+ * @index: the DQRR index to be consumed
+ * @park_request: indicates whether the held-active @fq should be parked
+ *
+ * Only allowed in DCA-mode portals, for DQRR entries whose handler callback had
+ * previously returned 'qman_cb_dqrr_defer'. NB, as with the other APIs, this
+ * does not take a 'portal' argument but implies the core affine portal from the
+ * cpu that is currently executing the function. For reasons of locking, this
+ * function must be called from the same CPU as that which processed the DQRR
+ * entry in the first place.
+ */
+void qman_dca_index(u8 index, int park_request);
/**
* qman_eqcr_is_empty - Determine if portal's EQCR is empty
@@ -1730,9 +1749,8 @@ int qman_volatile_dequeue(struct qman_fq *fq, u32 flags, u32 vdqcr);
*/
int qman_enqueue(struct qman_fq *fq, const struct qm_fd *fd, u32 flags);
-int qman_enqueue_multi(struct qman_fq *fq,
- const struct qm_fd *fd,
- int frames_to_send);
+int qman_enqueue_multi(struct qman_fq *fq, const struct qm_fd *fd, u32 *flags,
+ int frames_to_send);
typedef int (*qman_cb_precommit) (void *arg);
@@ -69,14 +69,19 @@ DPDK_18.02 {
global:
dpaa_svr_family;
+ per_lcore_held_bufs;
+ qm_channel_pool1;
qman_alloc_cgrid_range;
qman_alloc_pool_range;
qman_create_cgr;
+ qman_dca_index;
qman_delete_cgr;
qman_modify_cgr;
+ qman_portal_dequeue;
qman_portal_poll_rx;
qman_query_fq_frm_cnt;
qman_release_cgrid_range;
+ qman_static_dequeue_add;
rte_dpaa_portal_fq_close;
rte_dpaa_portal_fq_init;
@@ -181,6 +181,20 @@ static void dpaainitfn_ ##nm(void) \
} \
RTE_PMD_EXPORT_NAME(nm, __COUNTER__)
+/* Create storage for dqrr entries per lcore */
+#define DPAA_PORTAL_DEQUEUE_DEPTH 16
+struct dpaa_portal_dqrr {
+ void *mbuf[DPAA_PORTAL_DEQUEUE_DEPTH];
+ uint64_t dqrr_held;
+ uint8_t dqrr_size;
+};
+
+RTE_DECLARE_PER_LCORE(struct dpaa_portal_dqrr, held_bufs);
+
+#define DPAA_PER_LCORE_DQRR_SIZE RTE_PER_LCORE(held_bufs).dqrr_size
+#define DPAA_PER_LCORE_DQRR_HELD RTE_PER_LCORE(held_bufs).dqrr_held
+#define DPAA_PER_LCORE_DQRR_MBUF(i) RTE_PER_LCORE(held_bufs).mbuf[i]
+
#ifdef __cplusplus
}
#endif
@@ -800,6 +800,7 @@ dpaa_eth_queue_tx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs)
loop = 0;
while (loop < frames_to_send) {
loop += qman_enqueue_multi(q, &fd_arr[loop],
+ NULL,
frames_to_send - loop);
}
nb_bufs -= frames_to_send;