get:
Show a patch.

patch:
Update a patch.

put:
Update a patch.

GET /api/patches/36075/?format=api
HTTP 200 OK
Allow: GET, PUT, PATCH, HEAD, OPTIONS
Content-Type: application/json
Vary: Accept

{
    "id": 36075,
    "url": "https://patches.dpdk.org/api/patches/36075/?format=api",
    "web_url": "https://patches.dpdk.org/project/dpdk/patch/7f5496e8b5fd43dcbf10fe7059ed832107be0720.1520961844.git.anatoly.burakov@intel.com/",
    "project": {
        "id": 1,
        "url": "https://patches.dpdk.org/api/projects/1/?format=api",
        "name": "DPDK",
        "link_name": "dpdk",
        "list_id": "dev.dpdk.org",
        "list_email": "dev@dpdk.org",
        "web_url": "http://core.dpdk.org",
        "scm_url": "git://dpdk.org/dpdk",
        "webscm_url": "http://git.dpdk.org/dpdk",
        "list_archive_url": "https://inbox.dpdk.org/dev",
        "list_archive_url_format": "https://inbox.dpdk.org/dev/{}",
        "commit_url_format": ""
    },
    "msgid": "<7f5496e8b5fd43dcbf10fe7059ed832107be0720.1520961844.git.anatoly.burakov@intel.com>",
    "list_archive_url": "https://inbox.dpdk.org/dev/7f5496e8b5fd43dcbf10fe7059ed832107be0720.1520961844.git.anatoly.burakov@intel.com",
    "date": "2018-03-13T17:42:51",
    "name": "[dpdk-dev,v4] eal: add asynchronous request API to DPDK IPC",
    "commit_ref": null,
    "pull_url": null,
    "state": "superseded",
    "archived": true,
    "hash": "13971028ae07a1e178b18793437c7550357a5764",
    "submitter": {
        "id": 4,
        "url": "https://patches.dpdk.org/api/people/4/?format=api",
        "name": "Burakov, Anatoly",
        "email": "anatoly.burakov@intel.com"
    },
    "delegate": {
        "id": 1,
        "url": "https://patches.dpdk.org/api/users/1/?format=api",
        "username": "tmonjalo",
        "first_name": "Thomas",
        "last_name": "Monjalon",
        "email": "thomas@monjalon.net"
    },
    "mbox": "https://patches.dpdk.org/project/dpdk/patch/7f5496e8b5fd43dcbf10fe7059ed832107be0720.1520961844.git.anatoly.burakov@intel.com/mbox/",
    "series": [],
    "comments": "https://patches.dpdk.org/api/patches/36075/comments/",
    "check": "fail",
    "checks": "https://patches.dpdk.org/api/patches/36075/checks/",
    "tags": {},
    "related": [],
    "headers": {
        "Return-Path": "<dev-bounces@dpdk.org>",
        "X-Original-To": "patchwork@dpdk.org",
        "Delivered-To": "patchwork@dpdk.org",
        "Received": [
            "from [92.243.14.124] (localhost [127.0.0.1])\n\tby dpdk.org (Postfix) with ESMTP id 483596CA3;\n\tTue, 13 Mar 2018 18:43:05 +0100 (CET)",
            "from mga11.intel.com (mga11.intel.com [192.55.52.93])\n\tby dpdk.org (Postfix) with ESMTP id 0F85A5F32\n\tfor <dev@dpdk.org>; Tue, 13 Mar 2018 18:42:54 +0100 (CET)",
            "from fmsmga004.fm.intel.com ([10.253.24.48])\n\tby fmsmga102.fm.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384;\n\t13 Mar 2018 10:42:53 -0700",
            "from irvmail001.ir.intel.com ([163.33.26.43])\n\tby fmsmga004.fm.intel.com with ESMTP; 13 Mar 2018 10:42:52 -0700",
            "from sivswdev01.ir.intel.com (sivswdev01.ir.intel.com\n\t[10.237.217.45])\n\tby irvmail001.ir.intel.com (8.14.3/8.13.6/MailSET/Hub) with ESMTP id\n\tw2DHgp3m003100; Tue, 13 Mar 2018 17:42:51 GMT",
            "from sivswdev01.ir.intel.com (localhost [127.0.0.1])\n\tby sivswdev01.ir.intel.com with ESMTP id w2DHgpwL025417;\n\tTue, 13 Mar 2018 17:42:51 GMT",
            "(from aburakov@localhost)\n\tby sivswdev01.ir.intel.com with LOCAL id w2DHgpb7025413;\n\tTue, 13 Mar 2018 17:42:51 GMT"
        ],
        "X-Amp-Result": "SKIPPED(no attachment in message)",
        "X-Amp-File-Uploaded": "False",
        "X-ExtLoop1": "1",
        "X-IronPort-AV": "E=Sophos;i=\"5.47,465,1515484800\"; d=\"scan'208\";a=\"37015662\"",
        "From": "Anatoly Burakov <anatoly.burakov@intel.com>",
        "To": "dev@dpdk.org",
        "Cc": "jianfeng.tan@intel.com, konstantin.ananyev@intel.com",
        "Date": "Tue, 13 Mar 2018 17:42:51 +0000",
        "Message-Id": "<7f5496e8b5fd43dcbf10fe7059ed832107be0720.1520961844.git.anatoly.burakov@intel.com>",
        "X-Mailer": "git-send-email 1.7.0.7",
        "In-Reply-To": "<f3ec7463a5e48643a93c2f23c072ae52be69409a.1520426895.git.anatoly.burakov@intel.com>",
        "References": "<f3ec7463a5e48643a93c2f23c072ae52be69409a.1520426895.git.anatoly.burakov@intel.com>",
        "Subject": "[dpdk-dev] [PATCH v4] eal: add asynchronous request API to DPDK IPC",
        "X-BeenThere": "dev@dpdk.org",
        "X-Mailman-Version": "2.1.15",
        "Precedence": "list",
        "List-Id": "DPDK patches and discussions <dev.dpdk.org>",
        "List-Unsubscribe": "<https://dpdk.org/ml/options/dev>,\n\t<mailto:dev-request@dpdk.org?subject=unsubscribe>",
        "List-Archive": "<http://dpdk.org/ml/archives/dev/>",
        "List-Post": "<mailto:dev@dpdk.org>",
        "List-Help": "<mailto:dev-request@dpdk.org?subject=help>",
        "List-Subscribe": "<https://dpdk.org/ml/listinfo/dev>,\n\t<mailto:dev-request@dpdk.org?subject=subscribe>",
        "Errors-To": "dev-bounces@dpdk.org",
        "Sender": "\"dev\" <dev-bounces@dpdk.org>"
    },
    "content": "This API is similar to the blocking API that is already present,\nbut reply will be received in a separate callback by the caller.\n\nUnder the hood, we create a separate thread to deal with replies to\nasynchronous requests, that will just wait to be notified by the\nmain thread, or woken up on a timer (it'll wake itself up every\nminute regardless of whether it was called, but if there are no\nrequests in the queue, nothing will be done and it'll go to sleep\nfor another minute).\n\nSigned-off-by: Anatoly Burakov <anatoly.burakov@intel.com>\n---\n\nNotes:\n    v4:\n      - rebase on top of latest IPC Improvements patchset [2]\n    v3:\n      - added support for MP_IGN messages introduced in\n        IPC improvements v5 patchset\n    v2:\n      - fixed deadlocks and race conditions by not calling\n        callbacks while iterating over sync request list\n      - fixed use-after-free by making a copy of request\n      - changed API to also give user a copy of original\n        request, so that they know to which message the\n        callback is a reply to\n      - fixed missing .map file entries\n    \n    This patch is dependent upon previously published patchsets\n    for IPC fixes [1] and improvements [2].\n    \n    rte_mp_action_unregister and rte_mp_async_reply_unregister\n    do the same thing - should we perhaps make it one function?\n    \n    [1] http://dpdk.org/dev/patchwork/bundle/aburakov/IPC_Fixes/\n    [2] http://dpdk.org/dev/patchwork/bundle/aburakov/IPC_Improvements/\n\n lib/librte_eal/common/eal_common_proc.c | 563 ++++++++++++++++++++++++++++++--\n lib/librte_eal/common/include/rte_eal.h |  72 ++++\n lib/librte_eal/rte_eal_version.map      |   3 +\n 3 files changed, 607 insertions(+), 31 deletions(-)",
    "diff": "diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c\nindex 4131b67..50d6506 100644\n--- a/lib/librte_eal/common/eal_common_proc.c\n+++ b/lib/librte_eal/common/eal_common_proc.c\n@@ -26,6 +26,7 @@\n #include <rte_errno.h>\n #include <rte_lcore.h>\n #include <rte_log.h>\n+#include <rte_tailq.h>\n \n #include \"eal_private.h\"\n #include \"eal_filesystem.h\"\n@@ -39,7 +40,11 @@ static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER;\n struct action_entry {\n \tTAILQ_ENTRY(action_entry) next;\n \tchar action_name[RTE_MP_MAX_NAME_LEN];\n-\trte_mp_t action;\n+\tRTE_STD_C11\n+\tunion {\n+\t\trte_mp_t action;\n+\t\trte_mp_async_reply_t reply;\n+\t};\n };\n \n /** Double linked list of actions. */\n@@ -60,13 +65,37 @@ struct mp_msg_internal {\n \tstruct rte_mp_msg msg;\n };\n \n+enum mp_request_type {\n+\tREQUEST_TYPE_SYNC,\n+\tREQUEST_TYPE_ASYNC\n+};\n+\n+struct async_request_shared_param {\n+\tstruct rte_mp_reply *user_reply;\n+\tstruct timespec *end;\n+\tint n_requests_processed;\n+};\n+\n+struct async_request_param {\n+\tstruct async_request_shared_param *param;\n+};\n+\n+struct sync_request_param {\n+\tpthread_cond_t cond;\n+};\n+\n struct sync_request {\n \tTAILQ_ENTRY(sync_request) next;\n-\tint reply_received;\n+\tenum mp_request_type type;\n \tchar dst[PATH_MAX];\n \tstruct rte_mp_msg *request;\n-\tstruct rte_mp_msg *reply;\n-\tpthread_cond_t cond;\n+\tstruct rte_mp_msg *reply_msg;\n+\tint reply_received;\n+\tRTE_STD_C11\n+\tunion {\n+\t\tstruct sync_request_param sync;\n+\t\tstruct async_request_param async;\n+\t};\n };\n \n TAILQ_HEAD(sync_request_list, sync_request);\n@@ -74,9 +103,12 @@ TAILQ_HEAD(sync_request_list, sync_request);\n static struct {\n \tstruct sync_request_list requests;\n \tpthread_mutex_t lock;\n+\tpthread_cond_t async_cond;\n } sync_requests = {\n \t.requests = TAILQ_HEAD_INITIALIZER(sync_requests.requests),\n-\t.lock = PTHREAD_MUTEX_INITIALIZER\n+\t.lock = PTHREAD_MUTEX_INITIALIZER,\n+\t.async_cond = PTHREAD_COND_INITIALIZER\n+\t/**< used in async requests only */\n };\n \n /* forward declarations */\n@@ -164,53 +196,97 @@ validate_action_name(const char *name)\n \treturn 0;\n }\n \n-int __rte_experimental\n-rte_mp_action_register(const char *name, rte_mp_t action)\n+static struct action_entry *\n+action_register(const char *name)\n {\n \tstruct action_entry *entry;\n \n \tif (validate_action_name(name))\n-\t\treturn -1;\n+\t\treturn NULL;\n \n \tentry = malloc(sizeof(struct action_entry));\n \tif (entry == NULL) {\n \t\trte_errno = ENOMEM;\n-\t\treturn -1;\n+\t\treturn NULL;\n \t}\n \tstrcpy(entry->action_name, name);\n-\tentry->action = action;\n \n-\tpthread_mutex_lock(&mp_mutex_action);\n \tif (find_action_entry_by_name(name) != NULL) {\n \t\tpthread_mutex_unlock(&mp_mutex_action);\n \t\trte_errno = EEXIST;\n \t\tfree(entry);\n-\t\treturn -1;\n+\t\treturn NULL;\n \t}\n \tTAILQ_INSERT_TAIL(&action_entry_list, entry, next);\n-\tpthread_mutex_unlock(&mp_mutex_action);\n-\treturn 0;\n+\n+\t/* async and sync replies are handled by different threads, so even\n+\t * though they a share pointer in a union, one will never trigger in\n+\t * place of the other.\n+\t */\n+\n+\treturn entry;\n }\n \n-void __rte_experimental\n-rte_mp_action_unregister(const char *name)\n+static void\n+action_unregister(const char *name)\n {\n \tstruct action_entry *entry;\n \n \tif (validate_action_name(name))\n \t\treturn;\n \n-\tpthread_mutex_lock(&mp_mutex_action);\n \tentry = find_action_entry_by_name(name);\n \tif (entry == NULL) {\n-\t\tpthread_mutex_unlock(&mp_mutex_action);\n \t\treturn;\n \t}\n \tTAILQ_REMOVE(&action_entry_list, entry, next);\n-\tpthread_mutex_unlock(&mp_mutex_action);\n \tfree(entry);\n }\n \n+int __rte_experimental\n+rte_mp_action_register(const char *name, rte_mp_t action)\n+{\n+\tstruct action_entry *entry;\n+\n+\tpthread_mutex_lock(&mp_mutex_action);\n+\n+\tentry = action_register(name);\n+\tif (entry != NULL)\n+\t\tentry->action = action;\n+\tpthread_mutex_unlock(&mp_mutex_action);\n+\n+\treturn entry == NULL ? -1 : 0;\n+}\n+\n+void __rte_experimental\n+rte_mp_action_unregister(const char *name)\n+{\n+\tpthread_mutex_lock(&mp_mutex_action);\n+\taction_unregister(name);\n+\tpthread_mutex_unlock(&mp_mutex_action);\n+}\n+\n+int __rte_experimental\n+rte_mp_async_reply_register(const char *name, rte_mp_async_reply_t reply)\n+{\n+\tstruct action_entry *entry;\n+\n+\tpthread_mutex_lock(&mp_mutex_action);\n+\n+\tentry = action_register(name);\n+\tif (entry != NULL)\n+\t\tentry->reply = reply;\n+\tpthread_mutex_unlock(&mp_mutex_action);\n+\n+\treturn entry == NULL ? -1 : 0;\n+}\n+\n+void __rte_experimental\n+rte_mp_async_reply_unregister(const char *name)\n+{\n+\trte_mp_action_unregister(name);\n+}\n+\n static int\n read_msg(struct mp_msg_internal *m, struct sockaddr_un *s)\n {\n@@ -270,10 +346,14 @@ process_msg(struct mp_msg_internal *m, struct sockaddr_un *s)\n \t\tpthread_mutex_lock(&sync_requests.lock);\n \t\tsync_req = find_sync_request(s->sun_path, msg->name);\n \t\tif (sync_req) {\n-\t\t\tmemcpy(sync_req->reply, msg, sizeof(*msg));\n+\t\t\tmemcpy(sync_req->reply_msg, msg, sizeof(*msg));\n \t\t\t/* -1 indicates that we've been asked to ignore */\n \t\t\tsync_req->reply_received = m->type == MP_REP ? 1 : -1;\n-\t\t\tpthread_cond_signal(&sync_req->cond);\n+\n+\t\t\tif (sync_req->type == REQUEST_TYPE_SYNC)\n+\t\t\t\tpthread_cond_signal(&sync_req->sync.cond);\n+\t\t\telse if (sync_req->type == REQUEST_TYPE_ASYNC)\n+\t\t\t\tpthread_cond_signal(&sync_requests.async_cond);\n \t\t} else\n \t\t\tRTE_LOG(ERR, EAL, \"Drop mp reply: %s\\n\", msg->name);\n \t\tpthread_mutex_unlock(&sync_requests.lock);\n@@ -320,6 +400,204 @@ mp_handle(void *arg __rte_unused)\n }\n \n static int\n+timespec_cmp(const struct timespec *a, const struct timespec *b)\n+{\n+\tif (a->tv_sec < b->tv_sec)\n+\t\treturn -1;\n+\tif (a->tv_sec > b->tv_sec)\n+\t\treturn 1;\n+\tif (a->tv_nsec < b->tv_nsec)\n+\t\treturn -1;\n+\tif (a->tv_nsec > b->tv_nsec)\n+\t\treturn 1;\n+\treturn 0;\n+}\n+\n+enum async_action {\n+\tACTION_NONE, /**< don't do anything */\n+\tACTION_FREE, /**< free the action entry, but don't trigger callback */\n+\tACTION_TRIGGER /**< trigger callback, then free action entry */\n+};\n+\n+static enum async_action\n+process_async_request(struct sync_request *sr, const struct timespec *now)\n+{\n+\tstruct async_request_shared_param *param;\n+\tstruct rte_mp_reply *reply;\n+\tbool timeout, received, last_msg;\n+\n+\tparam = sr->async.param;\n+\treply = param->user_reply;\n+\n+\t/* did we timeout? */\n+\ttimeout = timespec_cmp(param->end, now) <= 0;\n+\n+\t/* did we receive a response? */\n+\treceived = sr->reply_received != 0;\n+\n+\t/* if we didn't time out, and we didn't receive a response, ignore */\n+\tif (!timeout && !received)\n+\t\treturn ACTION_NONE;\n+\n+\t/* if we received a response, adjust relevant data and copy mesasge. */\n+\tif (sr->reply_received == 1 && sr->reply_msg) {\n+\t\tstruct rte_mp_msg *msg, *user_msgs, *tmp;\n+\n+\t\tmsg = sr->reply_msg;\n+\t\tuser_msgs = reply->msgs;\n+\n+\t\ttmp = realloc(user_msgs, sizeof(*msg) *\n+\t\t\t\t(reply->nb_received + 1));\n+\t\tif (!tmp) {\n+\t\t\tRTE_LOG(ERR, EAL, \"Fail to alloc reply for request %s:%s\\n\",\n+\t\t\t\tsr->dst, sr->request->name);\n+\t\t\t/* this entry is going to be removed and its message\n+\t\t\t * dropped, but we don't want to leak memory, so\n+\t\t\t * continue.\n+\t\t\t */\n+\t\t} else {\n+\t\t\tuser_msgs = tmp;\n+\t\t\treply->msgs = user_msgs;\n+\t\t\tmemcpy(&user_msgs[reply->nb_received],\n+\t\t\t\t\tmsg, sizeof(*msg));\n+\t\t\treply->nb_received++;\n+\t\t}\n+\t} else if (sr->reply_received == -1) {\n+\t\t/* we were asked to ignore this process */\n+\t\treply->nb_sent--;\n+\t}\n+\tfree(sr->reply_msg);\n+\n+\t/* mark this request as processed */\n+\tparam->n_requests_processed++;\n+\n+\t/* if number of sent messages is zero, we're short-circuiting */\n+\tlast_msg = param->n_requests_processed == reply->nb_sent ||\n+\t\t\treply->nb_sent == 0;\n+\n+\treturn last_msg ? ACTION_TRIGGER : ACTION_FREE;\n+}\n+\n+static void\n+trigger_async_action(struct sync_request *sr)\n+{\n+\tstruct async_request_shared_param *param;\n+\tstruct rte_mp_reply *reply;\n+\n+\tparam = sr->async.param;\n+\treply = param->user_reply;\n+\n+\tpthread_mutex_lock(&mp_mutex_action);\n+\tstruct action_entry *entry =\n+\t\t\tfind_action_entry_by_name(sr->request->name);\n+\tpthread_mutex_unlock(&mp_mutex_action);\n+\tif (!entry)\n+\t\tRTE_LOG(ERR, EAL, \"Cannot find async request callback for %s\\n\",\n+\t\t\t\tsr->request->name);\n+\telse\n+\t\tentry->reply(sr->request, reply);\n+\t/* clean up */\n+\tfree(sr->async.param->user_reply->msgs);\n+\tfree(sr->async.param->user_reply);\n+\tfree(sr->async.param->end);\n+\tfree(sr->async.param);\n+\tfree(sr->request);\n+}\n+\n+static void *\n+async_reply_handle(void *arg __rte_unused)\n+{\n+\tstruct sync_request *sr;\n+\tstruct timeval now;\n+\tstruct timespec timeout, ts_now;\n+\tdo {\n+\t\tstruct sync_request *trigger = NULL;\n+\t\tint ret;\n+\t\tbool dontwait = false;\n+\n+\t\tpthread_mutex_lock(&sync_requests.lock);\n+\n+\t\tif (gettimeofday(&now, NULL) < 0) {\n+\t\t\tRTE_LOG(ERR, EAL, \"Cannot get current time\\n\");\n+\t\t\tpthread_mutex_unlock(&sync_requests.lock);\n+\t\t\tbreak;\n+\t\t}\n+\n+\t\t/* set a 60 second timeout by default */\n+\t\ttimeout.tv_nsec = (now.tv_usec * 1000 + 60) % 1000000000;\n+\t\ttimeout.tv_sec = now.tv_sec + 60 +\n+\t\t\t\t(now.tv_usec * 1000 + 60) / 1000000000;\n+\n+\t\t/* scan through the list and see if there are any timeouts that\n+\t\t * are earlier than our current timeout.\n+\t\t */\n+\t\tTAILQ_FOREACH(sr, &sync_requests.requests, next) {\n+\t\t\tif (sr->type != REQUEST_TYPE_ASYNC)\n+\t\t\t\tcontinue;\n+\t\t\tif (timespec_cmp(sr->async.param->end, &timeout) < 0)\n+\t\t\t\tmemcpy(&timeout, sr->async.param->end,\n+\t\t\t\t\tsizeof(timeout));\n+\n+\t\t\t/* sometimes, we don't even wait */\n+\t\t\tif (sr->reply_received) {\n+\t\t\t\tdontwait = true;\n+\t\t\t\tbreak;\n+\t\t\t}\n+\t\t}\n+\n+\t\t/* now, wait until we either time out or get woken up */\n+\t\tif (!dontwait)\n+\t\t\tret = pthread_cond_timedwait(&sync_requests.async_cond,\n+\t\t\t\t\t&sync_requests.lock, &timeout);\n+\t\telse\n+\t\t\tret = 0;\n+\n+\t\tif (gettimeofday(&now, NULL) < 0) {\n+\t\t\tRTE_LOG(ERR, EAL, \"Cannot get current time\\n\");\n+\t\t\tbreak;\n+\t\t}\n+\t\tts_now.tv_nsec = now.tv_usec * 1000;\n+\t\tts_now.tv_sec = now.tv_sec;\n+\n+\t\tif (ret == 0 || ret == ETIMEDOUT) {\n+\t\t\tstruct sync_request *next;\n+\t\t\t/* we've either been woken up, or we timed out */\n+\n+\t\t\t/* we have still the lock, check if anything needs\n+\t\t\t * processing.\n+\t\t\t */\n+\t\t\tTAILQ_FOREACH_SAFE(sr, &sync_requests.requests, next,\n+\t\t\t\t\tnext) {\n+\t\t\t\tenum async_action action;\n+\t\t\t\tif (sr->type != REQUEST_TYPE_ASYNC)\n+\t\t\t\t\tcontinue;\n+\n+\t\t\t\taction = process_async_request(sr, &ts_now);\n+\t\t\t\tif (action == ACTION_FREE) {\n+\t\t\t\t\tTAILQ_REMOVE(&sync_requests.requests,\n+\t\t\t\t\t\t\tsr, next);\n+\t\t\t\t\tfree(sr);\n+\t\t\t\t} else if (action == ACTION_TRIGGER &&\n+\t\t\t\t\t\ttrigger == NULL) {\n+\t\t\t\t\tTAILQ_REMOVE(&sync_requests.requests,\n+\t\t\t\t\t\t\tsr, next);\n+\t\t\t\t\ttrigger = sr;\n+\t\t\t\t}\n+\t\t\t}\n+\t\t}\n+\t\tpthread_mutex_unlock(&sync_requests.lock);\n+\t\tif (trigger) {\n+\t\t\ttrigger_async_action(trigger);\n+\t\t\tfree(trigger);\n+\t\t}\n+\t} while (1);\n+\n+\tRTE_LOG(ERR, EAL, \"ERROR: asynchronous requests disabled\\n\");\n+\n+\treturn NULL;\n+}\n+\n+static int\n open_socket_fd(void)\n {\n \tchar peer_name[PATH_MAX] = {0};\n@@ -382,7 +660,7 @@ rte_mp_channel_init(void)\n \tchar thread_name[RTE_MAX_THREAD_NAME_LEN];\n \tchar path[PATH_MAX];\n \tint dir_fd;\n-\tpthread_t tid;\n+\tpthread_t mp_handle_tid, async_reply_handle_tid;\n \n \t/* create filter path */\n \tcreate_socket_path(\"*\", path, sizeof(path));\n@@ -419,7 +697,16 @@ rte_mp_channel_init(void)\n \t\treturn -1;\n \t}\n \n-\tif (pthread_create(&tid, NULL, mp_handle, NULL) < 0) {\n+\tif (pthread_create(&mp_handle_tid, NULL, mp_handle, NULL) < 0) {\n+\t\tRTE_LOG(ERR, EAL, \"failed to create mp thead: %s\\n\",\n+\t\t\tstrerror(errno));\n+\t\tclose(mp_fd);\n+\t\tmp_fd = -1;\n+\t\treturn -1;\n+\t}\n+\n+\tif (pthread_create(&async_reply_handle_tid, NULL,\n+\t\t\tasync_reply_handle, NULL) < 0) {\n \t\tRTE_LOG(ERR, EAL, \"failed to create mp thead: %s\\n\",\n \t\t\tstrerror(errno));\n \t\tclose(mp_fd);\n@@ -430,7 +717,11 @@ rte_mp_channel_init(void)\n \n \t/* try best to set thread name */\n \tsnprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, \"rte_mp_handle\");\n-\trte_thread_setname(tid, thread_name);\n+\trte_thread_setname(mp_handle_tid, thread_name);\n+\n+\t/* try best to set thread name */\n+\tsnprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, \"rte_mp_async_handle\");\n+\trte_thread_setname(async_reply_handle_tid, thread_name);\n \n \t/* unlock the directory */\n \tflock(dir_fd, LOCK_UN);\n@@ -602,18 +893,77 @@ rte_mp_sendmsg(struct rte_mp_msg *msg)\n }\n \n static int\n-mp_request_one(const char *dst, struct rte_mp_msg *req,\n+mp_request_async(const char *dst, struct rte_mp_msg *req,\n+\t\tstruct async_request_shared_param *param)\n+{\n+\tstruct rte_mp_msg *reply_msg;\n+\tstruct sync_request *sync_req, *exist;\n+\tint ret;\n+\n+\tsync_req = malloc(sizeof(*sync_req));\n+\treply_msg = malloc(sizeof(*reply_msg));\n+\tif (sync_req == NULL || reply_msg == NULL) {\n+\t\tRTE_LOG(ERR, EAL, \"Could not allocate space for sync request\\n\");\n+\t\trte_errno = ENOMEM;\n+\t\tret = -1;\n+\t\tgoto fail;\n+\t}\n+\n+\tmemset(sync_req, 0, sizeof(*sync_req));\n+\tmemset(reply_msg, 0, sizeof(*reply_msg));\n+\n+\tsync_req->type = REQUEST_TYPE_ASYNC;\n+\tstrcpy(sync_req->dst, dst);\n+\tsync_req->request = req;\n+\tsync_req->reply_msg = reply_msg;\n+\tsync_req->async.param = param;\n+\n+\t/* queue already locked by caller */\n+\n+\texist = find_sync_request(dst, req->name);\n+\tif (!exist)\n+\t\tTAILQ_INSERT_TAIL(&sync_requests.requests, sync_req, next);\n+\tif (exist) {\n+\t\tRTE_LOG(ERR, EAL, \"A pending request %s:%s\\n\", dst, req->name);\n+\t\trte_errno = EEXIST;\n+\t\tret = -1;\n+\t\tgoto fail;\n+\t}\n+\n+\tret = send_msg(dst, req, MP_REQ);\n+\tif (ret < 0) {\n+\t\tRTE_LOG(ERR, EAL, \"Fail to send request %s:%s\\n\",\n+\t\t\tdst, req->name);\n+\t\tret = -1;\n+\t\tgoto fail;\n+\t} else if (ret == 0) {\n+\t\tret = 0;\n+\t\tgoto fail;\n+\t}\n+\n+\tparam->user_reply->nb_sent++;\n+\n+\treturn 0;\n+fail:\n+\tfree(sync_req);\n+\tfree(reply_msg);\n+\treturn ret;\n+}\n+\n+static int\n+mp_request_sync(const char *dst, struct rte_mp_msg *req,\n \t       struct rte_mp_reply *reply, const struct timespec *ts)\n {\n \tint ret;\n \tstruct rte_mp_msg msg, *tmp;\n \tstruct sync_request sync_req, *exist;\n \n+\tsync_req.type = REQUEST_TYPE_SYNC;\n \tsync_req.reply_received = 0;\n \tstrcpy(sync_req.dst, dst);\n \tsync_req.request = req;\n-\tsync_req.reply = &msg;\n-\tpthread_cond_init(&sync_req.cond, NULL);\n+\tsync_req.reply_msg = &msg;\n+\tpthread_cond_init(&sync_req.sync.cond, NULL);\n \n \tpthread_mutex_lock(&sync_requests.lock);\n \texist = find_sync_request(dst, req->name);\n@@ -637,7 +987,7 @@ mp_request_one(const char *dst, struct rte_mp_msg *req,\n \treply->nb_sent++;\n \n \tdo {\n-\t\tret = pthread_cond_timedwait(&sync_req.cond,\n+\t\tret = pthread_cond_timedwait(&sync_req.sync.cond,\n \t\t\t\t&sync_requests.lock, ts);\n \t} while (ret != 0 && ret != ETIMEDOUT);\n \n@@ -703,7 +1053,7 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,\n \n \t/* for secondary process, send request to the primary process only */\n \tif (rte_eal_process_type() == RTE_PROC_SECONDARY)\n-\t\treturn mp_request_one(eal_mp_socket_path(), req, reply, &end);\n+\t\treturn mp_request_sync(eal_mp_socket_path(), req, reply, &end);\n \n \t/* for primary process, broadcast request, and collect reply 1 by 1 */\n \tmp_dir = opendir(mp_dir_path);\n@@ -732,7 +1082,7 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,\n \t\tsnprintf(path, sizeof(path), \"%s/%s\", mp_dir_path,\n \t\t\t ent->d_name);\n \n-\t\tif (mp_request_one(path, req, reply, &end))\n+\t\tif (mp_request_sync(path, req, reply, &end))\n \t\t\tret = -1;\n \t}\n \t/* unlock the directory */\n@@ -744,9 +1094,160 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,\n }\n \n int __rte_experimental\n-rte_mp_reply(struct rte_mp_msg *msg, const char *peer)\n+rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts)\n {\n+\tstruct rte_mp_msg *copy;\n+\tstruct sync_request *dummy;\n+\tstruct async_request_shared_param *param = NULL;\n+\tstruct rte_mp_reply *reply = NULL;\n+\tint dir_fd, ret = 0;\n+\tDIR *mp_dir;\n+\tstruct dirent *ent;\n+\tstruct timeval now;\n+\tstruct timespec *end = NULL;\n+\n+\tRTE_LOG(DEBUG, EAL, \"request: %s\\n\", req->name);\n+\n+\tif (check_input(req) == false)\n+\t\treturn -1;\n+\tif (gettimeofday(&now, NULL) < 0) {\n+\t\tRTE_LOG(ERR, EAL, \"Faile to get current time\\n\");\n+\t\trte_errno = errno;\n+\t\treturn -1;\n+\t}\n+\tcopy = malloc(sizeof(*copy));\n+\tdummy = malloc(sizeof(*dummy));\n+\tparam = malloc(sizeof(*param));\n+\treply = malloc(sizeof(*reply));\n+\tend = malloc(sizeof(*end));\n+\tif (copy == NULL || dummy == NULL || param == NULL || reply == NULL ||\n+\t\t\tend == NULL) {\n+\t\tRTE_LOG(ERR, EAL, \"Failed to allocate memory for async reply\\n\");\n+\t\trte_errno = ENOMEM;\n+\t\tgoto fail;\n+\t}\n+\n+\tmemset(copy, 0, sizeof(*copy));\n+\tmemset(dummy, 0, sizeof(*dummy));\n+\tmemset(param, 0, sizeof(*param));\n+\tmemset(reply, 0, sizeof(*reply));\n+\tmemset(end, 0, sizeof(*end));\n+\n+\t/* copy message */\n+\tmemcpy(copy, req, sizeof(*copy));\n+\n+\tparam->n_requests_processed = 0;\n+\tparam->end = end;\n+\tparam->user_reply = reply;\n \n+\tend->tv_nsec = (now.tv_usec * 1000 + ts->tv_nsec) % 1000000000;\n+\tend->tv_sec = now.tv_sec + ts->tv_sec +\n+\t\t\t(now.tv_usec * 1000 + ts->tv_nsec) / 1000000000;\n+\treply->nb_sent = 0;\n+\treply->nb_received = 0;\n+\treply->msgs = NULL;\n+\n+\t/* we have to lock the request queue here, as we will be adding a bunch\n+\t * of requests to the queue at once, and some of the replies may arrive\n+\t * before we add all of the requests to the queue.\n+\t */\n+\tpthread_mutex_lock(&sync_requests.lock);\n+\n+\t/* we have to ensure that callback gets triggered even if we don't send\n+\t * anything, therefore earlier we have allocated a dummy request. put it\n+\t * on the queue and fill it. we will remove it once we know we sent\n+\t * something.\n+\t */\n+\tdummy->type = REQUEST_TYPE_ASYNC;\n+\tdummy->request = copy;\n+\tdummy->reply_msg = NULL;\n+\tdummy->async.param = param;\n+\tdummy->reply_received = 1; /* short-circuit the timeout */\n+\n+\tTAILQ_INSERT_TAIL(&sync_requests.requests, dummy, next);\n+\n+\t/* for secondary process, send request to the primary process only */\n+\tif (rte_eal_process_type() == RTE_PROC_SECONDARY) {\n+\t\tret = mp_request_async(eal_mp_socket_path(), copy, param);\n+\n+\t\t/* if we sent something, remove dummy request from the queue */\n+\t\tif (reply->nb_sent != 0) {\n+\t\t\tTAILQ_REMOVE(&sync_requests.requests, dummy, next);\n+\t\t\tfree(dummy);\n+\t\t\tdummy = NULL;\n+\t\t}\n+\n+\t\tpthread_mutex_unlock(&sync_requests.lock);\n+\n+\t\t/* if we couldn't send anything, clean up */\n+\t\tif (ret != 0)\n+\t\t\tgoto fail;\n+\t\treturn 0;\n+\t}\n+\n+\t/* for primary process, broadcast request */\n+\tmp_dir = opendir(mp_dir_path);\n+\tif (!mp_dir) {\n+\t\tRTE_LOG(ERR, EAL, \"Unable to open directory %s\\n\", mp_dir_path);\n+\t\trte_errno = errno;\n+\t\tgoto unlock_fail;\n+\t}\n+\tdir_fd = dirfd(mp_dir);\n+\n+\t/* lock the directory to prevent processes spinning up while we send */\n+\tif (flock(dir_fd, LOCK_EX)) {\n+\t\tRTE_LOG(ERR, EAL, \"Unable to lock directory %s\\n\",\n+\t\t\tmp_dir_path);\n+\t\trte_errno = errno;\n+\t\tgoto closedir_fail;\n+\t}\n+\n+\twhile ((ent = readdir(mp_dir))) {\n+\t\tchar path[PATH_MAX];\n+\n+\t\tif (fnmatch(mp_filter, ent->d_name, 0) != 0)\n+\t\t\tcontinue;\n+\n+\t\tsnprintf(path, sizeof(path), \"%s/%s\", mp_dir_path,\n+\t\t\t ent->d_name);\n+\n+\t\tif (mp_request_async(path, copy, param))\n+\t\t\tret = -1;\n+\t}\n+\t/* if we sent something, remove dummy request from the queue */\n+\tif (reply->nb_sent != 0) {\n+\t\tTAILQ_REMOVE(&sync_requests.requests, dummy, next);\n+\t\tfree(dummy);\n+\t\tdummy = NULL;\n+\t}\n+\t/* trigger async request thread wake up */\n+\tpthread_cond_signal(&sync_requests.async_cond);\n+\n+\t/* finally, unlock the queue */\n+\tpthread_mutex_unlock(&sync_requests.lock);\n+\n+\t/* unlock the directory */\n+\tflock(dir_fd, LOCK_UN);\n+\n+\t/* dir_fd automatically closed on closedir */\n+\tclosedir(mp_dir);\n+\treturn ret;\n+closedir_fail:\n+\tclosedir(mp_dir);\n+unlock_fail:\n+\tpthread_mutex_unlock(&sync_requests.lock);\n+fail:\n+\tfree(dummy);\n+\tfree(param);\n+\tfree(reply);\n+\tfree(end);\n+\tfree(copy);\n+\treturn -1;\n+}\n+\n+int __rte_experimental\n+rte_mp_reply(struct rte_mp_msg *msg, const char *peer)\n+{\n \tRTE_LOG(DEBUG, EAL, \"reply: %s\\n\", msg->name);\n \n \tif (check_input(msg) == false)\ndiff --git a/lib/librte_eal/common/include/rte_eal.h b/lib/librte_eal/common/include/rte_eal.h\nindex 044474e..93ca4cc 100644\n--- a/lib/librte_eal/common/include/rte_eal.h\n+++ b/lib/librte_eal/common/include/rte_eal.h\n@@ -230,6 +230,16 @@ struct rte_mp_reply {\n typedef int (*rte_mp_t)(const struct rte_mp_msg *msg, const void *peer);\n \n /**\n+ * Asynchronous reply function typedef used by other components.\n+ *\n+ * As we create  socket channel for primary/secondary communication, use\n+ * this function typedef to register action for coming responses to asynchronous\n+ * requests.\n+ */\n+typedef int (*rte_mp_async_reply_t)(const struct rte_mp_msg *request,\n+\t\tconst struct rte_mp_reply *reply);\n+\n+/**\n  * @warning\n  * @b EXPERIMENTAL: this API may change without prior notice\n  *\n@@ -273,6 +283,46 @@ rte_mp_action_unregister(const char *name);\n  * @warning\n  * @b EXPERIMENTAL: this API may change without prior notice\n  *\n+ * Register an asynchronous reply callback for primary/secondary communication.\n+ *\n+ * Call this function to register a callback for asynchronous requests, if the\n+ * calling component wants to receive responses to its own asynchronous requests\n+ * from the corresponding component in its primary or secondary processes.\n+ *\n+ * @param name\n+ *   The name argument plays as a unique key to find the action.\n+ *\n+ * @param reply\n+ *   The reply argument is the function pointer to the reply callback.\n+ *\n+ * @return\n+ *  - 0 on success.\n+ *  - (<0) on failure.\n+ */\n+int __rte_experimental\n+rte_mp_async_reply_register(const char *name, rte_mp_async_reply_t reply);\n+\n+/**\n+ * @warning\n+ * @b EXPERIMENTAL: this API may change without prior notice\n+ *\n+ * Unregister an asynchronous reply callback.\n+ *\n+ * Call this function to unregister a callback if the calling component does\n+ * not want responses the messages from the corresponding component in its\n+ * primary process or secondary processes.\n+ *\n+ * @param name\n+ *   The name argument plays as a unique key to find the action.\n+ *\n+ */\n+void __rte_experimental\n+rte_mp_async_reply_unregister(const char *name);\n+\n+/**\n+ * @warning\n+ * @b EXPERIMENTAL: this API may change without prior notice\n+ *\n  * Send a message to the peer process.\n  *\n  * This function will send a message which will be responsed by the action\n@@ -321,6 +371,28 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,\n  * @warning\n  * @b EXPERIMENTAL: this API may change without prior notice\n  *\n+ * Send a request to the peer process and expect a reply in a separate callback.\n+ *\n+ * This function sends a request message to the peer process, and will not\n+ * block. Instead, reply will be received in a separate callback.\n+ *\n+ * @param req\n+ *   The req argument contains the customized request message.\n+ *\n+ * @param ts\n+ *   The ts argument specifies how long we can wait for the peer(s) to reply.\n+ *\n+ * @return\n+ *  - On success, return 0.\n+ *  - On failure, return -1, and the reason will be stored in rte_errno.\n+ */\n+int __rte_experimental\n+rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts);\n+\n+/**\n+ * @warning\n+ * @b EXPERIMENTAL: this API may change without prior notice\n+ *\n  * Send a reply to the peer process.\n  *\n  * This function will send a reply message in response to a request message\ndiff --git a/lib/librte_eal/rte_eal_version.map b/lib/librte_eal/rte_eal_version.map\nindex d123602..1d88437 100644\n--- a/lib/librte_eal/rte_eal_version.map\n+++ b/lib/librte_eal/rte_eal_version.map\n@@ -223,8 +223,11 @@ EXPERIMENTAL {\n \trte_eal_mbuf_user_pool_ops;\n \trte_mp_action_register;\n \trte_mp_action_unregister;\n+\trte_mp_async_reply_register;\n+\trte_mp_async_reply_unregister;\n \trte_mp_sendmsg;\n \trte_mp_request;\n+\trte_mp_request_async;\n \trte_mp_reply;\n \trte_service_attr_get;\n \trte_service_attr_reset_all;\n",
    "prefixes": [
        "dpdk-dev",
        "v4"
    ]
}