From patchwork Mon Dec 11 17:56:31 2017 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Eads, Gage" X-Patchwork-Id: 32097 X-Patchwork-Delegate: jerinj@marvell.com Return-Path: X-Original-To: patchwork@dpdk.org Delivered-To: patchwork@dpdk.org Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id B90DA7D30; Mon, 11 Dec 2017 18:56:55 +0100 (CET) Received: from mga17.intel.com (mga17.intel.com [192.55.52.151]) by dpdk.org (Postfix) with ESMTP id 750042A5D for ; Mon, 11 Dec 2017 18:56:53 +0100 (CET) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from fmsmga006.fm.intel.com ([10.253.24.20]) by fmsmga107.fm.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 11 Dec 2017 09:56:52 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.45,392,1508828400"; d="scan'208";a="186028361" Received: from txasoft-yocto.an.intel.com (HELO txasoft-yocto.an.intel.com.) ([10.123.72.111]) by fmsmga006.fm.intel.com with ESMTP; 11 Dec 2017 09:56:51 -0800 From: Gage Eads To: dev@dpdk.org Cc: jerin.jacob@caviumnetworks.com, harry.van.haaren@intel.com, bruce.richardson@intel.com, hemant.agrawal@nxp.com, nipun.gupta@nxp.com, santosh.shukla@caviumnetworks.com Date: Mon, 11 Dec 2017 11:56:31 -0600 Message-Id: <1513014992-10861-1-git-send-email-gage.eads@intel.com> X-Mailer: git-send-email 2.7.4 In-Reply-To: <1512015636-31878-2-git-send-email-gage.eads@intel.com> References: <1512015636-31878-2-git-send-email-gage.eads@intel.com> Subject: [dpdk-dev] [PATCH v2 1/2] eventdev: add implicit release disable capability X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" This commit introduces a capability for disabling the "implicit" release functionality for a port, which prevents the eventdev PMD from issuing outstanding releases for previously dequeued events when dequeuing a new batch of events. If a PMD does not support this capability, the application will receive an error if it attempts to setup a port with implicit releases disabled. Otherwise, if the port is configured with implicit releases disabled, the application must release each dequeued event by invoking rte_event_enqueue_burst() with RTE_EVENT_OP_RELEASE or RTE_EVENT_OP_FORWARD. Signed-off-by: Gage Eads Acked-by: Harry van Haaren Acked-by: Jerin Jacob --- v2: - Merged two lines into one in dpaa2_eventdev_port_def_conf(). - Used '0' instead of 'false' in skeleton_eventdev_port_def_conf(). - Replaced instances of 'bool' with 'uint8_t' in eventdev_pipeline_sw_pmd/main.c. - Put both worker_loopback() invocations in the same if block in test_sw_eventdev(). drivers/event/dpaa2/dpaa2_eventdev.c | 1 + drivers/event/octeontx/ssovf_evdev.c | 1 + drivers/event/skeleton/skeleton_eventdev.c | 1 + drivers/event/sw/sw_evdev.c | 10 +++++++--- drivers/event/sw/sw_evdev.h | 1 + drivers/event/sw/sw_evdev_worker.c | 16 ++++++++-------- examples/eventdev_pipeline_sw_pmd/main.c | 24 +++++++++++++++++++++++- lib/librte_eventdev/rte_eventdev.c | 9 +++++++++ lib/librte_eventdev/rte_eventdev.h | 23 ++++++++++++++++++++--- test/test/test_eventdev.c | 9 +++++++++ test/test/test_eventdev_sw.c | 20 ++++++++++++++++---- 11 files changed, 96 insertions(+), 19 deletions(-) diff --git a/drivers/event/dpaa2/dpaa2_eventdev.c b/drivers/event/dpaa2/dpaa2_eventdev.c index eeeb231..13e7122 100644 --- a/drivers/event/dpaa2/dpaa2_eventdev.c +++ b/drivers/event/dpaa2/dpaa2_eventdev.c @@ -440,6 +440,7 @@ dpaa2_eventdev_port_def_conf(struct rte_eventdev *dev, uint8_t port_id, DPAA2_EVENT_MAX_PORT_DEQUEUE_DEPTH; port_conf->enqueue_depth = DPAA2_EVENT_MAX_PORT_ENQUEUE_DEPTH; + port_conf->disable_implicit_release = 0; } static void diff --git a/drivers/event/octeontx/ssovf_evdev.c b/drivers/event/octeontx/ssovf_evdev.c index 117b145..b80a6c0 100644 --- a/drivers/event/octeontx/ssovf_evdev.c +++ b/drivers/event/octeontx/ssovf_evdev.c @@ -252,6 +252,7 @@ ssovf_port_def_conf(struct rte_eventdev *dev, uint8_t port_id, port_conf->new_event_threshold = edev->max_num_events; port_conf->dequeue_depth = 1; port_conf->enqueue_depth = 1; + port_conf->disable_implicit_release = 0; } static void diff --git a/drivers/event/skeleton/skeleton_eventdev.c b/drivers/event/skeleton/skeleton_eventdev.c index bb554c3..3b448b2 100644 --- a/drivers/event/skeleton/skeleton_eventdev.c +++ b/drivers/event/skeleton/skeleton_eventdev.c @@ -237,6 +237,7 @@ skeleton_eventdev_port_def_conf(struct rte_eventdev *dev, uint8_t port_id, port_conf->new_event_threshold = 32 * 1024; port_conf->dequeue_depth = 16; port_conf->enqueue_depth = 16; + port_conf->disable_implicit_release = 0; } static void diff --git a/drivers/event/sw/sw_evdev.c b/drivers/event/sw/sw_evdev.c index 5b4a208..1ef6340 100644 --- a/drivers/event/sw/sw_evdev.c +++ b/drivers/event/sw/sw_evdev.c @@ -191,6 +191,7 @@ sw_port_setup(struct rte_eventdev *dev, uint8_t port_id, } p->inflight_max = conf->new_event_threshold; + p->implicit_release = !conf->disable_implicit_release; /* check if ring exists, same as rx_worker above */ snprintf(buf, sizeof(buf), "sw%d_p%u, %s", dev->data->dev_id, @@ -413,6 +414,7 @@ sw_port_def_conf(struct rte_eventdev *dev, uint8_t port_id, port_conf->new_event_threshold = 1024; port_conf->dequeue_depth = 16; port_conf->enqueue_depth = 16; + port_conf->disable_implicit_release = 0; } static int @@ -482,9 +484,11 @@ sw_info_get(struct rte_eventdev *dev, struct rte_event_dev_info *info) .max_event_port_dequeue_depth = MAX_SW_CONS_Q_DEPTH, .max_event_port_enqueue_depth = MAX_SW_PROD_Q_DEPTH, .max_num_events = SW_INFLIGHT_EVENTS_TOTAL, - .event_dev_cap = (RTE_EVENT_DEV_CAP_QUEUE_QOS | - RTE_EVENT_DEV_CAP_BURST_MODE | - RTE_EVENT_DEV_CAP_EVENT_QOS), + .event_dev_cap = ( + RTE_EVENT_DEV_CAP_QUEUE_QOS | + RTE_EVENT_DEV_CAP_BURST_MODE | + RTE_EVENT_DEV_CAP_EVENT_QOS | + RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE), }; *info = evdev_sw_info; diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h index 0859388..d08f7d0 100644 --- a/drivers/event/sw/sw_evdev.h +++ b/drivers/event/sw/sw_evdev.h @@ -213,6 +213,7 @@ struct sw_port { uint16_t outstanding_releases __rte_cache_aligned; uint16_t inflight_max; /* app requested max inflights for this port */ uint16_t inflight_credits; /* num credits this port has right now */ + uint8_t implicit_release; /* release events before dequeueing */ uint16_t last_dequeue_burst_sz; /* how big the burst was */ uint64_t last_dequeue_ticks; /* used to track burst processing time */ diff --git a/drivers/event/sw/sw_evdev_worker.c b/drivers/event/sw/sw_evdev_worker.c index b3b3b17..93cd29b 100644 --- a/drivers/event/sw/sw_evdev_worker.c +++ b/drivers/event/sw/sw_evdev_worker.c @@ -109,7 +109,7 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num) return 0; } - uint32_t forwards = 0; + uint32_t completions = 0; for (i = 0; i < num; i++) { int op = ev[i].op; int outstanding = p->outstanding_releases > 0; @@ -118,7 +118,6 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num) p->inflight_credits -= (op == RTE_EVENT_OP_NEW); p->inflight_credits += (op == RTE_EVENT_OP_RELEASE) * outstanding; - forwards += (op == RTE_EVENT_OP_FORWARD); new_ops[i] = sw_qe_flag_map[op]; new_ops[i] &= ~(invalid_qid << QE_FLAG_VALID_SHIFT); @@ -127,8 +126,10 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num) * correct usage of the API), providing very high correct * prediction rate. */ - if ((new_ops[i] & QE_FLAG_COMPLETE) && outstanding) + if ((new_ops[i] & QE_FLAG_COMPLETE) && outstanding) { p->outstanding_releases--; + completions++; + } /* error case: branch to avoid touching p->stats */ if (unlikely(invalid_qid)) { @@ -137,8 +138,8 @@ sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num) } } - /* handle directed port forward credits */ - p->inflight_credits -= forwards * p->is_directed; + /* handle directed port forward and release credits */ + p->inflight_credits -= completions * p->is_directed; /* returns number of events actually enqueued */ uint32_t enq = enqueue_burst_with_ops(p->rx_worker_ring, ev, i, @@ -172,7 +173,7 @@ sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num, uint32_t credit_update_quanta = sw->credit_update_quanta; /* check that all previous dequeues have been released */ - if (!p->is_directed) { + if (p->implicit_release && !p->is_directed) { uint16_t out_rels = p->outstanding_releases; uint16_t i; for (i = 0; i < out_rels; i++) @@ -182,7 +183,6 @@ sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num, /* returns number of events actually dequeued */ uint16_t ndeq = rte_event_ring_dequeue_burst(ring, ev, num, NULL); if (unlikely(ndeq == 0)) { - p->outstanding_releases = 0; p->zero_polls++; p->total_polls++; goto end; @@ -190,7 +190,7 @@ sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num, /* only add credits for directed ports - LB ports send RELEASEs */ p->inflight_credits += ndeq * p->is_directed; - p->outstanding_releases = ndeq; + p->outstanding_releases += ndeq; p->last_dequeue_burst_sz = ndeq; p->last_dequeue_ticks = rte_get_timer_cycles(); p->poll_buckets[(ndeq - 1) >> SW_DEQ_STAT_BUCKET_SHIFT]++; diff --git a/examples/eventdev_pipeline_sw_pmd/main.c b/examples/eventdev_pipeline_sw_pmd/main.c index 5f431d8..2e9a6d2 100644 --- a/examples/eventdev_pipeline_sw_pmd/main.c +++ b/examples/eventdev_pipeline_sw_pmd/main.c @@ -62,6 +62,7 @@ struct prod_data { struct cons_data { uint8_t dev_id; uint8_t port_id; + uint8_t release; } __rte_cache_aligned; static struct prod_data prod_data; @@ -167,6 +168,18 @@ consumer(void) uint8_t outport = packets[i].mbuf->port; rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport], packets[i].mbuf); + + packets[i].op = RTE_EVENT_OP_RELEASE; + } + + if (cons_data.release) { + uint16_t nb_tx; + + nb_tx = rte_event_enqueue_burst(dev_id, port_id, packets, n); + while (nb_tx < n) + nb_tx += rte_event_enqueue_burst(dev_id, port_id, + packets + nb_tx, + n - nb_tx); } /* Print out mpps every 1<22 packets */ @@ -702,6 +715,7 @@ setup_eventdev(struct prod_data *prod_data, }; struct port_link worker_queues[MAX_NUM_STAGES]; + uint8_t disable_implicit_release; struct port_link tx_queue; unsigned int i; @@ -715,6 +729,12 @@ setup_eventdev(struct prod_data *prod_data, ret = rte_event_dev_info_get(dev_id, &dev_info); printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name); + disable_implicit_release = (dev_info.event_dev_cap & + RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE); + + wkr_p_conf.disable_implicit_release = disable_implicit_release; + tx_p_conf.disable_implicit_release = disable_implicit_release; + if (dev_info.max_event_port_dequeue_depth < config.nb_event_port_dequeue_depth) config.nb_event_port_dequeue_depth = @@ -823,6 +843,7 @@ setup_eventdev(struct prod_data *prod_data, .dequeue_depth = 8, .enqueue_depth = 8, .new_event_threshold = 1200, + .disable_implicit_release = disable_implicit_release, }; if (rx_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth) @@ -839,7 +860,8 @@ setup_eventdev(struct prod_data *prod_data, .port_id = i + 1, .qid = cdata.qid[0] }; *cons_data = (struct cons_data){.dev_id = dev_id, - .port_id = i }; + .port_id = i, + .release = disable_implicit_release }; ret = rte_event_dev_service_id_get(dev_id, &fdata->evdev_service_id); diff --git a/lib/librte_eventdev/rte_eventdev.c b/lib/librte_eventdev/rte_eventdev.c index e17f8fc..3e93c03 100644 --- a/lib/librte_eventdev/rte_eventdev.c +++ b/lib/librte_eventdev/rte_eventdev.c @@ -686,6 +686,15 @@ rte_event_port_setup(uint8_t dev_id, uint8_t port_id, return -EINVAL; } + if (port_conf && port_conf->disable_implicit_release && + !(dev->data->event_dev_cap & + RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE)) { + RTE_EDEV_LOG_ERR( + "dev%d port%d Implicit release disable not supported", + dev_id, port_id); + return -EINVAL; + } + if (dev->data->dev_started) { RTE_EDEV_LOG_ERR( "device %d must be stopped to allow port setup", dev_id); diff --git a/lib/librte_eventdev/rte_eventdev.h b/lib/librte_eventdev/rte_eventdev.h index f1949ff..8377b3f 100644 --- a/lib/librte_eventdev/rte_eventdev.h +++ b/lib/librte_eventdev/rte_eventdev.h @@ -282,6 +282,16 @@ struct rte_mbuf; /* we just use mbuf pointers; no need to include rte_mbuf.h */ * * @see rte_event_dequeue_burst() rte_event_enqueue_burst() */ +#define RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE (1ULL << 5) +/**< Event device ports support disabling the implicit release feature, in + * which the port will release all unreleased events in its dequeue operation. + * If this capability is set and the port is configured with implicit release + * disabled, the application is responsible for explicitly releasing events + * using either the RTE_EVENT_OP_FORWARD or the RTE_EVENT_OP_RELEASE event + * enqueue operations. + * + * @see rte_event_dequeue_burst() rte_event_enqueue_burst() + */ /* Event device priority levels */ #define RTE_EVENT_DEV_PRIORITY_HIGHEST 0 @@ -686,6 +696,13 @@ struct rte_event_port_conf { * which previously supplied to rte_event_dev_configure(). * Ignored when device is not RTE_EVENT_DEV_CAP_BURST_MODE capable. */ + uint8_t disable_implicit_release; + /**< Configure the port not to release outstanding events in + * rte_event_dev_dequeue_burst(). If true, all events received through + * the port must be explicitly released with RTE_EVENT_OP_RELEASE or + * RTE_EVENT_OP_FORWARD. Must be false when the device is not + * RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE capable. + */ }; /** @@ -1381,9 +1398,9 @@ rte_event_dequeue_timeout_ticks(uint8_t dev_id, uint64_t ns, * * The number of events dequeued is the number of scheduler contexts held by * this port. These contexts are automatically released in the next - * rte_event_dequeue_burst() invocation, or invoking rte_event_enqueue_burst() - * with RTE_EVENT_OP_RELEASE operation can be used to release the - * contexts early. + * rte_event_dequeue_burst() invocation if the port supports implicit + * releases, or invoking rte_event_enqueue_burst() with RTE_EVENT_OP_RELEASE + * operation can be used to release the contexts early. * * Event operations RTE_EVENT_OP_FORWARD and RTE_EVENT_OP_RELEASE must only be * enqueued to the same port that their associated events were dequeued from. diff --git a/test/test/test_eventdev.c b/test/test/test_eventdev.c index 1ed2a1d..f8ee1be 100644 --- a/test/test/test_eventdev.c +++ b/test/test/test_eventdev.c @@ -581,6 +581,15 @@ test_eventdev_port_setup(void) ret = rte_event_port_setup(TEST_DEV_ID, 0, &pconf); TEST_ASSERT(ret == -EINVAL, "Expected -EINVAL, %d", ret); + if (!(info.event_dev_cap & + RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE)) { + pconf.enqueue_depth = info.max_event_port_enqueue_depth; + pconf.disable_implicit_release = 1; + ret = rte_event_port_setup(TEST_DEV_ID, 0, &pconf); + TEST_ASSERT(ret == -EINVAL, "Expected -EINVAL, %d", ret); + pconf.disable_implicit_release = 0; + } + ret = rte_event_port_setup(TEST_DEV_ID, info.max_event_ports, &pconf); TEST_ASSERT(ret == -EINVAL, "Expected -EINVAL, %d", ret); diff --git a/test/test/test_eventdev_sw.c b/test/test/test_eventdev_sw.c index 96ed920..4108b00 100644 --- a/test/test/test_eventdev_sw.c +++ b/test/test/test_eventdev_sw.c @@ -200,6 +200,7 @@ create_ports(struct test *t, int num_ports) .new_event_threshold = 1024, .dequeue_depth = 32, .enqueue_depth = 64, + .disable_implicit_release = 0, }; if (num_ports > MAX_PORTS) return -1; @@ -1254,6 +1255,7 @@ port_reconfig_credits(struct test *t) .new_event_threshold = 128, .dequeue_depth = 32, .enqueue_depth = 64, + .disable_implicit_release = 0, }; if (rte_event_port_setup(evdev, 0, &port_conf) < 0) { printf("%d Error setting up port\n", __LINE__); @@ -1343,6 +1345,7 @@ port_single_lb_reconfig(struct test *t) .new_event_threshold = 128, .dequeue_depth = 32, .enqueue_depth = 64, + .disable_implicit_release = 0, }; if (rte_event_port_setup(evdev, 0, &port_conf) < 0) { printf("%d Error setting up port\n", __LINE__); @@ -2935,7 +2938,7 @@ worker_loopback_producer_fn(void *arg) } static int -worker_loopback(struct test *t) +worker_loopback(struct test *t, uint8_t disable_implicit_release) { /* use a single producer core, and a worker core to see what happens * if the worker loops packets back multiple times @@ -2961,6 +2964,7 @@ worker_loopback(struct test *t) * only be initialized once - and this needs to be set for multiple runs */ conf.new_event_threshold = 512; + conf.disable_implicit_release = disable_implicit_release; if (rte_event_port_setup(evdev, 0, &conf) < 0) { printf("Error setting up RX port\n"); @@ -3235,15 +3239,23 @@ test_sw_eventdev(void) } if (rte_lcore_count() >= 3) { printf("*** Running Worker loopback test...\n"); - ret = worker_loopback(t); + ret = worker_loopback(t, 0); + if (ret != 0) { + printf("ERROR - Worker loopback test FAILED.\n"); + return ret; + } + + printf("*** Running Worker loopback test (implicit release disabled)...\n"); + ret = worker_loopback(t, 1); if (ret != 0) { printf("ERROR - Worker loopback test FAILED.\n"); return ret; } } else { - printf("### Not enough cores for worker loopback test.\n"); - printf("### Need at least 3 cores for test.\n"); + printf("### Not enough cores for worker loopback tests.\n"); + printf("### Need at least 3 cores for the tests.\n"); } + /* * Free test instance, leaving mempool initialized, and a pointer to it * in static eventdev_func_mempool, as it is re-used on re-runs