@@ -3168,119 +3168,86 @@ cmd_ipsec_sa_delete(char **tokens,
rte_swx_ipsec_sa_delete(ipsec, sa_id);
}
-static const char cmd_thread_pipeline_enable_help[] =
-"thread <thread_id> pipeline <pipeline_name> enable [ period <timer_period_ms> ]\n";
-
-#ifndef TIMER_PERIOD_MS_DEFAULT
-#define TIMER_PERIOD_MS_DEFAULT 10
-#endif
+static const char cmd_pipeline_enable_help[] =
+"pipeline <pipeline_name> enable thread <thread_id>\n";
static void
-cmd_thread_pipeline_enable(char **tokens,
- uint32_t n_tokens,
- char *out,
- size_t out_size,
- void *obj __rte_unused)
+cmd_pipeline_enable(char **tokens,
+ uint32_t n_tokens,
+ char *out,
+ size_t out_size,
+ void *obj __rte_unused)
{
char *pipeline_name;
struct rte_swx_pipeline *p;
- uint32_t thread_id, timer_period_ms = TIMER_PERIOD_MS_DEFAULT;
+ uint32_t thread_id;
int status;
- if ((n_tokens != 5) && (n_tokens != 7)) {
+ if (n_tokens != 5) {
snprintf(out, out_size, MSG_ARG_MISMATCH, tokens[0]);
return;
}
- if (parser_read_uint32(&thread_id, tokens[1]) != 0) {
- snprintf(out, out_size, MSG_ARG_INVALID, "thread_id");
- return;
- }
-
- if (strcmp(tokens[2], "pipeline") != 0) {
- snprintf(out, out_size, MSG_ARG_NOT_FOUND, "pipeline");
- return;
- }
-
- pipeline_name = tokens[3];
+ pipeline_name = tokens[1];
p = rte_swx_pipeline_find(pipeline_name);
if (!p) {
snprintf(out, out_size, MSG_ARG_INVALID, "pipeline_name");
return;
}
- if (strcmp(tokens[4], "enable") != 0) {
+ if (strcmp(tokens[2], "enable") != 0) {
snprintf(out, out_size, MSG_ARG_NOT_FOUND, "enable");
return;
}
- if (n_tokens == 7) {
- if (strcmp(tokens[5], "period") != 0) {
- snprintf(out, out_size, MSG_ARG_NOT_FOUND, "period");
- return;
- }
+ if (strcmp(tokens[3], "thread") != 0) {
+ snprintf(out, out_size, MSG_ARG_NOT_FOUND, "thread");
+ return;
+ }
- if (parser_read_uint32(&timer_period_ms, tokens[6]) != 0) {
- snprintf(out, out_size, MSG_ARG_INVALID, "timer_period_ms");
- return;
- }
+ if (parser_read_uint32(&thread_id, tokens[4]) != 0) {
+ snprintf(out, out_size, MSG_ARG_INVALID, "thread_id");
+ return;
}
- status = thread_pipeline_enable(thread_id, p, timer_period_ms);
+ status = pipeline_enable(p, thread_id);
if (status) {
- snprintf(out, out_size, MSG_CMD_FAIL, "thread pipeline enable");
+ snprintf(out, out_size, MSG_CMD_FAIL, "pipeline enable");
return;
}
}
-static const char cmd_thread_pipeline_disable_help[] =
-"thread <thread_id> pipeline <pipeline_name> disable\n";
+static const char cmd_pipeline_disable_help[] =
+"pipeline <pipeline_name> disable\n";
static void
-cmd_thread_pipeline_disable(char **tokens,
- uint32_t n_tokens,
- char *out,
- size_t out_size,
- void *obj __rte_unused)
+cmd_pipeline_disable(char **tokens,
+ uint32_t n_tokens,
+ char *out,
+ size_t out_size,
+ void *obj __rte_unused)
{
struct rte_swx_pipeline *p;
char *pipeline_name;
- uint32_t thread_id;
- int status;
- if (n_tokens != 5) {
+ if (n_tokens != 3) {
snprintf(out, out_size, MSG_ARG_MISMATCH, tokens[0]);
return;
}
- if (parser_read_uint32(&thread_id, tokens[1]) != 0) {
- snprintf(out, out_size, MSG_ARG_INVALID, "thread_id");
- return;
- }
-
- if (strcmp(tokens[2], "pipeline") != 0) {
- snprintf(out, out_size, MSG_ARG_NOT_FOUND, "pipeline");
- return;
- }
-
- pipeline_name = tokens[3];
+ pipeline_name = tokens[1];
p = rte_swx_pipeline_find(pipeline_name);
if (!p) {
snprintf(out, out_size, MSG_ARG_INVALID, "pipeline_name");
return;
}
- if (strcmp(tokens[4], "disable") != 0) {
+ if (strcmp(tokens[2], "disable") != 0) {
snprintf(out, out_size, MSG_ARG_NOT_FOUND, "disable");
return;
}
- status = thread_pipeline_disable(thread_id, p);
- if (status) {
- snprintf(out, out_size, MSG_CMD_FAIL,
- "thread pipeline disable");
- return;
- }
+ pipeline_disable(p);
}
static void
@@ -3326,11 +3293,12 @@ cmd_help(char **tokens,
"\tpipeline meter stats\n"
"\tpipeline stats\n"
"\tpipeline mirror session\n"
+ "\tpipeline enable\n"
+ "\tpipeline disable\n\n"
"\tipsec create\n"
"\tipsec sa add\n"
"\tipsec sa delete\n"
- "\tthread pipeline enable\n"
- "\tthread pipeline disable\n\n");
+ );
return;
}
@@ -3553,6 +3521,18 @@ cmd_help(char **tokens,
return;
}
+ if (!strcmp(tokens[0], "pipeline") &&
+ (n_tokens == 2) && !strcmp(tokens[1], "enable")) {
+ snprintf(out, out_size, "\n%s\n", cmd_pipeline_enable_help);
+ return;
+ }
+
+ if (!strcmp(tokens[0], "pipeline") &&
+ (n_tokens == 2) && !strcmp(tokens[1], "disable")) {
+ snprintf(out, out_size, "\n%s\n", cmd_pipeline_disable_help);
+ return;
+ }
+
if (!strcmp(tokens[0], "ipsec") &&
(n_tokens == 2) && !strcmp(tokens[1], "create")) {
snprintf(out, out_size, "\n%s\n", cmd_ipsec_create_help);
@@ -3573,22 +3553,6 @@ cmd_help(char **tokens,
return;
}
- if ((n_tokens == 3) &&
- (strcmp(tokens[0], "thread") == 0) &&
- (strcmp(tokens[1], "pipeline") == 0)) {
- if (strcmp(tokens[2], "enable") == 0) {
- snprintf(out, out_size, "\n%s\n",
- cmd_thread_pipeline_enable_help);
- return;
- }
-
- if (strcmp(tokens[2], "disable") == 0) {
- snprintf(out, out_size, "\n%s\n",
- cmd_thread_pipeline_disable_help);
- return;
- }
- }
-
snprintf(out, out_size, "Invalid command\n");
}
@@ -3819,6 +3783,16 @@ cli_process(char *in, char *out, size_t out_size, void *obj)
cmd_pipeline_mirror_session(tokens, n_tokens, out, out_size, obj);
return;
}
+
+ if (n_tokens >= 3 && !strcmp(tokens[2], "enable")) {
+ cmd_pipeline_enable(tokens, n_tokens, out, out_size, obj);
+ return;
+ }
+
+ if (n_tokens >= 3 && !strcmp(tokens[2], "disable")) {
+ cmd_pipeline_disable(tokens, n_tokens, out, out_size, obj);
+ return;
+ }
}
if (!strcmp(tokens[0], "ipsec")) {
@@ -3838,22 +3812,6 @@ cli_process(char *in, char *out, size_t out_size, void *obj)
}
}
- if (strcmp(tokens[0], "thread") == 0) {
- if ((n_tokens >= 5) &&
- (strcmp(tokens[4], "enable") == 0)) {
- cmd_thread_pipeline_enable(tokens, n_tokens,
- out, out_size, obj);
- return;
- }
-
- if ((n_tokens >= 5) &&
- (strcmp(tokens[4], "disable") == 0)) {
- cmd_thread_pipeline_disable(tokens, n_tokens,
- out, out_size, obj);
- return;
- }
- }
-
snprintf(out, out_size, MSG_CMD_UNKNOWN, tokens[0]);
}
@@ -54,4 +54,4 @@ pipeline PIPELINE0 commit
;
; Pipelines-to-threads mapping.
;
-thread 1 pipeline PIPELINE0 enable
+pipeline PIPELINE0 enable thread 1
@@ -32,4 +32,4 @@ pipeline PIPELINE0 build lib /tmp/hash_func.so io ./examples/pipeline/examples/e
;
; Pipelines-to-threads mapping.
;
-thread 1 pipeline PIPELINE0 enable
+pipeline PIPELINE0 enable thread 1
@@ -32,4 +32,4 @@ pipeline PIPELINE0 build lib /tmp/l2fwd.so io ./examples/pipeline/examples/ethde
;
; Pipelines-to-threads mapping.
;
-thread 1 pipeline PIPELINE0 enable
+pipeline PIPELINE0 enable thread 1
@@ -32,4 +32,4 @@ pipeline PIPELINE0 build lib /tmp/l2fwd_macswp.so io ./examples/pipeline/example
;
; Pipelines-to-threads mapping.
;
-thread 1 pipeline PIPELINE0 enable
+pipeline PIPELINE0 enable thread 1
@@ -28,4 +28,4 @@ pipeline PIPELINE0 build lib /tmp/l2fwd_macswp.so io ./examples/pipeline/example
;
; Pipelines-to-threads mapping.
;
-thread 1 pipeline PIPELINE0 enable
+pipeline PIPELINE0 enable thread 1
@@ -28,4 +28,4 @@ pipeline PIPELINE0 build lib /tmp/l2fwd.so io ./examples/pipeline/examples/pcap.
;
; Pipelines-to-threads mapping.
;
-thread 1 pipeline PIPELINE0 enable
+pipeline PIPELINE0 enable thread 1
@@ -32,4 +32,4 @@ pipeline PIPELINE0 build lib /tmp/learner.so io ./examples/pipeline/examples/eth
;
; Pipelines-to-threads mapping.
;
-thread 1 pipeline PIPELINE0 enable
+pipeline PIPELINE0 enable thread 1
@@ -40,4 +40,4 @@ pipeline PIPELINE0 meter meters set profile platinum index from 0 to 15
;
; Pipelines-to-threads mapping.
;
-thread 1 pipeline PIPELINE0 enable
+pipeline PIPELINE0 enable thread 1
@@ -42,4 +42,4 @@ pipeline PIPELINE0 mirror session 3 port 0 clone slow truncate 128
;
; Pipelines-to-threads mapping.
;
-thread 1 pipeline PIPELINE0 enable
+pipeline PIPELINE0 enable thread 1
@@ -32,4 +32,4 @@ pipeline PIPELINE0 build lib /tmp/recirculation.so io ./examples/pipeline/exampl
;
; Pipelines-to-threads mapping.
;
-thread 1 pipeline PIPELINE0 enable
+pipeline PIPELINE0 enable thread 1
@@ -32,4 +32,4 @@ pipeline PIPELINE0 build lib /tmp/registers.so io ./examples/pipeline/examples/e
;
; Pipelines-to-threads mapping.
;
-thread 1 pipeline PIPELINE0 enable
+pipeline PIPELINE0 enable thread 1
@@ -42,4 +42,4 @@ pipeline PIPELINE0 selector s show
;
; Pipelines-to-threads mapping.
;
-thread 1 pipeline PIPELINE0 enable
+pipeline PIPELINE0 enable thread 1
@@ -32,4 +32,4 @@ pipeline PIPELINE0 build lib /tmp/varbit.so io ./examples/pipeline/examples/ethd
;
; Pipelines-to-threads mapping.
;
-thread 1 pipeline PIPELINE0 enable
+pipeline PIPELINE0 enable thread 1
@@ -40,4 +40,4 @@ pipeline PIPELINE0 commit
;
; Pipelines-to-threads mapping.
;
-thread 1 pipeline PIPELINE0 enable
+pipeline PIPELINE0 enable thread 1
@@ -36,4 +36,4 @@ pipeline PIPELINE0 commit
;
; Pipelines-to-threads mapping.
;
-thread 1 pipeline PIPELINE0 enable
+pipeline PIPELINE0 enable thread 1
@@ -3,17 +3,11 @@
*/
#include <stdlib.h>
+#include <errno.h>
+#include <rte_atomic.h>
#include <rte_common.h>
-#include <rte_cycles.h>
#include <rte_lcore.h>
-#include <rte_ring.h>
-
-#include <rte_table_acl.h>
-#include <rte_table_array.h>
-#include <rte_table_hash.h>
-#include <rte_table_lpm.h>
-#include <rte_table_lpm_ipv6.h>
#include "obj.h"
#include "thread.h"
@@ -22,14 +16,6 @@
#define THREAD_PIPELINES_MAX 256
#endif
-#ifndef THREAD_MSGQ_SIZE
-#define THREAD_MSGQ_SIZE 64
-#endif
-
-#ifndef THREAD_TIMER_PERIOD_MS
-#define THREAD_TIMER_PERIOD_MS 100
-#endif
-
/* Pipeline instruction quanta: Needs to be big enough to do some meaningful
* work, but not too big to avoid starving any other pipelines mapped to the
* same thread. For a pipeline that executes 10 instructions per packet, a
@@ -40,509 +26,209 @@
#endif
/**
- * Control thread: data plane thread context
+ * In this design, there is a single control plane (CP) thread and one or multiple data plane (DP)
+ * threads. Each DP thread can run up to THREAD_PIPELINES_MAX pipelines and up to THREAD_BLOCKS_MAX
+ * blocks.
+ *
+ * The pipelines and blocks are single threaded, meaning that a given pipeline/block can be run by a
+ * single thread at any given time, so the same pipeline/block cannot show up in the list of
+ * pipelines/blocks of more than one thread at any specific moment.
+ *
+ * Each DP thread has its own context (struct thread instance), which it shares with the CP thread:
+ * - Read-write by the CP thread;
+ * - Read-only by the DP thread.
*/
struct thread {
- struct rte_ring *msgq_req;
- struct rte_ring *msgq_rsp;
-
- uint32_t enabled;
-};
-
-static struct thread thread[RTE_MAX_LCORE];
-
-/**
- * Data plane threads: context
- */
-struct pipeline_data {
- struct rte_swx_pipeline *p;
- uint64_t timer_period; /* Measured in CPU cycles. */
- uint64_t time_next;
-};
-
-struct thread_data {
- struct rte_swx_pipeline *p[THREAD_PIPELINES_MAX];
- uint32_t n_pipelines;
-
- struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX];
- struct rte_ring *msgq_req;
- struct rte_ring *msgq_rsp;
- uint64_t timer_period; /* Measured in CPU cycles. */
- uint64_t time_next;
- uint64_t time_next_min;
+ struct rte_swx_pipeline *pipelines[THREAD_PIPELINES_MAX];
+ volatile uint64_t n_pipelines;
+ int enabled;
} __rte_cache_aligned;
-static struct thread_data thread_data[RTE_MAX_LCORE];
+static struct thread threads[RTE_MAX_LCORE];
/**
- * Control thread: data plane thread init
+ * Control plane (CP) thread.
*/
-static void
-thread_free(void)
-{
- uint32_t i;
-
- for (i = 0; i < RTE_MAX_LCORE; i++) {
- struct thread *t = &thread[i];
-
- if (!rte_lcore_is_enabled(i))
- continue;
-
- /* MSGQs */
- rte_ring_free(t->msgq_req);
-
- rte_ring_free(t->msgq_rsp);
- }
-}
-
int
thread_init(void)
{
- uint32_t i;
-
- RTE_LCORE_FOREACH_WORKER(i) {
- char name[NAME_MAX];
- struct rte_ring *msgq_req, *msgq_rsp;
- struct thread *t = &thread[i];
- struct thread_data *t_data = &thread_data[i];
- uint32_t cpu_id = rte_lcore_to_socket_id(i);
-
- /* MSGQs */
- snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i);
-
- msgq_req = rte_ring_create(name,
- THREAD_MSGQ_SIZE,
- cpu_id,
- RING_F_SP_ENQ | RING_F_SC_DEQ);
-
- if (msgq_req == NULL) {
- thread_free();
- return -1;
- }
+ uint32_t thread_id;
- snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i);
+ RTE_LCORE_FOREACH_WORKER(thread_id) {
+ struct thread *t = &threads[thread_id];
- msgq_rsp = rte_ring_create(name,
- THREAD_MSGQ_SIZE,
- cpu_id,
- RING_F_SP_ENQ | RING_F_SC_DEQ);
-
- if (msgq_rsp == NULL) {
- thread_free();
- return -1;
- }
-
- /* Control thread records */
- t->msgq_req = msgq_req;
- t->msgq_rsp = msgq_rsp;
t->enabled = 1;
-
- /* Data plane thread records */
- t_data->n_pipelines = 0;
- t_data->msgq_req = msgq_req;
- t_data->msgq_rsp = msgq_rsp;
- t_data->timer_period =
- (rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000;
- t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period;
- t_data->time_next_min = t_data->time_next;
}
return 0;
}
-static inline int
-thread_is_running(uint32_t thread_id)
+static uint32_t
+pipeline_find(struct rte_swx_pipeline *p)
{
- enum rte_lcore_state_t thread_state;
+ uint32_t thread_id;
- thread_state = rte_eal_get_lcore_state(thread_id);
- return (thread_state == RUNNING) ? 1 : 0;
-}
-
-/**
- * Control thread & data plane threads: message passing
- */
-enum thread_req_type {
- THREAD_REQ_PIPELINE_ENABLE = 0,
- THREAD_REQ_PIPELINE_DISABLE,
- THREAD_REQ_MAX
-};
-
-struct thread_msg_req {
- enum thread_req_type type;
-
- union {
- struct {
- struct rte_swx_pipeline *p;
- uint32_t timer_period_ms;
- } pipeline_enable;
-
- struct {
- struct rte_swx_pipeline *p;
- } pipeline_disable;
- };
-};
-
-struct thread_msg_rsp {
- int status;
-};
-
-/**
- * Control thread
- */
-static struct thread_msg_req *
-thread_msg_alloc(void)
-{
- size_t size = RTE_MAX(sizeof(struct thread_msg_req),
- sizeof(struct thread_msg_rsp));
-
- return calloc(1, size);
-}
-
-static void
-thread_msg_free(struct thread_msg_rsp *rsp)
-{
- free(rsp);
-}
-
-static struct thread_msg_rsp *
-thread_msg_send_recv(uint32_t thread_id,
- struct thread_msg_req *req)
-{
- struct thread *t = &thread[thread_id];
- struct rte_ring *msgq_req = t->msgq_req;
- struct rte_ring *msgq_rsp = t->msgq_rsp;
- struct thread_msg_rsp *rsp;
- int status;
-
- /* send */
- do {
- status = rte_ring_sp_enqueue(msgq_req, req);
- } while (status == -ENOBUFS);
-
- /* recv */
- do {
- status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
- } while (status != 0);
-
- return rsp;
-}
-
-static int
-thread_is_pipeline_enabled(uint32_t thread_id, struct rte_swx_pipeline *p)
-{
- struct thread *t = &thread[thread_id];
- struct thread_data *td = &thread_data[thread_id];
- uint32_t i;
-
- if (!t->enabled)
- return 0; /* Pipeline NOT enabled on this thread. */
-
- for (i = 0; i < td->n_pipelines; i++)
- if (td->p[i] == p)
- return 1; /* Pipeline enabled on this thread. */
-
- return 0 /* Pipeline NOT enabled on this thread. */;
-}
-
-int
-thread_pipeline_enable(uint32_t thread_id, struct rte_swx_pipeline *p, uint32_t timer_period_ms)
-{
- struct thread *t;
- struct thread_msg_req *req;
- struct thread_msg_rsp *rsp;
- int status;
-
- /* Check input params */
- if ((thread_id >= RTE_MAX_LCORE) || !p || !timer_period_ms)
- return -1;
-
- t = &thread[thread_id];
- if (t->enabled == 0)
- return -1;
-
- if (!thread_is_running(thread_id)) {
- struct thread_data *td = &thread_data[thread_id];
- struct pipeline_data *tdp = &td->pipeline_data[td->n_pipelines];
-
- if (td->n_pipelines >= THREAD_PIPELINES_MAX)
- return -1;
-
- /* Data plane thread */
- td->p[td->n_pipelines] = p;
-
- tdp->p = p;
- tdp->timer_period = (rte_get_tsc_hz() * timer_period_ms) / 1000;
- tdp->time_next = rte_get_tsc_cycles() + tdp->timer_period;
+ for (thread_id = 0; thread_id < RTE_MAX_LCORE; thread_id++) {
+ struct thread *t = &threads[thread_id];
+ uint32_t i;
- td->n_pipelines++;
+ if (!t->enabled)
+ continue;
- return 0;
+ for (i = 0; i < t->n_pipelines; i++)
+ if (t->pipelines[i] == p)
+ break;
}
- /* Allocate request */
- req = thread_msg_alloc();
- if (req == NULL)
- return -1;
-
- /* Write request */
- req->type = THREAD_REQ_PIPELINE_ENABLE;
- req->pipeline_enable.p = p;
- req->pipeline_enable.timer_period_ms = timer_period_ms;
-
- /* Send request and wait for response */
- rsp = thread_msg_send_recv(thread_id, req);
-
- /* Read response */
- status = rsp->status;
-
- /* Free response */
- thread_msg_free(rsp);
-
- /* Request completion */
- if (status)
- return status;
-
- return 0;
+ return thread_id;
}
+/**
+ * Enable a given pipeline to run on a specific DP thread.
+ *
+ * CP thread:
+ * - Adds a new pipeline to the end of the DP thread pipeline list (t->pipelines[]);
+ * - Increments the DP thread number of pipelines (t->n_pipelines). It is important to make sure
+ * that t->pipelines[] update is completed BEFORE the t->n_pipelines update, hence the memory
+ * write barrier used below.
+ *
+ * DP thread:
+ * - Reads t->n_pipelines before starting every new iteration through t->pipelines[]. It detects
+ * the new pipeline when it sees the updated t->n_pipelines value;
+ * - If somehow the above condition is not met, so t->n_pipelines update is incorrectly taking
+ * place before the t->pipelines[] update is completed, then the DP thread will use an incorrect
+ * handle for the new pipeline, which can result in memory corruption or segmentation fault.
+ */
int
-thread_pipeline_disable(uint32_t thread_id, struct rte_swx_pipeline *p)
+pipeline_enable(struct rte_swx_pipeline *p, uint32_t thread_id)
{
struct thread *t;
- struct thread_msg_req *req;
- struct thread_msg_rsp *rsp;
- int status;
+ uint64_t n_pipelines;
/* Check input params */
- if ((thread_id >= RTE_MAX_LCORE) || !p)
- return -1;
-
- t = &thread[thread_id];
- if (t->enabled == 0)
- return -1;
-
- if (!thread_is_pipeline_enabled(thread_id, p))
- return 0;
+ if (!p || thread_id >= RTE_MAX_LCORE)
+ return -EINVAL;
- if (!thread_is_running(thread_id)) {
- struct thread_data *td = &thread_data[thread_id];
- uint32_t i;
-
- for (i = 0; i < td->n_pipelines; i++) {
- struct pipeline_data *tdp = &td->pipeline_data[i];
-
- if (tdp->p != p)
- continue;
-
- /* Data plane thread */
- if (i < td->n_pipelines - 1) {
- struct rte_swx_pipeline *pipeline_last =
- td->p[td->n_pipelines - 1];
- struct pipeline_data *tdp_last =
- &td->pipeline_data[td->n_pipelines - 1];
-
- td->p[i] = pipeline_last;
- memcpy(tdp, tdp_last, sizeof(*tdp));
- }
-
- td->n_pipelines--;
-
- break;
- }
+ if (pipeline_find(p) < RTE_MAX_LCORE)
+ return -EEXIST;
- return 0;
- }
-
- /* Allocate request */
- req = thread_msg_alloc();
- if (req == NULL)
- return -1;
-
- /* Write request */
- req->type = THREAD_REQ_PIPELINE_DISABLE;
- req->pipeline_disable.p = p;
-
- /* Send request and wait for response */
- rsp = thread_msg_send_recv(thread_id, req);
+ t = &threads[thread_id];
+ if (!t->enabled)
+ return -EINVAL;
- /* Read response */
- status = rsp->status;
+ n_pipelines = t->n_pipelines;
- /* Free response */
- thread_msg_free(rsp);
+ /* Check there is room for at least one more pipeline. */
+ if (n_pipelines >= THREAD_PIPELINES_MAX)
+ return -ENOSPC;
- /* Request completion */
- if (status)
- return status;
+ /* Install the new pipeline. */
+ t->pipelines[n_pipelines] = p;
+ rte_wmb();
+ t->n_pipelines = n_pipelines + 1;
return 0;
}
/**
- * Data plane threads: message handling
+ * Disable a given pipeline from running on any DP thread.
+ *
+ * CP thread:
+ * - Detects the thread that is running the given pipeline, if any;
+ * - Writes the last pipeline handle (pipeline_last = t->pipelines[t->n_pipelines - 1]) on the
+ * position of the pipeline to be disabled (t->pipelines[i] = pipeline_last) and decrements the
+ * number of pipelines running on the current thread (t->n_pipelines--). This approach makes sure
+ * that no holes with invalid locations are ever developed within the t->pipelines[] array.
+ * - If the memory barrier below is present, then t->n_pipelines update is guaranteed to take place
+ * after the t->pipelines[] update is completed. The possible DP thread behaviors are detailed
+ * below, which are all valid:
+ * - Not run the removed pipeline at all, run all the other pipelines (including pipeline_last)
+ * exactly one time during the current dispatch loop iteration. This takes place when the DP
+ * thread sees the final value of t->n_pipelines;
+ * - Not run the removed pipeline at all, run all the other pipelines, except pipeline_last,
+ * exactly one time and the pipeline_last exactly two times during the current dispatch loop
+ * iteration. This takes place when the DP thread sees the initial value of t->n_pipelines.
+ * - If the memory barrier below is not present, then the t->n_pipelines update may be reordered by
+ * the CPU, so that it takes place before the t->pipelines[] update. The possible DP thread
+ * behaviors are detailed below, which are all valid:
+ * - Not run the removed pipeline at all, run all the other pipelines (including pipeline_last)
+ * exactly one time during the current dispatch loop iteration. This takes place when the DP
+ * thread sees the final values of the t->pipeline[] array;
+ * - Run the removed pipeline one last time, run all the other pipelines exactly one time, with
+ * the exception of the pipeline_last, which is not run during the current dispatch loop
+ * iteration. This takes place when the DP thread sees the initial values of t->pipeline[].
+ *
+ * DP thread:
+ * - Reads t->n_pipelines before starting every new iteration through t->pipelines[].
*/
-static inline struct thread_msg_req *
-thread_msg_recv(struct rte_ring *msgq_req)
-{
- struct thread_msg_req *req;
-
- int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
-
- if (status != 0)
- return NULL;
-
- return req;
-}
-
-static inline void
-thread_msg_send(struct rte_ring *msgq_rsp,
- struct thread_msg_rsp *rsp)
-{
- int status;
-
- do {
- status = rte_ring_sp_enqueue(msgq_rsp, rsp);
- } while (status == -ENOBUFS);
-}
-
-static struct thread_msg_rsp *
-thread_msg_handle_pipeline_enable(struct thread_data *t,
- struct thread_msg_req *req)
+void
+pipeline_disable(struct rte_swx_pipeline *p)
{
- struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
- struct pipeline_data *p = &t->pipeline_data[t->n_pipelines];
-
- /* Request */
- if (t->n_pipelines >= THREAD_PIPELINES_MAX) {
- rsp->status = -1;
- return rsp;
- }
-
- t->p[t->n_pipelines] = req->pipeline_enable.p;
-
- p->p = req->pipeline_enable.p;
- p->timer_period = (rte_get_tsc_hz() *
- req->pipeline_enable.timer_period_ms) / 1000;
- p->time_next = rte_get_tsc_cycles() + p->timer_period;
+ struct thread *t;
+ uint64_t n_pipelines;
+ uint32_t thread_id, i;
- t->n_pipelines++;
+ /* Check input params */
+ if (!p)
+ return;
- /* Response */
- rsp->status = 0;
- return rsp;
-}
+ /* Find the thread that runs this pipeline. */
+ thread_id = pipeline_find(p);
+ if (thread_id == RTE_MAX_LCORE)
+ return;
-static struct thread_msg_rsp *
-thread_msg_handle_pipeline_disable(struct thread_data *t,
- struct thread_msg_req *req)
-{
- struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
- uint32_t n_pipelines = t->n_pipelines;
- struct rte_swx_pipeline *pipeline = req->pipeline_disable.p;
- uint32_t i;
+ t = &threads[thread_id];
+ n_pipelines = t->n_pipelines;
- /* find pipeline */
for (i = 0; i < n_pipelines; i++) {
- struct pipeline_data *p = &t->pipeline_data[i];
+ struct rte_swx_pipeline *pipeline = t->pipelines[i];
- if (p->p != pipeline)
+ if (pipeline != p)
continue;
if (i < n_pipelines - 1) {
- struct rte_swx_pipeline *pipeline_last =
- t->p[n_pipelines - 1];
- struct pipeline_data *p_last =
- &t->pipeline_data[n_pipelines - 1];
+ struct rte_swx_pipeline *pipeline_last = t->pipelines[n_pipelines - 1];
- t->p[i] = pipeline_last;
- memcpy(p, p_last, sizeof(*p));
+ t->pipelines[i] = pipeline_last;
}
- t->n_pipelines--;
+ rte_wmb();
+ t->n_pipelines = n_pipelines - 1;
- rsp->status = 0;
- return rsp;
+ return;
}
- /* should not get here */
- rsp->status = 0;
- return rsp;
-}
-
-static void
-thread_msg_handle(struct thread_data *t)
-{
- for ( ; ; ) {
- struct thread_msg_req *req;
- struct thread_msg_rsp *rsp;
-
- req = thread_msg_recv(t->msgq_req);
- if (req == NULL)
- break;
-
- switch (req->type) {
- case THREAD_REQ_PIPELINE_ENABLE:
- rsp = thread_msg_handle_pipeline_enable(t, req);
- break;
-
- case THREAD_REQ_PIPELINE_DISABLE:
- rsp = thread_msg_handle_pipeline_disable(t, req);
- break;
-
- default:
- rsp = (struct thread_msg_rsp *) req;
- rsp->status = -1;
- }
-
- thread_msg_send(t->msgq_rsp, rsp);
- }
+ return;
}
/**
- * Data plane threads: main
+ * Data plane (DP) threads.
+ *
+ * The t->n_pipelines variable is modified by the CP thread every time changes to the t->pipeline[]
+ * array are operated, so it is therefore very important that the latest value of t->n_pipelines is
+ * read by the DP thread at the beginning of every new dispatch loop iteration, otherwise a stale
+ * t->n_pipelines value may result in new pipelines not being detected, running pipelines that have
+ * been removed and are possibly no longer valid (e.g. when the pipeline_last is removed), running
+ * one pipeline (pipeline_last) twice as frequently than the rest of the pipelines (e.g. when a
+ * pipeline other than pipeline_last is removed), etc. This is the reason why t->n_pipelines is
+ * marked as volatile.
*/
int
thread_main(void *arg __rte_unused)
{
- struct thread_data *t;
- uint32_t thread_id, i;
+ struct thread *t;
+ uint32_t thread_id;
thread_id = rte_lcore_id();
- t = &thread_data[thread_id];
-
- /* Dispatch loop */
- for (i = 0; ; i++) {
- uint32_t j;
-
- /* Data Plane */
- for (j = 0; j < t->n_pipelines; j++)
- rte_swx_pipeline_run(t->p[j], PIPELINE_INSTR_QUANTA);
+ t = &threads[thread_id];
- /* Control Plane */
- if ((i & 0xF) == 0) {
- uint64_t time = rte_get_tsc_cycles();
- uint64_t time_next_min = UINT64_MAX;
-
- if (time < t->time_next_min)
- continue;
-
- /* Thread message queues */
- {
- uint64_t time_next = t->time_next;
-
- if (time_next <= time) {
- thread_msg_handle(t);
- time_next = time + t->timer_period;
- t->time_next = time_next;
- }
-
- if (time_next < time_next_min)
- time_next_min = time_next;
- }
+ /* Dispatch loop. */
+ for ( ; ; ) {
+ uint32_t i;
- t->time_next_min = time_next_min;
- }
+ /* Pipelines. */
+ for (i = 0; i < t->n_pipelines; i++)
+ rte_swx_pipeline_run(t->pipelines[i], PIPELINE_INSTR_QUANTA);
}
return 0;
@@ -9,18 +9,21 @@
#include <rte_swx_pipeline.h>
+/**
+ * Control plane (CP) thread.
+ */
int
-thread_pipeline_enable(uint32_t thread_id,
- struct rte_swx_pipeline *p,
- uint32_t timer_period_ms);
+thread_init(void);
int
-thread_pipeline_disable(uint32_t thread_id,
- struct rte_swx_pipeline *p);
+pipeline_enable(struct rte_swx_pipeline *p, uint32_t thread_id);
-int
-thread_init(void);
+void
+pipeline_disable(struct rte_swx_pipeline *p);
+/**
+ * Data plane (DP) threads.
+ */
int
thread_main(void *arg);