From patchwork Thu Feb 29 12:25:00 2024 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Maxime Coquelin X-Patchwork-Id: 137484 X-Patchwork-Delegate: maxime.coquelin@redhat.com Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id 9762043C36; Thu, 29 Feb 2024 13:25:41 +0100 (CET) Received: from mails.dpdk.org (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id D4AEC42DCA; Thu, 29 Feb 2024 13:25:21 +0100 (CET) Received: from us-smtp-delivery-124.mimecast.com (us-smtp-delivery-124.mimecast.com [170.10.133.124]) by mails.dpdk.org (Postfix) with ESMTP id 20C2742D72 for ; Thu, 29 Feb 2024 13:25:16 +0100 (CET) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=redhat.com; s=mimecast20190719; t=1709209515; h=from:from:reply-to:subject:subject:date:date:message-id:message-id: to:to:cc:cc:mime-version:mime-version:content-type:content-type: content-transfer-encoding:content-transfer-encoding: in-reply-to:in-reply-to:references:references; bh=kWOrb3PzXAY7zdFlc7KXowRti6dJPbdTpf1Qdcptsm8=; b=iwInk2Niaw5+QuRMCf8pvFM6H4FYafUIO/hJ2pKjq1H3neYU1pwx97Z4BByVhTMwTb5u0F GYq7wchK9FCt5up11TidGMf/u9rxZMbL6vwkk922UHQCgTqGrc4KpXX69Ht0Qsc8y+haNH eFCCUHSgT1QixCCy9gHmePLPO/lDHQE= Received: from mimecast-mx02.redhat.com (mx-ext.redhat.com [66.187.233.73]) by relay.mimecast.com with ESMTP with STARTTLS (version=TLSv1.3, cipher=TLS_AES_256_GCM_SHA384) id us-mta-584-jFvpeUmAMQ2nmGHx3Eiikw-1; Thu, 29 Feb 2024 07:25:14 -0500 X-MC-Unique: jFvpeUmAMQ2nmGHx3Eiikw-1 Received: from smtp.corp.redhat.com (int-mx03.intmail.prod.int.rdu2.redhat.com [10.11.54.3]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256) (No client certificate requested) by mimecast-mx02.redhat.com (Postfix) with ESMTPS id D2A18280A9C8; Thu, 29 Feb 2024 12:25:13 +0000 (UTC) Received: from max-p1.redhat.com (unknown [10.39.208.20]) by smtp.corp.redhat.com (Postfix) with ESMTP id 5C0101121312; Thu, 29 Feb 2024 12:25:12 +0000 (UTC) From: Maxime Coquelin To: dev@dpdk.org, david.marchand@redhat.com, chenbox@nvidia.com Cc: Maxime Coquelin Subject: [PATCH 5/7] vhost: improve fdset initialization Date: Thu, 29 Feb 2024 13:25:00 +0100 Message-ID: <20240229122502.2572343-6-maxime.coquelin@redhat.com> In-Reply-To: <20240229122502.2572343-1-maxime.coquelin@redhat.com> References: <20240229122502.2572343-1-maxime.coquelin@redhat.com> MIME-Version: 1.0 X-Scanned-By: MIMEDefang 3.4.1 on 10.11.54.3 X-Mimecast-Spam-Score: 0 X-Mimecast-Originator: redhat.com X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org This patch heavily reworks fdset initialization: - fdsets are now dynamically allocated by the FD manager - the event dispatcher is now created by the FD manager - struct fdset is now opaque to VDUSE and Vhost Signed-off-by: Maxime Coquelin --- lib/vhost/fd_man.c | 177 +++++++++++-- lib/vhost/fd_man.c.orig | 538 ++++++++++++++++++++++++++++++++++++++++ lib/vhost/fd_man.h | 39 +-- lib/vhost/socket.c | 24 +- lib/vhost/vduse.c | 29 +-- 5 files changed, 715 insertions(+), 92 deletions(-) create mode 100644 lib/vhost/fd_man.c.orig diff --git a/lib/vhost/fd_man.c b/lib/vhost/fd_man.c index 0ae481b785..8b47c97d45 100644 --- a/lib/vhost/fd_man.c +++ b/lib/vhost/fd_man.c @@ -3,12 +3,16 @@ */ #include +#include #include #include #include #include #include +#include +#include +#include #include "fd_man.h" @@ -19,6 +23,79 @@ RTE_LOG_REGISTER_SUFFIX(vhost_fdset_logtype, fdset, INFO); #define FDPOLLERR (POLLERR | POLLHUP | POLLNVAL) +struct fdentry { + int fd; /* -1 indicates this entry is empty */ + fd_cb rcb; /* callback when this fd is readable. */ + fd_cb wcb; /* callback when this fd is writeable.*/ + void *dat; /* fd context */ + int busy; /* whether this entry is being used in cb. */ +}; + +struct fdset { + char name[RTE_THREAD_NAME_SIZE]; + struct pollfd rwfds[MAX_FDS]; + struct fdentry fd[MAX_FDS]; + rte_thread_t tid; + pthread_mutex_t fd_mutex; + pthread_mutex_t fd_polling_mutex; + int num; /* current fd number of this fdset */ + + union pipefds { + struct { + int pipefd[2]; + }; + struct { + int readfd; + int writefd; + }; + } u; + + pthread_mutex_t sync_mutex; + pthread_cond_t sync_cond; + bool sync; + bool destroy; +}; + +static int fdset_add_no_sync(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void *dat); +static uint32_t fdset_event_dispatch(void *arg); + +#define MAX_FDSETS 8 + +static struct fdset *fdsets[MAX_FDSETS]; +pthread_mutex_t fdsets_mutex = PTHREAD_MUTEX_INITIALIZER; + +static struct fdset * +fdset_lookup(const char *name) +{ + int i; + + for (i = 0; i < MAX_FDSETS; i++) { + struct fdset *fdset = fdsets[i]; + if (fdset == NULL) + continue; + + if (!strncmp(fdset->name, name, RTE_THREAD_NAME_SIZE)) + return fdset; + } + + return NULL; +} + +static int +fdset_insert(struct fdset *fdset) +{ + int i; + + for (i = 0; i < MAX_FDSETS; i++) { + if (fdsets[i] == NULL) { + fdsets[i] = fdset; + return 0; + } + } + + return -1; +} + static void fdset_pipe_read_cb(int readfd, void *dat, int *remove __rte_unused) @@ -63,7 +140,7 @@ fdset_pipe_init(struct fdset *fdset) return -1; } - ret = fdset_add(fdset, fdset->u.readfd, + ret = fdset_add_no_sync(fdset, fdset->u.readfd, fdset_pipe_read_cb, NULL, fdset); if (ret < 0) { VHOST_FDMAN_LOG(ERR, @@ -179,37 +256,82 @@ fdset_add_fd(struct fdset *pfdset, int idx, int fd, pfd->revents = 0; } -void -fdset_uninit(struct fdset *pfdset) -{ - fdset_pipe_uninit(pfdset); -} - -int -fdset_init(struct fdset *pfdset) +struct fdset * +fdset_init(const char *name) { + struct fdset *fdset; + uint32_t val; int i; - if (pfdset == NULL) - return -1; + if (name == NULL) { + VHOST_FDMAN_LOG(ERR, "Invalid name"); + goto err; + } - pthread_mutex_init(&pfdset->fd_mutex, NULL); - pthread_mutex_init(&pfdset->fd_polling_mutex, NULL); + pthread_mutex_lock(&fdsets_mutex); + fdset = fdset_lookup(name); + if (fdset) { + pthread_mutex_unlock(&fdsets_mutex); + return fdset; + } + + fdset = rte_zmalloc(NULL, sizeof(*fdset), 0); + if (!fdset) { + VHOST_FDMAN_LOG(ERR, "Failed to alloc fdset %s", name); + goto err_unlock; + } + + rte_strscpy(fdset->name, name, RTE_THREAD_NAME_SIZE); + + pthread_mutex_init(&fdset->fd_mutex, NULL); + pthread_mutex_init(&fdset->fd_polling_mutex, NULL); for (i = 0; i < MAX_FDS; i++) { - pfdset->fd[i].fd = -1; - pfdset->fd[i].dat = NULL; + fdset->fd[i].fd = -1; + fdset->fd[i].dat = NULL; } - pfdset->num = 0; + fdset->num = 0; - return fdset_pipe_init(pfdset); + if (fdset_pipe_init(fdset)) { + VHOST_FDMAN_LOG(ERR, "Failed to init pipe for %s", name); + goto err_free; + } + + if (rte_thread_create_internal_control(&fdset->tid, fdset->name, + fdset_event_dispatch, fdset)) { + VHOST_FDMAN_LOG(ERR, "Failed to create %s event dispatch thread", + fdset->name); + goto err_pipe; + } + + if (fdset_insert(fdset)) { + VHOST_FDMAN_LOG(ERR, "Failed to insert fdset %s", name); + goto err_thread; + } + + pthread_mutex_unlock(&fdsets_mutex); + + return fdset; + +err_thread: + fdset->destroy = true; + fdset_sync(fdset); + rte_thread_join(fdset->tid, &val); +err_pipe: + fdset_pipe_uninit(fdset); +err_free: + rte_free(fdset); +err_unlock: + pthread_mutex_unlock(&fdsets_mutex); +err: + return NULL; } /** * Register the fd in the fdset with read/write handler and context. */ -int -fdset_add(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void *dat) +static int +fdset_add_no_sync(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void *dat) { int i; @@ -232,6 +354,18 @@ fdset_add(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void *dat) fdset_add_fd(pfdset, i, fd, rcb, wcb, dat); pthread_mutex_unlock(&pfdset->fd_mutex); + return 0; +} + +int +fdset_add(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void *dat) +{ + int ret; + + ret = fdset_add_no_sync(pfdset, fd, rcb, wcb, dat); + if (ret < 0) + return ret; + fdset_sync(pfdset); return 0; @@ -315,7 +449,7 @@ fdset_try_del(struct fdset *pfdset, int fd) * will wait until the flag is reset to zero(which indicates the callback is * finished), then it could free the context after fdset_del. */ -uint32_t +static uint32_t fdset_event_dispatch(void *arg) { int i; @@ -404,6 +538,9 @@ fdset_event_dispatch(void *arg) if (need_shrink) fdset_shrink(pfdset); + + if (pfdset->destroy) + break; } return 0; diff --git a/lib/vhost/fd_man.c.orig b/lib/vhost/fd_man.c.orig new file mode 100644 index 0000000000..c0149fbf4e --- /dev/null +++ b/lib/vhost/fd_man.c.orig @@ -0,0 +1,538 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2010-2014 Intel Corporation + */ + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "fd_man.h" + +RTE_LOG_REGISTER_SUFFIX(vhost_fdset_logtype, fdset, INFO); +#define RTE_LOGTYPE_VHOST_FDMAN vhost_fdset_logtype +#define VHOST_FDMAN_LOG(level, ...) \ + RTE_LOG_LINE(level, VHOST_FDMAN, "" __VA_ARGS__) + +#define FDPOLLERR (POLLERR | POLLHUP | POLLNVAL) + +struct fdentry { + int fd; /* -1 indicates this entry is empty */ + fd_cb rcb; /* callback when this fd is readable. */ + fd_cb wcb; /* callback when this fd is writeable.*/ + void *dat; /* fd context */ + int busy; /* whether this entry is being used in cb. */ +}; + +struct fdset { + char name[RTE_THREAD_NAME_SIZE]; + struct pollfd rwfds[MAX_FDS]; + struct fdentry fd[MAX_FDS]; + rte_thread_t tid; + pthread_mutex_t fd_mutex; + pthread_mutex_t fd_polling_mutex; + int num; /* current fd number of this fdset */ + + int sync_fd; + pthread_mutex_t sync_mutex; + pthread_cond_t sync_cond; + bool sync; + + bool destroy; +}; + +static int fdset_add_no_sync(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void *dat); +static uint32_t fdset_event_dispatch(void *arg); + +#define MAX_FDSETS 8 + +static struct fdset *fdsets[MAX_FDSETS]; +pthread_mutex_t fdsets_mutex = PTHREAD_MUTEX_INITIALIZER; + +static struct fdset * +fdset_lookup(const char *name) +{ + int i; + + for (i = 0; i < MAX_FDSETS; i++) { + struct fdset *fdset = fdsets[i]; + if (fdset == NULL) + continue; + + if (!strncmp(fdset->name, name, RTE_THREAD_NAME_SIZE)) + return fdset; + } + + return NULL; +} + +static int +fdset_insert(struct fdset *fdset) +{ + int i; + + for (i = 0; i < MAX_FDSETS; i++) { + if (fdsets[i] == NULL) { + fdsets[i] = fdset; + return 0; + } + } + + return -1; +} + +static void +fdset_sync_read_cb(int sync_fd, void *dat, int *remove __rte_unused) +{ + eventfd_t val; + struct fdset *fdset = dat; + int r = eventfd_read(sync_fd, &val); + /* + * Just an optimization, we don't care if read() failed + * so ignore explicitly its return value to make the + * compiler happy + */ + RTE_SET_USED(r); + + pthread_mutex_lock(&fdset->sync_mutex); + fdset->sync = true; + pthread_cond_broadcast(&fdset->sync_cond); + pthread_mutex_unlock(&fdset->sync_mutex); +} + +static void +fdset_sync_uninit(struct fdset *fdset) +{ + fdset_del(fdset, fdset->sync_fd); + close(fdset->sync_fd); + fdset->sync_fd = -1; +} + +static int +fdset_sync_init(struct fdset *fdset) +{ + int ret; + + pthread_mutex_init(&fdset->sync_mutex, NULL); + pthread_cond_init(&fdset->sync_cond, NULL); + + fdset->sync_fd = eventfd(0, 0); + if (fdset->sync_fd < 0) { + VHOST_FDMAN_LOG(ERR, "failed to create eventfd for %s fdset", fdset->name); + return -1; + } + +<<<<<<< HEAD + ret = fdset_add_no_sync(fdset, fdset->u.readfd, + fdset_pipe_read_cb, NULL, fdset); +======= + ret = fdset_add(fdset, fdset->sync_fd, fdset_sync_read_cb, NULL, fdset); +>>>>>>> 3474bf77e2 (vhost: convert fdset sync to eventfd) + if (ret < 0) { + VHOST_FDMAN_LOG(ERR, "failed to add eventfd %d to %s fdset", + fdset->sync_fd, fdset->name); + + fdset_sync_uninit(fdset); + return -1; + } + + return 0; +} + +static void +fdset_sync(struct fdset *fdset) +{ + int ret; + + pthread_mutex_lock(&fdset->sync_mutex); + + fdset->sync = false; + ret = eventfd_write(fdset->sync_fd, (eventfd_t)1); + if (ret < 0) { + VHOST_FDMAN_LOG(ERR, "Failed to write sync eventfd for %s fdset: %s", + fdset->name, strerror(errno)); + goto out_unlock; + } + + while (!fdset->sync) + pthread_cond_wait(&fdset->sync_cond, &fdset->sync_mutex); + +out_unlock: + pthread_mutex_unlock(&fdset->sync_mutex); +} + +static int +get_last_valid_idx(struct fdset *pfdset, int last_valid_idx) +{ + int i; + + for (i = last_valid_idx; i >= 0 && pfdset->fd[i].fd == -1; i--) + ; + + return i; +} + +static void +fdset_move(struct fdset *pfdset, int dst, int src) +{ + pfdset->fd[dst] = pfdset->fd[src]; + pfdset->rwfds[dst] = pfdset->rwfds[src]; +} + +static void +fdset_shrink_nolock(struct fdset *pfdset) +{ + int i; + int last_valid_idx = get_last_valid_idx(pfdset, pfdset->num - 1); + + for (i = 0; i < last_valid_idx; i++) { + if (pfdset->fd[i].fd != -1) + continue; + + fdset_move(pfdset, i, last_valid_idx); + last_valid_idx = get_last_valid_idx(pfdset, last_valid_idx - 1); + } + pfdset->num = last_valid_idx + 1; +} + +/* + * Find deleted fd entries and remove them + */ +static void +fdset_shrink(struct fdset *pfdset) +{ + pthread_mutex_lock(&pfdset->fd_mutex); + fdset_shrink_nolock(pfdset); + pthread_mutex_unlock(&pfdset->fd_mutex); +} + +/** + * Returns the index in the fdset for a given fd. + * @return + * index for the fd, or -1 if fd isn't in the fdset. + */ +static int +fdset_find_fd(struct fdset *pfdset, int fd) +{ + int i; + + for (i = 0; i < pfdset->num && pfdset->fd[i].fd != fd; i++) + ; + + return i == pfdset->num ? -1 : i; +} + +static void +fdset_add_fd(struct fdset *pfdset, int idx, int fd, + fd_cb rcb, fd_cb wcb, void *dat) +{ + struct fdentry *pfdentry = &pfdset->fd[idx]; + struct pollfd *pfd = &pfdset->rwfds[idx]; + + pfdentry->fd = fd; + pfdentry->rcb = rcb; + pfdentry->wcb = wcb; + pfdentry->dat = dat; + + pfd->fd = fd; + pfd->events = rcb ? POLLIN : 0; + pfd->events |= wcb ? POLLOUT : 0; + pfd->revents = 0; +} + +struct fdset * +fdset_init(const char *name) +{ + struct fdset *fdset; + uint32_t val; + int i; + + if (name == NULL) { + VHOST_FDMAN_LOG(ERR, "Invalid name"); + goto err; + } + + pthread_mutex_lock(&fdsets_mutex); + fdset = fdset_lookup(name); + if (fdset) { + pthread_mutex_unlock(&fdsets_mutex); + return fdset; + } + + fdset = rte_zmalloc(NULL, sizeof(*fdset), 0); + if (!fdset) { + VHOST_FDMAN_LOG(ERR, "Failed to alloc fdset %s", name); + goto err_unlock; + } + + strncpy(fdset->name, name, RTE_THREAD_NAME_SIZE - 1); + + pthread_mutex_init(&fdset->fd_mutex, NULL); + pthread_mutex_init(&fdset->fd_polling_mutex, NULL); + + for (i = 0; i < MAX_FDS; i++) { + fdset->fd[i].fd = -1; + fdset->fd[i].dat = NULL; + } + fdset->num = 0; + + if (fdset_sync_init(fdset)) { + VHOST_FDMAN_LOG(ERR, "Failed to init sync for %s", name); + goto err_free; + } + + if (rte_thread_create_internal_control(&fdset->tid, fdset->name, + fdset_event_dispatch, fdset)) { + VHOST_FDMAN_LOG(ERR, "Failed to create %s event dispatch thread", + fdset->name); + goto err_sync; + } + + if (fdset_insert(fdset)) { + VHOST_FDMAN_LOG(ERR, "Failed to insert fdset %s", name); + goto err_thread; + } + + pthread_mutex_unlock(&fdsets_mutex); + + return fdset; + +err_thread: + fdset->destroy = true; + fdset_sync(fdset); + rte_thread_join(fdset->tid, &val); +err_sync: + fdset_sync_uninit(fdset); +err_free: + rte_free(fdset); +err_unlock: + pthread_mutex_unlock(&fdsets_mutex); +err: + return NULL; +} + +/** + * Register the fd in the fdset with read/write handler and context. + */ +static int +fdset_add_no_sync(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void *dat) +{ + int i; + + if (pfdset == NULL || fd == -1) + return -1; + + pthread_mutex_lock(&pfdset->fd_mutex); + i = pfdset->num < MAX_FDS ? pfdset->num++ : -1; + if (i == -1) { + pthread_mutex_lock(&pfdset->fd_polling_mutex); + fdset_shrink_nolock(pfdset); + pthread_mutex_unlock(&pfdset->fd_polling_mutex); + i = pfdset->num < MAX_FDS ? pfdset->num++ : -1; + if (i == -1) { + pthread_mutex_unlock(&pfdset->fd_mutex); + return -2; + } + } + + fdset_add_fd(pfdset, i, fd, rcb, wcb, dat); + pthread_mutex_unlock(&pfdset->fd_mutex); + + return 0; +} + +int +fdset_add(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void *dat) +{ + int ret; + + ret = fdset_add_no_sync(pfdset, fd, rcb, wcb, dat); + if (ret < 0) + return ret; + + fdset_sync(pfdset); + + return 0; +} + +/** + * Unregister the fd from the fdset. + * Returns context of a given fd or NULL. + */ +void * +fdset_del(struct fdset *pfdset, int fd) +{ + int i; + void *dat = NULL; + + if (pfdset == NULL || fd == -1) + return NULL; + + do { + pthread_mutex_lock(&pfdset->fd_mutex); + + i = fdset_find_fd(pfdset, fd); + if (i != -1 && pfdset->fd[i].busy == 0) { + /* busy indicates r/wcb is executing! */ + dat = pfdset->fd[i].dat; + pfdset->fd[i].fd = -1; + pfdset->fd[i].rcb = pfdset->fd[i].wcb = NULL; + pfdset->fd[i].dat = NULL; + i = -1; + } + pthread_mutex_unlock(&pfdset->fd_mutex); + } while (i != -1); + + fdset_sync(pfdset); + + return dat; +} + +/** + * Unregister the fd from the fdset. + * + * If parameters are invalid, return directly -2. + * And check whether fd is busy, if yes, return -1. + * Otherwise, try to delete the fd from fdset and + * return true. + */ +int +fdset_try_del(struct fdset *pfdset, int fd) +{ + int i; + + if (pfdset == NULL || fd == -1) + return -2; + + pthread_mutex_lock(&pfdset->fd_mutex); + i = fdset_find_fd(pfdset, fd); + if (i != -1 && pfdset->fd[i].busy) { + pthread_mutex_unlock(&pfdset->fd_mutex); + return -1; + } + + if (i != -1) { + pfdset->fd[i].fd = -1; + pfdset->fd[i].rcb = pfdset->fd[i].wcb = NULL; + pfdset->fd[i].dat = NULL; + } + + pthread_mutex_unlock(&pfdset->fd_mutex); + + fdset_sync(pfdset); + + return 0; +} + +/** + * This functions runs in infinite blocking loop until there is no fd in + * pfdset. It calls corresponding r/w handler if there is event on the fd. + * + * Before the callback is called, we set the flag to busy status; If other + * thread(now rte_vhost_driver_unregister) calls fdset_del concurrently, it + * will wait until the flag is reset to zero(which indicates the callback is + * finished), then it could free the context after fdset_del. + */ +static uint32_t +fdset_event_dispatch(void *arg) +{ + int i; + struct pollfd *pfd; + struct fdentry *pfdentry; + fd_cb rcb, wcb; + void *dat; + int fd, numfds; + int remove1, remove2; + int need_shrink; + struct fdset *pfdset = arg; + int val; + + if (pfdset == NULL) + return 0; + + while (1) { + + /* + * When poll is blocked, other threads might unregister + * listenfds from and register new listenfds into fdset. + * When poll returns, the entries for listenfds in the fdset + * might have been updated. It is ok if there is unwanted call + * for new listenfds. + */ + pthread_mutex_lock(&pfdset->fd_mutex); + numfds = pfdset->num; + pthread_mutex_unlock(&pfdset->fd_mutex); + + pthread_mutex_lock(&pfdset->fd_polling_mutex); + val = poll(pfdset->rwfds, numfds, 1000 /* millisecs */); + pthread_mutex_unlock(&pfdset->fd_polling_mutex); + if (val < 0) + continue; + + need_shrink = 0; + for (i = 0; i < numfds; i++) { + pthread_mutex_lock(&pfdset->fd_mutex); + + pfdentry = &pfdset->fd[i]; + fd = pfdentry->fd; + pfd = &pfdset->rwfds[i]; + + if (fd < 0) { + need_shrink = 1; + pthread_mutex_unlock(&pfdset->fd_mutex); + continue; + } + + if (!pfd->revents) { + pthread_mutex_unlock(&pfdset->fd_mutex); + continue; + } + + remove1 = remove2 = 0; + + rcb = pfdentry->rcb; + wcb = pfdentry->wcb; + dat = pfdentry->dat; + pfdentry->busy = 1; + + pthread_mutex_unlock(&pfdset->fd_mutex); + + if (rcb && pfd->revents & (POLLIN | FDPOLLERR)) + rcb(fd, dat, &remove1); + if (wcb && pfd->revents & (POLLOUT | FDPOLLERR)) + wcb(fd, dat, &remove2); + pfdentry->busy = 0; + /* + * fdset_del needs to check busy flag. + * We don't allow fdset_del to be called in callback + * directly. + */ + /* + * When we are to clean up the fd from fdset, + * because the fd is closed in the cb, + * the old fd val could be reused by when creates new + * listen fd in another thread, we couldn't call + * fdset_del. + */ + if (remove1 || remove2) { + pfdentry->fd = -1; + need_shrink = 1; + } + } + + if (need_shrink) + fdset_shrink(pfdset); + + if (pfdset->destroy) + break; + } + + return 0; +} diff --git a/lib/vhost/fd_man.h b/lib/vhost/fd_man.h index c18e3a435c..079fa0155f 100644 --- a/lib/vhost/fd_man.h +++ b/lib/vhost/fd_man.h @@ -8,50 +8,19 @@ #include #include +struct fdset; + #define MAX_FDS 1024 typedef void (*fd_cb)(int fd, void *dat, int *remove); -struct fdentry { - int fd; /* -1 indicates this entry is empty */ - fd_cb rcb; /* callback when this fd is readable. */ - fd_cb wcb; /* callback when this fd is writeable.*/ - void *dat; /* fd context */ - int busy; /* whether this entry is being used in cb. */ -}; - -struct fdset { - struct pollfd rwfds[MAX_FDS]; - struct fdentry fd[MAX_FDS]; - pthread_mutex_t fd_mutex; - pthread_mutex_t fd_polling_mutex; - int num; /* current fd number of this fdset */ - - union pipefds { - struct { - int pipefd[2]; - }; - struct { - int readfd; - int writefd; - }; - } u; - - pthread_mutex_t sync_mutex; - pthread_cond_t sync_cond; - bool sync; -}; - -void fdset_uninit(struct fdset *pfdset); - -int fdset_init(struct fdset *pfdset); +struct fdset *fdset_init(const char *name); int fdset_add(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void *dat); void *fdset_del(struct fdset *pfdset, int fd); -int fdset_try_del(struct fdset *pfdset, int fd); -uint32_t fdset_event_dispatch(void *arg); +int fdset_try_del(struct fdset *pfdset, int fd); #endif diff --git a/lib/vhost/socket.c b/lib/vhost/socket.c index 2f93d48c31..9eebc63479 100644 --- a/lib/vhost/socket.c +++ b/lib/vhost/socket.c @@ -76,7 +76,7 @@ struct vhost_user_connection { #define MAX_VHOST_SOCKET 1024 struct vhost_user { struct vhost_user_socket *vsockets[MAX_VHOST_SOCKET]; - struct fdset fdset; + struct fdset *fdset; int vsocket_cnt; pthread_mutex_t mutex; }; @@ -261,7 +261,7 @@ vhost_user_add_connection(int fd, struct vhost_user_socket *vsocket) conn->connfd = fd; conn->vsocket = vsocket; conn->vid = vid; - ret = fdset_add(&vhost_user.fdset, fd, vhost_user_read_cb, + ret = fdset_add(vhost_user.fdset, fd, vhost_user_read_cb, NULL, conn); if (ret < 0) { VHOST_CONFIG_LOG(vsocket->path, ERR, @@ -394,7 +394,7 @@ vhost_user_start_server(struct vhost_user_socket *vsocket) if (ret < 0) goto err; - ret = fdset_add(&vhost_user.fdset, fd, vhost_user_server_new_connection, + ret = fdset_add(vhost_user.fdset, fd, vhost_user_server_new_connection, NULL, vsocket); if (ret < 0) { VHOST_CONFIG_LOG(path, ERR, "failed to add listen fd %d to vhost server fdset", @@ -1079,7 +1079,7 @@ rte_vhost_driver_unregister(const char *path) * mutex lock, and try again since the r/wcb * may use the mutex lock. */ - if (fdset_try_del(&vhost_user.fdset, vsocket->socket_fd) == -1) { + if (fdset_try_del(vhost_user.fdset, vsocket->socket_fd) == -1) { pthread_mutex_unlock(&vhost_user.mutex); goto again; } @@ -1099,7 +1099,7 @@ rte_vhost_driver_unregister(const char *path) * try again since the r/wcb may use the * conn_mutex and mutex locks. */ - if (fdset_try_del(&vhost_user.fdset, + if (fdset_try_del(vhost_user.fdset, conn->connfd) == -1) { pthread_mutex_unlock(&vsocket->conn_mutex); pthread_mutex_unlock(&vhost_user.mutex); @@ -1167,7 +1167,6 @@ int rte_vhost_driver_start(const char *path) { struct vhost_user_socket *vsocket; - static rte_thread_t fdset_tid; pthread_mutex_lock(&vhost_user.mutex); vsocket = find_vhost_user_socket(path); @@ -1179,19 +1178,12 @@ rte_vhost_driver_start(const char *path) if (vsocket->is_vduse) return vduse_device_create(path, vsocket->net_compliant_ol_flags); - if (fdset_tid.opaque_id == 0) { - if (fdset_init(&vhost_user.fdset) < 0) { + if (vhost_user.fdset == NULL) { + vhost_user.fdset = fdset_init("vhost-evt"); + if (vhost_user.fdset == NULL) { VHOST_CONFIG_LOG(path, ERR, "Failed to init Vhost-user fdset"); return -1; } - - int ret = rte_thread_create_internal_control(&fdset_tid, - "vhost-evt", fdset_event_dispatch, &vhost_user.fdset); - if (ret != 0) { - VHOST_CONFIG_LOG(path, ERR, "failed to create fdset handling thread"); - fdset_uninit(&vhost_user.fdset); - return -1; - } } if (vsocket->is_server) diff --git a/lib/vhost/vduse.c b/lib/vhost/vduse.c index 257285a89f..ef2573bdf0 100644 --- a/lib/vhost/vduse.c +++ b/lib/vhost/vduse.c @@ -28,13 +28,11 @@ #define VDUSE_CTRL_PATH "/dev/vduse/control" struct vduse { - struct fdset fdset; + struct fdset *fdset; }; static struct vduse vduse; -static bool vduse_events_thread; - static const char * const vduse_reqs_str[] = { "VDUSE_GET_VQ_STATE", "VDUSE_SET_STATUS", @@ -215,7 +213,7 @@ vduse_vring_setup(struct virtio_net *dev, unsigned int index) } if (vq == dev->cvq) { - ret = fdset_add(&vduse.fdset, vq->kickfd, vduse_control_queue_event, NULL, dev); + ret = fdset_add(vduse.fdset, vq->kickfd, vduse_control_queue_event, NULL, dev); if (ret) { VHOST_CONFIG_LOG(dev->ifname, ERR, "Failed to setup kickfd handler for VQ %u: %s", @@ -238,7 +236,7 @@ vduse_vring_cleanup(struct virtio_net *dev, unsigned int index) int ret; if (vq == dev->cvq && vq->kickfd >= 0) - fdset_del(&vduse.fdset, vq->kickfd); + fdset_del(vduse.fdset, vq->kickfd); vq_efd.index = index; vq_efd.fd = VDUSE_EVENTFD_DEASSIGN; @@ -413,7 +411,6 @@ int vduse_device_create(const char *path, bool compliant_ol_flags) { int control_fd, dev_fd, vid, ret; - rte_thread_t fdset_tid; uint32_t i, max_queue_pairs, total_queues; struct virtio_net *dev; struct virtio_net_config vnet_config = {{ 0 }}; @@ -422,22 +419,12 @@ vduse_device_create(const char *path, bool compliant_ol_flags) struct vduse_dev_config *dev_config = NULL; const char *name = path + strlen("/dev/vduse/"); - /* If first device, create events dispatcher thread */ - if (vduse_events_thread == false) { - if (fdset_init(&vduse.fdset) < 0) { + if (vduse.fdset == NULL) { + vduse.fdset = fdset_init("vduse-evt"); + if (vduse.fdset == NULL) { VHOST_CONFIG_LOG(path, ERR, "Failed to init VDUSE fdset"); return -1; } - - ret = rte_thread_create_internal_control(&fdset_tid, "vduse-evt", - fdset_event_dispatch, &vduse.fdset); - if (ret != 0) { - VHOST_CONFIG_LOG(path, ERR, "failed to create vduse fdset handling thread"); - fdset_uninit(&vduse.fdset); - return -1; - } - - vduse_events_thread = true; } control_fd = open(VDUSE_CTRL_PATH, O_RDWR); @@ -555,7 +542,7 @@ vduse_device_create(const char *path, bool compliant_ol_flags) dev->cvq = dev->virtqueue[max_queue_pairs * 2]; - ret = fdset_add(&vduse.fdset, dev->vduse_dev_fd, vduse_events_handler, NULL, dev); + ret = fdset_add(vduse.fdset, dev->vduse_dev_fd, vduse_events_handler, NULL, dev); if (ret) { VHOST_CONFIG_LOG(name, ERR, "Failed to add fd %d to vduse fdset", dev->vduse_dev_fd); @@ -602,7 +589,7 @@ vduse_device_destroy(const char *path) vduse_device_stop(dev); - fdset_del(&vduse.fdset, dev->vduse_dev_fd); + fdset_del(vduse.fdset, dev->vduse_dev_fd); if (dev->vduse_dev_fd >= 0) { close(dev->vduse_dev_fd);