[dpdk-dev,RFC,4/8] ipc: remove IPC thread for async request

Message ID 1524150216-3407-5-git-send-email-jianfeng.tan@intel.com (mailing list archive)
State Superseded, archived
Delegated to: Thomas Monjalon
Headers

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation fail apply patch file failure

Commit Message

Jianfeng Tan April 19, 2018, 3:03 p.m. UTC
  As discussed here, http://dpdk.org/dev/patchwork/patch/36579/,
we remove IPC threads, rte_mp_handle and rte_mp_handle_async.
This patch targets to remove thread rte_mp_handle_async.

Previously, to handle replies for an async request, rte_mp_handle
wakes up the rte_mp_handle_async thread to process through
pending_requests.async_cond. Now, we change to handle that in
rte_mp_handle context directly.

To handle timeout events, for each async request which is sent,
we set an alarm for it. If its reply is received before timeout,
we will cancel the alarm when we handle the reply; otherwise,
alarm will invoke the async_reply_handle() as the alarm callback.

Cc: Anatoly Burakov <anatoly.burakov@intel.com>

Suggested-by: Thomas Monjalon <thomas@monjalon.net>
Signed-off-by: Jianfeng Tan <jianfeng.tan@intel.com>
---
 lib/librte_eal/common/eal_common_proc.c | 169 ++++++++------------------------
 1 file changed, 43 insertions(+), 126 deletions(-)
  

Comments

Anatoly Burakov April 20, 2018, 9:03 a.m. UTC | #1
On 19-Apr-18 4:03 PM, Jianfeng Tan wrote:
> As discussed here, http://dpdk.org/dev/patchwork/patch/36579/,
> we remove IPC threads, rte_mp_handle and rte_mp_handle_async.
> This patch targets to remove thread rte_mp_handle_async.
> 
> Previously, to handle replies for an async request, rte_mp_handle
> wakes up the rte_mp_handle_async thread to process through
> pending_requests.async_cond. Now, we change to handle that in
> rte_mp_handle context directly.
> 
> To handle timeout events, for each async request which is sent,
> we set an alarm for it. If its reply is received before timeout,
> we will cancel the alarm when we handle the reply; otherwise,
> alarm will invoke the async_reply_handle() as the alarm callback.
> 
> Cc: Anatoly Burakov <anatoly.burakov@intel.com>
> 
> Suggested-by: Thomas Monjalon <thomas@monjalon.net>
> Signed-off-by: Jianfeng Tan <jianfeng.tan@intel.com>
> ---

<...>

> @@ -299,9 +300,11 @@ process_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
>   
>   			if (pending_req->type == REQUEST_TYPE_SYNC)
>   				pthread_cond_signal(&pending_req->sync.cond);
> -			else if (pending_req->type == REQUEST_TYPE_ASYNC)
> -				pthread_cond_signal(
> -					&pending_requests.async_cond);
> +			else if (pending_req->type == REQUEST_TYPE_ASYNC) {
> +				pthread_mutex_unlock(&pending_requests.lock);
> +				async_reply_handle(pending_req);
> +				pthread_mutex_lock(&pending_requests.lock);

There must be a better way to do this than to unlock mutex before 
locking it again :) I haven't looked at implications of this suggestion 
yet, but how about alarm calling a wrapper function that locks the 
mutex, but leave async_reply_handle without any locking whatsoever?

> +			}
>   		} else
>   			RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name);
>   		pthread_mutex_unlock(&pending_requests.lock);
> @@ -450,115 +453,39 @@ trigger_async_action(struct pending_request *sr)
>   	free(sr->request);
>   }
>   
> -static struct pending_request *
> -check_trigger(struct timespec *ts)

<...>

> +	ts_now.tv_nsec = now.tv_usec * 1000;
> +	ts_now.tv_sec = now.tv_sec;
>   
> -				/* we've triggered a callback, but there may be
> -				 * more, so lock the list and check again.
> -				 */
> -				pthread_mutex_lock(&pending_requests.lock);
> -			}
> -		} while (trigger);
> +	pthread_mutex_lock(&pending_requests.lock);
> +	action = process_async_request(req, &ts_now);
> +	if (action == ACTION_NONE) {
> +		pthread_mutex_unlock(&pending_requests.lock);
> +		return;
>   	}
> +	TAILQ_REMOVE(&pending_requests.requests, req, next);
> +	pthread_mutex_unlock(&pending_requests.lock);
>   
> -	RTE_LOG(ERR, EAL, "ERROR: asynchronous requests disabled\n");
> +	if (action == ACTION_TRIGGER)
> +		trigger_async_action(req);
>   
> -	return NULL;
> +	if (rte_eal_alarm_cancel(async_reply_handle, req) < 0 &&
> +	    rte_errno != EINPROGRESS)
> +		RTE_LOG(ERR, EAL, "Failed to cancel alarm\n");
> +

