[dpdk-dev,v2,07/15] event/sw: add support for event queues
Checks
Commit Message
From: Bruce Richardson <bruce.richardson@intel.com>
Add in the data structures for the event queues, and the eventdev
functions to create and destroy those queues.
Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
Signed-off-by: Harry van Haaren <harry.van.haaren@intel.com>
---
drivers/event/sw/iq_ring.h | 176 ++++++++++++++++++++++++++++++++++++++++++++
drivers/event/sw/sw_evdev.c | 158 +++++++++++++++++++++++++++++++++++++++
drivers/event/sw/sw_evdev.h | 75 +++++++++++++++++++
3 files changed, 409 insertions(+)
create mode 100644 drivers/event/sw/iq_ring.h
Comments
On Tue, Jan 31, 2017 at 04:14:25PM +0000, Harry van Haaren wrote:
> From: Bruce Richardson <bruce.richardson@intel.com>
>
> Add in the data structures for the event queues, and the eventdev
> functions to create and destroy those queues.
>
> Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
> Signed-off-by: Harry van Haaren <harry.van.haaren@intel.com>
> ---
> drivers/event/sw/iq_ring.h | 176 ++++++++++++++++++++++++++++++++++++++++++++
> drivers/event/sw/sw_evdev.c | 158 +++++++++++++++++++++++++++++++++++++++
> drivers/event/sw/sw_evdev.h | 75 +++++++++++++++++++
> 3 files changed, 409 insertions(+)
> create mode 100644 drivers/event/sw/iq_ring.h
>
> + */
> +
> +/*
> + * Ring structure definitions used for the internal ring buffers of the
> + * SW eventdev implementation. These are designed for single-core use only.
> + */
If I understand it correctly, IQ and QE rings are single producer and
single consumer rings. By the specification, multiple producers through
multiple ports can enqueue to the event queues at a time.Does SW implementation
support that? or am I missing something here?
> +#ifndef _IQ_RING_
> +#define _IQ_RING_
> +
> +#include <stdint.h>
> +
> +#include <rte_common.h>
> +#include <rte_memory.h>
> +#include <rte_malloc.h>
> +#include <rte_eventdev.h>
> +
> +#define IQ_RING_NAMESIZE 12
> +#define QID_IQ_DEPTH 512
> +#define QID_IQ_MASK (uint16_t)(QID_IQ_DEPTH - 1)
> +
> +struct iq_ring {
> + char name[IQ_RING_NAMESIZE] __rte_cache_aligned;
> + uint16_t write_idx;
> + uint16_t read_idx;
> +
> + struct rte_event ring[QID_IQ_DEPTH];
> +};
> +
> +#ifndef force_inline
> +#define force_inline inline __attribute__((always_inline))
> +#endif
> +
> +static inline struct iq_ring *
> +iq_ring_create(const char *name, unsigned int socket_id)
> +{
> + struct iq_ring *retval;
> +
> + retval = rte_malloc_socket(NULL, sizeof(*retval), 0, socket_id);
> + if (retval == NULL)
> + goto end;
> +
> + snprintf(retval->name, sizeof(retval->name), "%s", name);
> + retval->write_idx = retval->read_idx = 0;
> +end:
> + return retval;
> +}
> +
> +static inline void
> +iq_ring_destroy(struct iq_ring *r)
> +{
> + rte_free(r);
> +}
> +
> +static force_inline uint16_t
> +iq_ring_count(const struct iq_ring *r)
> +{
> + return r->write_idx - r->read_idx;
> +}
> +
> +static force_inline uint16_t
> +iq_ring_free_count(const struct iq_ring *r)
> +{
> + return QID_IQ_MASK - iq_ring_count(r);
> +}
> +
> +static force_inline uint16_t
> +iq_ring_enqueue_burst(struct iq_ring *r, struct rte_event *qes, uint16_t nb_qes)
> +{
> + const uint16_t read = r->read_idx;
> + uint16_t write = r->write_idx;
> + const uint16_t space = read + QID_IQ_MASK - write;
> + uint16_t i;
> +
> + if (space < nb_qes)
> + nb_qes = space;
> +
> + for (i = 0; i < nb_qes; i++, write++)
> + r->ring[write & QID_IQ_MASK] = qes[i];
> +
> + r->write_idx = write;
> +
> + return nb_qes;
> +}
> +
> diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h
> index 65f00e4..aaa8056 100644
> --- a/drivers/event/sw/sw_evdev.h
> +++ b/drivers/event/sw/sw_evdev.h
> @@ -49,6 +49,78 @@
> #define SW_INFLIGHT_EVENTS_TOTAL 4096
> /* allow for lots of over-provisioning */
> #define MAX_SW_PROD_Q_DEPTH 4096
> +#define SW_FRAGMENTS_MAX 16
> +
> +/* have a new scheduling type for 1:1 queue to port links */
> +#define RTE_SCHED_TYPE_DIRECT (RTE_SCHED_TYPE_PARALLEL + 1)
IMO, better to use SW_ for internal sched types
> +
> +#ifdef RTE_LIBRTE_PMD_EVDEV_SW_DEBUG
> +#define SW_LOG_INFO(fmt, args...) \
> + RTE_LOG(INFO, PMD, "[%s] %s() line %u: " fmt "\n", \
> + PMD_NAME, \
> + __func__, __LINE__, ## args)
> +
> +#define SW_LOG_DBG(fmt, args...) \
> + RTE_LOG(DEBUG, PMD, "[%s] %s() line %u: " fmt "\n", \
> -----Original Message-----
> From: Jerin Jacob [mailto:jerin.jacob@caviumnetworks.com]
> Sent: Monday, February 6, 2017 9:25 AM
> To: Van Haaren, Harry <harry.van.haaren@intel.com>
> Cc: dev@dpdk.org; Richardson, Bruce <bruce.richardson@intel.com>
> Subject: Re: [PATCH v2 07/15] event/sw: add support for event queues
>
> On Tue, Jan 31, 2017 at 04:14:25PM +0000, Harry van Haaren wrote:
> > From: Bruce Richardson <bruce.richardson@intel.com>
> >
> > Add in the data structures for the event queues, and the eventdev
> > functions to create and destroy those queues.
> >
> > Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
> > Signed-off-by: Harry van Haaren <harry.van.haaren@intel.com>
> > ---
> > drivers/event/sw/iq_ring.h | 176 ++++++++++++++++++++++++++++++++++++++++++++
> > drivers/event/sw/sw_evdev.c | 158 +++++++++++++++++++++++++++++++++++++++
> > drivers/event/sw/sw_evdev.h | 75 +++++++++++++++++++
> > 3 files changed, 409 insertions(+)
> > create mode 100644 drivers/event/sw/iq_ring.h
> >
> > + */
> > +
> > +/*
> > + * Ring structure definitions used for the internal ring buffers of the
> > + * SW eventdev implementation. These are designed for single-core use only.
> > + */
>
> If I understand it correctly, IQ and QE rings are single producer and
> single consumer rings. By the specification, multiple producers through
> multiple ports can enqueue to the event queues at a time.Does SW implementation
> support that? or am I missing something here?
You're right that the IQ and QE rings are Single Producer, Single Consumer rings. More specifically, the QE is a ring for sending rte_event structs between cores, while the IQ ring is optimized for internal use in the scheduler core - and should not be used to send events between cores. Note that the design of the SW scheduler includes a central core for performing scheduling.
In other works, the QE rings transfer events from the worker core to the scheduler - and the scheduler pushes the events into what you call the "event queues" (aka, the atomic/ordered queue itself). These "event queues" are IQ instances. On egress from the scheduler, the event passes through a QE ring to the worker.
The result is that despite that only SP/SC rings are used, multiple workers can enqueue to any event queue.
On Mon, Feb 06, 2017 at 10:25:18AM +0000, Van Haaren, Harry wrote:
> > -----Original Message-----
> > From: Jerin Jacob [mailto:jerin.jacob@caviumnetworks.com]
> > Sent: Monday, February 6, 2017 9:25 AM
> > To: Van Haaren, Harry <harry.van.haaren@intel.com>
> > Cc: dev@dpdk.org; Richardson, Bruce <bruce.richardson@intel.com>
> > Subject: Re: [PATCH v2 07/15] event/sw: add support for event queues
> >
> > On Tue, Jan 31, 2017 at 04:14:25PM +0000, Harry van Haaren wrote:
> > > From: Bruce Richardson <bruce.richardson@intel.com>
> > >
> > > Add in the data structures for the event queues, and the eventdev
> > > functions to create and destroy those queues.
> > >
> > > Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
> > > Signed-off-by: Harry van Haaren <harry.van.haaren@intel.com>
> > > ---
> > > drivers/event/sw/iq_ring.h | 176 ++++++++++++++++++++++++++++++++++++++++++++
> > > drivers/event/sw/sw_evdev.c | 158 +++++++++++++++++++++++++++++++++++++++
> > > drivers/event/sw/sw_evdev.h | 75 +++++++++++++++++++
> > > 3 files changed, 409 insertions(+)
> > > create mode 100644 drivers/event/sw/iq_ring.h
> > >
> > > + */
> > > +
> > > +/*
> > > + * Ring structure definitions used for the internal ring buffers of the
> > > + * SW eventdev implementation. These are designed for single-core use only.
> > > + */
> >
> > If I understand it correctly, IQ and QE rings are single producer and
> > single consumer rings. By the specification, multiple producers through
> > multiple ports can enqueue to the event queues at a time.Does SW implementation
> > support that? or am I missing something here?
>
> You're right that the IQ and QE rings are Single Producer, Single Consumer rings. More specifically, the QE is a ring for sending rte_event structs between cores, while the IQ ring is optimized for internal use in the scheduler core - and should not be used to send events between cores. Note that the design of the SW scheduler includes a central core for performing scheduling.
Thanks Harry. One question though,
In RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED mode, multiple SW schedulers can
be active. Right? If so, We need multi consumer. Right?
>
> In other works, the QE rings transfer events from the worker core to the scheduler - and the scheduler pushes the events into what you call the "event queues" (aka, the atomic/ordered queue itself). These "event queues" are IQ instances. On egress from the scheduler, the event passes through a QE ring to the worker.
>
> The result is that despite that only SP/SC rings are used, multiple workers can enqueue to any event queue.
Got it.
> -----Original Message-----
> From: Jerin Jacob [mailto:jerin.jacob@caviumnetworks.com]
> Sent: Tuesday, February 7, 2017 6:58 AM
> To: Van Haaren, Harry <harry.van.haaren@intel.com>
> Cc: dev@dpdk.org; Richardson, Bruce <bruce.richardson@intel.com>
> Subject: Re: [PATCH v2 07/15] event/sw: add support for event queues
>
> On Mon, Feb 06, 2017 at 10:25:18AM +0000, Van Haaren, Harry wrote:
> > > -----Original Message-----
> > > From: Jerin Jacob [mailto:jerin.jacob@caviumnetworks.com]
> > > Sent: Monday, February 6, 2017 9:25 AM
> > > To: Van Haaren, Harry <harry.van.haaren@intel.com>
> > > Cc: dev@dpdk.org; Richardson, Bruce <bruce.richardson@intel.com>
> > > Subject: Re: [PATCH v2 07/15] event/sw: add support for event queues
> > >
> > > On Tue, Jan 31, 2017 at 04:14:25PM +0000, Harry van Haaren wrote:
> > > > From: Bruce Richardson <bruce.richardson@intel.com>
> > > >
> > > > Add in the data structures for the event queues, and the eventdev
> > > > functions to create and destroy those queues.
> > > >
> > > > Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
> > > > Signed-off-by: Harry van Haaren <harry.van.haaren@intel.com>
> > > > ---
> > > > drivers/event/sw/iq_ring.h | 176 ++++++++++++++++++++++++++++++++++++++++++++
> > > > drivers/event/sw/sw_evdev.c | 158 +++++++++++++++++++++++++++++++++++++++
> > > > drivers/event/sw/sw_evdev.h | 75 +++++++++++++++++++
> > > > 3 files changed, 409 insertions(+)
> > > > create mode 100644 drivers/event/sw/iq_ring.h
> > > >
> > > > + */
> > > > +
> > > > +/*
> > > > + * Ring structure definitions used for the internal ring buffers of the
> > > > + * SW eventdev implementation. These are designed for single-core use only.
> > > > + */
> > >
> > > If I understand it correctly, IQ and QE rings are single producer and
> > > single consumer rings. By the specification, multiple producers through
> > > multiple ports can enqueue to the event queues at a time.Does SW implementation
> > > support that? or am I missing something here?
> >
> > You're right that the IQ and QE rings are Single Producer, Single Consumer rings. More
> specifically, the QE is a ring for sending rte_event structs between cores, while the IQ ring
> is optimized for internal use in the scheduler core - and should not be used to send events
> between cores. Note that the design of the SW scheduler includes a central core for performing
> scheduling.
>
> Thanks Harry. One question though,
> In RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED mode, multiple SW schedulers can
> be active. Right? If so, We need multi consumer. Right?
Note that the sw scheduler is a centralized scheduler, and does not support distributing the *scheduling* work itself.
In the case of having multiple software schedulers, each instance is its own scheduling domain - they don't interact with each other directly. There is no need for Multi-Producer / Multi-Consumer with this design as there is never more than 1 thread accessing the producer/consumer side of a ring.
>
> >
> > In other works, the QE rings transfer events from the worker core to the scheduler - and the
> scheduler pushes the events into what you call the "event queues" (aka, the atomic/ordered
> queue itself). These "event queues" are IQ instances. On egress from the scheduler, the event
> passes through a QE ring to the worker.
> >
> > The result is that despite that only SP/SC rings are used, multiple workers can enqueue to
> any event queue.
>
> Got it.
>
new file mode 100644
@@ -0,0 +1,176 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/*
+ * Ring structure definitions used for the internal ring buffers of the
+ * SW eventdev implementation. These are designed for single-core use only.
+ */
+#ifndef _IQ_RING_
+#define _IQ_RING_
+
+#include <stdint.h>
+
+#include <rte_common.h>
+#include <rte_memory.h>
+#include <rte_malloc.h>
+#include <rte_eventdev.h>
+
+#define IQ_RING_NAMESIZE 12
+#define QID_IQ_DEPTH 512
+#define QID_IQ_MASK (uint16_t)(QID_IQ_DEPTH - 1)
+
+struct iq_ring {
+ char name[IQ_RING_NAMESIZE] __rte_cache_aligned;
+ uint16_t write_idx;
+ uint16_t read_idx;
+
+ struct rte_event ring[QID_IQ_DEPTH];
+};
+
+#ifndef force_inline
+#define force_inline inline __attribute__((always_inline))
+#endif
+
+static inline struct iq_ring *
+iq_ring_create(const char *name, unsigned int socket_id)
+{
+ struct iq_ring *retval;
+
+ retval = rte_malloc_socket(NULL, sizeof(*retval), 0, socket_id);
+ if (retval == NULL)
+ goto end;
+
+ snprintf(retval->name, sizeof(retval->name), "%s", name);
+ retval->write_idx = retval->read_idx = 0;
+end:
+ return retval;
+}
+
+static inline void
+iq_ring_destroy(struct iq_ring *r)
+{
+ rte_free(r);
+}
+
+static force_inline uint16_t
+iq_ring_count(const struct iq_ring *r)
+{
+ return r->write_idx - r->read_idx;
+}
+
+static force_inline uint16_t
+iq_ring_free_count(const struct iq_ring *r)
+{
+ return QID_IQ_MASK - iq_ring_count(r);
+}
+
+static force_inline uint16_t
+iq_ring_enqueue_burst(struct iq_ring *r, struct rte_event *qes, uint16_t nb_qes)
+{
+ const uint16_t read = r->read_idx;
+ uint16_t write = r->write_idx;
+ const uint16_t space = read + QID_IQ_MASK - write;
+ uint16_t i;
+
+ if (space < nb_qes)
+ nb_qes = space;
+
+ for (i = 0; i < nb_qes; i++, write++)
+ r->ring[write & QID_IQ_MASK] = qes[i];
+
+ r->write_idx = write;
+
+ return nb_qes;
+}
+
+static force_inline uint16_t
+iq_ring_dequeue_burst(struct iq_ring *r, struct rte_event *qes, uint16_t nb_qes)
+{
+ uint16_t read = r->read_idx;
+ const uint16_t write = r->write_idx;
+ const uint16_t items = write - read;
+ uint16_t i;
+
+ for (i = 0; i < nb_qes; i++, read++)
+ qes[i] = r->ring[read & QID_IQ_MASK];
+
+ if (items < nb_qes)
+ nb_qes = items;
+
+ r->read_idx += nb_qes;
+
+ return nb_qes;
+}
+
+/* assumes there is space, from a previous dequeue_burst */
+static force_inline uint16_t
+iq_ring_put_back(struct iq_ring *r, struct rte_event *qes, uint16_t nb_qes)
+{
+ uint16_t i, read = r->read_idx;
+
+ for (i = nb_qes; i-- > 0; )
+ r->ring[--read & QID_IQ_MASK] = qes[i];
+
+ r->read_idx = read;
+ return nb_qes;
+}
+
+static force_inline const struct rte_event *
+iq_ring_peek(const struct iq_ring *r)
+{
+ return &r->ring[r->read_idx & QID_IQ_MASK];
+}
+
+static force_inline void
+iq_ring_pop(struct iq_ring *r)
+{
+ r->read_idx++;
+}
+
+static force_inline int
+iq_ring_enqueue(struct iq_ring *r, const struct rte_event *qe)
+{
+ const uint16_t read = r->read_idx;
+ const uint16_t write = r->write_idx;
+ const uint16_t space = read + QID_IQ_MASK - write;
+
+ if (space == 0)
+ return -1;
+
+ r->ring[write & QID_IQ_MASK] = *qe;
+
+ r->write_idx = write + 1;
+
+ return 0;
+}
+
+#endif
@@ -38,12 +38,168 @@
#include <rte_ring.h>
#include "sw_evdev.h"
+#include "iq_ring.h"
#define EVENTDEV_NAME_SW_PMD event_sw
#define NUMA_NODE_ARG "numa_node"
#define SCHED_QUANTA_ARG "sched_quanta"
#define CREDIT_QUANTA_ARG "credit_quanta"
+static int32_t
+qid_init(struct sw_evdev *sw, unsigned int idx, int type,
+ const struct rte_event_queue_conf *queue_conf)
+{
+ unsigned int i;
+ int dev_id = sw->data->dev_id;
+ int socket_id = sw->data->socket_id;
+ char buf[IQ_RING_NAMESIZE];
+ struct sw_qid *qid = &sw->qids[idx];
+
+ for (i = 0; i < SW_IQS_MAX; i++) {
+ snprintf(buf, sizeof(buf), "q_%u_iq_%d", idx, i);
+ qid->iq[i] = iq_ring_create(buf, socket_id);
+ if (!qid->iq[i]) {
+ SW_LOG_DBG("ring create failed");
+ goto cleanup;
+ }
+ }
+
+ /* Initialize the FID structures to no pinning (-1), and zero packets */
+ const struct sw_fid_t fid = {.cq = -1, .pcount = 0};
+ for (i = 0; i < RTE_DIM(qid->fids); i++)
+ qid->fids[i] = fid;
+
+ qid->id = idx;
+ qid->type = type;
+ qid->priority = queue_conf->priority;
+
+ if (qid->type == RTE_SCHED_TYPE_ORDERED) {
+ char ring_name[RTE_RING_NAMESIZE];
+ uint32_t window_size;
+
+ /* rte_ring and window_size_mask require require window_size to
+ * be a power-of-2.
+ */
+ window_size = rte_align32pow2(
+ queue_conf->nb_atomic_order_sequences);
+
+ qid->window_size = window_size - 1;
+
+ if (!window_size) {
+ SW_LOG_DBG(
+ "invalid reorder_window_size for ordered queue\n"
+ );
+ goto cleanup;
+ }
+
+ snprintf(buf, sizeof(buf), "sw%d_iq_%d_rob", dev_id, i);
+ qid->reorder_buffer = rte_zmalloc_socket(buf,
+ window_size * sizeof(qid->reorder_buffer[0]),
+ 0, socket_id);
+ if (!qid->reorder_buffer) {
+ SW_LOG_DBG("reorder_buffer malloc failed\n");
+ goto cleanup;
+ }
+
+ memset(&qid->reorder_buffer[0],
+ 0,
+ window_size * sizeof(qid->reorder_buffer[0]));
+
+ snprintf(ring_name, sizeof(ring_name), "sw%d_q%d_freelist",
+ dev_id, idx);
+ qid->reorder_buffer_freelist = rte_ring_create(ring_name,
+ window_size,
+ socket_id,
+ RING_F_SP_ENQ | RING_F_SC_DEQ);
+ if (!qid->reorder_buffer_freelist) {
+ SW_LOG_DBG("freelist ring create failed");
+ goto cleanup;
+ }
+
+ /* Populate the freelist with reorder buffer entries. Enqueue
+ * 'window_size - 1' entries because the rte_ring holds only
+ * that many.
+ */
+ for (i = 0; i < window_size - 1; i++) {
+ if (rte_ring_sp_enqueue(qid->reorder_buffer_freelist,
+ &qid->reorder_buffer[i]) < 0)
+ goto cleanup;
+ }
+
+ qid->reorder_buffer_index = 0;
+ qid->cq_next_tx = 0;
+ }
+
+ return 0;
+
+cleanup:
+ for (i = 0; i < SW_IQS_MAX; i++) {
+ if (qid->iq[i])
+ iq_ring_destroy(qid->iq[i]);
+ }
+
+ if (qid->reorder_buffer) {
+ rte_free(qid->reorder_buffer);
+ qid->reorder_buffer = NULL;
+ }
+
+ if (qid->reorder_buffer_freelist) {
+ rte_ring_free(qid->reorder_buffer_freelist);
+ qid->reorder_buffer_freelist = NULL;
+ }
+
+ return -EINVAL;
+}
+
+static int
+sw_queue_setup(struct rte_eventdev *dev, uint8_t queue_id,
+ const struct rte_event_queue_conf *conf)
+{
+ int type;
+
+ /* TODO: Error check queue types and appropriate values */
+ switch (conf->event_queue_cfg) {
+ case RTE_EVENT_QUEUE_CFG_SINGLE_LINK:
+ type = RTE_SCHED_TYPE_DIRECT;
+ break;
+ case RTE_EVENT_QUEUE_CFG_DEFAULT:
+ /* fallthrough */
+ case RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY:
+ type = RTE_SCHED_TYPE_ATOMIC;
+ break;
+ case RTE_EVENT_QUEUE_CFG_ORDERED_ONLY:
+ type = RTE_SCHED_TYPE_ORDERED;
+ break;
+ case RTE_EVENT_QUEUE_CFG_PARALLEL_ONLY:
+ type = RTE_SCHED_TYPE_PARALLEL;
+ break;
+ default:
+ printf("%s : unknown queue type %d requested\n", __func__,
+ conf->event_queue_cfg);
+ return -1;
+ }
+
+ struct sw_evdev *sw = sw_pmd_priv(dev);
+ return qid_init(sw, queue_id, type, conf);
+}
+
+static void
+sw_queue_release(struct rte_eventdev *dev, uint8_t id)
+{
+ struct sw_evdev *sw = sw_pmd_priv(dev);
+ struct sw_qid *qid = &sw->qids[id];
+ uint32_t i;
+
+ for (i = 0; i < SW_IQS_MAX; i++)
+ iq_ring_destroy(qid->iq[i]);
+
+ if (qid->type == RTE_SCHED_TYPE_ORDERED) {
+ rte_free(qid->reorder_buffer);
+ rte_ring_free(qid->reorder_buffer_freelist);
+ }
+ memset(qid, 0, sizeof(*qid));
+}
+
static void
sw_queue_def_conf(struct rte_eventdev *dev, uint8_t queue_id,
struct rte_event_queue_conf *conf)
@@ -147,6 +303,8 @@ sw_probe(const char *name, const char *params)
.dev_infos_get = sw_info_get,
.queue_def_conf = sw_queue_def_conf,
+ .queue_setup = sw_queue_setup,
+ .queue_release = sw_queue_release,
.port_def_conf = sw_port_def_conf,
};
@@ -49,6 +49,78 @@
#define SW_INFLIGHT_EVENTS_TOTAL 4096
/* allow for lots of over-provisioning */
#define MAX_SW_PROD_Q_DEPTH 4096
+#define SW_FRAGMENTS_MAX 16
+
+/* have a new scheduling type for 1:1 queue to port links */
+#define RTE_SCHED_TYPE_DIRECT (RTE_SCHED_TYPE_PARALLEL + 1)
+
+#ifdef RTE_LIBRTE_PMD_EVDEV_SW_DEBUG
+#define SW_LOG_INFO(fmt, args...) \
+ RTE_LOG(INFO, PMD, "[%s] %s() line %u: " fmt "\n", \
+ PMD_NAME, \
+ __func__, __LINE__, ## args)
+
+#define SW_LOG_DBG(fmt, args...) \
+ RTE_LOG(DEBUG, PMD, "[%s] %s() line %u: " fmt "\n", \
+ PMD_NAME, \
+ __func__, __LINE__, ## args)
+#else
+#define SW_LOG_INFO(fmt, args...)
+#define SW_LOG_DBG(fmt, args...)
+#endif
+
+/* Records basic event stats at a given point. Used in port and qid structs */
+struct sw_point_stats {
+ uint64_t rx_pkts;
+ uint64_t rx_dropped;
+ uint64_t tx_pkts;
+};
+
+/* structure used to track what port a flow (FID) is pinned to */
+struct sw_fid_t {
+ /* which CQ this FID is currently pinned to */
+ int32_t cq;
+ /* number of packets gone to the CQ with this FID */
+ uint32_t pcount;
+};
+
+struct reorder_buffer_entry {
+ uint16_t num_fragments; /**< Number of packet fragments */
+ uint16_t fragment_index; /**< Points to the oldest valid frag */
+ uint8_t ready; /**< Entry is ready to be reordered */
+ struct rte_event fragments[SW_FRAGMENTS_MAX];
+};
+
+struct sw_qid {
+ /* The type of this QID */
+ int type;
+ /* Integer ID representing the queue. This is used in history lists,
+ * to identify the stage of processing.
+ */
+ uint32_t id;
+ struct sw_point_stats stats;
+
+ /* Internal priority rings for packets */
+ struct iq_ring *iq[SW_IQS_MAX];
+ uint32_t iq_pkt_mask; /* A mask to indicate packets in an IQ */
+ uint64_t iq_pkt_count[SW_IQS_MAX];
+
+ /* Information on what CQs are polling this IQ */
+ uint32_t cq_num_mapped_cqs;
+ uint32_t cq_next_tx; /* cq to write next (non-atomic) packet */
+ uint32_t cq_map[SW_PORTS_MAX];
+
+ /* Track flow ids for atomic load balancing */
+ struct sw_fid_t fids[SW_QID_NUM_FIDS];
+
+ /* Track packet order for reordering when needed */
+ struct reorder_buffer_entry *reorder_buffer; /*< pkts await reorder */
+ struct rte_ring *reorder_buffer_freelist; /* available reorder slots */
+ uint32_t reorder_buffer_index; /* oldest valid reorder buffer entry */
+ uint32_t window_size; /* Used to wrap reorder_buffer_index */
+
+ uint8_t priority;
+};
struct sw_evdev {
struct rte_eventdev_data *data;
@@ -62,6 +134,9 @@ struct sw_evdev {
*/
uint32_t nb_events_limit;
+ /* Internal queues - one per logical queue */
+ struct sw_qid qids[RTE_EVENT_MAX_QUEUES_PER_DEV] __rte_cache_aligned;
+
int32_t sched_quanta;
uint32_t credit_update_quanta;