From patchwork Fri Dec 2 19:45:56 2016 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Eads, Gage" X-Patchwork-Id: 17502 X-Patchwork-Delegate: jerinj@marvell.com Return-Path: X-Original-To: patchwork@dpdk.org Delivered-To: patchwork@dpdk.org Received: from [92.243.14.124] (localhost [IPv6:::1]) by dpdk.org (Postfix) with ESMTP id E4B6DB33E; Fri, 2 Dec 2016 20:46:30 +0100 (CET) Received: from mga09.intel.com (mga09.intel.com [134.134.136.24]) by dpdk.org (Postfix) with ESMTP id 508E9591E for ; Fri, 2 Dec 2016 20:45:58 +0100 (CET) Received: from fmsmga003.fm.intel.com ([10.253.24.29]) by orsmga102.jf.intel.com with ESMTP; 02 Dec 2016 11:45:56 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.33,288,1477983600"; d="scan'208";a="793581718" Received: from txasoft-yocto.an.intel.com ([10.123.72.75]) by FMSMGA003.fm.intel.com with ESMTP; 02 Dec 2016 11:45:55 -0800 From: Gage Eads To: jerin.jacob@caviumnetworks.com Cc: dev@dpdk.org, bruce.richardson@intel.com, harry.van.haaren@intel.com, hemant.agrawal@nxp.com, Gage Eads Date: Fri, 2 Dec 2016 13:45:56 -0600 Message-Id: <1480707956-17187-2-git-send-email-gage.eads@intel.com> X-Mailer: git-send-email 1.9.1 In-Reply-To: <1480707956-17187-1-git-send-email-gage.eads@intel.com> References: <1480707956-17187-1-git-send-email-gage.eads@intel.com> Subject: [dpdk-dev] [RFC PATCH] eventdev: add buffered enqueue and flush APIs X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" This commit adds buffered enqueue functionality to the eventdev API. It is conceptually similar to the ethdev API's tx buffering, however with a smaller API surface and no dropping of events. Signed-off-by: Gage Eads --- lib/librte_eventdev/rte_eventdev.c | 29 ++++++++++ lib/librte_eventdev/rte_eventdev.h | 106 +++++++++++++++++++++++++++++++++++++ 2 files changed, 135 insertions(+) diff --git a/lib/librte_eventdev/rte_eventdev.c b/lib/librte_eventdev/rte_eventdev.c index 17ce5c3..564573f 100644 --- a/lib/librte_eventdev/rte_eventdev.c +++ b/lib/librte_eventdev/rte_eventdev.c @@ -219,6 +219,7 @@ uint16_t *links_map; uint8_t *ports_dequeue_depth; uint8_t *ports_enqueue_depth; + struct rte_eventdev_enqueue_buffer *port_buffers; unsigned int i; EDEV_LOG_DEBUG("Setup %d ports on device %u", nb_ports, @@ -272,6 +273,19 @@ "nb_ports %u", nb_ports); return -(ENOMEM); } + + /* Allocate memory to store port enqueue buffers */ + dev->data->port_buffers = + rte_zmalloc_socket("eventdev->port_buffers", + sizeof(dev->data->port_buffers[0]) * nb_ports, + RTE_CACHE_LINE_SIZE, dev->data->socket_id); + if (dev->data->port_buffers == NULL) { + dev->data->nb_ports = 0; + EDEV_LOG_ERR("failed to get memory for port enq" + " buffers, nb_ports %u", nb_ports); + return -(ENOMEM); + } + } else if (dev->data->ports != NULL && nb_ports != 0) {/* re-config */ RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->port_release, -ENOTSUP); @@ -279,6 +293,7 @@ ports_dequeue_depth = dev->data->ports_dequeue_depth; ports_enqueue_depth = dev->data->ports_enqueue_depth; links_map = dev->data->links_map; + port_buffers = dev->data->port_buffers; for (i = nb_ports; i < old_nb_ports; i++) (*dev->dev_ops->port_release)(ports[i]); @@ -324,6 +339,17 @@ return -(ENOMEM); } + /* Realloc memory to store port enqueue buffers */ + port_buffers = rte_realloc(dev->data->port_buffers, + sizeof(dev->data->port_buffers[0]) * nb_ports, + RTE_CACHE_LINE_SIZE); + if (port_buffers == NULL) { + dev->data->nb_ports = 0; + EDEV_LOG_ERR("failed to realloc mem for port enq" + " buffers, nb_ports %u", nb_ports); + return -(ENOMEM); + } + if (nb_ports > old_nb_ports) { uint8_t new_ps = nb_ports - old_nb_ports; @@ -336,12 +362,15 @@ memset(links_map + (old_nb_ports * RTE_EVENT_MAX_QUEUES_PER_DEV), 0, sizeof(ports_enqueue_depth[0]) * new_ps); + memset(port_buffers + old_nb_ports, 0, + sizeof(port_buffers[0]) * new_ps); } dev->data->ports = ports; dev->data->ports_dequeue_depth = ports_dequeue_depth; dev->data->ports_enqueue_depth = ports_enqueue_depth; dev->data->links_map = links_map; + dev->data->port_buffers = port_buffers; } else if (dev->data->ports != NULL && nb_ports == 0) { RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->port_release, -ENOTSUP); diff --git a/lib/librte_eventdev/rte_eventdev.h b/lib/librte_eventdev/rte_eventdev.h index 778d6dc..3f24342 100644 --- a/lib/librte_eventdev/rte_eventdev.h +++ b/lib/librte_eventdev/rte_eventdev.h @@ -246,6 +246,7 @@ #include #include #include +#include #define EVENTDEV_NAME_SKELETON_PMD event_skeleton /**< Skeleton event device PMD name */ @@ -965,6 +966,26 @@ typedef uint16_t (*event_dequeue_burst_t)(void *port, struct rte_event ev[], #define RTE_EVENTDEV_NAME_MAX_LEN (64) /**< @internal Max length of name of event PMD */ +#define RTE_EVENT_BUF_MAX 16 +/**< Maximum number of events in an enqueue buffer. */ + +/** + * @internal + * An enqueue buffer for each port. + * + * The reason this struct is in the header is for inlining the function calls + * to enqueue, as doing a function call per packet would incur significant + * performance overhead. + * + * \see rte_event_enqueue_buffer(), rte_event_enqueue_buffer_flush() + */ +struct rte_eventdev_enqueue_buffer { + /**> Count of events in this buffer */ + uint16_t count; + /**> Array of events in this buffer */ + struct rte_event events[RTE_EVENT_BUF_MAX]; +} __rte_cache_aligned; + /** * @internal * The data part, with no function pointers, associated with each device. @@ -983,6 +1004,8 @@ struct rte_eventdev_data { /**< Number of event ports. */ void **ports; /**< Array of pointers to ports. */ + struct rte_eventdev_enqueue_buffer *port_buffers; + /**< Array of port enqueue buffers. */ uint8_t *ports_dequeue_depth; /**< Array of port dequeue depth. */ uint8_t *ports_enqueue_depth; @@ -1132,6 +1155,89 @@ struct rte_eventdev { } /** + * Flush the enqueue buffer of the event port specified by *port_id*, in the + * event device specified by *dev_id*. + * + * This function attempts to flush as many of the buffered events as possible, + * and returns the number of flushed events. Any unflushed events remain in + * the buffer. + * + * @param dev_id + * The identifier of the device. + * @param port_id + * The identifier of the event port. + * + * @return + * The number of event objects actually flushed to the event device. + * + * \see rte_event_enqueue_buffer(), rte_event_enqueue_burst() + * \see rte_event_port_enqueue_depth() + */ +static inline int +rte_event_enqueue_buffer_flush(uint8_t dev_id, uint8_t port_id) +{ + struct rte_eventdev *dev = &rte_eventdevs[dev_id]; + struct rte_eventdev_enqueue_buffer *buf = + &dev->data->port_buffers[port_id]; + int n; + + n = rte_event_enqueue_burst(dev_id, port_id, buf->events, buf->count); + + if (n != buf->count) + memmove(buf->events, &buf->events[n], buf->count - n); + + buf->count -= n; + + return n; +} + +/** + * Buffer an event object supplied in *rte_event* structure for future + * enqueueing on an event device designated by its *dev_id* through the event + * port specified by *port_id*. + * + * This function takes a single event and buffers it for later enqueuing to the + * queue specified in the event structure. If the buffer is full, the + * function will attempt to flush the buffer before buffering the event. + * If the flush operation fails, the previously buffered events remain in the + * buffer and an error is returned to the user to indicate that *ev* was not + * buffered. + * + * @param dev_id + * The identifier of the device. + * @param port_id + * The identifier of the event port. + * @param ev + * Pointer to struct rte_event + * + * @return + * - 0 on success + * - <0 on failure. Failure can occur if the event port's output queue is + * backpressured, for instance. + * + * \see rte_event_enqueue_buffer_flush(), rte_event_enqueue_burst() + * \see rte_event_port_enqueue_depth() + */ +static inline int +rte_event_enqueue_buffer(uint8_t dev_id, uint8_t port_id, struct rte_event *ev) +{ + struct rte_eventdev *dev = &rte_eventdevs[dev_id]; + struct rte_eventdev_enqueue_buffer *buf = + &dev->data->port_buffers[port_id]; + int ret; + + /* If necessary, flush the enqueue buffer to make space for ev. */ + if (buf->count == RTE_EVENT_BUF_MAX) { + ret = rte_event_enqueue_buffer_flush(dev_id, port_id); + if (ret == 0) + return -ENOSPC; + } + + rte_memcpy(&buf->events[buf->count++], ev, sizeof(struct rte_event)); + return 0; +} + +/** * Converts nanoseconds to *wait* value for rte_event_dequeue() * * If the device is configured with RTE_EVENT_DEV_CFG_PER_DEQUEUE_WAIT flag then