[dpdk-dev,1/1] ip_pipeline: added dynamic pipeline reconfiguration
Commit Message
Up till now pipeline was bound to thread selected in the initial config.
This patch allows binding pipeline to other threads at runtime using CLI
commands.
Signed-off-by: Maciej Gajdzica <maciejx.t.gajdzica@intel.com>
---
examples/ip_pipeline/Makefile | 1 +
examples/ip_pipeline/app.h | 5 +
examples/ip_pipeline/config_parse.c | 2 +-
examples/ip_pipeline/init.c | 61 ++++
examples/ip_pipeline/pipeline.h | 6 +
examples/ip_pipeline/pipeline/pipeline_common_fe.h | 3 +
examples/ip_pipeline/thread.c | 135 +++++++-
examples/ip_pipeline/thread.h | 101 ++++++
examples/ip_pipeline/thread_fe.c | 328 ++++++++++++++++++++
9 files changed, 640 insertions(+), 2 deletions(-)
create mode 100644 examples/ip_pipeline/thread.h
create mode 100644 examples/ip_pipeline/thread_fe.c
Comments
Forgot to delete debug printfs, v2 incoming - Self NACK
> -----Original Message-----
> From: Gajdzica, MaciejX T
> Sent: Wednesday, September 30, 2015 6:11 PM
> To: dev@dpdk.org
> Cc: Gajdzica, MaciejX T
> Subject: [PATCH 1/1] ip_pipeline: added dynamic pipeline reconfiguration
>
> Up till now pipeline was bound to thread selected in the initial config.
> This patch allows binding pipeline to other threads at runtime using CLI
> commands.
>
> Signed-off-by: Maciej Gajdzica <maciejx.t.gajdzica@intel.com>
> ---
> examples/ip_pipeline/Makefile | 1 +
> examples/ip_pipeline/app.h | 5 +
> examples/ip_pipeline/config_parse.c | 2 +-
> examples/ip_pipeline/init.c | 61 ++++
> examples/ip_pipeline/pipeline.h | 6 +
> examples/ip_pipeline/pipeline/pipeline_common_fe.h | 3 +
> examples/ip_pipeline/thread.c | 135 +++++++-
> examples/ip_pipeline/thread.h | 101 ++++++
> examples/ip_pipeline/thread_fe.c | 328 ++++++++++++++++++++
> 9 files changed, 640 insertions(+), 2 deletions(-) create mode 100644
> examples/ip_pipeline/thread.h create mode 100644
> examples/ip_pipeline/thread_fe.c
>
> diff --git a/examples/ip_pipeline/Makefile b/examples/ip_pipeline/Makefile
> index f3ff1ec..c8e80b5 100644
> --- a/examples/ip_pipeline/Makefile
> +++ b/examples/ip_pipeline/Makefile
> @@ -54,6 +54,7 @@ SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) +=
> config_parse_tm.c
> SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += config_check.c
> SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += init.c
> SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += thread.c
> +SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += thread_fe.c
> SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += cpu_core_map.c
>
> SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += pipeline_common_be.c diff --git
> a/examples/ip_pipeline/app.h b/examples/ip_pipeline/app.h index
> 521e3a0..19ddd31 100644
> --- a/examples/ip_pipeline/app.h
> +++ b/examples/ip_pipeline/app.h
> @@ -220,9 +220,11 @@ struct app_pipeline_data {
> void *be;
> void *fe;
> uint64_t timer_period;
> + uint32_t enabled;
> };
>
> struct app_thread_pipeline_data {
> + uint32_t pipeline_id;
> void *be;
> pipeline_be_op_run f_run;
> pipeline_be_op_timer f_timer;
> @@ -242,6 +244,9 @@ struct app_thread_data {
> uint32_t n_custom;
>
> uint64_t deadline;
> +
> + struct rte_ring *msgq_in;
> + struct rte_ring *msgq_out;
> };
>
> struct app_eal_params {
> diff --git a/examples/ip_pipeline/config_parse.c
> b/examples/ip_pipeline/config_parse.c
> index c9b78f9..d2aaadf 100644
> --- a/examples/ip_pipeline/config_parse.c
> +++ b/examples/ip_pipeline/config_parse.c
> @@ -362,7 +362,7 @@ parser_read_uint32(uint32_t *value, const char *p)
> return 0;
> }
>
> -static int
> +int
> parse_pipeline_core(uint32_t *socket,
> uint32_t *core,
> uint32_t *ht,
> diff --git a/examples/ip_pipeline/init.c b/examples/ip_pipeline/init.c index
> 3f9c68d..1126288 100644
> --- a/examples/ip_pipeline/init.c
> +++ b/examples/ip_pipeline/init.c
> @@ -50,6 +50,7 @@
> #include "pipeline_firewall.h"
> #include "pipeline_flow_classification.h"
> #include "pipeline_routing.h"
> +#include "thread.h"
>
> #define APP_NAME_SIZE 32
>
> @@ -1225,6 +1226,48 @@ app_init_pipelines(struct app_params *app)
> }
> }
>
> +static inline struct rte_ring *
> +app_thread_msgq_in_get(struct app_params *app,
> + uint32_t socket_id, uint32_t core_id, uint32_t ht_id) {
> + char msgq_name[32];
> + ssize_t param_idx;
> +
> + snprintf(msgq_name, sizeof(msgq_name),
> + "MSGQ-REQ-CORE-s%" PRIu32 "c%" PRIu32 "%s",
> + socket_id,
> + core_id,
> + (ht_id) ? "h" : "");
> + param_idx = APP_PARAM_FIND(app->msgq_params, msgq_name);
> +
> + if (param_idx < 0)
> + return NULL;
> +
> + return app->msgq[param_idx];
> +}
> +
> +static inline struct rte_ring *
> +app_thread_msgq_out_get(struct app_params *app,
> + uint32_t socket_id, uint32_t core_id, uint32_t ht_id) {
> + char msgq_name[32];
> + ssize_t param_idx;
> +
> + snprintf(msgq_name, sizeof(msgq_name),
> + "MSGQ-RSP-CORE-s%" PRIu32 "c%" PRIu32 "%s",
> + socket_id,
> + core_id,
> + (ht_id) ? "h" : "");
> + param_idx = APP_PARAM_FIND(app->msgq_params, msgq_name);
> +
> +
> + if (param_idx < 0)
> + return NULL;
> +
> + return app->msgq[param_idx];
> +
> +}
> +
> static void
> app_init_threads(struct app_params *app) { @@ -1253,6 +1296,20 @@
> app_init_threads(struct app_params *app)
>
> t = &app->thread_data[lcore_id];
>
> + t->msgq_in = app_thread_msgq_in_get(app,
> + params->socket_id,
> + params->core_id,
> + params->hyper_th_id);
> + if (t->msgq_in == NULL)
> + rte_panic("Init error: Cannot find MSGQ_IN for thread
> %" PRId32,
> +lcore_id);
> +
> + t->msgq_out = app_thread_msgq_out_get(app,
> + params->socket_id,
> + params->core_id,
> + params->hyper_th_id);
> + if (t->msgq_out == NULL)
> + rte_panic("Init error: Cannot find MSGQ_OUT for
> thread %" PRId32,
> +lcore_id);
> +
> ptype = app_pipeline_type_find(app, params->type);
> if (ptype == NULL)
> rte_panic("Init error: Unknown pipeline "
> @@ -1262,12 +1319,15 @@ app_init_threads(struct app_params *app)
> &t->regular[t->n_regular] :
> &t->custom[t->n_custom];
>
> + p->pipeline_id = p_id;
> p->be = data->be;
> p->f_run = ptype->be_ops->f_run;
> p->f_timer = ptype->be_ops->f_timer;
> p->timer_period = data->timer_period;
> p->deadline = time + data->timer_period;
>
> + data->enabled = 1;
> +
> if (ptype->be_ops->f_run == NULL)
> t->n_regular++;
> else
> @@ -1288,6 +1348,7 @@ int app_init(struct app_params *app)
> app_init_msgq(app);
>
> app_pipeline_common_cmd_push(app);
> + app_pipeline_thread_cmd_push(app);
> app_pipeline_type_register(app, &pipeline_master);
> app_pipeline_type_register(app, &pipeline_passthrough);
> app_pipeline_type_register(app, &pipeline_flow_classification); diff --
> git a/examples/ip_pipeline/pipeline.h b/examples/ip_pipeline/pipeline.h index
> b9a56ea..dab9c36 100644
> --- a/examples/ip_pipeline/pipeline.h
> +++ b/examples/ip_pipeline/pipeline.h
> @@ -84,4 +84,10 @@ pipeline_type_cmds_count(struct pipeline_type *ptype)
> return n_cmds;
> }
>
> +int
> +parse_pipeline_core(uint32_t *socket,
> + uint32_t *core,
> + uint32_t *ht,
> + const char *entry);
> +
> #endif
> diff --git a/examples/ip_pipeline/pipeline/pipeline_common_fe.h
> b/examples/ip_pipeline/pipeline/pipeline_common_fe.h
> index 693848d..e84aa3a 100644
> --- a/examples/ip_pipeline/pipeline/pipeline_common_fe.h
> +++ b/examples/ip_pipeline/pipeline/pipeline_common_fe.h
> @@ -68,6 +68,9 @@ app_pipeline_data_fe(struct app_params *app, uint32_t
> id)
> if (pipeline_data == NULL)
> return NULL;
>
> + if (pipeline_data->enabled == 0)
> + return NULL;
> +
> return pipeline_data->fe;
> }
>
> diff --git a/examples/ip_pipeline/thread.c b/examples/ip_pipeline/thread.c
> index b2a8656..8aabee0 100644
> --- a/examples/ip_pipeline/thread.c
> +++ b/examples/ip_pipeline/thread.c
> @@ -37,8 +37,139 @@
>
> #include "pipeline_common_be.h"
> #include "app.h"
> +#include "thread.h"
>
> -int app_thread(void *arg)
> +static inline void *
> +thread_msg_recv(struct rte_ring *r)
> +{
> + void *msg;
> + int status = rte_ring_sc_dequeue(r, &msg);
> +
> + if (status != 0)
> + return NULL;
> +
> + return msg;
> +}
> +
> +static inline void
> +thread_msg_send(struct rte_ring *r,
> + void *msg)
> +{
> + int status;
> +
> + do {
> + status = rte_ring_sp_enqueue(r, msg);
> + } while (status == -ENOBUFS);
> +}
> +
> +static int
> +thread_pipeline_enable(struct app_thread_data *t,
> + struct thread_pipeline_enable_msg_req *req) {
> + struct app_thread_pipeline_data *p;
> +
> + p = (req->f_run == NULL) ?
> + &t->regular[t->n_regular] :
> + &t->custom[t->n_custom];
> +
> + if (t->n_regular >= APP_MAX_THREAD_PIPELINES)
> + return -1;
> +
> + p->pipeline_id = req->pipeline_id;
> + p->be = req->be;
> + p->f_run = req->f_run;
> + p->f_timer = req->f_timer;
> + p->timer_period = req->timer_period;
> + p->deadline = 0;
> +
> + if (req->f_run == NULL)
> + t->n_regular++;
> + else
> + t->n_custom++;
> +
> + return 0;
> +}
> +
> +static int
> +thread_pipeline_disable(struct app_thread_data *t,
> + struct thread_pipeline_disable_msg_req *req) {
> + uint32_t i;
> +
> + printf("%p\n", req);
> + printf("%d %d\n", req->type, req->pipeline_id);
> +
> + for (i = 0; i < t->n_regular; i++) {
> + if (t->regular[i].pipeline_id == req->pipeline_id)
> + break;
> + }
> +
> + if (i < t->n_regular) {
> + if (i < t->n_regular - 1)
> + memcpy(&t->regular[i],
> + &t->regular[i+1],
> + (t->n_regular - i) * sizeof(struct
> app_thread_pipeline_data));
> +
> + t->n_regular--;
> +
> + return 0;
> + }
> +
> + for (i = 0; i < t->n_custom; i++) {
> + if (t->custom[i].pipeline_id == req->pipeline_id)
> + break;
> + }
> +
> + /* return if pipeline not found */
> + if (i >= t->n_custom)
> + return -1;
> +
> + if (i < t->n_custom - 1)
> + memcpy(&t->custom[i],
> + &t->custom[i+1],
> + (t->n_custom - i) * sizeof(struct
> app_thread_pipeline_data));
> +
> + t->n_custom--;
> +
> + return 0;
> +}
> +
> +static int
> +thread_msg_req_handle(struct app_thread_data *t) {
> + void *msg_ptr;
> + struct thread_msg_req *req;
> + struct thread_msg_rsp *rsp;
> +
> + msg_ptr = thread_msg_recv(t->msgq_in);
> + req = msg_ptr;
> + rsp = msg_ptr;
> +
> + if (req != NULL)
> + switch (req->type) {
> + case THREAD_MSG_REQ_PIPELINE_ENABLE: {
> + rsp->status = thread_pipeline_enable(t,
> + (struct
> thread_pipeline_enable_msg_req *) req);
> + thread_msg_send(t->msgq_out, rsp);
> + break;
> + }
> +
> + case THREAD_MSG_REQ_PIPELINE_DISABLE: {
> + rsp->status = thread_pipeline_disable(t,
> + (struct
> thread_pipeline_disable_msg_req *) req);
> + printf("THREAD STATUS: %d\n", rsp->status);
> + thread_msg_send(t->msgq_out, rsp);
> + break;
> + }
> + default:
> + break;
> + }
> +
> + return 0;
> +}
> +
> +int
> +app_thread(void *arg)
> {
> struct app_params *app = (struct app_params *) arg;
> uint32_t core_id = rte_lcore_id(), i, j; @@ -103,6 +234,8 @@ int
> app_thread(void *arg)
> }
>
> t->deadline = t_deadline;
> +
> + thread_msg_req_handle(t);
> }
> }
>
> diff --git a/examples/ip_pipeline/thread.h b/examples/ip_pipeline/thread.h new
> file mode 100644 index 0000000..878c9e1
> --- /dev/null
> +++ b/examples/ip_pipeline/thread.h
> @@ -0,0 +1,101 @@
> +/*-
> + * BSD LICENSE
> + *
> + * Copyright(c) 2010-2015 Intel Corporation. All rights reserved.
> + * All rights reserved.
> + *
> + * Redistribution and use in source and binary forms, with or without
> + * modification, are permitted provided that the following conditions
> + * are met:
> + *
> + * * Redistributions of source code must retain the above copyright
> + * notice, this list of conditions and the following disclaimer.
> + * * Redistributions in binary form must reproduce the above copyright
> + * notice, this list of conditions and the following disclaimer in
> + * the documentation and/or other materials provided with the
> + * distribution.
> + * * Neither the name of Intel Corporation nor the names of its
> + * contributors may be used to endorse or promote products derived
> + * from this software without specific prior written permission.
> + *
> + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
> CONTRIBUTORS
> + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
> + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
> FITNESS FOR
> + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
> COPYRIGHT
> + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
> INCIDENTAL,
> + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
> NOT
> + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
> OF USE,
> + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
> ON ANY
> + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
> TORT
> + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
> THE USE
> + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
> DAMAGE.
> + */
> +
> +#ifndef THREAD_H_
> +#define THREAD_H_
> +
> +#include "app.h"
> +#include "pipeline_be.h"
> +
> +enum thread_msg_req_type {
> + THREAD_MSG_REQ_PIPELINE_ENABLE = 0,
> + THREAD_MSG_REQ_PIPELINE_DISABLE,
> + THREAD_MSG_REQS
> +};
> +
> +struct thread_msg_req {
> + enum thread_msg_req_type type;
> +};
> +
> +struct thread_msg_rsp {
> + int status;
> +};
> +
> +/*
> + * PIPELINE ENABLE
> + */
> +struct thread_pipeline_enable_msg_req {
> + enum thread_msg_req_type type;
> +
> + uint32_t pipeline_id;
> + void *be;
> + pipeline_be_op_run f_run;
> + pipeline_be_op_timer f_timer;
> + uint64_t timer_period;
> +};
> +
> +struct thread_pipeline_enable_msg_rsp {
> + int status;
> +};
> +
> +/*
> + * PIPELINE DISABLE
> + */
> +struct thread_pipeline_disable_msg_req {
> + enum thread_msg_req_type type;
> +
> + uint32_t pipeline_id;
> +};
> +
> +struct thread_pipeline_disable_msg_rsp {
> + int status;
> +};
> +
> +int
> +app_pipeline_thread_cmd_push(struct app_params *app);
> +
> +int
> +app_pipeline_enable(struct app_params *app,
> + uint32_t core_id,
> + uint32_t socket_id,
> + uint32_t hyper_th_id,
> + uint32_t pipeline_id);
> +
> +int
> +app_pipeline_disable(struct app_params *app,
> + uint32_t core_id,
> + uint32_t socket_id,
> + uint32_t hyper_th_id,
> + uint32_t pipeline_id);
> +
> +#endif /* THREAD_H_ */
> diff --git a/examples/ip_pipeline/thread_fe.c
> b/examples/ip_pipeline/thread_fe.c
> new file mode 100644
> index 0000000..3f05b44
> --- /dev/null
> +++ b/examples/ip_pipeline/thread_fe.c
> @@ -0,0 +1,328 @@
> +#include <rte_common.h>
> +#include <rte_ring.h>
> +#include <rte_malloc.h>
> +#include <cmdline_rdline.h>
> +#include <cmdline_parse.h>
> +#include <cmdline_parse_num.h>
> +#include <cmdline_parse_string.h>
> +#include <cmdline_parse_ipaddr.h>
> +#include <cmdline_parse_etheraddr.h>
> +#include <cmdline_socket.h>
> +#include <cmdline.h>
> +
> +#include "thread.h"
> +#include "pipeline.h"
> +#include "pipeline_common_fe.h"
> +#include "app.h"
> +
> +static inline void *
> +thread_msg_send_recv(struct app_params *app,
> + uint32_t thread_id,
> + void *msg,
> + uint32_t timeout_ms)
> +{
> + struct rte_ring *r_req = app->thread_data[thread_id].msgq_in;
> + struct rte_ring *r_rsp = app->thread_data[thread_id].msgq_out;
> + uint64_t hz = rte_get_tsc_hz();
> + void *msg_recv;
> + uint64_t deadline;
> + int status;
> +
> + /* send */
> + do {
> + status = rte_ring_sp_enqueue(r_req, (void *) msg);
> + } while (status == -ENOBUFS);
> +
> + /* recv */
> + deadline = (timeout_ms) ?
> + (rte_rdtsc() + ((hz * timeout_ms) / 1000)) :
> + UINT64_MAX;
> +
> + do {
> + if (rte_rdtsc() > deadline)
> + return NULL;
> +
> + status = rte_ring_sc_dequeue(r_rsp, &msg_recv);
> + } while (status != 0);
> +
> + return msg_recv;
> +}
> +
> +int
> +app_pipeline_enable(struct app_params *app,
> + uint32_t socket_id,
> + uint32_t core_id,
> + uint32_t hyper_th_id,
> + uint32_t pipeline_id)
> +{
> + struct thread_pipeline_enable_msg_req *req;
> + struct thread_pipeline_enable_msg_rsp *rsp;
> +
> + int thread_id;
> +
> + struct app_pipeline_data *p;
> + struct app_pipeline_params *p_params;
> + struct pipeline_type *p_type;
> +
> + if (app_pipeline_data(app, pipeline_id) == NULL)
> + return -1;
> +
> + thread_id = cpu_core_map_get_lcore_id(app->core_map,
> + socket_id,
> + core_id,
> + hyper_th_id);
> +
> + if (thread_id < 0)
> + return -1;
> +
> + req = app_msg_alloc(app);
> + if (req == NULL)
> + return -1;
> +
> + p = &app->pipeline_data[pipeline_id];
> + p_params = &app->pipeline_params[pipeline_id];
> + p_type = app_pipeline_type_find(app, p_params->type);
> +
> + if (p->enabled == 1)
> + return -1;
> +
> + req->type = THREAD_MSG_REQ_PIPELINE_ENABLE;
> + req->pipeline_id = pipeline_id;
> + req->be = p->be;
> + req->f_run = p_type->be_ops->f_run;
> + req->f_timer = p_type->be_ops->f_timer;
> + req->timer_period = p->timer_period;
> +
> + rsp = thread_msg_send_recv(app, thread_id, req, 5 *
> MSG_TIMEOUT_DEFAULT);
> + if (rsp == NULL)
> + return -1;
> +
> + if (rsp->status < 0)
> + return -1;
> +
> + p->enabled = 1;
> + return 0;
> +}
> +
> +
> +int
> +app_pipeline_disable(struct app_params *app,
> + uint32_t socket_id,
> + uint32_t core_id,
> + uint32_t hyper_th_id,
> + uint32_t pipeline_id)
> +{
> + struct thread_pipeline_disable_msg_req *req;
> + struct thread_pipeline_disable_msg_rsp *rsp;
> +
> + int thread_id;
> +
> + struct app_pipeline_data *p;
> +
> + if (app_pipeline_data(app, pipeline_id) == NULL)
> + return -1;
> +
> + thread_id = cpu_core_map_get_lcore_id(app->core_map,
> + socket_id,
> + core_id,
> + hyper_th_id);
> +
> + if (thread_id < 0)
> + return -1;
> +
> + p = &app->pipeline_data[pipeline_id];
> +
> + if (p->enabled == 0)
> + return -1;
> +
> + req = app_msg_alloc(app);
> + if (req == NULL)
> + return -1;
> +
> + req->type = THREAD_MSG_REQ_PIPELINE_DISABLE;
> + req->pipeline_id = pipeline_id;
> +
> + printf("%p\n", req);
> + printf("%d %d\n", req->type, req->pipeline_id);
> +
> + rsp = thread_msg_send_recv(app, thread_id, req, 5 *
> +MSG_TIMEOUT_DEFAULT);
> +
> + printf("RSP: %p\n", rsp);
> + if (rsp == NULL)
> + return -1;
> +
> + printf("%d\n", rsp->status);
> + if (rsp->status < 0)
> + return -1;
> +
> + p->enabled = 0;
> + return 0;
> +}
> +
> +struct cmd_pipeline_enable_result {
> + cmdline_fixed_string_t t_string;
> + cmdline_fixed_string_t t_id;
> + cmdline_fixed_string_t pipeline_string;
> + uint32_t pipeline_id;
> + cmdline_fixed_string_t enable_string;
> +};
> +
> +static void
> +cmd_pipeline_enable_parsed(
> + void *parsed_result,
> + __attribute__((unused)) struct cmdline *cl,
> + void *data)
> +{
> + struct cmd_pipeline_enable_result *params = parsed_result;
> + struct app_params *app = data;
> +
> + int status;
> +
> + uint32_t core_id, socket_id, hyper_th_id;
> +
> + if (parse_pipeline_core(&socket_id,
> + &core_id,
> + &hyper_th_id,
> + params->t_id) < 0) {
> + printf("Command failed\n");
> + return;
> + }
> +
> + status = app_pipeline_enable(app,
> + socket_id,
> + core_id,
> + hyper_th_id,
> + params->pipeline_id);
> +
> + if (status != 0)
> + printf("Command failed\n");
> +}
> +
> +cmdline_parse_token_string_t cmd_pipeline_enable_t_string =
> + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_enable_result,
> t_string,
> +"t");
> +
> +cmdline_parse_token_string_t cmd_pipeline_enable_t_id =
> + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_enable_result, t_id,
> +NULL);
> +
> +cmdline_parse_token_string_t cmd_pipeline_enable_pipeline_string =
> + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_enable_result,
> pipeline_string,
> + "pipeline");
> +
> +cmdline_parse_token_num_t cmd_pipeline_enable_pipeline_id =
> + TOKEN_NUM_INITIALIZER(struct cmd_pipeline_enable_result,
> pipeline_id,
> +UINT32);
> +
> +cmdline_parse_token_string_t cmd_pipeline_enable_enable_string =
> + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_enable_result,
> +enable_string, "enable");
> +
> +cmdline_parse_inst_t cmd_pipeline_enable = {
> + .f = cmd_pipeline_enable_parsed,
> + .data = NULL,
> + .help_str = "Enable pipeline on specified core",
> + .tokens = {
> + (void *)&cmd_pipeline_enable_t_string,
> + (void *)&cmd_pipeline_enable_t_id,
> + (void *)&cmd_pipeline_enable_pipeline_string,
> + (void *)&cmd_pipeline_enable_pipeline_id,
> + (void *)&cmd_pipeline_enable_enable_string,
> + NULL,
> + },
> +};
> +
> +struct cmd_pipeline_disable_result {
> + cmdline_fixed_string_t t_string;
> + cmdline_fixed_string_t t_id;
> + cmdline_fixed_string_t pipeline_string;
> + uint32_t pipeline_id;
> + cmdline_fixed_string_t disable_string; };
> +
> +static void
> +cmd_pipeline_disable_parsed(
> + void *parsed_result,
> + __attribute__((unused)) struct cmdline *cl,
> + void *data)
> +{
> + struct cmd_pipeline_enable_result *params = parsed_result;
> + struct app_params *app = data;
> +
> + int status;
> +
> + uint32_t core_id, socket_id, hyper_th_id;
> +
> + if (parse_pipeline_core(&socket_id,
> + &core_id,
> + &hyper_th_id,
> + params->t_id) < 0) {
> + printf("Command failed\n");
> + return;
> + }
> +
> + status = app_pipeline_disable(app,
> + socket_id,
> + core_id,
> + hyper_th_id,
> + params->pipeline_id);
> +
> + if (status != 0)
> + printf("Command failed\n");
> +}
> +
> +cmdline_parse_token_string_t cmd_pipeline_disable_t_string =
> + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_disable_result,
> t_string,
> +"t");
> +
> +cmdline_parse_token_string_t cmd_pipeline_disable_t_id =
> + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_disable_result, t_id,
> +NULL);
> +
> +cmdline_parse_token_string_t cmd_pipeline_disable_pipeline_string =
> + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_disable_result,
> pipeline_string,
> + "pipeline");
> +
> +cmdline_parse_token_num_t cmd_pipeline_disable_pipeline_id =
> + TOKEN_NUM_INITIALIZER(struct cmd_pipeline_disable_result,
> pipeline_id,
> +UINT32);
> +
> +cmdline_parse_token_string_t cmd_pipeline_disable_enable_string =
> + TOKEN_STRING_INITIALIZER(struct cmd_pipeline_disable_result,
> +disable_string, "disable");
> +
> +cmdline_parse_inst_t cmd_pipeline_disable = {
> + .f = cmd_pipeline_disable_parsed,
> + .data = NULL,
> + .help_str = "Disable pipeline on specified core",
> + .tokens = {
> + (void *)&cmd_pipeline_disable_t_string,
> + (void *)&cmd_pipeline_disable_t_id,
> + (void *)&cmd_pipeline_disable_pipeline_string,
> + (void *)&cmd_pipeline_disable_pipeline_id,
> + (void *)&cmd_pipeline_disable_enable_string,
> + NULL,
> + },
> +};
> +
> +static cmdline_parse_ctx_t thread_cmds[] = {
> + (cmdline_parse_inst_t *) &cmd_pipeline_enable,
> + (cmdline_parse_inst_t *) &cmd_pipeline_disable,
> + NULL,
> +};
> +
> +int
> +app_pipeline_thread_cmd_push(struct app_params *app) {
> + uint32_t n_cmds, i;
> +
> + /* Check for available slots in the application commands array */
> + n_cmds = RTE_DIM(thread_cmds) - 1;
> + if (n_cmds > APP_MAX_CMDS - app->n_cmds)
> + return -ENOMEM;
> +
> + /* Push pipeline commands into the application */
> + memcpy(&app->cmds[app->n_cmds],
> + thread_cmds,
> + n_cmds * sizeof(cmdline_parse_ctx_t *));
> +
> + for (i = 0; i < n_cmds; i++)
> + app->cmds[app->n_cmds + i]->data = app;
> +
> + app->n_cmds += n_cmds;
> + app->cmds[app->n_cmds] = NULL;
> +
> + return 0;
> +}
> --
> 1.7.9.5
@@ -54,6 +54,7 @@ SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += config_parse_tm.c
SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += config_check.c
SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += init.c
SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += thread.c
+SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += thread_fe.c
SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += cpu_core_map.c
SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += pipeline_common_be.c
@@ -220,9 +220,11 @@ struct app_pipeline_data {
void *be;
void *fe;
uint64_t timer_period;
+ uint32_t enabled;
};
struct app_thread_pipeline_data {
+ uint32_t pipeline_id;
void *be;
pipeline_be_op_run f_run;
pipeline_be_op_timer f_timer;
@@ -242,6 +244,9 @@ struct app_thread_data {
uint32_t n_custom;
uint64_t deadline;
+
+ struct rte_ring *msgq_in;
+ struct rte_ring *msgq_out;
};
struct app_eal_params {
@@ -362,7 +362,7 @@ parser_read_uint32(uint32_t *value, const char *p)
return 0;
}
-static int
+int
parse_pipeline_core(uint32_t *socket,
uint32_t *core,
uint32_t *ht,
@@ -50,6 +50,7 @@
#include "pipeline_firewall.h"
#include "pipeline_flow_classification.h"
#include "pipeline_routing.h"
+#include "thread.h"
#define APP_NAME_SIZE 32
@@ -1225,6 +1226,48 @@ app_init_pipelines(struct app_params *app)
}
}
+static inline struct rte_ring *
+app_thread_msgq_in_get(struct app_params *app,
+ uint32_t socket_id, uint32_t core_id, uint32_t ht_id)
+{
+ char msgq_name[32];
+ ssize_t param_idx;
+
+ snprintf(msgq_name, sizeof(msgq_name),
+ "MSGQ-REQ-CORE-s%" PRIu32 "c%" PRIu32 "%s",
+ socket_id,
+ core_id,
+ (ht_id) ? "h" : "");
+ param_idx = APP_PARAM_FIND(app->msgq_params, msgq_name);
+
+ if (param_idx < 0)
+ return NULL;
+
+ return app->msgq[param_idx];
+}
+
+static inline struct rte_ring *
+app_thread_msgq_out_get(struct app_params *app,
+ uint32_t socket_id, uint32_t core_id, uint32_t ht_id)
+{
+ char msgq_name[32];
+ ssize_t param_idx;
+
+ snprintf(msgq_name, sizeof(msgq_name),
+ "MSGQ-RSP-CORE-s%" PRIu32 "c%" PRIu32 "%s",
+ socket_id,
+ core_id,
+ (ht_id) ? "h" : "");
+ param_idx = APP_PARAM_FIND(app->msgq_params, msgq_name);
+
+
+ if (param_idx < 0)
+ return NULL;
+
+ return app->msgq[param_idx];
+
+}
+
static void
app_init_threads(struct app_params *app)
{
@@ -1253,6 +1296,20 @@ app_init_threads(struct app_params *app)
t = &app->thread_data[lcore_id];
+ t->msgq_in = app_thread_msgq_in_get(app,
+ params->socket_id,
+ params->core_id,
+ params->hyper_th_id);
+ if (t->msgq_in == NULL)
+ rte_panic("Init error: Cannot find MSGQ_IN for thread %" PRId32, lcore_id);
+
+ t->msgq_out = app_thread_msgq_out_get(app,
+ params->socket_id,
+ params->core_id,
+ params->hyper_th_id);
+ if (t->msgq_out == NULL)
+ rte_panic("Init error: Cannot find MSGQ_OUT for thread %" PRId32, lcore_id);
+
ptype = app_pipeline_type_find(app, params->type);
if (ptype == NULL)
rte_panic("Init error: Unknown pipeline "
@@ -1262,12 +1319,15 @@ app_init_threads(struct app_params *app)
&t->regular[t->n_regular] :
&t->custom[t->n_custom];
+ p->pipeline_id = p_id;
p->be = data->be;
p->f_run = ptype->be_ops->f_run;
p->f_timer = ptype->be_ops->f_timer;
p->timer_period = data->timer_period;
p->deadline = time + data->timer_period;
+ data->enabled = 1;
+
if (ptype->be_ops->f_run == NULL)
t->n_regular++;
else
@@ -1288,6 +1348,7 @@ int app_init(struct app_params *app)
app_init_msgq(app);
app_pipeline_common_cmd_push(app);
+ app_pipeline_thread_cmd_push(app);
app_pipeline_type_register(app, &pipeline_master);
app_pipeline_type_register(app, &pipeline_passthrough);
app_pipeline_type_register(app, &pipeline_flow_classification);
@@ -84,4 +84,10 @@ pipeline_type_cmds_count(struct pipeline_type *ptype)
return n_cmds;
}
+int
+parse_pipeline_core(uint32_t *socket,
+ uint32_t *core,
+ uint32_t *ht,
+ const char *entry);
+
#endif
@@ -68,6 +68,9 @@ app_pipeline_data_fe(struct app_params *app, uint32_t id)
if (pipeline_data == NULL)
return NULL;
+ if (pipeline_data->enabled == 0)
+ return NULL;
+
return pipeline_data->fe;
}
@@ -37,8 +37,139 @@
#include "pipeline_common_be.h"
#include "app.h"
+#include "thread.h"
-int app_thread(void *arg)
+static inline void *
+thread_msg_recv(struct rte_ring *r)
+{
+ void *msg;
+ int status = rte_ring_sc_dequeue(r, &msg);
+
+ if (status != 0)
+ return NULL;
+
+ return msg;
+}
+
+static inline void
+thread_msg_send(struct rte_ring *r,
+ void *msg)
+{
+ int status;
+
+ do {
+ status = rte_ring_sp_enqueue(r, msg);
+ } while (status == -ENOBUFS);
+}
+
+static int
+thread_pipeline_enable(struct app_thread_data *t,
+ struct thread_pipeline_enable_msg_req *req)
+{
+ struct app_thread_pipeline_data *p;
+
+ p = (req->f_run == NULL) ?
+ &t->regular[t->n_regular] :
+ &t->custom[t->n_custom];
+
+ if (t->n_regular >= APP_MAX_THREAD_PIPELINES)
+ return -1;
+
+ p->pipeline_id = req->pipeline_id;
+ p->be = req->be;
+ p->f_run = req->f_run;
+ p->f_timer = req->f_timer;
+ p->timer_period = req->timer_period;
+ p->deadline = 0;
+
+ if (req->f_run == NULL)
+ t->n_regular++;
+ else
+ t->n_custom++;
+
+ return 0;
+}
+
+static int
+thread_pipeline_disable(struct app_thread_data *t,
+ struct thread_pipeline_disable_msg_req *req)
+{
+ uint32_t i;
+
+ printf("%p\n", req);
+ printf("%d %d\n", req->type, req->pipeline_id);
+
+ for (i = 0; i < t->n_regular; i++) {
+ if (t->regular[i].pipeline_id == req->pipeline_id)
+ break;
+ }
+
+ if (i < t->n_regular) {
+ if (i < t->n_regular - 1)
+ memcpy(&t->regular[i],
+ &t->regular[i+1],
+ (t->n_regular - i) * sizeof(struct app_thread_pipeline_data));
+
+ t->n_regular--;
+
+ return 0;
+ }
+
+ for (i = 0; i < t->n_custom; i++) {
+ if (t->custom[i].pipeline_id == req->pipeline_id)
+ break;
+ }
+
+ /* return if pipeline not found */
+ if (i >= t->n_custom)
+ return -1;
+
+ if (i < t->n_custom - 1)
+ memcpy(&t->custom[i],
+ &t->custom[i+1],
+ (t->n_custom - i) * sizeof(struct app_thread_pipeline_data));
+
+ t->n_custom--;
+
+ return 0;
+}
+
+static int
+thread_msg_req_handle(struct app_thread_data *t)
+{
+ void *msg_ptr;
+ struct thread_msg_req *req;
+ struct thread_msg_rsp *rsp;
+
+ msg_ptr = thread_msg_recv(t->msgq_in);
+ req = msg_ptr;
+ rsp = msg_ptr;
+
+ if (req != NULL)
+ switch (req->type) {
+ case THREAD_MSG_REQ_PIPELINE_ENABLE: {
+ rsp->status = thread_pipeline_enable(t,
+ (struct thread_pipeline_enable_msg_req *) req);
+ thread_msg_send(t->msgq_out, rsp);
+ break;
+ }
+
+ case THREAD_MSG_REQ_PIPELINE_DISABLE: {
+ rsp->status = thread_pipeline_disable(t,
+ (struct thread_pipeline_disable_msg_req *) req);
+ printf("THREAD STATUS: %d\n", rsp->status);
+ thread_msg_send(t->msgq_out, rsp);
+ break;
+ }
+ default:
+ break;
+ }
+
+ return 0;
+}
+
+int
+app_thread(void *arg)
{
struct app_params *app = (struct app_params *) arg;
uint32_t core_id = rte_lcore_id(), i, j;
@@ -103,6 +234,8 @@ int app_thread(void *arg)
}
t->deadline = t_deadline;
+
+ thread_msg_req_handle(t);
}
}
new file mode 100644
@@ -0,0 +1,101 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2010-2015 Intel Corporation. All rights reserved.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef THREAD_H_
+#define THREAD_H_
+
+#include "app.h"
+#include "pipeline_be.h"
+
+enum thread_msg_req_type {
+ THREAD_MSG_REQ_PIPELINE_ENABLE = 0,
+ THREAD_MSG_REQ_PIPELINE_DISABLE,
+ THREAD_MSG_REQS
+};
+
+struct thread_msg_req {
+ enum thread_msg_req_type type;
+};
+
+struct thread_msg_rsp {
+ int status;
+};
+
+/*
+ * PIPELINE ENABLE
+ */
+struct thread_pipeline_enable_msg_req {
+ enum thread_msg_req_type type;
+
+ uint32_t pipeline_id;
+ void *be;
+ pipeline_be_op_run f_run;
+ pipeline_be_op_timer f_timer;
+ uint64_t timer_period;
+};
+
+struct thread_pipeline_enable_msg_rsp {
+ int status;
+};
+
+/*
+ * PIPELINE DISABLE
+ */
+struct thread_pipeline_disable_msg_req {
+ enum thread_msg_req_type type;
+
+ uint32_t pipeline_id;
+};
+
+struct thread_pipeline_disable_msg_rsp {
+ int status;
+};
+
+int
+app_pipeline_thread_cmd_push(struct app_params *app);
+
+int
+app_pipeline_enable(struct app_params *app,
+ uint32_t core_id,
+ uint32_t socket_id,
+ uint32_t hyper_th_id,
+ uint32_t pipeline_id);
+
+int
+app_pipeline_disable(struct app_params *app,
+ uint32_t core_id,
+ uint32_t socket_id,
+ uint32_t hyper_th_id,
+ uint32_t pipeline_id);
+
+#endif /* THREAD_H_ */
new file mode 100644
@@ -0,0 +1,328 @@
+#include <rte_common.h>
+#include <rte_ring.h>
+#include <rte_malloc.h>
+#include <cmdline_rdline.h>
+#include <cmdline_parse.h>
+#include <cmdline_parse_num.h>
+#include <cmdline_parse_string.h>
+#include <cmdline_parse_ipaddr.h>
+#include <cmdline_parse_etheraddr.h>
+#include <cmdline_socket.h>
+#include <cmdline.h>
+
+#include "thread.h"
+#include "pipeline.h"
+#include "pipeline_common_fe.h"
+#include "app.h"
+
+static inline void *
+thread_msg_send_recv(struct app_params *app,
+ uint32_t thread_id,
+ void *msg,
+ uint32_t timeout_ms)
+{
+ struct rte_ring *r_req = app->thread_data[thread_id].msgq_in;
+ struct rte_ring *r_rsp = app->thread_data[thread_id].msgq_out;
+ uint64_t hz = rte_get_tsc_hz();
+ void *msg_recv;
+ uint64_t deadline;
+ int status;
+
+ /* send */
+ do {
+ status = rte_ring_sp_enqueue(r_req, (void *) msg);
+ } while (status == -ENOBUFS);
+
+ /* recv */
+ deadline = (timeout_ms) ?
+ (rte_rdtsc() + ((hz * timeout_ms) / 1000)) :
+ UINT64_MAX;
+
+ do {
+ if (rte_rdtsc() > deadline)
+ return NULL;
+
+ status = rte_ring_sc_dequeue(r_rsp, &msg_recv);
+ } while (status != 0);
+
+ return msg_recv;
+}
+
+int
+app_pipeline_enable(struct app_params *app,
+ uint32_t socket_id,
+ uint32_t core_id,
+ uint32_t hyper_th_id,
+ uint32_t pipeline_id)
+{
+ struct thread_pipeline_enable_msg_req *req;
+ struct thread_pipeline_enable_msg_rsp *rsp;
+
+ int thread_id;
+
+ struct app_pipeline_data *p;
+ struct app_pipeline_params *p_params;
+ struct pipeline_type *p_type;
+
+ if (app_pipeline_data(app, pipeline_id) == NULL)
+ return -1;
+
+ thread_id = cpu_core_map_get_lcore_id(app->core_map,
+ socket_id,
+ core_id,
+ hyper_th_id);
+
+ if (thread_id < 0)
+ return -1;
+
+ req = app_msg_alloc(app);
+ if (req == NULL)
+ return -1;
+
+ p = &app->pipeline_data[pipeline_id];
+ p_params = &app->pipeline_params[pipeline_id];
+ p_type = app_pipeline_type_find(app, p_params->type);
+
+ if (p->enabled == 1)
+ return -1;
+
+ req->type = THREAD_MSG_REQ_PIPELINE_ENABLE;
+ req->pipeline_id = pipeline_id;
+ req->be = p->be;
+ req->f_run = p_type->be_ops->f_run;
+ req->f_timer = p_type->be_ops->f_timer;
+ req->timer_period = p->timer_period;
+
+ rsp = thread_msg_send_recv(app, thread_id, req, 5 * MSG_TIMEOUT_DEFAULT);
+ if (rsp == NULL)
+ return -1;
+
+ if (rsp->status < 0)
+ return -1;
+
+ p->enabled = 1;
+ return 0;
+}
+
+
+int
+app_pipeline_disable(struct app_params *app,
+ uint32_t socket_id,
+ uint32_t core_id,
+ uint32_t hyper_th_id,
+ uint32_t pipeline_id)
+{
+ struct thread_pipeline_disable_msg_req *req;
+ struct thread_pipeline_disable_msg_rsp *rsp;
+
+ int thread_id;
+
+ struct app_pipeline_data *p;
+
+ if (app_pipeline_data(app, pipeline_id) == NULL)
+ return -1;
+
+ thread_id = cpu_core_map_get_lcore_id(app->core_map,
+ socket_id,
+ core_id,
+ hyper_th_id);
+
+ if (thread_id < 0)
+ return -1;
+
+ p = &app->pipeline_data[pipeline_id];
+
+ if (p->enabled == 0)
+ return -1;
+
+ req = app_msg_alloc(app);
+ if (req == NULL)
+ return -1;
+
+ req->type = THREAD_MSG_REQ_PIPELINE_DISABLE;
+ req->pipeline_id = pipeline_id;
+
+ printf("%p\n", req);
+ printf("%d %d\n", req->type, req->pipeline_id);
+
+ rsp = thread_msg_send_recv(app, thread_id, req, 5 * MSG_TIMEOUT_DEFAULT);
+
+ printf("RSP: %p\n", rsp);
+ if (rsp == NULL)
+ return -1;
+
+ printf("%d\n", rsp->status);
+ if (rsp->status < 0)
+ return -1;
+
+ p->enabled = 0;
+ return 0;
+}
+
+struct cmd_pipeline_enable_result {
+ cmdline_fixed_string_t t_string;
+ cmdline_fixed_string_t t_id;
+ cmdline_fixed_string_t pipeline_string;
+ uint32_t pipeline_id;
+ cmdline_fixed_string_t enable_string;
+};
+
+static void
+cmd_pipeline_enable_parsed(
+ void *parsed_result,
+ __attribute__((unused)) struct cmdline *cl,
+ void *data)
+{
+ struct cmd_pipeline_enable_result *params = parsed_result;
+ struct app_params *app = data;
+
+ int status;
+
+ uint32_t core_id, socket_id, hyper_th_id;
+
+ if (parse_pipeline_core(&socket_id,
+ &core_id,
+ &hyper_th_id,
+ params->t_id) < 0) {
+ printf("Command failed\n");
+ return;
+ }
+
+ status = app_pipeline_enable(app,
+ socket_id,
+ core_id,
+ hyper_th_id,
+ params->pipeline_id);
+
+ if (status != 0)
+ printf("Command failed\n");
+}
+
+cmdline_parse_token_string_t cmd_pipeline_enable_t_string =
+ TOKEN_STRING_INITIALIZER(struct cmd_pipeline_enable_result, t_string, "t");
+
+cmdline_parse_token_string_t cmd_pipeline_enable_t_id =
+ TOKEN_STRING_INITIALIZER(struct cmd_pipeline_enable_result, t_id, NULL);
+
+cmdline_parse_token_string_t cmd_pipeline_enable_pipeline_string =
+ TOKEN_STRING_INITIALIZER(struct cmd_pipeline_enable_result, pipeline_string,
+ "pipeline");
+
+cmdline_parse_token_num_t cmd_pipeline_enable_pipeline_id =
+ TOKEN_NUM_INITIALIZER(struct cmd_pipeline_enable_result, pipeline_id, UINT32);
+
+cmdline_parse_token_string_t cmd_pipeline_enable_enable_string =
+ TOKEN_STRING_INITIALIZER(struct cmd_pipeline_enable_result, enable_string, "enable");
+
+cmdline_parse_inst_t cmd_pipeline_enable = {
+ .f = cmd_pipeline_enable_parsed,
+ .data = NULL,
+ .help_str = "Enable pipeline on specified core",
+ .tokens = {
+ (void *)&cmd_pipeline_enable_t_string,
+ (void *)&cmd_pipeline_enable_t_id,
+ (void *)&cmd_pipeline_enable_pipeline_string,
+ (void *)&cmd_pipeline_enable_pipeline_id,
+ (void *)&cmd_pipeline_enable_enable_string,
+ NULL,
+ },
+};
+
+struct cmd_pipeline_disable_result {
+ cmdline_fixed_string_t t_string;
+ cmdline_fixed_string_t t_id;
+ cmdline_fixed_string_t pipeline_string;
+ uint32_t pipeline_id;
+ cmdline_fixed_string_t disable_string;
+};
+
+static void
+cmd_pipeline_disable_parsed(
+ void *parsed_result,
+ __attribute__((unused)) struct cmdline *cl,
+ void *data)
+{
+ struct cmd_pipeline_enable_result *params = parsed_result;
+ struct app_params *app = data;
+
+ int status;
+
+ uint32_t core_id, socket_id, hyper_th_id;
+
+ if (parse_pipeline_core(&socket_id,
+ &core_id,
+ &hyper_th_id,
+ params->t_id) < 0) {
+ printf("Command failed\n");
+ return;
+ }
+
+ status = app_pipeline_disable(app,
+ socket_id,
+ core_id,
+ hyper_th_id,
+ params->pipeline_id);
+
+ if (status != 0)
+ printf("Command failed\n");
+}
+
+cmdline_parse_token_string_t cmd_pipeline_disable_t_string =
+ TOKEN_STRING_INITIALIZER(struct cmd_pipeline_disable_result, t_string, "t");
+
+cmdline_parse_token_string_t cmd_pipeline_disable_t_id =
+ TOKEN_STRING_INITIALIZER(struct cmd_pipeline_disable_result, t_id, NULL);
+
+cmdline_parse_token_string_t cmd_pipeline_disable_pipeline_string =
+ TOKEN_STRING_INITIALIZER(struct cmd_pipeline_disable_result, pipeline_string,
+ "pipeline");
+
+cmdline_parse_token_num_t cmd_pipeline_disable_pipeline_id =
+ TOKEN_NUM_INITIALIZER(struct cmd_pipeline_disable_result, pipeline_id, UINT32);
+
+cmdline_parse_token_string_t cmd_pipeline_disable_enable_string =
+ TOKEN_STRING_INITIALIZER(struct cmd_pipeline_disable_result, disable_string, "disable");
+
+cmdline_parse_inst_t cmd_pipeline_disable = {
+ .f = cmd_pipeline_disable_parsed,
+ .data = NULL,
+ .help_str = "Disable pipeline on specified core",
+ .tokens = {
+ (void *)&cmd_pipeline_disable_t_string,
+ (void *)&cmd_pipeline_disable_t_id,
+ (void *)&cmd_pipeline_disable_pipeline_string,
+ (void *)&cmd_pipeline_disable_pipeline_id,
+ (void *)&cmd_pipeline_disable_enable_string,
+ NULL,
+ },
+};
+
+static cmdline_parse_ctx_t thread_cmds[] = {
+ (cmdline_parse_inst_t *) &cmd_pipeline_enable,
+ (cmdline_parse_inst_t *) &cmd_pipeline_disable,
+ NULL,
+};
+
+int
+app_pipeline_thread_cmd_push(struct app_params *app)
+{
+ uint32_t n_cmds, i;
+
+ /* Check for available slots in the application commands array */
+ n_cmds = RTE_DIM(thread_cmds) - 1;
+ if (n_cmds > APP_MAX_CMDS - app->n_cmds)
+ return -ENOMEM;
+
+ /* Push pipeline commands into the application */
+ memcpy(&app->cmds[app->n_cmds],
+ thread_cmds,
+ n_cmds * sizeof(cmdline_parse_ctx_t *));
+
+ for (i = 0; i < n_cmds; i++)
+ app->cmds[app->n_cmds + i]->data = app;
+
+ app->n_cmds += n_cmds;
+ app->cmds[app->n_cmds] = NULL;
+
+ return 0;
+}