On 07-Jun-18 1:38 PM, Qi Zhang wrote:
> This patch cover the multi-process hotplug case when a share device
> attach/detach request be issued from secondary process, the implementation
> references malloc_mp.c.
>
> device attach on secondary:
> a) seconary send asycn request to primary and wait on a condition
> which will be released by matched response from primary.
> b) primary receive the request and attach the new device if failed
> goto i).
> c) primary forward attach request to all secondary as async request
> (because this in mp thread context, use sync request will deadlock)
> d) secondary receive request and attach device and send reply.
> e) primary check the reply if all success go to j).
> f) primary send attach rollback async request to all secondary.
> g) secondary receive the request and detach device and send reply.
> h) primary receive the reply and detach device as rollback action.
> i) send fail response to secondary, goto k).
> j) send success response to secondary.
> k) secondary process receive response and return.
>
> device detach on secondary:
> a) secondary send async request to primary and wait on a condition
> which will be released by matched response from primary.
> b) primary receive the request and perform pre-detach check, if device
> is locked, goto j).
> c) primary send pre-detach async request to all secondary.
> d) secondary perform pre-detach check and send reply.
> e) primary check the reply if any fail goto j).
> f) primary send detach async request to all secondary
> g) secondary detach the device and send reply
> h) primary detach the device.
> i) send success response to secondary, goto k).
> j) send fail response to secondary.
> k) secondary process receive response and return.
>
> Signed-off-by: Qi Zhang <qi.z.zhang@intel.com>
> ---
<snip>
> +TAILQ_HEAD(mp_request_list, mp_request);
> +static struct {
> + struct mp_request_list list;
> + pthread_mutex_t lock;
> +} mp_request_list = {
> + .list = TAILQ_HEAD_INITIALIZER(mp_request_list.list),
> + .lock = PTHREAD_MUTEX_INITIALIZER
> +};
> +
> +#define MP_TIMEOUT_S 5 /**< 5 seconds timeouts */
Patch number 4 should've used this #define to set up its timeout.
> +
> +static struct mp_request *
> +find_request_by_id(uint64_t id)
> +{
> + struct mp_request *req;
> +
> + TAILQ_FOREACH(req, &mp_request_list.list, next) {
> + if (req->user_req.id == id)
> + break;
> + }
> + return req;
> +}
> +
<snip>
> + if (resp->result)
> + return resp->result;
> + }
> +
> + return 0;
> +}
> +
> +static int
> +send_response_to_secondary(const struct eth_dev_mp_req *req, int result)
> +{
> + struct rte_mp_msg resp_msg = {0};
I've been bitten by this in the past - some compilers (*cough* clang
*cough*) don't like this kind of zero-initialization depending on which
type of parameter comes first in the structure, so i would refrain from
using it and used memset(0) instead.
> + struct eth_dev_mp_req *resp =
> + (struct eth_dev_mp_req *)resp_msg.param;
> + int ret = 0;
> +
> + resp_msg.len_param = sizeof(*resp);
> + strcpy(resp_msg.name, ETH_DEV_MP_ACTION_RESPONSE);
here and in other places - strlcpy()?
> + memcpy(resp, req, sizeof(*req));
> + resp->result = result;
> +
> + ret = rte_mp_sendmsg(&resp_msg);
> + if (ret)
> + ethdev_log(ERR, "failed to send response to secondary\n");
> +
> + return ret;
> +}
> +
> +static int
> +handle_async_attach_response(const struct rte_mp_msg *request,
> + const struct rte_mp_reply *reply)
> +{
<snip>
> + else
> + return -1;
> + do {
> + ret = rte_mp_request_async(&mp_req, &ts, clb);
> + } while (ret != 0 && rte_errno == EEXIST);
> +
> + if (ret)
> + ethdev_log(ERR, "couldn't send async request\n");
> + entry = find_request_by_id(req->id > + (void)entry;
Why did you look up entry and then marked it as used without checking
the return value? Leftover? Some code missing?
> + return ret;
> +}
> +
> +static int handle_secondary_request(const struct rte_mp_msg *msg,
> + const void *peer __rte_unused)
> +{
> + const struct eth_dev_mp_req *req =
> + (const struct eth_dev_mp_req *)msg->param;
> + struct eth_dev_mp_req tmp_req;
<snip>
> @@ -124,10 +490,101 @@ static int handle_primary_request(const struct rte_mp_msg *msg, const void *peer
> return 0;
> }
>
> +/**
> + * secondary to primary request.
> + *
> + * device attach:
> + * a) seconary send request to primary.
> + * b) primary attach the new device if failed goto i).
> + * c) primary forward attach request to all secondary.
> + * d) secondary receive request and attach device and send reply.
> + * e) primary check the reply if all success go to j).
> + * f) primary send attach rollback request to all secondary.
> + * g) secondary receive the request and detach device and send reply.
> + * h) primary receive the reply and detach device as rollback action.
> + * i) send fail response to secondary, goto k).
> + * j) send success response to secondary.
> + * k) end.
> +
> + * device detach:
> + * a) secondary send request to primary.
> + * b) primary perform pre-detach check, if device is locked, got j).
> + * c) primary send pre-detach check request to all secondary.
> + * d) secondary perform pre-detach check and send reply.
> + * e) primary check the reply if any fail goto j).
> + * f) primary send detach request to all secondary
> + * g) secondary detach the device and send reply
> + * h) primary detach the device.
> + * i) send success response to secondary, goto k).
> + * j) send fail response to secondary.
> + * k) end.
> + */
I think this comment should be at the top of this file.
> -----Original Message-----
> From: Burakov, Anatoly
> Sent: Monday, June 18, 2018 4:51 PM
> To: Zhang, Qi Z <qi.z.zhang@intel.com>; thomas@monjalon.net
> Cc: Ananyev, Konstantin <konstantin.ananyev@intel.com>; dev@dpdk.org;
> Richardson, Bruce <bruce.richardson@intel.com>; Yigit, Ferruh
> <ferruh.yigit@intel.com>; Shelton, Benjamin H
> <benjamin.h.shelton@intel.com>; Vangati, Narender
> <narender.vangati@intel.com>
> Subject: Re: [PATCH 06/22] ethdev: support attach or detach share device
> from secondary
>
>
> > + else
> > + return -1;
> > + do {
> > + ret = rte_mp_request_async(&mp_req, &ts, clb);
> > + } while (ret != 0 && rte_errno == EEXIST);
> > +
> > + if (ret)
> > + ethdev_log(ERR, "couldn't send async request\n");
> > + entry = find_request_by_id(req->id > + (void)entry;
>
> Why did you look up entry and then marked it as used without checking the
> return value? Leftover? Some code missing?
Some debug code forgot be removed :)
BTW, also accept all other comments
Thanks
Qi
@@ -2,10 +2,69 @@
* Copyright(c) 2010-2018 Intel Corporation
*/
+#include <sys/time.h>
+
#include "rte_ethdev_driver.h"
#include "rte_ethdev_mp.h"
#include "rte_ethdev_lock.h"
+enum req_state {
+ REQ_STATE_INACTIVE = 0,
+ REQ_STATE_ACTIVE,
+ REQ_STATE_COMPLETE
+};
+
+struct mp_request {
+ TAILQ_ENTRY(mp_request) next;
+ struct eth_dev_mp_req user_req; /**< contents of request */
+ pthread_cond_t cond; /**< variable we use to time out on this request */
+ enum req_state state; /**< indicate status of this request */
+};
+
+/*
+ * We could've used just a single request, but it may be possible for
+ * secondaries to timeout earlier than the primary, and send a new request while
+ * primary is still expecting replies to the old one. Therefore, each new
+ * request will get assigned a new ID, which is how we will distinguish between
+ * expected and unexpected messages.
+ */
+TAILQ_HEAD(mp_request_list, mp_request);
+static struct {
+ struct mp_request_list list;
+ pthread_mutex_t lock;
+} mp_request_list = {
+ .list = TAILQ_HEAD_INITIALIZER(mp_request_list.list),
+ .lock = PTHREAD_MUTEX_INITIALIZER
+};
+
+#define MP_TIMEOUT_S 5 /**< 5 seconds timeouts */
+
+static struct mp_request *
+find_request_by_id(uint64_t id)
+{
+ struct mp_request *req;
+
+ TAILQ_FOREACH(req, &mp_request_list.list, next) {
+ if (req->user_req.id == id)
+ break;
+ }
+ return req;
+}
+
+static uint64_t
+get_unique_id(void)
+{
+ uint64_t id;
+
+ do {
+ id = rte_rand();
+ } while (find_request_by_id(id) != NULL);
+ return id;
+}
+
+static int
+send_request_to_secondary_async(const struct eth_dev_mp_req *req);
+
static int detach_on_secondary(uint16_t port_id)
{
struct rte_device *dev;
@@ -72,18 +131,325 @@ static int attach_on_secondary(const char *devargs, uint16_t port_id)
return 0;
}
-static int handle_secondary_request(const struct rte_mp_msg *msg, const void *peer)
+static int
+check_reply(const struct eth_dev_mp_req *req,
+ const struct rte_mp_reply *reply)
+{
+ struct eth_dev_mp_req *resp;
+ int i;
+
+ if (reply->nb_received != reply->nb_sent)
+ return -EINVAL;
+
+ for (i = 0; i < reply->nb_received; i++) {
+ resp = (struct eth_dev_mp_req *)reply->msgs[i].param;
+
+ if (resp->t != req->t) {
+ ethdev_log(ERR, "Unexpected response to async request\n");
+ return -EINVAL;
+ }
+
+ if (resp->id != req->id) {
+ ethdev_log(ERR, "response to wrong async request\n");
+ return -EINVAL;
+ }
+
+ if (resp->result)
+ return resp->result;
+ }
+
+ return 0;
+}
+
+static int
+send_response_to_secondary(const struct eth_dev_mp_req *req, int result)
+{
+ struct rte_mp_msg resp_msg = {0};
+ struct eth_dev_mp_req *resp =
+ (struct eth_dev_mp_req *)resp_msg.param;
+ int ret = 0;
+
+ resp_msg.len_param = sizeof(*resp);
+ strcpy(resp_msg.name, ETH_DEV_MP_ACTION_RESPONSE);
+ memcpy(resp, req, sizeof(*req));
+ resp->result = result;
+
+ ret = rte_mp_sendmsg(&resp_msg);
+ if (ret)
+ ethdev_log(ERR, "failed to send response to secondary\n");
+
+ return ret;
+}
+
+static int
+handle_async_attach_response(const struct rte_mp_msg *request,
+ const struct rte_mp_reply *reply)
+{
+ struct mp_request *entry;
+ const struct eth_dev_mp_req *req =
+ (const struct eth_dev_mp_req *)request->param;
+ struct eth_dev_mp_req tmp_req;
+ int ret = 0;
+
+ pthread_mutex_lock(&mp_request_list.lock);
+
+ entry = find_request_by_id(req->id);
+ if (!entry) {
+ ethdev_log(ERR, "wrong request ID\n");
+ ret = -EINVAL;
+ goto finish;
+ }
+
+ ret = check_reply(req, reply);
+ if (ret) {
+ tmp_req = *req;
+ tmp_req.t = REQ_TYPE_ATTACH_ROLLBACK;
+
+ ret = send_request_to_secondary_async(&tmp_req);
+ if (ret) {
+ ethdev_log(ERR, "couldn't send async request\n");
+ TAILQ_REMOVE(&mp_request_list.list, entry, next);
+ free(entry);
+ }
+ } else {
+ send_response_to_secondary(req, 0);
+ TAILQ_REMOVE(&mp_request_list.list, entry, next);
+ free(entry);
+ }
+
+finish:
+ pthread_mutex_unlock(&mp_request_list.lock);
+ return ret;
+}
+
+static int
+handle_async_detach_response(const struct rte_mp_msg *request,
+ const struct rte_mp_reply *reply)
+{
+ struct mp_request *entry;
+ const struct eth_dev_mp_req *req =
+ (const struct eth_dev_mp_req *)request->param;
+ int ret = 0;
+
+ pthread_mutex_lock(&mp_request_list.lock);
+
+ entry = find_request_by_id(req->id);
+ if (!entry) {
+ ethdev_log(ERR, "wrong request ID\n");
+ ret = -EINVAL;
+ goto finish;
+ }
+
+ ret = check_reply(req, reply);
+ if (ret) {
+ send_response_to_secondary(req, ret);
+ } else {
+ do_eth_dev_detach(req->port_id);
+ send_response_to_secondary(req, 0);
+ }
+ TAILQ_REMOVE(&mp_request_list.list, entry, next);
+ free(entry);
+
+finish:
+ pthread_mutex_unlock(&mp_request_list.lock);
+ return ret;
+}
+
+static int
+handle_async_pre_detach_response(const struct rte_mp_msg *request,
+ const struct rte_mp_reply *reply)
+{
+ struct mp_request *entry;
+ const struct eth_dev_mp_req *req =
+ (const struct eth_dev_mp_req *)request->param;
+ struct eth_dev_mp_req tmp_req;
+ int ret = 0;
+
+ pthread_mutex_lock(&mp_request_list.lock);
+
+ entry = find_request_by_id(req->id);
+ if (!entry) {
+ ethdev_log(ERR, "wrong request ID\n");
+ ret = -EINVAL;
+ goto finish;
+ }
+
+ ret = check_reply(req, reply);
+ if (!ret) {
+ tmp_req = *req;
+ tmp_req.t = REQ_TYPE_DETACH;
+
+ ret = send_request_to_secondary_async(&tmp_req);
+ if (ret) {
+ ethdev_log(ERR, "couldn't send async request\n");
+ TAILQ_REMOVE(&mp_request_list.list, entry, next);
+ free(entry);
+ }
+ } else {
+ send_response_to_secondary(req, ret);
+ TAILQ_REMOVE(&mp_request_list.list, entry, next);
+ free(entry);
+ }
+
+finish:
+ pthread_mutex_unlock(&mp_request_list.lock);
+ return 0;
+}
+
+static int
+handle_async_rollback_response(const struct rte_mp_msg *request,
+ const struct rte_mp_reply *reply __rte_unused)
+{
+ struct mp_request *entry;
+ const struct eth_dev_mp_req *req =
+ (const struct eth_dev_mp_req *)request->param;
+ int ret = 0;
+
+ pthread_mutex_lock(&mp_request_list.lock);
+
+ entry = find_request_by_id(req->id);
+ if (!entry) {
+ ethdev_log(ERR, "wrong request ID\n");
+ ret = -EINVAL;
+ goto finish;
+ }
+
+ /* we have nothing to do if rollback still fail, just detach */
+ do_eth_dev_detach(req->port_id);
+ /* send response to secondary with the reason of rollback */
+ send_response_to_secondary(req, req->result);
+ TAILQ_REMOVE(&mp_request_list.list, entry, next);
+ free(entry);
+
+finish:
+ pthread_mutex_unlock(&mp_request_list.lock);
+ return ret;
+}
+
+static int
+send_request_to_secondary_async(const struct eth_dev_mp_req *req)
{
- (void)msg;
- (void)(peer);
- return -ENOTSUP;
+ struct rte_mp_msg mp_req = {0};
+ struct timespec ts = {.tv_sec = MP_TIMEOUT_S, .tv_nsec = 0};
+ rte_mp_async_reply_t clb;
+ struct mp_request *entry;
+ int ret = 0;
+
+ memcpy(mp_req.param, req, sizeof(*req));
+ mp_req.len_param = sizeof(*req);
+ strcpy(mp_req.name, ETH_DEV_MP_ACTION_REQUEST);
+
+ if (req->t == REQ_TYPE_ATTACH)
+ clb = handle_async_attach_response;
+ else if (req->t == REQ_TYPE_PRE_DETACH)
+ clb = handle_async_pre_detach_response;
+ else if (req->t == REQ_TYPE_DETACH)
+ clb = handle_async_detach_response;
+ else if (req->t == REQ_TYPE_ATTACH_ROLLBACK)
+ clb = handle_async_rollback_response;
+ else
+ return -1;
+ do {
+ ret = rte_mp_request_async(&mp_req, &ts, clb);
+ } while (ret != 0 && rte_errno == EEXIST);
+
+ if (ret)
+ ethdev_log(ERR, "couldn't send async request\n");
+ entry = find_request_by_id(req->id);
+ (void)entry;
+ return ret;
+}
+
+static int handle_secondary_request(const struct rte_mp_msg *msg,
+ const void *peer __rte_unused)
+{
+ const struct eth_dev_mp_req *req =
+ (const struct eth_dev_mp_req *)msg->param;
+ struct eth_dev_mp_req tmp_req;
+ struct mp_request *entry;
+ uint16_t port_id;
+ int ret = 0;
+
+ pthread_mutex_lock(&mp_request_list.lock);
+
+ entry = find_request_by_id(req->id);
+ if (entry) {
+ ethdev_log(ERR, "duplicate request id\n");
+ ret = -EEXIST;
+ goto finish;
+ }
+
+ entry = malloc(sizeof(*entry));
+ if (entry == NULL) {
+ ethdev_log(ERR, "not enough memory to allocate request entry\n");
+ ret = -ENOMEM;
+ goto finish;
+ }
+
+ if (req->t == REQ_TYPE_ATTACH) {
+ ret = do_eth_dev_attach(req->devargs, &port_id);
+ if (!ret) {
+ tmp_req = *req;
+ tmp_req.port_id = port_id;
+ ret = send_request_to_secondary_async(&tmp_req);
+ }
+ } else if (req->t == REQ_TYPE_DETACH) {
+ if (!rte_eth_dev_is_valid_port(req->port_id))
+ ret = -EINVAL;
+ if (!ret)
+ ret = process_lock_callbacks(req->port_id);
+ if (!ret) {
+ tmp_req = *req;
+ tmp_req.t = REQ_TYPE_PRE_DETACH;
+ ret = send_request_to_secondary_async(&tmp_req);
+ }
+ } else {
+ ethdev_log(ERR, "unsupported secondary to primary request\n");
+ ret = -ENOTSUP;
+ goto finish;
+ }
+
+ if (ret) {
+ ret = send_response_to_secondary(req, ret);
+ if (ret) {
+ ethdev_log(ERR, "failed to send response to secondary\n");
+ goto finish;
+ }
+ } else {
+ memcpy(&entry->user_req, req, sizeof(*req));
+ entry->state = REQ_STATE_ACTIVE;
+ TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
+ entry = NULL;
+ }
+
+finish:
+ pthread_mutex_unlock(&mp_request_list.lock);
+ if (entry)
+ free(entry);
+ return ret;
}
-static int handle_primary_response(const struct rte_mp_msg *msg, const void *peer)
+static int handle_primary_response(const struct rte_mp_msg *msg,
+ const void *peer __rte_unused)
{
- (void)msg;
- (void)(peer);
- return -ENOTSUP;
+ const struct eth_dev_mp_req *req =
+ (const struct eth_dev_mp_req *)msg->param;
+ struct mp_request *entry;
+
+ pthread_mutex_lock(&mp_request_list.lock);
+
+ entry = find_request_by_id(req->id);
+ if (entry) {
+ entry->user_req.result = req->result;
+ entry->user_req.port_id = req->port_id;
+ entry->state = REQ_STATE_COMPLETE;
+
+ pthread_cond_signal(&entry->cond);
+ }
+
+ pthread_mutex_unlock(&mp_request_list.lock);
+
+ return 0;
}
static int handle_primary_request(const struct rte_mp_msg *msg, const void *peer)
@@ -124,10 +490,101 @@ static int handle_primary_request(const struct rte_mp_msg *msg, const void *peer
return 0;
}
+/**
+ * secondary to primary request.
+ *
+ * device attach:
+ * a) seconary send request to primary.
+ * b) primary attach the new device if failed goto i).
+ * c) primary forward attach request to all secondary.
+ * d) secondary receive request and attach device and send reply.
+ * e) primary check the reply if all success go to j).
+ * f) primary send attach rollback request to all secondary.
+ * g) secondary receive the request and detach device and send reply.
+ * h) primary receive the reply and detach device as rollback action.
+ * i) send fail response to secondary, goto k).
+ * j) send success response to secondary.
+ * k) end.
+
+ * device detach:
+ * a) secondary send request to primary.
+ * b) primary perform pre-detach check, if device is locked, got j).
+ * c) primary send pre-detach check request to all secondary.
+ * d) secondary perform pre-detach check and send reply.
+ * e) primary check the reply if any fail goto j).
+ * f) primary send detach request to all secondary
+ * g) secondary detach the device and send reply
+ * h) primary detach the device.
+ * i) send success response to secondary, goto k).
+ * j) send fail response to secondary.
+ * k) end.
+ */
int rte_eth_dev_request_to_primary(struct eth_dev_mp_req *req)
{
- (void)req;
- return -ENOTSUP;
+ struct rte_mp_msg msg = {0};
+ struct eth_dev_mp_req *msg_req = (struct eth_dev_mp_req *)msg.param;
+ struct mp_request *entry;
+ struct timespec ts = {0};
+ struct timeval now;
+ int ret = 0;
+
+ entry = malloc(sizeof(*entry));
+ if (entry == NULL) {
+ ethdev_log(ERR, "not enough memory to allocate request entry\n");
+ return -ENOMEM;
+ }
+
+ pthread_mutex_lock(&mp_request_list.lock);
+
+ ret = gettimeofday(&now, NULL);
+ if (ret) {
+ ethdev_log(ERR, "cannot get current time\n");
+ ret = -EINVAL;
+ goto finish;
+ }
+
+ ts.tv_nsec = (now.tv_usec * 1000) % 1000000000;
+ ts.tv_sec = now.tv_sec + MP_TIMEOUT_S +
+ (now.tv_usec * 1000) / 1000000000;
+
+ pthread_cond_init(&entry->cond, NULL);
+
+ msg.len_param = sizeof(*req);
+ strcpy(msg.name, ETH_DEV_MP_ACTION_REQUEST);
+
+ req->id = get_unique_id();
+
+ memcpy(msg_req, req, sizeof(*req));
+
+ ret = rte_mp_sendmsg(&msg);
+ if (ret) {
+ ethdev_log(ERR, "cannot send message to primary");
+ goto finish;
+ }
+
+ memcpy(&entry->user_req, req, sizeof(*req));
+
+ entry->state = REQ_STATE_ACTIVE;
+
+ TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
+
+ do {
+ ret = pthread_cond_timedwait(&entry->cond,
+ &mp_request_list.lock, &ts);
+ } while (ret != 0 && ret != ETIMEDOUT);
+
+ if (entry->state != REQ_STATE_COMPLETE) {
+ RTE_LOG(ERR, EAL, "request time out\n");
+ ret = -ETIMEDOUT;
+ } else {
+ req->result = entry->user_req.result;
+ }
+ TAILQ_REMOVE(&mp_request_list.list, entry, next);
+
+finish:
+ pthread_mutex_unlock(&mp_request_list.lock);
+ free(entry);
+ return ret;
}
/**
@@ -18,6 +18,7 @@ enum eth_dev_req_type {
};
struct eth_dev_mp_req {
+ uint64_t id;
enum eth_dev_req_type t;
char devargs[MAX_DEV_ARGS_LEN];
uint16_t port_id;