Perhaps cancel the alarm before triggering callback?
  

Patch

diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
index 27de16e..4cb460e 100644
--- a/lib/librte_eal/common/eal_common_proc.c
+++ b/lib/librte_eal/common/eal_common_proc.c
@@ -20,6 +20,7 @@ 
 #include <sys/un.h>
 #include <unistd.h>
 
+#include <rte_alarm.h>
 #include <rte_common.h>
 #include <rte_cycles.h>
 #include <rte_eal.h>
@@ -94,14 +95,14 @@  TAILQ_HEAD(pending_request_list, pending_request);
 static struct {
 	struct pending_request_list requests;
 	pthread_mutex_t lock;
-	pthread_cond_t async_cond;
 } pending_requests = {
 	.requests = TAILQ_HEAD_INITIALIZER(pending_requests.requests),
 	.lock = PTHREAD_MUTEX_INITIALIZER,
-	.async_cond = PTHREAD_COND_INITIALIZER
 	/**< used in async requests only */
 };
 
+static void async_reply_handle(void *arg);
+
 /* forward declarations */
 static int
 mp_send(struct rte_mp_msg *msg, const char *peer, int type);
@@ -299,9 +300,11 @@  process_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
 
 			if (pending_req->type == REQUEST_TYPE_SYNC)
 				pthread_cond_signal(&pending_req->sync.cond);
-			else if (pending_req->type == REQUEST_TYPE_ASYNC)
-				pthread_cond_signal(
-					&pending_requests.async_cond);
+			else if (pending_req->type == REQUEST_TYPE_ASYNC) {
+				pthread_mutex_unlock(&pending_requests.lock);
+				async_reply_handle(pending_req);
+				pthread_mutex_lock(&pending_requests.lock);
+			}
 		} else
 			RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name);
 		pthread_mutex_unlock(&pending_requests.lock);
@@ -450,115 +453,39 @@  trigger_async_action(struct pending_request *sr)
 	free(sr->request);
 }
 
-static struct pending_request *
-check_trigger(struct timespec *ts)
-{
-	struct pending_request *next, *cur, *trigger = NULL;
-
-	TAILQ_FOREACH_SAFE(cur, &pending_requests.requests, next, next) {
-		enum async_action action;
-		if (cur->type != REQUEST_TYPE_ASYNC)
-			continue;
-
-		action = process_async_request(cur, ts);
-		if (action == ACTION_FREE) {
-			TAILQ_REMOVE(&pending_requests.requests, cur, next);
-			free(cur);
-		} else if (action == ACTION_TRIGGER) {
-			TAILQ_REMOVE(&pending_requests.requests, cur, next);
-			trigger = cur;
-			break;
-		}
-	}
-	return trigger;
-}
-
 static void
