@@ -3,12 +3,103 @@
*/
#include <rte_string_fns.h>
+#include <sys/time.h>
+
+#include <rte_alarm.h>
+
#include "rte_ethdev_driver.h"
#include "ethdev_mp.h"
#include "ethdev_lock.h"
+#include "ethdev_private.h"
+
+/**
+ * secondary to primary request.
+ * start from function eth_dev_request_to_primary.
+ *
+ * device attach:
+ * a) seconary send request to primary.
+ * b) primary attach the new device if failed goto i).
+ * c) primary forward attach request to all secondary.
+ * d) secondary receive request and attach device and send reply.
+ * e) primary check the reply if all success go to j).
+ * f) primary send attach rollback request to all secondary.
+ * g) secondary receive the request and detach device and send reply.
+ * h) primary receive the reply and detach device as rollback action.
+ * i) send fail response to secondary, goto k).
+ * j) send success response to secondary.
+ * k) end.
+
+ * device detach:
+ * a) secondary send request to primary.
+ * b) primary perform pre-detach check, if device is locked, got j).
+ * c) primary send pre-detach check request to all secondary.
+ * d) secondary perform pre-detach check and send reply.
+ * e) primary check the reply if any fail goto j).
+ * f) primary send detach request to all secondary
+ * g) secondary detach the device and send reply
+ * h) primary detach the device.
+ * i) send success response to secondary, goto k).
+ * j) send fail response to secondary.
+ * k) end.
+ */
+
+enum req_state {
+ REQ_STATE_INACTIVE = 0,
+ REQ_STATE_ACTIVE,
+ REQ_STATE_COMPLETE
+};
+
+struct mp_request {
+ TAILQ_ENTRY(mp_request) next;
+ struct eth_dev_mp_req user_req; /**< contents of request */
+ pthread_cond_t cond; /**< variable we use to time out on this request */
+ enum req_state state; /**< indicate status of this request */
+};
+
+/*
+ * We could've used just a single request, but it may be possible for
+ * secondaries to timeout earlier than the primary, and send a new request while
+ * primary is still expecting replies to the old one. Therefore, each new
+ * request will get assigned a new ID, which is how we will distinguish between
+ * expected and unexpected messages.
+ */
+TAILQ_HEAD(mp_request_list, mp_request);
+static struct {
+ struct mp_request_list list;
+ pthread_mutex_t lock;
+} mp_request_list = {
+ .list = TAILQ_HEAD_INITIALIZER(mp_request_list.list),
+ .lock = PTHREAD_MUTEX_INITIALIZER
+};
#define MP_TIMEOUT_S 5 /**< 5 seconds timeouts */
+static struct mp_request *
+find_request_by_id(uint64_t id)
+{
+ struct mp_request *req;
+
+ TAILQ_FOREACH(req, &mp_request_list.list, next) {
+ if (req->user_req.id == id)
+ break;
+ }
+ return req;
+}
+
+static uint64_t
+get_unique_id(void)
+{
+ uint64_t id;
+
+ do {
+ id = rte_rand();
+ } while (find_request_by_id(id) != NULL);
+ return id;
+}
+
+static int
+send_request_to_secondary_async(const struct eth_dev_mp_req *req);
+
static int detach_on_secondary(uint16_t port_id)
{
struct rte_device *dev;
@@ -78,19 +169,355 @@ static int attach_on_secondary(const char *devargs, uint16_t port_id)
}
static int
-handle_secondary_request(const struct rte_mp_msg *msg, const void *peer)
+check_reply(const struct eth_dev_mp_req *req, const struct rte_mp_reply *reply)
+{
+ struct eth_dev_mp_req *resp;
+ int i;
+
+ if (reply->nb_received != reply->nb_sent)
+ return -EINVAL;
+
+ for (i = 0; i < reply->nb_received; i++) {
+ resp = (struct eth_dev_mp_req *)reply->msgs[i].param;
+
+ if (resp->t != req->t) {
+ ethdev_log(ERR, "Unexpected response to async request\n");
+ return -EINVAL;
+ }
+
+ if (resp->id != req->id) {
+ ethdev_log(ERR, "response to wrong async request\n");
+ return -ENOENT;
+ }
+
+ if (resp->result)
+ return resp->result;
+ }
+
+ return 0;
+}
+
+static int
+send_response_to_secondary(const struct eth_dev_mp_req *req, int result)
+{
+ struct rte_mp_msg resp_msg;
+ struct eth_dev_mp_req *resp =
+ (struct eth_dev_mp_req *)resp_msg.param;
+ int ret = 0;
+
+ memset(&resp_msg, 0, sizeof(resp_msg));
+ resp_msg.len_param = sizeof(*resp);
+ strcpy(resp_msg.name, ETH_DEV_MP_ACTION_RESPONSE);
+ memcpy(resp, req, sizeof(*req));
+ resp->result = result;
+
+ ret = rte_mp_sendmsg(&resp_msg);
+ if (ret)
+ ethdev_log(ERR, "failed to send response to secondary\n");
+
+ return ret;
+}
+
+static int
+handle_async_attach_response(const struct rte_mp_msg *request,
+ const struct rte_mp_reply *reply)
+{
+ const struct eth_dev_mp_req *req =
+ (const struct eth_dev_mp_req *)request->param;
+ struct mp_request *entry;
+ struct eth_dev_mp_req tmp_req;
+ int ret = 0;
+
+ pthread_mutex_lock(&mp_request_list.lock);
+
+ entry = find_request_by_id(req->id);
+ if (!entry) {
+ ethdev_log(ERR, "wrong request ID\n");
+ ret = -EINVAL;
+ goto finish;
+ }
+
+ ret = check_reply(req, reply);
+ if (ret) {
+ tmp_req = *req;
+ tmp_req.t = REQ_TYPE_ATTACH_ROLLBACK;
+
+ ret = send_request_to_secondary_async(&tmp_req);
+ if (ret) {
+ ethdev_log(ERR, "couldn't send async request\n");
+ TAILQ_REMOVE(&mp_request_list.list, entry, next);
+ free(entry);
+ }
+ } else {
+ send_response_to_secondary(req, 0);
+ TAILQ_REMOVE(&mp_request_list.list, entry, next);
+ free(entry);
+ }
+
+finish:
+ pthread_mutex_unlock(&mp_request_list.lock);
+ return ret;
+}
+
+static int
+handle_async_detach_response(const struct rte_mp_msg *request,
+ const struct rte_mp_reply *reply)
{
- RTE_SET_USED(msg);
- RTE_SET_USED(peer);
- return -ENOTSUP;
+ const struct eth_dev_mp_req *req =
+ (const struct eth_dev_mp_req *)request->param;
+ struct mp_request *entry;
+ int ret = 0;
+
+ pthread_mutex_lock(&mp_request_list.lock);
+
+ entry = find_request_by_id(req->id);
+ if (!entry) {
+ ethdev_log(ERR, "wrong request ID\n");
+ ret = -EINVAL;
+ goto finish;
+ }
+
+ ret = check_reply(req, reply);
+ if (ret) {
+ send_response_to_secondary(req, ret);
+ } else {
+ do_eth_dev_detach(req->port_id);
+ send_response_to_secondary(req, 0);
+ }
+ TAILQ_REMOVE(&mp_request_list.list, entry, next);
+ free(entry);
+
+finish:
+ pthread_mutex_unlock(&mp_request_list.lock);
+ return ret;
}
static int
-handle_primary_response(const struct rte_mp_msg *msg, const void *peer)
+handle_async_pre_detach_response(const struct rte_mp_msg *request,
+ const struct rte_mp_reply *reply)
{
- RTE_SET_USED(msg);
- RTE_SET_USED(peer);
- return -ENOTSUP;
+ const struct eth_dev_mp_req *req =
+ (const struct eth_dev_mp_req *)request->param;
+ struct eth_dev_mp_req tmp_req;
+ struct mp_request *entry;
+ int ret = 0;
+
+ pthread_mutex_lock(&mp_request_list.lock);
+
+ entry = find_request_by_id(req->id);
+ if (!entry) {
+ ethdev_log(ERR, "wrong request ID\n");
+ ret = -EINVAL;
+ goto finish;
+ }
+
+ ret = check_reply(req, reply);
+ if (!ret) {
+ tmp_req = *req;
+ tmp_req.t = REQ_TYPE_DETACH;
+
+ ret = send_request_to_secondary_async(&tmp_req);
+ if (ret) {
+ ethdev_log(ERR, "couldn't send async request\n");
+ TAILQ_REMOVE(&mp_request_list.list, entry, next);
+ free(entry);
+ }
+ } else {
+ send_response_to_secondary(req, ret);
+ TAILQ_REMOVE(&mp_request_list.list, entry, next);
+ free(entry);
+ }
+
+finish:
+ pthread_mutex_unlock(&mp_request_list.lock);
+ return 0;
+}
+
+static int
+handle_async_rollback_response(const struct rte_mp_msg *request,
+ const struct rte_mp_reply *reply __rte_unused)
+{
+ const struct eth_dev_mp_req *req =
+ (const struct eth_dev_mp_req *)request->param;
+ struct mp_request *entry;
+ int ret = 0;
+
+ pthread_mutex_lock(&mp_request_list.lock);
+
+ entry = find_request_by_id(req->id);
+ if (!entry) {
+ ethdev_log(ERR, "wrong request ID\n");
+ ret = -EINVAL;
+ goto finish;
+ }
+
+ /* we have nothing to do if rollback still fail, just detach */
+ do_eth_dev_detach(req->port_id);
+ /* send response to secondary with the reason of rollback */
+ send_response_to_secondary(req, req->result);
+ TAILQ_REMOVE(&mp_request_list.list, entry, next);
+ free(entry);
+
+finish:
+ pthread_mutex_unlock(&mp_request_list.lock);
+ return ret;
+}
+
+static int
+send_request_to_secondary_async(const struct eth_dev_mp_req *req)
+{
+ struct timespec ts = {.tv_sec = MP_TIMEOUT_S, .tv_nsec = 0};
+ struct rte_mp_msg mp_req;
+ rte_mp_async_reply_t clb;
+ int ret = 0;
+
+ memset(&mp_req, 0, sizeof(mp_req));
+ memcpy(mp_req.param, req, sizeof(*req));
+ mp_req.len_param = sizeof(*req);
+ strcpy(mp_req.name, ETH_DEV_MP_ACTION_REQUEST);
+
+ if (req->t == REQ_TYPE_ATTACH)
+ clb = handle_async_attach_response;
+ else if (req->t == REQ_TYPE_PRE_DETACH)
+ clb = handle_async_pre_detach_response;
+ else if (req->t == REQ_TYPE_DETACH)
+ clb = handle_async_detach_response;
+ else if (req->t == REQ_TYPE_ATTACH_ROLLBACK)
+ clb = handle_async_rollback_response;
+ else
+ return -1;
+ do {
+ ret = rte_mp_request_async(&mp_req, &ts, clb);
+ } while (ret != 0 && rte_errno == EEXIST);
+
+ if (ret)
+ ethdev_log(ERR, "couldn't send async request\n");
+
+ return ret;
+}
+
+static void
+__handle_secondary_request(void *param)
+{
+ struct rte_mp_msg *msg = param;
+ const struct eth_dev_mp_req *req =
+ (const struct eth_dev_mp_req *)msg->param;
+ struct eth_dev_mp_req tmp_req;
+ struct mp_request *entry;
+ uint16_t port_id;
+ int ret = 0;
+
+ pthread_mutex_lock(&mp_request_list.lock);
+
+ entry = find_request_by_id(req->id);
+ if (entry) {
+ ethdev_log(ERR, "duplicate request id\n");
+ ret = -EEXIST;
+ goto finish;
+ }
+
+ entry = malloc(sizeof(*entry));
+ if (entry == NULL) {
+ ethdev_log(ERR, "not enough memory to allocate request entry\n");
+ ret = -ENOMEM;
+ goto finish;
+ }
+
+ if (req->t == REQ_TYPE_ATTACH) {
+ ret = do_eth_dev_attach(req->devargs, &port_id);
+ if (!ret) {
+ tmp_req = *req;
+ tmp_req.port_id = port_id;
+ ret = send_request_to_secondary_async(&tmp_req);
+ }
+ } else if (req->t == REQ_TYPE_DETACH) {
+ if (!rte_eth_dev_is_valid_port(req->port_id))
+ ret = -EINVAL;
+ if (!ret)
+ ret = process_lock_callbacks(req->port_id);
+ if (!ret) {
+ tmp_req = *req;
+ tmp_req.t = REQ_TYPE_PRE_DETACH;
+ ret = send_request_to_secondary_async(&tmp_req);
+ }
+ } else {
+ ethdev_log(ERR, "unsupported secondary to primary request\n");
+ ret = -ENOTSUP;
+ goto finish;
+ }
+
+ if (ret) {
+ ret = send_response_to_secondary(req, ret);
+ if (ret) {
+ ethdev_log(ERR, "failed to send response to secondary\n");
+ goto finish;
+ }
+ } else {
+ memcpy(&entry->user_req, req, sizeof(*req));
+ entry->state = REQ_STATE_ACTIVE;
+ TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
+ entry = NULL;
+ }
+
+finish:
+ pthread_mutex_unlock(&mp_request_list.lock);
+ if (entry)
+ free(entry);
+ free(msg);
+}
+
+static int
+handle_secondary_request(const struct rte_mp_msg *msg,
+ const void *peer __rte_unused)
+{
+ struct rte_mp_msg *msg_cpy;
+ const struct eth_dev_mp_req *req =
+ (const struct eth_dev_mp_req *)msg->param;
+ int ret = 0;
+
+ msg_cpy = malloc(sizeof(*msg_cpy));
+ if (msg_cpy == NULL) {
+ ethdev_log(ERR, "not enough memory\n");
+ return send_response_to_secondary(req, -ENOMEM);
+ }
+
+ memcpy(msg_cpy, msg, sizeof(*msg_cpy));
+
+ /**
+ * We can't handle the secondary request in mp callback because
+ * we are running in primary process, we are going to invoke SYNC IPC
+ * in rte_malloc.
+ */
+ ret = rte_eal_alarm_set(1, __handle_secondary_request, msg_cpy);
+ if (ret) {
+ ethdev_log(ERR, "failed to set alarm callback\n");
+ return send_response_to_secondary(req, ret);
+ }
+ return 0;
+}
+
+static int
+handle_primary_response(const struct rte_mp_msg *msg,
+ const void *peer __rte_unused)
+{
+ const struct eth_dev_mp_req *req =
+ (const struct eth_dev_mp_req *)msg->param;
+ struct mp_request *entry;
+
+ pthread_mutex_lock(&mp_request_list.lock);
+
+ entry = find_request_by_id(req->id);
+ if (entry) {
+ entry->user_req.result = req->result;
+ entry->user_req.port_id = req->port_id;
+ entry->state = REQ_STATE_COMPLETE;
+
+ pthread_cond_signal(&entry->cond);
+ }
+
+ pthread_mutex_unlock(&mp_request_list.lock);
+
+ return 0;
}
static int
@@ -134,8 +561,74 @@ handle_primary_request(const struct rte_mp_msg *msg, const void *peer)
int eth_dev_request_to_primary(struct eth_dev_mp_req *req)
{
- RTE_SET_USED(req);
- return -ENOTSUP;
+ struct rte_mp_msg msg;
+ struct eth_dev_mp_req *msg_req = (struct eth_dev_mp_req *)msg.param;
+ struct mp_request *entry;
+ struct timespec ts;
+ struct timeval now;
+ int ret = 0;
+
+ memset(&msg, 0, sizeof(msg));
+ memset(&ts, 0, sizeof(ts));
+
+ entry = malloc(sizeof(*entry));
+ if (entry == NULL) {
+ ethdev_log(ERR, "not enough memory to allocate request entry\n");
+ return -ENOMEM;
+ }
+
+ pthread_mutex_lock(&mp_request_list.lock);
+
+ ret = gettimeofday(&now, NULL);
+ if (ret) {
+ ethdev_log(ERR, "cannot get current time\n");
+ ret = -EINVAL;
+ goto finish;
+ }
+
+ ts.tv_nsec = (now.tv_usec * 1000) % 1000000000;
+ ts.tv_sec = now.tv_sec + MP_TIMEOUT_S +
+ (now.tv_usec * 1000) / 1000000000;
+
+ pthread_cond_init(&entry->cond, NULL);
+
+ msg.len_param = sizeof(*req);
+ strcpy(msg.name, ETH_DEV_MP_ACTION_REQUEST);
+
+ req->id = get_unique_id();
+
+ memcpy(msg_req, req, sizeof(*req));
+
+ ret = rte_mp_sendmsg(&msg);
+ if (ret) {
+ ethdev_log(ERR, "cannot send message to primary");
+ goto finish;
+ }
+
+ memcpy(&entry->user_req, req, sizeof(*req));
+
+ entry->state = REQ_STATE_ACTIVE;
+
+ TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
+
+ do {
+ ret = pthread_cond_timedwait(&entry->cond,
+ &mp_request_list.lock, &ts);
+ } while (ret != 0 && ret != ETIMEDOUT);
+
+ if (entry->state != REQ_STATE_COMPLETE) {
+ RTE_LOG(ERR, EAL, "request time out\n");
+ ret = -ETIMEDOUT;
+ } else {
+ req->port_id = entry->user_req.port_id;
+ req->result = entry->user_req.result;
+ }
+ TAILQ_REMOVE(&mp_request_list.list, entry, next);
+
+finish:
+ pthread_mutex_unlock(&mp_request_list.lock);
+ free(entry);
+ return ret;
}
/**
@@ -18,6 +18,7 @@ enum eth_dev_req_type {
};
struct eth_dev_mp_req {
+ uint64_t id;
enum eth_dev_req_type t;
char devargs[MAX_DEV_ARGS_LEN];
uint16_t port_id;