From patchwork Fri Dec 1 14:48:00 2017 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Ananyev, Konstantin" X-Patchwork-Id: 31866 X-Patchwork-Delegate: ferruh.yigit@amd.com Return-Path: X-Original-To: patchwork@dpdk.org Delivered-To: patchwork@dpdk.org Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id E34583255; Fri, 1 Dec 2017 15:48:11 +0100 (CET) Received: from mga11.intel.com (mga11.intel.com [192.55.52.93]) by dpdk.org (Postfix) with ESMTP id 53FE8324A for ; Fri, 1 Dec 2017 15:48:09 +0100 (CET) Received: from orsmga003.jf.intel.com ([10.7.209.27]) by fmsmga102.fm.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 01 Dec 2017 06:48:08 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.45,344,1508828400"; d="scan'208";a="8428249" Received: from sivswdev02.ir.intel.com (HELO localhost.localdomain) ([10.237.217.46]) by orsmga003.jf.intel.com with ESMTP; 01 Dec 2017 06:48:07 -0800 From: Konstantin Ananyev To: dev@dpdk.org Cc: Konstantin Ananyev Date: Fri, 1 Dec 2017 14:48:00 +0000 Message-Id: <1512139682-11114-2-git-send-email-konstantin.ananyev@intel.com> X-Mailer: git-send-email 1.7.0.7 In-Reply-To: <1512139682-11114-1-git-send-email-konstantin.ananyev@intel.com> References: <1512139682-11114-1-git-send-email-konstantin.ananyev@intel.com> To: dev@dpdk.org Subject: [dpdk-dev] [RFC PATCH 1/3] ethdev: introduce eth_queue_local X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Introduce a separate data structure (rte_eth_queue_local) to store local to given process (i.e. no-shareable) information for each configured rx/tx queue. Memory for that structure is allocated/freed dynamically during rte_eth_dev_configure(). Put a placeholder pointers for queue specific (rx|tx)_pkt_burst(), tx_pkt_prepare() functions inside that structure. Move rx/tx callback related information inside that structure. That introduces a change in current behavior: all callbacks for un-configured queues will be automatically removed. Let say: rte_eth_dev_configure(port, 0, 0, ...); would wipe out all installed callbacks for that port. Signed-off-by: Konstantin Ananyev --- lib/librte_ether/rte_ethdev.c | 228 +++++++++++++++++++++++++++++------------- lib/librte_ether/rte_ethdev.h | 85 +++++++++++----- 2 files changed, 216 insertions(+), 97 deletions(-) diff --git a/lib/librte_ether/rte_ethdev.c b/lib/librte_ether/rte_ethdev.c index 318af2869..ff8571d60 100644 --- a/lib/librte_ether/rte_ethdev.c +++ b/lib/librte_ether/rte_ethdev.c @@ -241,6 +241,14 @@ rte_eth_dev_allocate(const char *name) return eth_dev; } +static struct rte_eth_queue_local * +alloc_queue_local(uint16_t nb_queues) +{ + struct rte_eth_queue_local *ql; + ql = rte_zmalloc(NULL, sizeof(ql[0]) * nb_queues, RTE_CACHE_LINE_SIZE); + return ql; +} + /* * Attach to a port already registered by the primary process, which * makes sure that the same device would have the same port id both @@ -269,6 +277,16 @@ rte_eth_dev_attach_secondary(const char *name) eth_dev = eth_dev_get(i); RTE_ASSERT(eth_dev->data->port_id == i); + /* + * obviously it is quite error prone: + * if the primary process will decide to (re)configure device later, + * there is no way for secondary one to notice that and update + * it's local data (queue_local, rx|tx_pkt_burst, etc.). + * We probably need an eth_dev_configure_secondary() or so. + */ + eth_dev->rx_ql = alloc_queue_local(eth_dev->data->nb_rx_queues); + eth_dev->tx_ql = alloc_queue_local(eth_dev->data->nb_tx_queues); + return eth_dev; } @@ -444,52 +462,92 @@ rte_eth_dev_detach(uint16_t port_id, char *name) return ret; } +/* + * Helper routine - removes all registered RX/TX queue callbacks + * from the FIFO list. + * It is a caller responsibility to make sure no actual RX/TX happens + * simultanesoulsy on that queue. + */ +static void +free_rxtx_cbs(struct rte_eth_rxtx_callback *cbs) +{ + struct rte_eth_rxtx_callback *cb, *next; + + for (cb = cbs; cb != NULL; cb = next) { + next = cb->next; + rte_free(cb); + } +} + +static void +reset_queue_local(struct rte_eth_queue_local *ql) +{ + free_rxtx_cbs(ql->cbs); + memset(ql, 0, sizeof(*ql)); +} + static int rte_eth_dev_rx_queue_config(struct rte_eth_dev *dev, uint16_t nb_queues) { - uint16_t old_nb_queues = dev->data->nb_rx_queues; + uint32_t diff, i, old_nb_queues; + struct rte_eth_queue_local *rlq; void **rxq; - unsigned i; - if (dev->data->rx_queues == NULL && nb_queues != 0) { /* first time configuration */ - dev->data->rx_queues = rte_zmalloc("ethdev->rx_queues", - sizeof(dev->data->rx_queues[0]) * nb_queues, - RTE_CACHE_LINE_SIZE); - if (dev->data->rx_queues == NULL) { - dev->data->nb_rx_queues = 0; - return -(ENOMEM); - } - } else if (dev->data->rx_queues != NULL && nb_queues != 0) { /* re-configure */ - RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->rx_queue_release, -ENOTSUP); + old_nb_queues = dev->data->nb_rx_queues; + + /* same # of queues nothing need to be done */ + if (nb_queues == old_nb_queues) + return 0; + + rxq = dev->data->rx_queues; + rlq = dev->rx_ql; - rxq = dev->data->rx_queues; + /* reduce # of queues */ + if (old_nb_queues > nb_queues) { - for (i = nb_queues; i < old_nb_queues; i++) + RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->rx_queue_release, + -ENOTSUP); + + dev->data->nb_rx_queues = nb_queues; + + /* free resources used by the queues we are going to remove */ + for (i = nb_queues; i < old_nb_queues; i++) { + reset_queue_local(rlq + i); (*dev->dev_ops->rx_queue_release)(rxq[i]); - rxq = rte_realloc(rxq, sizeof(rxq[0]) * nb_queues, - RTE_CACHE_LINE_SIZE); - if (rxq == NULL) - return -(ENOMEM); - if (nb_queues > old_nb_queues) { - uint16_t new_qs = nb_queues - old_nb_queues; - - memset(rxq + old_nb_queues, 0, - sizeof(rxq[0]) * new_qs); + rxq[i] = 0; } - dev->data->rx_queues = rxq; + if (nb_queues == 0) { + dev->data->rx_queues = NULL; + dev->rx_ql = NULL; + rte_free(rxq); + rte_free(rlq); + } + + return 0; + } - } else if (dev->data->rx_queues != NULL && nb_queues == 0) { - RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->rx_queue_release, -ENOTSUP); + /* increase # of queues */ - rxq = dev->data->rx_queues; + diff = nb_queues - old_nb_queues; - for (i = nb_queues; i < old_nb_queues; i++) - (*dev->dev_ops->rx_queue_release)(rxq[i]); + rxq = rte_realloc(rxq, sizeof(rxq[0]) * nb_queues, + RTE_CACHE_LINE_SIZE); + rlq = rte_realloc(rlq, sizeof(rlq[0]) * nb_queues, + RTE_CACHE_LINE_SIZE); - rte_free(dev->data->rx_queues); - dev->data->rx_queues = NULL; + if (rxq != NULL) { + memset(rxq + old_nb_queues, 0, sizeof(rxq[0]) * diff); + dev->data->rx_queues = rxq; + } + if (rlq != NULL) { + memset(rlq + old_nb_queues, 0, sizeof(rlq[0]) * diff); + dev->rx_ql = rlq; } + + if (rxq == NULL || rlq == NULL) + return -ENOMEM; + dev->data->nb_rx_queues = nb_queues; return 0; } @@ -601,49 +659,65 @@ rte_eth_dev_tx_queue_stop(uint16_t port_id, uint16_t tx_queue_id) static int rte_eth_dev_tx_queue_config(struct rte_eth_dev *dev, uint16_t nb_queues) { - uint16_t old_nb_queues = dev->data->nb_tx_queues; + uint32_t diff, i, old_nb_queues; + struct rte_eth_queue_local *tlq; void **txq; - unsigned i; - if (dev->data->tx_queues == NULL && nb_queues != 0) { /* first time configuration */ - dev->data->tx_queues = rte_zmalloc("ethdev->tx_queues", - sizeof(dev->data->tx_queues[0]) * nb_queues, - RTE_CACHE_LINE_SIZE); - if (dev->data->tx_queues == NULL) { - dev->data->nb_tx_queues = 0; - return -(ENOMEM); - } - } else if (dev->data->tx_queues != NULL && nb_queues != 0) { /* re-configure */ - RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->tx_queue_release, -ENOTSUP); + old_nb_queues = dev->data->nb_tx_queues; + + /* same # of queues nothing need to be done */ + if (nb_queues == old_nb_queues) + return 0; + + txq = dev->data->tx_queues; + tlq = dev->tx_ql; + + /* reduce # of queues */ + if (old_nb_queues > nb_queues) { + + RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->tx_queue_release, + -ENOTSUP); - txq = dev->data->tx_queues; + dev->data->nb_tx_queues = nb_queues; - for (i = nb_queues; i < old_nb_queues; i++) + /* free resources used by the queues we are going to remove */ + for (i = nb_queues; i < old_nb_queues; i++) { + reset_queue_local(tlq + i); (*dev->dev_ops->tx_queue_release)(txq[i]); - txq = rte_realloc(txq, sizeof(txq[0]) * nb_queues, - RTE_CACHE_LINE_SIZE); - if (txq == NULL) - return -ENOMEM; - if (nb_queues > old_nb_queues) { - uint16_t new_qs = nb_queues - old_nb_queues; - - memset(txq + old_nb_queues, 0, - sizeof(txq[0]) * new_qs); + txq[i] = 0; } - dev->data->tx_queues = txq; + if (nb_queues == 0) { + dev->data->tx_queues = NULL; + dev->tx_ql = NULL; + rte_free(txq); + rte_free(tlq); + } + + return 0; + } - } else if (dev->data->tx_queues != NULL && nb_queues == 0) { - RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->tx_queue_release, -ENOTSUP); + /* increase # of queues */ - txq = dev->data->tx_queues; + diff = nb_queues - old_nb_queues; - for (i = nb_queues; i < old_nb_queues; i++) - (*dev->dev_ops->tx_queue_release)(txq[i]); + txq = rte_realloc(txq, sizeof(txq[0]) * nb_queues, + RTE_CACHE_LINE_SIZE); + tlq = rte_realloc(tlq, sizeof(tlq[0]) * nb_queues, + RTE_CACHE_LINE_SIZE); - rte_free(dev->data->tx_queues); - dev->data->tx_queues = NULL; + if (txq != NULL) { + memset(txq + old_nb_queues, 0, sizeof(txq[0]) * diff); + dev->data->tx_queues = txq; + } + if (tlq != NULL) { + memset(tlq + old_nb_queues, 0, sizeof(tlq[0]) * diff); + dev->tx_ql = tlq; } + + if (txq == NULL || tlq == NULL) + return -ENOMEM; + dev->data->nb_tx_queues = nb_queues; return 0; } @@ -1084,6 +1158,7 @@ void rte_eth_dev_close(uint16_t port_id) { struct rte_eth_dev *dev; + uint32_t i; RTE_ETH_VALID_PORTID_OR_RET(port_id); dev = &rte_eth_devices[port_id]; @@ -1092,12 +1167,25 @@ rte_eth_dev_close(uint16_t port_id) dev->data->dev_started = 0; (*dev->dev_ops->dev_close)(dev); + /* free local resources for RX/TX queues */ + + for (i = 0; i != dev->data->nb_rx_queues; i++) + reset_queue_local(dev->rx_ql + i); + + for (i = 0; i != dev->data->nb_tx_queues; i++) + reset_queue_local(dev->tx_ql + i); + dev->data->nb_rx_queues = 0; rte_free(dev->data->rx_queues); dev->data->rx_queues = NULL; + rte_free(dev->rx_ql); + dev->rx_ql = NULL; + dev->data->nb_tx_queues = 0; rte_free(dev->data->tx_queues); dev->data->tx_queues = NULL; + rte_free(dev->tx_ql); + dev->tx_ql = NULL; } int @@ -3111,10 +3199,10 @@ rte_eth_add_rx_callback(uint16_t port_id, uint16_t queue_id, rte_spinlock_lock(&rte_eth_rx_cb_lock); /* Add the callbacks in fifo order. */ struct rte_eth_rxtx_callback *tail = - rte_eth_devices[port_id].post_rx_burst_cbs[queue_id]; + rte_eth_devices[port_id].rx_ql[queue_id].cbs; if (!tail) { - rte_eth_devices[port_id].post_rx_burst_cbs[queue_id] = cb; + rte_eth_devices[port_id].rx_ql[queue_id].cbs = cb; } else { while (tail->next) @@ -3153,9 +3241,9 @@ rte_eth_add_first_rx_callback(uint16_t port_id, uint16_t queue_id, rte_spinlock_lock(&rte_eth_rx_cb_lock); /* Add the callbacks at fisrt position*/ - cb->next = rte_eth_devices[port_id].post_rx_burst_cbs[queue_id]; + cb->next = rte_eth_devices[port_id].rx_ql[queue_id].cbs; rte_smp_wmb(); - rte_eth_devices[port_id].post_rx_burst_cbs[queue_id] = cb; + rte_eth_devices[port_id].rx_ql[queue_id].cbs = cb; rte_spinlock_unlock(&rte_eth_rx_cb_lock); return cb; @@ -3189,10 +3277,10 @@ rte_eth_add_tx_callback(uint16_t port_id, uint16_t queue_id, rte_spinlock_lock(&rte_eth_tx_cb_lock); /* Add the callbacks in fifo order. */ struct rte_eth_rxtx_callback *tail = - rte_eth_devices[port_id].pre_tx_burst_cbs[queue_id]; + rte_eth_devices[port_id].tx_ql[queue_id].cbs; if (!tail) { - rte_eth_devices[port_id].pre_tx_burst_cbs[queue_id] = cb; + rte_eth_devices[port_id].tx_ql[queue_id].cbs = cb; } else { while (tail->next) @@ -3223,7 +3311,7 @@ rte_eth_remove_rx_callback(uint16_t port_id, uint16_t queue_id, int ret = -EINVAL; rte_spinlock_lock(&rte_eth_rx_cb_lock); - prev_cb = &dev->post_rx_burst_cbs[queue_id]; + prev_cb = &dev->rx_ql[queue_id].cbs; for (; *prev_cb != NULL; prev_cb = &cb->next) { cb = *prev_cb; if (cb == user_cb) { @@ -3257,7 +3345,7 @@ rte_eth_remove_tx_callback(uint16_t port_id, uint16_t queue_id, struct rte_eth_rxtx_callback **prev_cb; rte_spinlock_lock(&rte_eth_tx_cb_lock); - prev_cb = &dev->pre_tx_burst_cbs[queue_id]; + prev_cb = &dev->tx_ql[queue_id].cbs; for (; *prev_cb != NULL; prev_cb = &cb->next) { cb = *prev_cb; if (cb == user_cb) { diff --git a/lib/librte_ether/rte_ethdev.h b/lib/librte_ether/rte_ethdev.h index 341c2d624..d62e1bcc3 100644 --- a/lib/librte_ether/rte_ethdev.h +++ b/lib/librte_ether/rte_ethdev.h @@ -1694,6 +1694,10 @@ typedef uint16_t (*rte_tx_callback_fn)(uint16_t port, uint16_t queue, * @internal * Structure used to hold information about the callbacks to be called for a * queue on RX and TX. + * On RX: user-supplied functions called from rte_eth_rx_burst to post-process + * received packets before passing them to the user + * On TX: user-supplied functions called from rte_eth_tx_burst to pre-process + * received packets before passing them to the driver for transmission. */ struct rte_eth_rxtx_callback { struct rte_eth_rxtx_callback *next; @@ -1715,6 +1719,24 @@ enum rte_eth_dev_state { /** * @internal + * Structure to hold process non-shareable information + * about RX/TX queue of the ethernet device. + */ +struct rte_eth_queue_local { + /** + * placeholder for queue specific rx/tx and tx_preapare + * functions pointers + */ + eth_rx_burst_t rx_pkt_burst; /**< receive function pointer. */ + eth_tx_burst_t tx_pkt_burst; /**< transmit function pointer. */ + eth_tx_prep_t tx_pkt_prepare; /**< transmit prepare function pointer. */ + + struct rte_eth_rxtx_callback *cbs; + /**< list of user supplied callbacks */ +} __rte_cache_aligned; + +/** + * @internal * The generic data structure associated with each ethernet device. * * Pointers to burst-oriented packet receive and transmit functions are @@ -1730,19 +1752,13 @@ struct rte_eth_dev { struct rte_eth_dev_data *data; /**< Pointer to device data */ const struct eth_dev_ops *dev_ops; /**< Functions exported by PMD */ struct rte_device *device; /**< Backing device */ + + struct rte_eth_queue_local *rx_ql; /**< RX queues local data */ + struct rte_eth_queue_local *tx_ql; /**< TX queues local data */ + struct rte_intr_handle *intr_handle; /**< Device interrupt handle */ /** User application callbacks for NIC interrupts */ struct rte_eth_dev_cb_list link_intr_cbs; - /** - * User-supplied functions called from rx_burst to post-process - * received packets before passing them to the user - */ - struct rte_eth_rxtx_callback *post_rx_burst_cbs[RTE_MAX_QUEUES_PER_PORT]; - /** - * User-supplied functions called from tx_burst to pre-process - * received packets before passing them to the driver for transmission. - */ - struct rte_eth_rxtx_callback *pre_tx_burst_cbs[RTE_MAX_QUEUES_PER_PORT]; enum rte_eth_dev_state state; /**< Flag indicating the port state */ void *security_ctx; /**< Context for security ops */ } __rte_cache_aligned; @@ -2883,29 +2899,37 @@ static inline uint16_t rte_eth_rx_burst(uint16_t port_id, uint16_t queue_id, struct rte_mbuf **rx_pkts, const uint16_t nb_pkts) { + uint16_t nb_rx; struct rte_eth_dev *dev = &rte_eth_devices[port_id]; #ifdef RTE_LIBRTE_ETHDEV_DEBUG RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, 0); RTE_FUNC_PTR_OR_ERR_RET(*dev->rx_pkt_burst, 0); - if (queue_id >= dev->data->nb_rx_queues) { + if (queue_id >= dev->data->nb_rx_queues || dev->rx_ql == NULL) { RTE_PMD_DEBUG_TRACE("Invalid RX queue_id=%d\n", queue_id); return 0; } #endif - int16_t nb_rx = (*dev->rx_pkt_burst)(dev->data->rx_queues[queue_id], + nb_rx = (*dev->rx_pkt_burst)(dev->data->rx_queues[queue_id], rx_pkts, nb_pkts); #ifdef RTE_ETHDEV_RXTX_CALLBACKS - struct rte_eth_rxtx_callback *cb = dev->post_rx_burst_cbs[queue_id]; + { + struct rte_eth_queue_local *ql; + struct rte_eth_rxtx_callback *cb; - if (unlikely(cb != NULL)) { - do { - nb_rx = cb->fn.rx(port_id, queue_id, rx_pkts, nb_rx, + ql = dev->rx_ql + queue_id; + cb = ql->cbs; + + if (unlikely(cb != NULL)) { + do { + nb_rx = cb->fn.rx(port_id, queue_id, + rx_pkts, nb_rx, nb_pkts, cb->param); - cb = cb->next; - } while (cb != NULL); + cb = cb->next; + } while (cb != NULL); + } } #endif @@ -3151,21 +3175,28 @@ rte_eth_tx_burst(uint16_t port_id, uint16_t queue_id, RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, 0); RTE_FUNC_PTR_OR_ERR_RET(*dev->tx_pkt_burst, 0); - if (queue_id >= dev->data->nb_tx_queues) { + if (queue_id >= dev->data->nb_tx_queues || dev->tx_ql == NULL) { RTE_PMD_DEBUG_TRACE("Invalid TX queue_id=%d\n", queue_id); return 0; } #endif #ifdef RTE_ETHDEV_RXTX_CALLBACKS - struct rte_eth_rxtx_callback *cb = dev->pre_tx_burst_cbs[queue_id]; - - if (unlikely(cb != NULL)) { - do { - nb_pkts = cb->fn.tx(port_id, queue_id, tx_pkts, nb_pkts, - cb->param); - cb = cb->next; - } while (cb != NULL); + { + struct rte_eth_queue_local *ql; + struct rte_eth_rxtx_callback *cb; + + ql = dev->tx_ql + queue_id; + cb = ql->cbs; + + if (unlikely(cb != NULL)) { + do { + nb_pkts = cb->fn.tx(port_id, queue_id, + tx_pkts, nb_pkts, + cb->param); + cb = cb->next; + } while (cb != NULL); + } } #endif