-wait_for_async_messages(void)
-{
-	struct pending_request *sr;
-	struct timespec timeout;
-	bool timedwait = false;
-	bool nowait = false;
-	int ret;
-
-	/* scan through the list and see if there are any timeouts that
-	 * are earlier than our current timeout.
-	 */
-	TAILQ_FOREACH(sr, &pending_requests.requests, next) {
-		if (sr->type != REQUEST_TYPE_ASYNC)
-			continue;
-		if (!timedwait || timespec_cmp(&sr->async.param->end,
-				&timeout) < 0) {
-			memcpy(&timeout, &sr->async.param->end,
-				sizeof(timeout));
-			timedwait = true;
-		}
-
-		/* sometimes, we don't even wait */
-		if (sr->reply_received) {
-			nowait = true;
-			break;
-		}
-	}
-
-	if (nowait)
-		return;
-
-	do {
-		ret = timedwait ?
-			pthread_cond_timedwait(
-				&pending_requests.async_cond,
-				&pending_requests.lock,
-				&timeout) :
-			pthread_cond_wait(
-				&pending_requests.async_cond,
-				&pending_requests.lock);
-	} while (ret != 0 && ret != ETIMEDOUT);
-
-	/* we've been woken up or timed out */
-}
-
-static void *
-async_reply_handle(void *arg __rte_unused)
+async_reply_handle(void *arg)
 {
 	struct timeval now;
 	struct timespec ts_now;
-	while (1) {
-		struct pending_request *trigger = NULL;
+	enum async_action action;
+	struct pending_request *req = (struct pending_request *)arg;
 
-		pthread_mutex_lock(&pending_requests.lock);
-
-		/* we exit this function holding the lock */
-		wait_for_async_messages();
-
-		if (gettimeofday(&now, NULL) < 0) {
-			RTE_LOG(ERR, EAL, "Cannot get current time\n");
-			break;
-		}
-		ts_now.tv_nsec = now.tv_usec * 1000;
-		ts_now.tv_sec = now.tv_sec;
-
-		do {
-			trigger = check_trigger(&ts_now);
-			/* unlock request list */
-			pthread_mutex_unlock(&pending_requests.lock);
-
-			if (trigger) {
-				trigger_async_action(trigger);
-				free(trigger);
+	if (gettimeofday(&now, NULL) < 0) {
+		/* This could lead to disaster */
+		RTE_LOG(ERR, EAL, "Cannot get current time\n");
+		return;
+	}
+	ts_now.tv_nsec = now.tv_usec * 1000;
+	ts_now.tv_sec = now.tv_sec;
 
-				/* we've triggered a callback, but there may be
-				 * more, so lock the list and check again.
-				 */
-				pthread_mutex_lock(&pending_requests.lock);
-			}
-		} while (trigger);
+	pthread_mutex_lock(&pending_requests.lock);
+	action = process_async_request(req, &ts_now);
+	if (action == ACTION_NONE) {
+		pthread_mutex_unlock(&pending_requests.lock);
+		return;
 	}
+	TAILQ_REMOVE(&pending_requests.requests, req, next);
+	pthread_mutex_unlock(&pending_requests.lock);
 
-	RTE_LOG(ERR, EAL, "ERROR: asynchronous requests disabled\n");
+	if (action == ACTION_TRIGGER)
+		trigger_async_action(req);
 
-	return NULL;
+	if (rte_eal_alarm_cancel(async_reply_handle, req) < 0 &&
+	    rte_errno != EINPROGRESS)
+		RTE_LOG(ERR, EAL, "Failed to cancel alarm\n");
+
+	free(req);
 }
 
 static int
@@ -624,7 +551,7 @@  rte_mp_channel_init(void)
 	char thread_name[RTE_MAX_THREAD_NAME_LEN];
 	char path[PATH_MAX];
 	int dir_fd;
-	pthread_t mp_handle_tid, async_reply_handle_tid;
+	pthread_t mp_handle_tid;
 
 	/* create filter path */
 	create_socket_path("*", path, sizeof(path));
@@ -669,24 +596,10 @@  rte_mp_channel_init(void)
 		return -1;
 	}
 
-	if (pthread_create(&async_reply_handle_tid, NULL,
-			async_reply_handle, NULL) < 0) {
-		RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n",
-			strerror(errno));
-		close(mp_fd);
-		close(dir_fd);
-		mp_fd = -1;
-		return -1;
-	}
-
 	/* try best to set thread name */
 	snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "rte_mp_handle");
 	rte_thread_setname(mp_handle_tid, thread_name);
 
-	/* try best to set thread name */
-	snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "rte_mp_async_handle");
-	rte_thread_setname(async_reply_handle_tid, thread_name);
-
 	/* unlock the directory */
 	flock(dir_fd, LOCK_UN);
 	close(dir_fd);
@@ -858,7 +771,7 @@  rte_mp_sendmsg(struct rte_mp_msg *msg)
 
 static int
 mp_request_async(const char *dst, struct rte_mp_msg *req,
-		struct async_request_param *param)
+		struct async_request_param *param, const struct timespec *ts)
 {
 	struct rte_mp_msg *reply_msg;
 	struct pending_request *pending_req, *exist;
@@ -903,6 +816,13 @@  mp_request_async(const char *dst, struct rte_mp_msg *req,
 
 	param->user_reply.nb_sent++;
 
+	if (rte_eal_alarm_set(ts->tv_sec * 1000000 + ts->tv_nsec / 10000,
+				async_reply_handle, pending_req) < 0) {
+		/* TODO: If error happends, turn to busy waiting */
+		RTE_LOG(ERR, EAL, "Fail to set alarm for request %s:%s\n",
+			dst, req->name);
+	}
+
 	return 0;
 fail:
 	free(pending_req);
@@ -1124,7 +1044,7 @@  rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts,
 
 	/* for secondary process, send request to the primary process only */
 	if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
-		ret = mp_request_async(eal_mp_socket_path(), copy, param);
+		ret = mp_request_async(eal_mp_socket_path(), copy, param, ts);
 
 		/* if we didn't send anything, put dummy request on the queue */
 		if (ret == 0 && reply->nb_sent == 0) {
@@ -1167,7 +1087,7 @@  rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts,
 		snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
 			 ent->d_name);
 
-		if (mp_request_async(path, copy, param))
+		if (mp_request_async(path, copy, param, ts))
 			ret = -1;
 	}
 	/* if we didn't send anything, put dummy request on the queue */
@@ -1176,9 +1096,6 @@  rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts,
 		dummy_used = true;
 	}
 
-	/* trigger async request thread wake up */
-	pthread_cond_signal(&pending_requests.async_cond);
-
 	/* finally, unlock the queue */
 	pthread_mutex_unlock(&pending_requests.lock);