[dpdk-dev,2/3] eal: don't process IPC messages before init finished

Message ID c88d580626439c2814c0ed539ae35dc708aa2cbf.1519322682.git.anatoly.burakov@intel.com
State Superseded, archived
Headers show

Checks

Context Check Description
ci/Intel-compilation fail apply patch file failure
ci/checkpatch success coding style OK

Commit Message

Burakov, Anatoly Feb. 22, 2018, 6:21 p.m.
It is not possible for a primary process to receive any messages
while initializing, because RTE_MAGIC value is not set in the
shared config, and hence no secondary process can ever spin up
during that time.

However, it is possible for a secondary process to receive messages
from the primary during initialization. We can't just drop the
messages as they may be important, and also we might need to process
replies to our own requests (e.g. VFIO) during initialization.

Therefore, add a tailq for incoming messages, and queue them up
until initialization is complete, and process them in order they
arrived.

Signed-off-by: Anatoly Burakov <anatoly.burakov@intel.com>
---
 lib/librte_eal/common/eal_common_proc.c | 50 +++++++++++++++++++++++++++++----
 1 file changed, 45 insertions(+), 5 deletions(-)

Patch

diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
index 3a1088e..b4d00c3 100644
--- a/lib/librte_eal/common/eal_common_proc.c
+++ b/lib/librte_eal/common/eal_common_proc.c
@@ -25,6 +25,7 @@ 
 #include <rte_errno.h>
 #include <rte_lcore.h>
 #include <rte_log.h>
+#include <rte_tailq.h>
 
 #include "eal_private.h"
 #include "eal_filesystem.h"
@@ -58,6 +59,18 @@  struct mp_msg_internal {
 	struct rte_mp_msg msg;
 };
 
+struct message_queue_entry {
+	TAILQ_ENTRY(message_queue_entry) next;
+	struct mp_msg_internal msg;
+	struct sockaddr_un sa;
+};
+
+/** Double linked list of received messages. */
+TAILQ_HEAD(message_queue, message_queue_entry);
+
+static struct message_queue message_queue =
+	TAILQ_HEAD_INITIALIZER(message_queue);
+
 struct sync_request {
 	TAILQ_ENTRY(sync_request) next;
 	int reply_received;
@@ -276,12 +289,39 @@  process_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
 static void *
 mp_handle(void *arg __rte_unused)
 {
-	struct mp_msg_internal msg;
-	struct sockaddr_un sa;
-
+	struct message_queue_entry *cur_msg, *next_msg, *new_msg = NULL;
 	while (1) {
-		if (read_msg(&msg, &sa) == 0)
-			process_msg(&msg, &sa);
+		/* we want to process all messages in order of their arrival,
+		 * but status of init_complete may change while we're iterating
+		 * the tailq. so, store it here and check once every iteration.
+		 */
+		int init_complete = internal_config.init_complete;
+
+		if (new_msg == NULL)
+			new_msg = malloc(sizeof(*new_msg));
+		if (read_msg(&new_msg->msg, &new_msg->sa) == 0) {
+			/* we successfully read the message, so enqueue it */
+			TAILQ_INSERT_TAIL(&message_queue, new_msg, next);
+			new_msg = NULL;
+		} /* reuse new_msg for next message if we couldn't read_msg */
+
+		/* tailq only accessed here, so no locking needed */
+		TAILQ_FOREACH_SAFE(cur_msg, &message_queue, next, next_msg) {
+			/* secondary process should not process any incoming
+			 * requests until its initialization is complete, but
+			 * it is allowed to process replies to its own queries.
+			 */
+			if (rte_eal_process_type() == RTE_PROC_SECONDARY &&
+					!init_complete &&
+					cur_msg->msg.type != MP_REP)
+				continue;
+
+			TAILQ_REMOVE(&message_queue, cur_msg, next);
+
+			process_msg(&cur_msg->msg, &cur_msg->sa);
+
+			free(cur_msg);
+		}
 	}
 
 	return NULL;