From patchwork Fri Dec 15 11:26:23 2017 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Liang, Ma" X-Patchwork-Id: 32308 X-Patchwork-Delegate: jerinj@marvell.com Return-Path: X-Original-To: patchwork@dpdk.org Delivered-To: patchwork@dpdk.org Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 010E61B023; Fri, 15 Dec 2017 12:26:30 +0100 (CET) Received: from mga01.intel.com (mga01.intel.com [192.55.52.88]) by dpdk.org (Postfix) with ESMTP id 35DF54A63 for ; Fri, 15 Dec 2017 12:26:26 +0100 (CET) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from orsmga007.jf.intel.com ([10.7.209.58]) by fmsmga101.fm.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 15 Dec 2017 03:26:26 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.45,404,1508828400"; d="scan'208";a="2943768" Received: from silpixa00398162.ir.intel.com (HELO silpixa00398162.ger.corp.intel.com) ([10.237.223.171]) by orsmga007.jf.intel.com with ESMTP; 15 Dec 2017 03:26:24 -0800 From: Liang Ma To: jerin.jacob@caviumnetworks.com Cc: dev@dpdk.org, harry.van.haaren@intel.com, bruce.richardson@intel.com, deepak.k.jain@intel.com, john.geary@intel.com, peter.mccarthy@intel.com, seanbh@gmail.com Date: Fri, 15 Dec 2017 11:26:23 +0000 Message-Id: <1513337189-137661-3-git-send-email-liang.j.ma@intel.com> X-Mailer: git-send-email 2.7.5 In-Reply-To: <1513337189-137661-1-git-send-email-liang.j.ma@intel.com> References: <1513337189-137661-1-git-send-email-liang.j.ma@intel.com> Subject: [dpdk-dev] [PATCH v2 2/8] event/opdl: add the opdl pmd header and init helper function X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" opdl_evdev.h include the main data structure of opdl device and all the function prototype need to be exposed to support eventdev API. opdl_evdev_init.c implement all initailization helper function Signed-off-by: Liang Ma Signed-off-by: Peter, Mccarthy --- drivers/event/opdl/opdl_evdev.h | 354 +++++++++++++ drivers/event/opdl/opdl_evdev_init.c | 951 +++++++++++++++++++++++++++++++++++ 2 files changed, 1305 insertions(+) create mode 100644 drivers/event/opdl/opdl_evdev.h create mode 100644 drivers/event/opdl/opdl_evdev_init.c diff --git a/drivers/event/opdl/opdl_evdev.h b/drivers/event/opdl/opdl_evdev.h new file mode 100644 index 0000000..c776d6f --- /dev/null +++ b/drivers/event/opdl/opdl_evdev.h @@ -0,0 +1,354 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2016-2017 Intel Corporation. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _OPDL_EVDEV_H_ +#define _OPDL_EVDEV_H_ + +#include +#include +#include +#include "opdl_ring.h" + +#define OPDL_QID_NUM_FIDS 1024 +#define OPDL_IQS_MAX 1 +#define OPDL_Q_PRIORITY_MAX 1 +#define OPDL_PORTS_MAX 64 +#define MAX_OPDL_CONS_Q_DEPTH 128 +/* OPDL size */ +#define OPDL_INFLIGHT_EVENTS_TOTAL 4096 +/* allow for lots of over-provisioning */ +#define OPDL_FRAGMENTS_MAX 1 + +/* report dequeue burst sizes in buckets */ +#define OPDL_DEQ_STAT_BUCKET_SHIFT 2 +/* how many packets pulled from port by sched */ +#define SCHED_DEQUEUE_BURST_SIZE 32 + +/* size of our history list */ +#define OPDL_PORT_HIST_LIST (MAX_OPDL_PROD_Q_DEPTH) + +/* how many data points use for average stats */ +#define NUM_SAMPLES 64 + +#define EVENTDEV_NAME_OPDL_PMD event_opdl +#define OPDL_PMD_NAME RTE_STR(event_opdl) +#define OPDL_PMD_NAME_MAX 64 + +#define OPDL_INVALID_QID 255 + +#define OPDL_SCHED_TYPE_DIRECT (RTE_SCHED_TYPE_PARALLEL + 1) + +#define OPDL_NUM_POLL_BUCKETS \ + (MAX_OPDL_CONS_Q_DEPTH >> OPDL_DEQ_STAT_BUCKET_SHIFT) + +enum { + QE_FLAG_VALID_SHIFT = 0, + QE_FLAG_COMPLETE_SHIFT, + QE_FLAG_NOT_EOP_SHIFT, + _QE_FLAG_COUNT +}; + +enum port_type { + OPDL_INVALID_PORT = 0, + OPDL_REGULAR_PORT = 1, + OPDL_PURE_RX_PORT, + OPDL_PURE_TX_PORT, + OPDL_ASYNC_PORT +}; + +enum queue_type { + OPDL_Q_TYPE_INVALID = 0, + OPDL_Q_TYPE_SINGLE_LINK = 1, + OPDL_Q_TYPE_ATOMIC, + OPDL_Q_TYPE_ORDERED +}; + +enum queue_pos { + OPDL_Q_POS_START = 0, + OPDL_Q_POS_MIDDLE, + OPDL_Q_POS_END +}; + +#define QE_FLAG_VALID (1 << QE_FLAG_VALID_SHIFT) /* for NEW FWD, FRAG */ +#define QE_FLAG_COMPLETE (1 << QE_FLAG_COMPLETE_SHIFT) /* set for FWD, DROP */ +#define QE_FLAG_NOT_EOP (1 << QE_FLAG_NOT_EOP_SHIFT) /* set for FRAG only */ + +static const uint8_t opdl_qe_flag_map[] = { + QE_FLAG_VALID /* NEW Event */, + QE_FLAG_VALID | QE_FLAG_COMPLETE /* FWD Event */, + QE_FLAG_COMPLETE /* RELEASE Event */, + + /* Values which can be used for future support for partial + * events, i.e. where one event comes back to the scheduler + * as multiple which need to be tracked together + */ + QE_FLAG_VALID | QE_FLAG_COMPLETE | QE_FLAG_NOT_EOP, +}; + +#define OPDL_LOG_INFO(fmt, args...) \ + RTE_LOG(INFO, EVENTDEV, "[%s] line %u: " fmt "\n", \ + OPDL_PMD_NAME, \ + __LINE__, ## args) + +#ifdef RTE_LIBRTE_PMD_EVDEV_OPDL_DEBUG +#define OPDL_LOG_DBG(fmt, args...) \ + RTE_LOG(DEBUG, EVENTDEV, "[%s] %s() line %u: " fmt "\n", \ + OPDL_PMD_NAME, \ + __func__, __LINE__, ## args) +#else +#define OPDL_LOG_DBG(fmt, args...) +#endif + +#define OPDL_LOG_ERR(fmt, args...) \ + RTE_LOG(ERR, EVENTDEV, "[%s] %s() line %u: " fmt "\n", \ + OPDL_PMD_NAME, \ + __func__, __LINE__, ## args) + +enum port_xstat_name { + claim_pkts_requested = 0, + claim_pkts_granted, + claim_non_empty, + claim_empty, + total_cycles, + max_num_port_xstat +}; + +#define OPDL_MAX_PORT_XSTAT_NUM (OPDL_PORTS_MAX * max_num_port_xstat) + +struct opdl_port; + +typedef uint16_t (*opdl_enq_operation)(struct opdl_port *port, + const struct rte_event ev[], + uint16_t num); + +typedef uint16_t (*opdl_deq_operation)(struct opdl_port *port, + struct rte_event ev[], + uint16_t num); + +struct opdl_evdev; + +struct opdl_stage_meta_data { + uint32_t num_claimed; /* number of entries claimed by this stage */ + uint32_t burst_sz; /* Port claim burst size */ +}; + +struct opdl_port { + + /* back pointer */ + struct opdl_evdev *opdl; + + /* enq handler & stage instance */ + opdl_enq_operation enq; + struct opdl_stage *enq_stage_inst; + + /* deq handler & stage instance */ + opdl_deq_operation deq; + struct opdl_stage *deq_stage_inst; + + /* port id has correctly been set */ + uint8_t configured; + + /* set when the port is initialized */ + uint8_t initialized; + + /* A numeric ID for the port */ + uint8_t id; + + /* Space for claimed entries */ + struct rte_event *entries[MAX_OPDL_CONS_Q_DEPTH]; + + /* RX/REGULAR/TX/ASYNC - determined on position in queue */ + enum port_type p_type; + + /* if the claim is static atomic type */ + bool atomic_claim; + + /* Queue linked to this port - internal queue id*/ + uint8_t queue_id; + + /* Queue linked to this port - external queue id*/ + uint8_t external_qid; + + /* Next queue linked to this port - external queue id*/ + uint8_t next_external_qid; + + /* number of instances of this stage */ + uint32_t num_instance; + + /* instance ID of this stage*/ + uint32_t instance_id; + + /* track packets in and out of this port */ + uint64_t port_stat[max_num_port_xstat]; + uint64_t start_cycles; +}; + +struct opdl_queue_meta_data { + uint8_t ext_id; + enum queue_type type; + int8_t setup; +}; + +struct opdl_xstats_entry { + struct rte_event_dev_xstats_name stat; + unsigned int id; + uint64_t *value; +}; + +struct opdl_queue { + + /* Opdl ring this queue is associated with */ + uint32_t opdl_id; + + /* type and position have correctly been set */ + uint8_t configured; + + /* port number and associated ports have been associated */ + uint8_t initialized; + + /* type of this queue (Atomic, Ordered, Parallel, Direct)*/ + enum queue_type q_type; + + /* position of queue (START, MIDDLE, END) */ + enum queue_pos q_pos; + + /* external queue id. It is mapped to the queue position */ + uint8_t external_qid; + + struct opdl_port *ports[OPDL_PORTS_MAX]; + uint32_t nb_ports; + + /* priority, reserved for future */ + uint8_t priority; +}; + + +#define OPDL_TUR_PER_DEV 12 + +/* PMD needs an extra queue per Opdl */ +#define OPDL_MAX_QUEUES (RTE_EVENT_MAX_QUEUES_PER_DEV - OPDL_TUR_PER_DEV) + + +struct opdl_evdev { + struct rte_eventdev_data *data; + + uint8_t started; + + /* Max number of ports and queues*/ + uint32_t max_port_nb; + uint32_t max_queue_nb; + + /* slots in the opdl ring */ + uint32_t nb_events_limit; + + /* + * Array holding all opdl for this device + */ + struct opdl_ring *opdl[OPDL_TUR_PER_DEV]; + uint32_t nb_opdls; + + struct opdl_queue_meta_data q_md[OPDL_MAX_QUEUES]; + uint32_t nb_q_md; + + /* Internal queues - one per logical queue */ + struct opdl_queue + queue[RTE_EVENT_MAX_QUEUES_PER_DEV] __rte_cache_aligned; + + uint32_t nb_queues; + + struct opdl_stage_meta_data s_md[OPDL_PORTS_MAX]; + + /* Contains all ports - load balanced and directed */ + struct opdl_port ports[OPDL_PORTS_MAX] __rte_cache_aligned; + uint32_t nb_ports; + + uint8_t q_map_ex_to_in[OPDL_INVALID_QID]; + + /* Stats */ + struct opdl_xstats_entry port_xstat[OPDL_MAX_PORT_XSTAT_NUM]; + + char service_name[OPDL_PMD_NAME_MAX]; + int socket; + int do_validation; + int do_test; +}; + + +static inline struct opdl_evdev * +opdl_pmd_priv(const struct rte_eventdev *eventdev) +{ + return eventdev->data->dev_private; +} + +static inline const struct opdl_evdev * +opdl_pmd_priv_const(const struct rte_eventdev *eventdev) +{ + return eventdev->data->dev_private; +} + +uint16_t opdl_event_enqueue(void *port, const struct rte_event *ev); +uint16_t opdl_event_enqueue_burst(void *port, const struct rte_event ev[], + uint16_t num); + +uint16_t opdl_event_dequeue(void *port, struct rte_event *ev, uint64_t wait); +uint16_t opdl_event_dequeue_burst(void *port, struct rte_event *ev, + uint16_t num, uint64_t wait); +void opdl_event_schedule(struct rte_eventdev *dev); + +void opdl_xstats_init(struct rte_eventdev *dev); +int opdl_xstats_uninit(struct rte_eventdev *dev); +int opdl_xstats_get_names(const struct rte_eventdev *dev, + enum rte_event_dev_xstats_mode mode, uint8_t queue_port_id, + struct rte_event_dev_xstats_name *xstats_names, + unsigned int *ids, unsigned int size); +int opdl_xstats_get(const struct rte_eventdev *dev, + enum rte_event_dev_xstats_mode mode, uint8_t queue_port_id, + const unsigned int ids[], uint64_t values[], unsigned int n); +uint64_t opdl_xstats_get_by_name(const struct rte_eventdev *dev, + const char *name, unsigned int *id); +int opdl_xstats_reset(struct rte_eventdev *dev, + enum rte_event_dev_xstats_mode mode, + int16_t queue_port_id, + const uint32_t ids[], + uint32_t nb_ids); + +int opdl_add_event_handlers(struct rte_eventdev *dev); +int build_all_dependencies(struct rte_eventdev *dev); +int check_queues_linked(struct rte_eventdev *dev); +int create_queues_and_rings(struct rte_eventdev *dev); +int initialise_all_other_ports(struct rte_eventdev *dev); +int initialise_queue_zero_ports(struct rte_eventdev *dev); +int assign_internal_queue_ids(struct rte_eventdev *dev); +void destroy_queues_and_rings(struct rte_eventdev *dev); +int opdl_selftest(void); + +#endif /* _OPDL_EVDEV_H_ */ diff --git a/drivers/event/opdl/opdl_evdev_init.c b/drivers/event/opdl/opdl_evdev_init.c new file mode 100644 index 0000000..e128afb --- /dev/null +++ b/drivers/event/opdl/opdl_evdev_init.c @@ -0,0 +1,951 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2016-2017 Intel Corporation. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "opdl_evdev.h" +#include "opdl_ring.h" + + +static inline uint32_t __attribute__((always_inline)) +enqueue_check(struct opdl_port *p, + const struct rte_event ev[], + uint16_t num, + uint16_t num_events) +{ + uint16_t i; + + if (p->opdl->do_validation) { + + for (i = 0; i < num; i++) { + if (ev[i].queue_id != p->next_external_qid) { + OPDL_LOG_ERR("ERROR - port:[%u] - event wants" + " to enq to q_id[%u]," + " but should be [%u]\n", + p->id, + ev[i].queue_id, + p->next_external_qid); + rte_errno = -EINVAL; + return 0; + } + } + + /* Stats */ + if (p->p_type == OPDL_PURE_RX_PORT || + p->p_type == OPDL_ASYNC_PORT) { + /* Stats */ + if (num_events) { + p->port_stat[claim_pkts_requested] += num; + p->port_stat[claim_pkts_granted] += num_events; + p->port_stat[claim_non_empty]++; + p->start_cycles = rte_rdtsc(); + } else { + p->port_stat[claim_empty]++; + p->start_cycles = 0; + } + } else { + if (p->start_cycles) { + uint64_t end_cycles = rte_rdtsc(); + p->port_stat[total_cycles] += + end_cycles - p->start_cycles; + } + } + } else { + if (num > 0 && + ev[0].queue_id != p->next_external_qid) { + rte_errno = -EINVAL; + return 0; + } + } + + return num; +} + +static inline void __attribute__((always_inline)) +update_on_dequeue(struct opdl_port *p, + struct rte_event ev[], + uint16_t num, + uint16_t num_events) +{ + if (p->opdl->do_validation) { + int16_t i; + for (i = 0; i < num; i++) + ev[i].queue_id = + p->opdl->queue[p->queue_id].external_qid; + + /* Stats */ + if (num_events) { + p->port_stat[claim_pkts_requested] += num; + p->port_stat[claim_pkts_granted] += num_events; + p->port_stat[claim_non_empty]++; + p->start_cycles = rte_rdtsc(); + } else { + p->port_stat[claim_empty]++; + p->start_cycles = 0; + } + } else { + if (num > 0) + ev[0].queue_id = + p->opdl->queue[p->queue_id].external_qid; + } +} + + +/* + * Error RX enqueue: + * + * + */ + +static uint16_t +opdl_rx_error_enqueue(struct opdl_port *p, + const struct rte_event ev[], + uint16_t num) +{ + RTE_SET_USED(p); + RTE_SET_USED(ev); + RTE_SET_USED(num); + + rte_errno = -ENOSPC; + + return 0; +} + +/* + * RX enqueue: + * + * This function handles enqueue for a single input stage_inst with + * threadsafe disabled or enabled. eg 1 thread using a stage_inst or + * multiple threads sharing a stage_inst + */ + +static uint16_t +opdl_rx_enqueue(struct opdl_port *p, + const struct rte_event ev[], + uint16_t num) +{ + uint16_t enqueued = 0; + + enqueued = opdl_ring_input(opdl_stage_get_opdl_ring(p->enq_stage_inst), + ev, + num, + false); + if (!enqueue_check(p, ev, num, enqueued)) + return 0; + + + if (enqueued < num) + rte_errno = -ENOSPC; + + return enqueued; +} + +/* + * Error TX handler + * + */ + +static uint16_t +opdl_tx_error_dequeue(struct opdl_port *p, + struct rte_event ev[], + uint16_t num) +{ + RTE_SET_USED(p); + RTE_SET_USED(ev); + RTE_SET_USED(num); + + rte_errno = -ENOSPC; + + return 0; +} + +/* + * TX single threaded claim + * + * This function handles dequeue for a single worker stage_inst with + * threadsafe disabled. eg 1 thread using an stage_inst + */ + +static uint16_t +opdl_tx_dequeue_single_thread(struct opdl_port *p, + struct rte_event ev[], + uint16_t num) +{ + uint16_t returned; + + struct opdl_ring *ring; + + ring = opdl_stage_get_opdl_ring(p->deq_stage_inst); + + returned = opdl_ring_copy_to_burst(ring, + p->deq_stage_inst, + ev, + num, + false); + + update_on_dequeue(p, ev, num, returned); + + return returned; +} + +/* + * TX multi threaded claim + * + * This function handles dequeue for multiple worker stage_inst with + * threadsafe disabled. eg multiple stage_inst each with its own instance + */ + +static uint16_t +opdl_tx_dequeue_multi_inst(struct opdl_port *p, + struct rte_event ev[], + uint16_t num) +{ + uint32_t num_events = 0; + + num_events = opdl_stage_claim(p->deq_stage_inst, + (void *)ev, + num, + NULL, + false, + false); + + update_on_dequeue(p, ev, num, num_events); + + return opdl_stage_disclaim(p->deq_stage_inst, num_events, false); +} + + +/* + * Worker thread claim + * + */ + +static uint16_t +opdl_claim(struct opdl_port *p, struct rte_event ev[], uint16_t num) +{ + uint32_t num_events = 0; + + if (unlikely(num > MAX_OPDL_CONS_Q_DEPTH)) { + OPDL_LOG_ERR("" + "Attempt to dequeue num of events larger than port (%d) max\n", + p->id); + rte_errno = -EINVAL; + return 0; + } + + + num_events = opdl_stage_claim(p->deq_stage_inst, + (void *)ev, + num, + NULL, + false, + p->atomic_claim); + + + update_on_dequeue(p, ev, num, num_events); + + return num_events; +} + +/* + * Worker thread disclaim + */ + +static uint16_t +opdl_disclaim(struct opdl_port *p, const struct rte_event ev[], uint16_t num) +{ + uint16_t enqueued = 0; + + uint32_t i = 0; + + for (i = 0; i < num; i++) + opdl_ring_cas_slot(p->enq_stage_inst, &ev[i], i, p->atomic_claim); + + + enqueued = opdl_stage_disclaim(p->enq_stage_inst, + num, + false); + + return enqueue_check(p, ev, num, enqueued); +} + +static inline struct opdl_stage *__attribute__((always_inline)) +stage_for_port(struct opdl_queue *q, unsigned int i) +{ + if (q->q_pos == OPDL_Q_POS_START || q->q_pos == OPDL_Q_POS_MIDDLE) + return q->ports[i]->enq_stage_inst; + else + return q->ports[i]->deq_stage_inst; +} + +static int opdl_add_deps(struct opdl_evdev *device, + int q_id, + int deps_q_id) +{ + unsigned int i, j; + int status; + struct opdl_ring *ring; + struct opdl_queue *queue = &device->queue[q_id]; + struct opdl_queue *queue_deps = &device->queue[deps_q_id]; + struct opdl_stage *dep_stages[OPDL_PORTS_MAX]; + + /* sanity check that all stages are for same opdl ring */ + for (i = 0; i < queue->nb_ports; i++) { + struct opdl_ring *r = + opdl_stage_get_opdl_ring(stage_for_port(queue, i)); + for (j = 0; j < queue_deps->nb_ports; j++) { + struct opdl_ring *rj = + opdl_stage_get_opdl_ring( + stage_for_port(queue_deps, j)); + if (r != rj) { + OPDL_LOG_ERR("Stages and dependents" + " are not for same opdl ring"); + for (uint32_t k = 0; + k < device->nb_opdls; k++) { + opdl_ring_dump(device->opdl[k], + stdout); + } + return -EINVAL; + } + } + } + + /* Gather all stages instance in deps */ + for (i = 0; i < queue_deps->nb_ports; i++) + dep_stages[i] = stage_for_port(queue_deps, i); + + + /* Add all deps for each port->stage_inst in this queue */ + for (i = 0; i < queue->nb_ports; i++) { + + ring = opdl_stage_get_opdl_ring(stage_for_port(queue, i)); + + status = opdl_stage_deps_add(ring, + stage_for_port(queue, i), + queue->ports[i]->num_instance, + queue->ports[i]->instance_id, + dep_stages, + queue_deps->nb_ports); + if (status < 0) + return -EINVAL; + } + + return 0; +} + +int +opdl_add_event_handlers(struct rte_eventdev *dev) +{ + int err = 0; + + struct opdl_evdev *device = opdl_pmd_priv(dev); + unsigned int i; + + for (i = 0; i < device->max_port_nb; i++) { + + struct opdl_port *port = &device->ports[i]; + + if (port->configured) { + if (port->p_type == OPDL_PURE_RX_PORT) { + port->enq = opdl_rx_enqueue; + port->deq = opdl_tx_error_dequeue; + + } else if (port->p_type == OPDL_PURE_TX_PORT) { + + port->enq = opdl_rx_error_enqueue; + + if (port->num_instance == 1) + port->deq = + opdl_tx_dequeue_single_thread; + else + port->deq = opdl_tx_dequeue_multi_inst; + + } else if (port->p_type == OPDL_REGULAR_PORT) { + + port->enq = opdl_disclaim; + port->deq = opdl_claim; + + } else if (port->p_type == OPDL_ASYNC_PORT) { + + port->enq = opdl_rx_enqueue; + + /* Always single instance */ + port->deq = opdl_tx_dequeue_single_thread; + } else { + OPDL_LOG_ERR("port:[%u] has invalid port type - ", + port->id); + err = -EINVAL; + break; + } + port->initialized = 1; + } + } + + if (!err) + fprintf(stdout, "Success - enqueue/dequeue handler(s) added\n"); + return err; +} + +int +build_all_dependencies(struct rte_eventdev *dev) +{ + + int err = 0; + unsigned int i; + struct opdl_evdev *device = opdl_pmd_priv(dev); + + uint8_t start_qid = 0; + + for (i = 0; i < RTE_EVENT_MAX_QUEUES_PER_DEV; i++) { + struct opdl_queue *queue = &device->queue[i]; + if (!queue->initialized) + break; + + if (queue->q_pos == OPDL_Q_POS_START) { + start_qid = i; + continue; + } + + if (queue->q_pos == OPDL_Q_POS_MIDDLE) { + err = opdl_add_deps(device, i, i-1); + if (err < 0) { + OPDL_LOG_ERR("dependancy addition for queue:[%u] - FAILED", + queue->external_qid); + break; + } + } + + if (queue->q_pos == OPDL_Q_POS_END) { + /* Add this dependency */ + err = opdl_add_deps(device, i, i-1); + if (err < 0) { + OPDL_LOG_ERR("dependancy addition for queue:[%u] - FAILED", + queue->external_qid); + break; + } + /* Add dependency for rx on tx */ + err = opdl_add_deps(device, start_qid, i); + if (err < 0) { + OPDL_LOG_ERR("dependancy addition for queue:[%u] - FAILED", + queue->external_qid); + break; + } + } + } + + if (!err) + fprintf(stdout, "Success - dependencies built\n"); + + return err; +} +int +check_queues_linked(struct rte_eventdev *dev) +{ + + int err = 0; + unsigned int i; + struct opdl_evdev *device = opdl_pmd_priv(dev); + uint32_t nb_iq = 0; + + for (i = 0; i < RTE_EVENT_MAX_QUEUES_PER_DEV; i++) { + struct opdl_queue *queue = &device->queue[i]; + + if (!queue->initialized) + break; + + if (queue->external_qid == OPDL_INVALID_QID) + nb_iq++; + + if (queue->nb_ports == 0) { + OPDL_LOG_ERR("queue:[%u] has no associated ports", + i); + err = -EINVAL; + break; + } + } + if (!err) { + if ((i - nb_iq) != device->max_queue_nb) { + OPDL_LOG_ERR("%u queues counted but should be %u", + i - nb_iq, + device->max_queue_nb); + err = -1; + } else { + fprintf(stdout, "Success - %u queues (ex:%u + in:%u) validated\n", + i, + device->max_queue_nb, + nb_iq); + } + + } + return err; +} + +void +destroy_queues_and_rings(struct rte_eventdev *dev) +{ + struct opdl_evdev *device = opdl_pmd_priv(dev); + + for (uint32_t i = 0; i < device->nb_opdls; i++) { + if (device->opdl[i]) + opdl_ring_free(device->opdl[i]); + } + + memset(&device->queue, + 0, + sizeof(struct opdl_queue) + * RTE_EVENT_MAX_QUEUES_PER_DEV); +} + +#define OPDL_ID(d)(d->nb_opdls - 1) + +static inline void +initialise_queue(struct opdl_evdev *device, + enum queue_pos pos, + int32_t i) +{ + struct opdl_queue *queue = &device->queue[device->nb_queues]; + + if (i == -1) { + queue->q_type = OPDL_Q_TYPE_ORDERED; + queue->external_qid = OPDL_INVALID_QID; + } else { + queue->q_type = device->q_md[i].type; + queue->external_qid = device->q_md[i].ext_id; + /* Add ex->in for queues setup */ + device->q_map_ex_to_in[queue->external_qid] = device->nb_queues; + } + queue->opdl_id = OPDL_ID(device); + queue->q_pos = pos; + queue->nb_ports = 0; + queue->configured = 1; + + device->nb_queues++; +} + + +static inline int +create_opdl(struct opdl_evdev *device) +{ + int err = 0; + + char name[RTE_MEMZONE_NAMESIZE]; + + sprintf(name, "%s_%u", device->service_name, device->nb_opdls); + + device->opdl[device->nb_opdls] = + opdl_ring_create(name, + device->nb_events_limit, + sizeof(struct rte_event), + device->max_port_nb * 2, + device->socket); + + if (!device->opdl[device->nb_opdls]) { + OPDL_LOG_ERR("opdl ring %u creation - FAILED", + device->nb_opdls); + err = -EINVAL; + } else { + device->nb_opdls++; + } + return err; +} + +static inline int +create_link_opdl(struct opdl_evdev *device, uint32_t index) +{ + + int err = 0; + + if (device->q_md[index + 1].type != + OPDL_Q_TYPE_SINGLE_LINK) { + + /* async queue with regular + * queue following it + */ + + /* create a new opdl ring */ + err = create_opdl(device); + if (!err) { + /* create an initial + * dummy queue for new opdl + */ + initialise_queue(device, + OPDL_Q_POS_START, + -1); + } else { + err = -EINVAL; + } + } else { + OPDL_LOG_ERR("queue %u, 2" + " SINGLE_LINK queues, not allowed", + index); + err = -EINVAL; + } + + return err; +} + +int +create_queues_and_rings(struct rte_eventdev *dev) +{ + int err = 0; + + struct opdl_evdev *device = opdl_pmd_priv(dev); + + device->nb_queues = 0; + + if (device->nb_ports != device->max_port_nb) { + OPDL_LOG_ERR("Number ports setup:%u NOT EQUAL to max port" + " number:%u for this device", + device->nb_ports, + device->max_port_nb); + err = -1; + } + + if (!err) { + /* We will have at least one opdl so create it now */ + err = create_opdl(device); + } + + if (!err) { + + /* Create 1st "dummy" queue */ + initialise_queue(device, + OPDL_Q_POS_START, + -1); + + for (uint32_t i = 0; i < device->nb_q_md; i++) { + + /* Check */ + if (!device->q_md[i].setup) { + + OPDL_LOG_ERR("queue meta data slot %u" + " not setup - FAILING", + i); + err = -EINVAL; + break; + } else if (device->q_md[i].type != + OPDL_Q_TYPE_SINGLE_LINK) { + + if (!device->q_md[i + 1].setup) { + /* Create a simple ORDERED/ATOMIC + * queue at the end + */ + initialise_queue(device, + OPDL_Q_POS_END, + i); + + } else { + /* Create a simple ORDERED/ATOMIC + * queue in the middle + */ + initialise_queue(device, + OPDL_Q_POS_MIDDLE, + i); + } + } else if (device->q_md[i].type == + OPDL_Q_TYPE_SINGLE_LINK) { + + /* create last queue for this opdl */ + initialise_queue(device, + OPDL_Q_POS_END, + i); + + err = create_link_opdl(device, i); + + if (err) + break; + + + } + } + } + if (err) + destroy_queues_and_rings(dev); + else + fprintf(stdout, "Success - Created %u queues and %u opdls\n", + device->nb_queues, + device->nb_opdls); + + return err; +} + + +int +initialise_all_other_ports(struct rte_eventdev *dev) +{ + int err = 0; + struct opdl_stage *stage_inst = NULL; + + struct opdl_evdev *device = opdl_pmd_priv(dev); + + for (uint32_t i = 0; i < device->nb_ports; i++) { + struct opdl_port *port = &device->ports[i]; + struct opdl_queue *queue = &device->queue[port->queue_id]; + + if (port->queue_id == 0) { + continue; + } else if (queue->q_type != OPDL_Q_TYPE_SINGLE_LINK) { + + if (queue->q_pos == OPDL_Q_POS_MIDDLE) { + + /* Regular port with claim/disclaim */ + stage_inst = opdl_stage_add( + device->opdl[queue->opdl_id], + false, + false); + port->deq_stage_inst = stage_inst; + port->enq_stage_inst = stage_inst; + + if (queue->q_type == OPDL_Q_TYPE_ATOMIC) + port->atomic_claim = true; + else + port->atomic_claim = false; + + port->p_type = OPDL_REGULAR_PORT; + + /* Add the port to the queue array of ports */ + queue->ports[queue->nb_ports] = port; + port->instance_id = queue->nb_ports; + queue->nb_ports++; + } else if (queue->q_pos == OPDL_Q_POS_END) { + + /* tx port */ + stage_inst = opdl_stage_add( + device->opdl[queue->opdl_id], + false, + false); + port->deq_stage_inst = stage_inst; + port->enq_stage_inst = NULL; + port->p_type = OPDL_PURE_TX_PORT; + + /* Add the port to the queue array of ports */ + queue->ports[queue->nb_ports] = port; + port->instance_id = queue->nb_ports; + queue->nb_ports++; + } else { + + OPDL_LOG_ERR("port %u:, linked incorrectly" + " to a q_pos START/INVALID %u", + port->id, + queue->q_pos); + err = -EINVAL; + break; + } + + } else if (queue->q_type == OPDL_Q_TYPE_SINGLE_LINK) { + + port->p_type = OPDL_ASYNC_PORT; + + /* -- tx -- */ + stage_inst = opdl_stage_add( + device->opdl[queue->opdl_id], + false, + false); /* First stage */ + port->deq_stage_inst = stage_inst; + + /* Add the port to the queue array of ports */ + queue->ports[queue->nb_ports] = port; + port->instance_id = queue->nb_ports; + queue->nb_ports++; + + if (queue->nb_ports > 1) { + OPDL_LOG_ERR("queue %u:, setup as SINGLE_LINK" + " but has more than one port linked", + queue->external_qid); + err = -EINVAL; + break; + } + + /* -- single instance rx for next opdl -- */ + uint8_t next_qid = + device->q_map_ex_to_in[queue->external_qid] + 1; + if (next_qid < RTE_EVENT_MAX_QUEUES_PER_DEV && + device->queue[next_qid].configured) { + + /* Remap the queue */ + queue = &device->queue[next_qid]; + + stage_inst = opdl_stage_add( + device->opdl[queue->opdl_id], + false, + true); + port->enq_stage_inst = stage_inst; + + /* Add the port to the queue array of ports */ + queue->ports[queue->nb_ports] = port; + port->instance_id = queue->nb_ports; + queue->nb_ports++; + if (queue->nb_ports > 1) { + OPDL_LOG_ERR("dummy queue %u: for " + "port %u, " + "SINGLE_LINK but has more " + "than one port linked", + next_qid, + port->id); + err = -EINVAL; + break; + } + /* Set this queue to initialized as it is never + * referenced by any ports + */ + queue->initialized = 1; + } + } + } + + /* Now that all ports are initialised we need to + * setup the last bit of stage md + */ + if (!err) { + for (uint32_t i = 0; i < device->nb_ports; i++) { + struct opdl_port *port = &device->ports[i]; + struct opdl_queue *queue = + &device->queue[port->queue_id]; + + if (port->configured && + (port->queue_id != OPDL_INVALID_QID)) { + if (queue->nb_ports == 0) { + OPDL_LOG_ERR("queue:[%u] has no ports" + " linked to it", + port->id); + err = -EINVAL; + break; + } + + port->num_instance = queue->nb_ports; + port->initialized = 1; + queue->initialized = 1; + } else { + OPDL_LOG_ERR("Port:[%u] not configured invalid" + " queue configuration", + port->id); + err = -EINVAL; + break; + } + } + } + + if (!err) { + fprintf(stdout, + "Success - %u port(s) initialized\n", + device->nb_ports); + } + return err; +} + +int +initialise_queue_zero_ports(struct rte_eventdev *dev) +{ + int err = 0; + uint8_t mt_rx = 0; + struct opdl_stage *stage_inst = NULL; + struct opdl_queue *queue = NULL; + + struct opdl_evdev *device = opdl_pmd_priv(dev); + + /* Assign queue zero and figure out how many Q0 ports we have */ + for (uint32_t i = 0; i < device->nb_ports; i++) { + struct opdl_port *port = &device->ports[i]; + if (port->queue_id == OPDL_INVALID_QID) { + port->queue_id = 0; + port->external_qid = OPDL_INVALID_QID; + port->p_type = OPDL_PURE_RX_PORT; + mt_rx++; + } + } + + /* Create the stage */ + stage_inst = opdl_stage_add(device->opdl[0], + (mt_rx > 1 ? true : false), + true); + if (stage_inst) { + + /* Assign the new created input stage to all relevant ports */ + for (uint32_t i = 0; i < device->nb_ports; i++) { + struct opdl_port *port = &device->ports[i]; + if (port->queue_id == 0) { + queue = &device->queue[port->queue_id]; + port->enq_stage_inst = stage_inst; + port->deq_stage_inst = NULL; + port->configured = 1; + port->initialized = 1; + + queue->ports[queue->nb_ports] = port; + port->instance_id = queue->nb_ports; + queue->nb_ports++; + } + } + } else { + err = -1; + } + + if (!err) { + fprintf(stdout, "Success - (%u) \"Queue 0\" port(s) " + "initialized\n", + queue->nb_ports); + } + return err; +} + +int +assign_internal_queue_ids(struct rte_eventdev *dev) +{ + int err = 0; + struct opdl_evdev *device = opdl_pmd_priv(dev); + + for (uint32_t i = 0; i < device->nb_ports; i++) { + struct opdl_port *port = &device->ports[i]; + if (port->external_qid != OPDL_INVALID_QID) { + port->queue_id = + device->q_map_ex_to_in[port->external_qid]; + + /* Now do the external_qid of the next queue */ + struct opdl_queue *queue = + &device->queue[port->queue_id]; + if (queue->q_pos == OPDL_Q_POS_END) + port->next_external_qid = + device->queue[port->queue_id + 2].external_qid; + else + port->next_external_qid = + device->queue[port->queue_id + 1].external_qid; + } + } + return err; +}