get:
Show a patch.

patch:
Update a patch.

put:
Update a patch.

GET /api/patches/54953/?format=api
HTTP 200 OK
Allow: GET, PUT, PATCH, HEAD, OPTIONS
Content-Type: application/json
Vary: Accept

{
    "id": 54953,
    "url": "http://patches.dpdk.org/api/patches/54953/?format=api",
    "web_url": "http://patches.dpdk.org/project/dpdk/patch/1560957270-56531-2-git-send-email-erik.g.carrillo@intel.com/",
    "project": {
        "id": 1,
        "url": "http://patches.dpdk.org/api/projects/1/?format=api",
        "name": "DPDK",
        "link_name": "dpdk",
        "list_id": "dev.dpdk.org",
        "list_email": "dev@dpdk.org",
        "web_url": "http://core.dpdk.org",
        "scm_url": "git://dpdk.org/dpdk",
        "webscm_url": "http://git.dpdk.org/dpdk",
        "list_archive_url": "https://inbox.dpdk.org/dev",
        "list_archive_url_format": "https://inbox.dpdk.org/dev/{}",
        "commit_url_format": ""
    },
    "msgid": "<1560957270-56531-2-git-send-email-erik.g.carrillo@intel.com>",
    "list_archive_url": "https://inbox.dpdk.org/dev/1560957270-56531-2-git-send-email-erik.g.carrillo@intel.com",
    "date": "2019-06-19T15:14:30",
    "name": "[v7,1/1] eventdev: add new software event timer adapter",
    "commit_ref": null,
    "pull_url": null,
    "state": "superseded",
    "archived": true,
    "hash": "4c27d1cf35142c4d0d4bfad0aff5c690b9dcf53a",
    "submitter": {
        "id": 762,
        "url": "http://patches.dpdk.org/api/people/762/?format=api",
        "name": "Carrillo, Erik G",
        "email": "erik.g.carrillo@intel.com"
    },
    "delegate": null,
    "mbox": "http://patches.dpdk.org/project/dpdk/patch/1560957270-56531-2-git-send-email-erik.g.carrillo@intel.com/mbox/",
    "series": [
        {
            "id": 5081,
            "url": "http://patches.dpdk.org/api/series/5081/?format=api",
            "web_url": "http://patches.dpdk.org/project/dpdk/list/?series=5081",
            "date": "2019-06-19T15:14:29",
            "name": "New software event timer adapter",
            "version": 7,
            "mbox": "http://patches.dpdk.org/series/5081/mbox/"
        }
    ],
    "comments": "http://patches.dpdk.org/api/patches/54953/comments/",
    "check": "fail",
    "checks": "http://patches.dpdk.org/api/patches/54953/checks/",
    "tags": {},
    "related": [],
    "headers": {
        "Return-Path": "<dev-bounces@dpdk.org>",
        "X-Original-To": "patchwork@dpdk.org",
        "Delivered-To": "patchwork@dpdk.org",
        "Received": [
            "from [92.243.14.124] (localhost [127.0.0.1])\n\tby dpdk.org (Postfix) with ESMTP id 63A471C34E;\n\tWed, 19 Jun 2019 17:14:25 +0200 (CEST)",
            "from mga09.intel.com (mga09.intel.com [134.134.136.24])\n\tby dpdk.org (Postfix) with ESMTP id 963692B99\n\tfor <dev@dpdk.org>; Wed, 19 Jun 2019 17:14:22 +0200 (CEST)",
            "from orsmga008.jf.intel.com ([10.7.209.65])\n\tby orsmga102.jf.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384;\n\t19 Jun 2019 08:14:20 -0700",
            "from wcpqa1.an.intel.com ([10.123.72.207])\n\tby orsmga008.jf.intel.com with ESMTP; 19 Jun 2019 08:14:19 -0700"
        ],
        "X-Amp-Result": "SKIPPED(no attachment in message)",
        "X-Amp-File-Uploaded": "False",
        "X-ExtLoop1": "1",
        "X-IronPort-AV": "E=Sophos;i=\"5.63,392,1557212400\"; d=\"scan'208\";a=\"153833090\"",
        "From": "Erik Gabriel Carrillo <erik.g.carrillo@intel.com>",
        "To": "jerin.jacob@caviumnetworks.com",
        "Cc": "mattias.ronnblom@ericsson.com, pbhagavatula@caviumnetworks.com,\n\tHonnappa.Nagarahalli@arm.com, dev@dpdk.org",
        "Date": "Wed, 19 Jun 2019 10:14:30 -0500",
        "Message-Id": "<1560957270-56531-2-git-send-email-erik.g.carrillo@intel.com>",
        "X-Mailer": "git-send-email 1.7.10",
        "In-Reply-To": "<1560957270-56531-1-git-send-email-erik.g.carrillo@intel.com>",
        "References": "<1556291660-12373-1-git-send-email-erik.g.carrillo@intel.com>\n\t<1560957270-56531-1-git-send-email-erik.g.carrillo@intel.com>",
        "Subject": "[dpdk-dev] [PATCH v7 1/1] eventdev: add new software event timer\n\tadapter",
        "X-BeenThere": "dev@dpdk.org",
        "X-Mailman-Version": "2.1.15",
        "Precedence": "list",
        "List-Id": "DPDK patches and discussions <dev.dpdk.org>",
        "List-Unsubscribe": "<https://mails.dpdk.org/options/dev>,\n\t<mailto:dev-request@dpdk.org?subject=unsubscribe>",
        "List-Archive": "<http://mails.dpdk.org/archives/dev/>",
        "List-Post": "<mailto:dev@dpdk.org>",
        "List-Help": "<mailto:dev-request@dpdk.org?subject=help>",
        "List-Subscribe": "<https://mails.dpdk.org/listinfo/dev>,\n\t<mailto:dev-request@dpdk.org?subject=subscribe>",
        "Errors-To": "dev-bounces@dpdk.org",
        "Sender": "\"dev\" <dev-bounces@dpdk.org>"
    },
    "content": "This patch introduces a new version of the event timer adapter software\nPMD. In the original design, timer event producer lcores in the primary\nand secondary processes enqueued event timers into a ring, and a\nservice core in the primary process dequeued them and processed them\nfurther.  To improve performance, this version does away with the ring\nand lets lcores insert timers directly into timer skiplist data\nstructures; the service core directly accesses the lists as well, when\nlooking for timers that have expired.\n\nTo compare the burst and non-burst performance of the original and new\nversions of the software event timer adapter, I ran the following\ncommands:\n\n$ sudo ./build/app/dpdk-test-eventdev -c 0xFFE -s 0xC --vdev=event_sw0 \\\n-- --test=perf_queue --plcores=4,5,6 --wlcore=7,8,9 --stlist=p \\\n--prod_type_timerdev --worker_deq_depth=32\n\n$ sudo ./build/app/dpdk-test-eventdev -c 0xFFE -s 0xC --vdev=event_sw0 \\\n-- --test=perf_queue --plcores=4,5,6 --wlcore=7,8,9 --stlist=p \\\n--prod_type_timerdev_burst --worker_deq_depth=32\n\nWith the new version, I see a 151% improvement in throughput for the\nnon-burst case, and a 270% improvement in throughput for the burst case.\nI also see a 53% improvement in arm latency in the non-burst case and a\n65% improvement in arm latency in the burst case.\n\nNote: To perform the test,  I commented out a check in the original\nversion that checks the adapter tick interval against a minimum value.\n\nSigned-off-by: Erik Gabriel Carrillo <erik.g.carrillo@intel.com>\n---\n lib/librte_eventdev/rte_event_timer_adapter.c | 741 +++++++++++---------------\n 1 file changed, 322 insertions(+), 419 deletions(-)",
    "diff": "diff --git a/lib/librte_eventdev/rte_event_timer_adapter.c b/lib/librte_eventdev/rte_event_timer_adapter.c\nindex 2f7a760..d525cb3 100644\n--- a/lib/librte_eventdev/rte_event_timer_adapter.c\n+++ b/lib/librte_eventdev/rte_event_timer_adapter.c\n@@ -34,7 +34,7 @@ static int evtim_buffer_logtype;\n \n static struct rte_event_timer_adapter adapters[RTE_EVENT_TIMER_ADAPTER_NUM_MAX];\n \n-static const struct rte_event_timer_adapter_ops sw_event_adapter_timer_ops;\n+static const struct rte_event_timer_adapter_ops swtim_ops;\n \n #define EVTIM_LOG(level, logtype, ...) \\\n \trte_log(RTE_LOG_ ## level, logtype, \\\n@@ -211,7 +211,7 @@ rte_event_timer_adapter_create_ext(\n \t * implementation.\n \t */\n \tif (adapter->ops == NULL)\n-\t\tadapter->ops = &sw_event_adapter_timer_ops;\n+\t\tadapter->ops = &swtim_ops;\n \n \t/* Allow driver to do some setup */\n \tFUNC_PTR_OR_NULL_RET_WITH_ERRNO(adapter->ops->init, -ENOTSUP);\n@@ -340,7 +340,7 @@ rte_event_timer_adapter_lookup(uint16_t adapter_id)\n \t * implementation.\n \t */\n \tif (adapter->ops == NULL)\n-\t\tadapter->ops = &sw_event_adapter_timer_ops;\n+\t\tadapter->ops = &swtim_ops;\n \n \t/* Set fast-path function pointers */\n \tadapter->arm_burst = adapter->ops->arm_burst;\n@@ -426,10 +426,11 @@ rte_event_timer_adapter_stats_reset(struct rte_event_timer_adapter *adapter)\n #define EVENT_BUFFER_SZ 4096\n #define EVENT_BUFFER_BATCHSZ 32\n #define EVENT_BUFFER_MASK (EVENT_BUFFER_SZ - 1)\n+#define EXP_TIM_BUFFER_SZ 128\n \n struct event_buffer {\n-\tuint16_t head;\n-\tuint16_t tail;\n+\tsize_t head;\n+\tsize_t tail;\n \tstruct rte_event events[EVENT_BUFFER_SZ];\n } __rte_cache_aligned;\n \n@@ -455,7 +456,7 @@ event_buffer_init(struct event_buffer *bufp)\n static int\n event_buffer_add(struct event_buffer *bufp, struct rte_event *eventp)\n {\n-\tuint16_t head_idx;\n+\tsize_t head_idx;\n \tstruct rte_event *buf_eventp;\n \n \tif (event_buffer_full(bufp))\n@@ -477,13 +478,16 @@ event_buffer_flush(struct event_buffer *bufp, uint8_t dev_id, uint8_t port_id,\n \t\t   uint16_t *nb_events_flushed,\n \t\t   uint16_t *nb_events_inv)\n {\n-\tuint16_t head_idx, tail_idx, n = 0;\n \tstruct rte_event *events = bufp->events;\n+\tsize_t head_idx, tail_idx;\n+\tuint16_t n = 0;\n \n \t/* Instead of modulus, bitwise AND with mask to get index. */\n \thead_idx = bufp->head & EVENT_BUFFER_MASK;\n \ttail_idx = bufp->tail & EVENT_BUFFER_MASK;\n \n+\tRTE_ASSERT(head_idx < EVENT_BUFFER_SZ && tail_idx < EVENT_BUFFER_SZ);\n+\n \t/* Determine the largest contigous run we can attempt to enqueue to the\n \t * event device.\n \t */\n@@ -491,150 +495,166 @@ event_buffer_flush(struct event_buffer *bufp, uint8_t dev_id, uint8_t port_id,\n \t\tn = head_idx - tail_idx;\n \telse if (head_idx < tail_idx)\n \t\tn = EVENT_BUFFER_SZ - tail_idx;\n+\telse if (event_buffer_full(bufp))\n+\t\tn = EVENT_BUFFER_SZ - tail_idx;\n \telse {\n \t\t*nb_events_flushed = 0;\n \t\treturn;\n \t}\n \n+\tn = RTE_MIN(EVENT_BUFFER_BATCHSZ, n);\n \t*nb_events_inv = 0;\n+\n \t*nb_events_flushed = rte_event_enqueue_burst(dev_id, port_id,\n \t\t\t\t\t\t     &events[tail_idx], n);\n-\tif (*nb_events_flushed != n && rte_errno == -EINVAL) {\n-\t\tEVTIM_LOG_ERR(\"failed to enqueue invalid event - dropping it\");\n-\t\t(*nb_events_inv)++;\n+\tif (*nb_events_flushed != n) {\n+\t\tif (rte_errno == -EINVAL) {\n+\t\t\tEVTIM_LOG_ERR(\"failed to enqueue invalid event - \"\n+\t\t\t\t      \"dropping it\");\n+\t\t\t(*nb_events_inv)++;\n+\t\t} else if (rte_errno == -ENOSPC)\n+\t\t\trte_pause();\n \t}\n \n+\tif (*nb_events_flushed > 0)\n+\t\tEVTIM_BUF_LOG_DBG(\"enqueued %\"PRIu16\" timer events to event \"\n+\t\t\t\t  \"device\", *nb_events_flushed);\n+\n \tbufp->tail = bufp->tail + *nb_events_flushed + *nb_events_inv;\n }\n \n /*\n  * Software event timer adapter implementation\n  */\n-\n-struct rte_event_timer_adapter_sw_data {\n-\t/* List of messages for outstanding timers */\n-\tTAILQ_HEAD(, msg) msgs_tailq_head;\n-\t/* Lock to guard tailq and armed count */\n-\trte_spinlock_t msgs_tailq_sl;\n+struct swtim {\n \t/* Identifier of service executing timer management logic. */\n \tuint32_t service_id;\n \t/* The cycle count at which the adapter should next tick */\n \tuint64_t next_tick_cycles;\n-\t/* Incremented as the service moves through phases of an iteration */\n-\tvolatile int service_phase;\n \t/* The tick resolution used by adapter instance. May have been\n \t * adjusted from what user requested\n \t */\n \tuint64_t timer_tick_ns;\n \t/* Maximum timeout in nanoseconds allowed by adapter instance. */\n \tuint64_t max_tmo_ns;\n-\t/* Ring containing messages to arm or cancel event timers */\n-\tstruct rte_ring *msg_ring;\n-\t/* Mempool containing msg objects */\n-\tstruct rte_mempool *msg_pool;\n \t/* Buffered timer expiry events to be enqueued to an event device. */\n \tstruct event_buffer buffer;\n \t/* Statistics */\n \tstruct rte_event_timer_adapter_stats stats;\n-\t/* The number of threads currently adding to the message ring */\n-\trte_atomic16_t message_producer_count;\n+\t/* Mempool of timer objects */\n+\tstruct rte_mempool *tim_pool;\n+\t/* Back pointer for convenience */\n+\tstruct rte_event_timer_adapter *adapter;\n+\t/* Identifier of timer data instance */\n+\tuint32_t timer_data_id;\n+\t/* Track which cores have actually armed a timer */\n+\tstruct {\n+\t\trte_atomic16_t v;\n+\t} __rte_cache_aligned in_use[RTE_MAX_LCORE];\n+\t/* Track which cores' timer lists should be polled */\n+\tunsigned int poll_lcores[RTE_MAX_LCORE];\n+\t/* The number of lists that should be polled */\n+\tint n_poll_lcores;\n+\t/* Lock to atomically access the above two variables */\n+\trte_spinlock_t poll_lcores_sl;\n+\n+\tstruct rte_timer *expired_timers[EXP_TIM_BUFFER_SZ];\n+\tsize_t expired_timers_idx;\n };\n \n-enum msg_type {MSG_TYPE_ARM, MSG_TYPE_CANCEL};\n-\n-struct msg {\n-\tenum msg_type type;\n-\tstruct rte_event_timer *evtim;\n-\tstruct rte_timer tim;\n-\tTAILQ_ENTRY(msg) msgs;\n-};\n+static inline struct swtim *\n+swtim_pmd_priv(const struct rte_event_timer_adapter *adapter)\n+{\n+\treturn adapter->data->adapter_priv;\n+}\n \n static void\n-sw_event_timer_cb(struct rte_timer *tim, void *arg)\n+swtim_callback(struct rte_timer *tim)\n {\n-\tint ret;\n+\tstruct rte_event_timer *evtim = tim->arg;\n+\tstruct rte_event_timer_adapter *adapter;\n+\tunsigned int lcore = rte_lcore_id();\n+\tstruct swtim *sw;\n \tuint16_t nb_evs_flushed = 0;\n \tuint16_t nb_evs_invalid = 0;\n \tuint64_t opaque;\n-\tstruct rte_event_timer *evtim;\n-\tstruct rte_event_timer_adapter *adapter;\n-\tstruct rte_event_timer_adapter_sw_data *sw_data;\n+\tint ret;\n \n-\tevtim = arg;\n \topaque = evtim->impl_opaque[1];\n \tadapter = (struct rte_event_timer_adapter *)(uintptr_t)opaque;\n-\tsw_data = adapter->data->adapter_priv;\n+\tsw = swtim_pmd_priv(adapter);\n \n-\tret = event_buffer_add(&sw_data->buffer, &evtim->ev);\n+\tret = event_buffer_add(&sw->buffer, &evtim->ev);\n \tif (ret < 0) {\n \t\t/* If event buffer is full, put timer back in list with\n \t\t * immediate expiry value, so that we process it again on the\n \t\t * next iteration.\n \t\t */\n-\t\trte_timer_reset_sync(tim, 0, SINGLE, rte_lcore_id(),\n-\t\t\t\t     sw_event_timer_cb, evtim);\n+\t\tret = rte_timer_alt_reset(sw->timer_data_id, tim, 0, SINGLE,\n+\t\t\t\t\t  lcore, NULL, evtim);\n+\t\tif (ret < 0) {\n+\t\t\tEVTIM_LOG_DBG(\"event buffer full, failed to reset \"\n+\t\t\t\t      \"timer with immediate expiry value\");\n+\t\t} else {\n+\t\t\tsw->stats.evtim_retry_count++;\n+\t\t\tEVTIM_LOG_DBG(\"event buffer full, resetting rte_timer \"\n+\t\t\t\t      \"with immediate expiry value\");\n+\t\t}\n \n-\t\tsw_data->stats.evtim_retry_count++;\n-\t\tEVTIM_LOG_DBG(\"event buffer full, resetting rte_timer with \"\n-\t\t\t      \"immediate expiry value\");\n+\t\tif (unlikely(rte_atomic16_test_and_set(&sw->in_use[lcore].v)))\n+\t\t\tsw->poll_lcores[sw->n_poll_lcores++] = lcore;\n \t} else {\n-\t\tstruct msg *m = container_of(tim, struct msg, tim);\n-\t\tTAILQ_REMOVE(&sw_data->msgs_tailq_head, m, msgs);\n \t\tEVTIM_BUF_LOG_DBG(\"buffered an event timer expiry event\");\n-\t\tevtim->state = RTE_EVENT_TIMER_NOT_ARMED;\n \n-\t\t/* Free the msg object containing the rte_timer now that\n-\t\t * we've buffered its event successfully.\n+\t\t/* Empty the buffer here, if necessary, to free older expired\n+\t\t * timers only\n \t\t */\n-\t\trte_mempool_put(sw_data->msg_pool, m);\n+\t\tif (unlikely(sw->expired_timers_idx == EXP_TIM_BUFFER_SZ)) {\n+\t\t\trte_mempool_put_bulk(sw->tim_pool,\n+\t\t\t\t\t     (void **)sw->expired_timers,\n+\t\t\t\t\t     sw->expired_timers_idx);\n+\t\t\tsw->expired_timers_idx = 0;\n+\t\t}\n \n-\t\t/* Bump the count when we successfully add an expiry event to\n-\t\t * the buffer.\n-\t\t */\n-\t\tsw_data->stats.evtim_exp_count++;\n+\t\tsw->expired_timers[sw->expired_timers_idx++] = tim;\n+\t\tsw->stats.evtim_exp_count++;\n+\n+\t\tevtim->state = RTE_EVENT_TIMER_NOT_ARMED;\n \t}\n \n-\tif (event_buffer_batch_ready(&sw_data->buffer)) {\n-\t\tevent_buffer_flush(&sw_data->buffer,\n+\tif (event_buffer_batch_ready(&sw->buffer)) {\n+\t\tevent_buffer_flush(&sw->buffer,\n \t\t\t\t   adapter->data->event_dev_id,\n \t\t\t\t   adapter->data->event_port_id,\n \t\t\t\t   &nb_evs_flushed,\n \t\t\t\t   &nb_evs_invalid);\n \n-\t\tsw_data->stats.ev_enq_count += nb_evs_flushed;\n-\t\tsw_data->stats.ev_inv_count += nb_evs_invalid;\n+\t\tsw->stats.ev_enq_count += nb_evs_flushed;\n+\t\tsw->stats.ev_inv_count += nb_evs_invalid;\n \t}\n }\n \n static __rte_always_inline uint64_t\n get_timeout_cycles(struct rte_event_timer *evtim,\n-\t\t   struct rte_event_timer_adapter *adapter)\n+\t\t   const struct rte_event_timer_adapter *adapter)\n {\n-\tuint64_t timeout_ns;\n-\tstruct rte_event_timer_adapter_sw_data *sw_data;\n-\n-\tsw_data = adapter->data->adapter_priv;\n-\ttimeout_ns = evtim->timeout_ticks * sw_data->timer_tick_ns;\n+\tstruct swtim *sw = swtim_pmd_priv(adapter);\n+\tuint64_t timeout_ns = evtim->timeout_ticks * sw->timer_tick_ns;\n \treturn timeout_ns * rte_get_timer_hz() / NSECPERSEC;\n-\n }\n \n /* This function returns true if one or more (adapter) ticks have occurred since\n  * the last time it was called.\n  */\n static inline bool\n-adapter_did_tick(struct rte_event_timer_adapter *adapter)\n+swtim_did_tick(struct swtim *sw)\n {\n \tuint64_t cycles_per_adapter_tick, start_cycles;\n \tuint64_t *next_tick_cyclesp;\n-\tstruct rte_event_timer_adapter_sw_data *sw_data;\n \n-\tsw_data = adapter->data->adapter_priv;\n-\tnext_tick_cyclesp = &sw_data->next_tick_cycles;\n-\n-\tcycles_per_adapter_tick = sw_data->timer_tick_ns *\n+\tnext_tick_cyclesp = &sw->next_tick_cycles;\n+\tcycles_per_adapter_tick = sw->timer_tick_ns *\n \t\t\t(rte_get_timer_hz() / NSECPERSEC);\n-\n \tstart_cycles = rte_get_timer_cycles();\n \n \t/* Note: initially, *next_tick_cyclesp == 0, so the clause below will\n@@ -646,7 +666,6 @@ adapter_did_tick(struct rte_event_timer_adapter *adapter)\n \t\t * boundary.\n \t\t */\n \t\tstart_cycles -= start_cycles % cycles_per_adapter_tick;\n-\n \t\t*next_tick_cyclesp = start_cycles + cycles_per_adapter_tick;\n \n \t\treturn true;\n@@ -661,15 +680,12 @@ check_timeout(struct rte_event_timer *evtim,\n \t      const struct rte_event_timer_adapter *adapter)\n {\n \tuint64_t tmo_nsec;\n-\tstruct rte_event_timer_adapter_sw_data *sw_data;\n-\n-\tsw_data = adapter->data->adapter_priv;\n-\ttmo_nsec = evtim->timeout_ticks * sw_data->timer_tick_ns;\n+\tstruct swtim *sw = swtim_pmd_priv(adapter);\n \n-\tif (tmo_nsec > sw_data->max_tmo_ns)\n+\ttmo_nsec = evtim->timeout_ticks * sw->timer_tick_ns;\n+\tif (tmo_nsec > sw->max_tmo_ns)\n \t\treturn -1;\n-\n-\tif (tmo_nsec < sw_data->timer_tick_ns)\n+\tif (tmo_nsec < sw->timer_tick_ns)\n \t\treturn -2;\n \n \treturn 0;\n@@ -697,110 +713,41 @@ check_destination_event_queue(struct rte_event_timer *evtim,\n \treturn 0;\n }\n \n-#define NB_OBJS 32\n static int\n-sw_event_timer_adapter_service_func(void *arg)\n+swtim_service_func(void *arg)\n {\n-\tint i, num_msgs;\n-\tuint64_t cycles, opaque;\n+\tstruct rte_event_timer_adapter *adapter = arg;\n+\tstruct swtim *sw = swtim_pmd_priv(adapter);\n \tuint16_t nb_evs_flushed = 0;\n \tuint16_t nb_evs_invalid = 0;\n-\tstruct rte_event_timer_adapter *adapter;\n-\tstruct rte_event_timer_adapter_sw_data *sw_data;\n-\tstruct rte_event_timer *evtim = NULL;\n-\tstruct rte_timer *tim = NULL;\n-\tstruct msg *msg, *msgs[NB_OBJS];\n-\n-\tadapter = arg;\n-\tsw_data = adapter->data->adapter_priv;\n-\n-\tsw_data->service_phase = 1;\n-\trte_smp_wmb();\n-\n-\twhile (rte_atomic16_read(&sw_data->message_producer_count) > 0 ||\n-\t       !rte_ring_empty(sw_data->msg_ring)) {\n-\n-\t\tnum_msgs = rte_ring_dequeue_burst(sw_data->msg_ring,\n-\t\t\t\t\t\t  (void **)msgs, NB_OBJS, NULL);\n-\n-\t\tfor (i = 0; i < num_msgs; i++) {\n-\t\t\tint ret = 0;\n-\n-\t\t\tRTE_SET_USED(ret);\n-\n-\t\t\tmsg = msgs[i];\n-\t\t\tevtim = msg->evtim;\n-\n-\t\t\tswitch (msg->type) {\n-\t\t\tcase MSG_TYPE_ARM:\n-\t\t\t\tEVTIM_SVC_LOG_DBG(\"dequeued ARM message from \"\n-\t\t\t\t\t\t  \"ring\");\n-\t\t\t\ttim = &msg->tim;\n-\t\t\t\trte_timer_init(tim);\n-\t\t\t\tcycles = get_timeout_cycles(evtim,\n-\t\t\t\t\t\t\t    adapter);\n-\t\t\t\tret = rte_timer_reset(tim, cycles, SINGLE,\n-\t\t\t\t\t\t      rte_lcore_id(),\n-\t\t\t\t\t\t      sw_event_timer_cb,\n-\t\t\t\t\t\t      evtim);\n-\t\t\t\tRTE_ASSERT(ret == 0);\n-\n-\t\t\t\tevtim->impl_opaque[0] = (uintptr_t)tim;\n-\t\t\t\tevtim->impl_opaque[1] = (uintptr_t)adapter;\n-\n-\t\t\t\tTAILQ_INSERT_TAIL(&sw_data->msgs_tailq_head,\n-\t\t\t\t\t\t  msg,\n-\t\t\t\t\t\t  msgs);\n-\t\t\t\tbreak;\n-\t\t\tcase MSG_TYPE_CANCEL:\n-\t\t\t\tEVTIM_SVC_LOG_DBG(\"dequeued CANCEL message \"\n-\t\t\t\t\t\t  \"from ring\");\n-\t\t\t\topaque = evtim->impl_opaque[0];\n-\t\t\t\ttim = (struct rte_timer *)(uintptr_t)opaque;\n-\t\t\t\tRTE_ASSERT(tim != NULL);\n-\n-\t\t\t\tret = rte_timer_stop(tim);\n-\t\t\t\tRTE_ASSERT(ret == 0);\n-\n-\t\t\t\t/* Free the msg object for the original arm\n-\t\t\t\t * request.\n-\t\t\t\t */\n-\t\t\t\tstruct msg *m;\n-\t\t\t\tm = container_of(tim, struct msg, tim);\n-\t\t\t\tTAILQ_REMOVE(&sw_data->msgs_tailq_head, m,\n-\t\t\t\t\t     msgs);\n-\t\t\t\trte_mempool_put(sw_data->msg_pool, m);\n-\n-\t\t\t\t/* Free the msg object for the current msg */\n-\t\t\t\trte_mempool_put(sw_data->msg_pool, msg);\n-\n-\t\t\t\tevtim->impl_opaque[0] = 0;\n-\t\t\t\tevtim->impl_opaque[1] = 0;\n-\n-\t\t\t\tbreak;\n-\t\t\t}\n-\t\t}\n-\t}\n \n-\tsw_data->service_phase = 2;\n-\trte_smp_wmb();\n+\tif (swtim_did_tick(sw)) {\n+\t\t/* This lock is seldom acquired on the arm side */\n+\t\trte_spinlock_lock(&sw->poll_lcores_sl);\n+\n+\t\trte_timer_alt_manage(sw->timer_data_id,\n+\t\t\t\t     sw->poll_lcores,\n+\t\t\t\t     sw->n_poll_lcores,\n+\t\t\t\t     swtim_callback);\n \n-\tif (adapter_did_tick(adapter)) {\n-\t\trte_timer_manage();\n+\t\trte_spinlock_unlock(&sw->poll_lcores_sl);\n \n-\t\tevent_buffer_flush(&sw_data->buffer,\n+\t\t/* Return expired timer objects back to mempool */\n+\t\trte_mempool_put_bulk(sw->tim_pool, (void **)sw->expired_timers,\n+\t\t\t\t     sw->expired_timers_idx);\n+\t\tsw->expired_timers_idx = 0;\n+\n+\t\tevent_buffer_flush(&sw->buffer,\n \t\t\t\t   adapter->data->event_dev_id,\n \t\t\t\t   adapter->data->event_port_id,\n-\t\t\t\t   &nb_evs_flushed, &nb_evs_invalid);\n+\t\t\t\t   &nb_evs_flushed,\n+\t\t\t\t   &nb_evs_invalid);\n \n-\t\tsw_data->stats.ev_enq_count += nb_evs_flushed;\n-\t\tsw_data->stats.ev_inv_count += nb_evs_invalid;\n-\t\tsw_data->stats.adapter_tick_count++;\n+\t\tsw->stats.ev_enq_count += nb_evs_flushed;\n+\t\tsw->stats.ev_inv_count += nb_evs_invalid;\n+\t\tsw->stats.adapter_tick_count++;\n \t}\n \n-\tsw_data->service_phase = 0;\n-\trte_smp_wmb();\n-\n \treturn 0;\n }\n \n@@ -820,7 +767,7 @@ compute_msg_mempool_cache_size(uint64_t nb_requested, uint64_t nb_actual)\n \tint size;\n \tint cache_size = 0;\n \n-\tfor (i = 0; ; i++) {\n+\tfor (i = 0;; i++) {\n \t\tsize = 1 << i;\n \n \t\tif (RTE_MAX_LCORE * size < (int)(nb_actual - nb_requested) &&\n@@ -834,168 +781,145 @@ compute_msg_mempool_cache_size(uint64_t nb_requested, uint64_t nb_actual)\n \treturn cache_size;\n }\n \n-#define SW_MIN_INTERVAL 1E5\n-\n static int\n-sw_event_timer_adapter_init(struct rte_event_timer_adapter *adapter)\n+swtim_init(struct rte_event_timer_adapter *adapter)\n {\n-\tint ret;\n-\tstruct rte_event_timer_adapter_sw_data *sw_data;\n-\tuint64_t nb_timers;\n+\tint i, ret;\n+\tstruct swtim *sw;\n \tunsigned int flags;\n \tstruct rte_service_spec service;\n-\tstatic bool timer_subsystem_inited; // static initialized to false\n \n-\t/* Allocate storage for SW implementation data */\n-\tchar priv_data_name[RTE_RING_NAMESIZE];\n-\tsnprintf(priv_data_name, RTE_RING_NAMESIZE, \"sw_evtim_adap_priv_%\"PRIu8,\n-\t\t adapter->data->id);\n-\tadapter->data->adapter_priv = rte_zmalloc_socket(\n-\t\t\t\tpriv_data_name,\n-\t\t\t\tsizeof(struct rte_event_timer_adapter_sw_data),\n-\t\t\t\tRTE_CACHE_LINE_SIZE,\n-\t\t\t\tadapter->data->socket_id);\n-\tif (adapter->data->adapter_priv == NULL) {\n+\t/* Allocate storage for private data area */\n+#define SWTIM_NAMESIZE 32\n+\tchar swtim_name[SWTIM_NAMESIZE];\n+\tsnprintf(swtim_name, SWTIM_NAMESIZE, \"swtim_%\"PRIu8,\n+\t\t\tadapter->data->id);\n+\tsw = rte_zmalloc_socket(swtim_name, sizeof(*sw), RTE_CACHE_LINE_SIZE,\n+\t\t\tadapter->data->socket_id);\n+\tif (sw == NULL) {\n \t\tEVTIM_LOG_ERR(\"failed to allocate space for private data\");\n \t\trte_errno = ENOMEM;\n \t\treturn -1;\n \t}\n \n-\tif (adapter->data->conf.timer_tick_ns < SW_MIN_INTERVAL) {\n-\t\tEVTIM_LOG_ERR(\"failed to create adapter with requested tick \"\n-\t\t\t      \"interval\");\n-\t\trte_errno = EINVAL;\n-\t\treturn -1;\n-\t}\n-\n-\tsw_data = adapter->data->adapter_priv;\n-\n-\tsw_data->timer_tick_ns = adapter->data->conf.timer_tick_ns;\n-\tsw_data->max_tmo_ns = adapter->data->conf.max_tmo_ns;\n-\n-\tTAILQ_INIT(&sw_data->msgs_tailq_head);\n-\trte_spinlock_init(&sw_data->msgs_tailq_sl);\n-\trte_atomic16_init(&sw_data->message_producer_count);\n+\t/* Connect storage to adapter instance */\n+\tadapter->data->adapter_priv = sw;\n+\tsw->adapter = adapter;\n \n-\t/* Rings require power of 2, so round up to next such value */\n-\tnb_timers = rte_align64pow2(adapter->data->conf.nb_timers);\n+\tsw->timer_tick_ns = adapter->data->conf.timer_tick_ns;\n+\tsw->max_tmo_ns = adapter->data->conf.max_tmo_ns;\n \n-\tchar msg_ring_name[RTE_RING_NAMESIZE];\n-\tsnprintf(msg_ring_name, RTE_RING_NAMESIZE,\n-\t\t \"sw_evtim_adap_msg_ring_%\"PRIu8, adapter->data->id);\n-\tflags = adapter->data->conf.flags & RTE_EVENT_TIMER_ADAPTER_F_SP_PUT ?\n-\t\tRING_F_SP_ENQ | RING_F_SC_DEQ :\n-\t\tRING_F_SC_DEQ;\n-\tsw_data->msg_ring = rte_ring_create(msg_ring_name, nb_timers,\n-\t\t\t\t\t    adapter->data->socket_id, flags);\n-\tif (sw_data->msg_ring == NULL) {\n-\t\tEVTIM_LOG_ERR(\"failed to create message ring\");\n-\t\trte_errno = ENOMEM;\n-\t\tgoto free_priv_data;\n-\t}\n-\n-\tchar pool_name[RTE_RING_NAMESIZE];\n-\tsnprintf(pool_name, RTE_RING_NAMESIZE, \"sw_evtim_adap_msg_pool_%\"PRIu8,\n+\t/* Create a timer pool */\n+\tchar pool_name[SWTIM_NAMESIZE];\n+\tsnprintf(pool_name, SWTIM_NAMESIZE, \"swtim_pool_%\"PRIu8,\n \t\t adapter->data->id);\n-\n-\t/* Both the arming/canceling thread and the service thread will do puts\n-\t * to the mempool, but if the SP_PUT flag is enabled, we can specify\n-\t * single-consumer get for the mempool.\n-\t */\n-\tflags = adapter->data->conf.flags & RTE_EVENT_TIMER_ADAPTER_F_SP_PUT ?\n-\t\tMEMPOOL_F_SC_GET : 0;\n-\n-\t/* The usable size of a ring is count - 1, so subtract one here to\n-\t * make the counts agree.\n-\t */\n+\t/* Optimal mempool size is a power of 2 minus one */\n+\tuint64_t nb_timers = rte_align64pow2(adapter->data->conf.nb_timers);\n \tint pool_size = nb_timers - 1;\n \tint cache_size = compute_msg_mempool_cache_size(\n \t\t\t\tadapter->data->conf.nb_timers, nb_timers);\n-\tsw_data->msg_pool = rte_mempool_create(pool_name, pool_size,\n-\t\t\t\t\t       sizeof(struct msg), cache_size,\n-\t\t\t\t\t       0, NULL, NULL, NULL, NULL,\n-\t\t\t\t\t       adapter->data->socket_id, flags);\n-\tif (sw_data->msg_pool == NULL) {\n-\t\tEVTIM_LOG_ERR(\"failed to create message object mempool\");\n+\tflags = 0; /* pool is multi-producer, multi-consumer */\n+\tsw->tim_pool = rte_mempool_create(pool_name, pool_size,\n+\t\t\tsizeof(struct rte_timer), cache_size, 0, NULL, NULL,\n+\t\t\tNULL, NULL, adapter->data->socket_id, flags);\n+\tif (sw->tim_pool == NULL) {\n+\t\tEVTIM_LOG_ERR(\"failed to create timer object mempool\");\n \t\trte_errno = ENOMEM;\n-\t\tgoto free_msg_ring;\n+\t\tgoto free_alloc;\n \t}\n \n-\tevent_buffer_init(&sw_data->buffer);\n+\t/* Initialize the variables that track in-use timer lists */\n+\trte_spinlock_init(&sw->poll_lcores_sl);\n+\tfor (i = 0; i < RTE_MAX_LCORE; i++)\n+\t\trte_atomic16_init(&sw->in_use[i].v);\n+\n+\t/* Initialize the timer subsystem and allocate timer data instance */\n+\tret = rte_timer_subsystem_init();\n+\tif (ret < 0) {\n+\t\tif (ret != -EALREADY) {\n+\t\t\tEVTIM_LOG_ERR(\"failed to initialize timer subsystem\");\n+\t\t\trte_errno = ret;\n+\t\t\tgoto free_mempool;\n+\t\t}\n+\t}\n+\n+\tret = rte_timer_data_alloc(&sw->timer_data_id);\n+\tif (ret < 0) {\n+\t\tEVTIM_LOG_ERR(\"failed to allocate timer data instance\");\n+\t\trte_errno = ret;\n+\t\tgoto free_mempool;\n+\t}\n+\n+\t/* Initialize timer event buffer */\n+\tevent_buffer_init(&sw->buffer);\n+\n+\tsw->adapter = adapter;\n \n \t/* Register a service component to run adapter logic */\n \tmemset(&service, 0, sizeof(service));\n \tsnprintf(service.name, RTE_SERVICE_NAME_MAX,\n-\t\t \"sw_evimer_adap_svc_%\"PRIu8, adapter->data->id);\n+\t\t \"swtim_svc_%\"PRIu8, adapter->data->id);\n \tservice.socket_id = adapter->data->socket_id;\n-\tservice.callback = sw_event_timer_adapter_service_func;\n+\tservice.callback = swtim_service_func;\n \tservice.callback_userdata = adapter;\n \tservice.capabilities &= ~(RTE_SERVICE_CAP_MT_SAFE);\n-\tret = rte_service_component_register(&service, &sw_data->service_id);\n+\tret = rte_service_component_register(&service, &sw->service_id);\n \tif (ret < 0) {\n \t\tEVTIM_LOG_ERR(\"failed to register service %s with id %\"PRIu32\n-\t\t\t      \": err = %d\", service.name, sw_data->service_id,\n+\t\t\t      \": err = %d\", service.name, sw->service_id,\n \t\t\t      ret);\n \n \t\trte_errno = ENOSPC;\n-\t\tgoto free_msg_pool;\n+\t\tgoto free_mempool;\n \t}\n \n \tEVTIM_LOG_DBG(\"registered service %s with id %\"PRIu32, service.name,\n-\t\t      sw_data->service_id);\n+\t\t      sw->service_id);\n \n-\tadapter->data->service_id = sw_data->service_id;\n+\tadapter->data->service_id = sw->service_id;\n \tadapter->data->service_inited = 1;\n \n-\tif (!timer_subsystem_inited) {\n-\t\trte_timer_subsystem_init();\n-\t\ttimer_subsystem_inited = true;\n-\t}\n-\n \treturn 0;\n-\n-free_msg_pool:\n-\trte_mempool_free(sw_data->msg_pool);\n-free_msg_ring:\n-\trte_ring_free(sw_data->msg_ring);\n-free_priv_data:\n-\trte_free(sw_data);\n+free_mempool:\n+\trte_mempool_free(sw->tim_pool);\n+free_alloc:\n+\trte_free(sw);\n \treturn -1;\n }\n \n-static int\n-sw_event_timer_adapter_uninit(struct rte_event_timer_adapter *adapter)\n+static void\n+swtim_free_tim(struct rte_timer *tim, void *arg)\n {\n-\tint ret;\n-\tstruct msg *m1, *m2;\n-\tstruct rte_event_timer_adapter_sw_data *sw_data =\n-\t\t\t\t\t\tadapter->data->adapter_priv;\n-\n-\trte_spinlock_lock(&sw_data->msgs_tailq_sl);\n+\tstruct swtim *sw = arg;\n \n-\t/* Cancel outstanding rte_timers and free msg objects */\n-\tm1 = TAILQ_FIRST(&sw_data->msgs_tailq_head);\n-\twhile (m1 != NULL) {\n-\t\tEVTIM_LOG_DBG(\"freeing outstanding timer\");\n-\t\tm2 = TAILQ_NEXT(m1, msgs);\n-\n-\t\trte_timer_stop_sync(&m1->tim);\n-\t\trte_mempool_put(sw_data->msg_pool, m1);\n+\trte_mempool_put(sw->tim_pool, tim);\n+}\n \n-\t\tm1 = m2;\n-\t}\n+/* Traverse the list of outstanding timers and put them back in the mempool\n+ * before freeing the adapter to avoid leaking the memory.\n+ */\n+static int\n+swtim_uninit(struct rte_event_timer_adapter *adapter)\n+{\n+\tint ret;\n+\tstruct swtim *sw = swtim_pmd_priv(adapter);\n \n-\trte_spinlock_unlock(&sw_data->msgs_tailq_sl);\n+\t/* Free outstanding timers */\n+\trte_timer_stop_all(sw->timer_data_id,\n+\t\t\t   sw->poll_lcores,\n+\t\t\t   sw->n_poll_lcores,\n+\t\t\t   swtim_free_tim,\n+\t\t\t   sw);\n \n-\tret = rte_service_component_unregister(sw_data->service_id);\n+\tret = rte_service_component_unregister(sw->service_id);\n \tif (ret < 0) {\n \t\tEVTIM_LOG_ERR(\"failed to unregister service component\");\n \t\treturn ret;\n \t}\n \n-\trte_ring_free(sw_data->msg_ring);\n-\trte_mempool_free(sw_data->msg_pool);\n-\trte_free(adapter->data->adapter_priv);\n+\trte_mempool_free(sw->tim_pool);\n+\trte_free(sw);\n+\tadapter->data->adapter_priv = NULL;\n \n \treturn 0;\n }\n@@ -1016,88 +940,79 @@ get_mapped_count_for_service(uint32_t service_id)\n }\n \n static int\n-sw_event_timer_adapter_start(const struct rte_event_timer_adapter *adapter)\n+swtim_start(const struct rte_event_timer_adapter *adapter)\n {\n \tint mapped_count;\n-\tstruct rte_event_timer_adapter_sw_data *sw_data;\n-\n-\tsw_data = adapter->data->adapter_priv;\n+\tstruct swtim *sw = swtim_pmd_priv(adapter);\n \n \t/* Mapping the service to more than one service core can introduce\n \t * delays while one thread is waiting to acquire a lock, so only allow\n \t * one core to be mapped to the service.\n+\t *\n+\t * Note: the service could be modified such that it spreads cores to\n+\t * poll over multiple service instances.\n \t */\n-\tmapped_count = get_mapped_count_for_service(sw_data->service_id);\n+\tmapped_count = get_mapped_count_for_service(sw->service_id);\n \n-\tif (mapped_count == 1)\n-\t\treturn rte_service_component_runstate_set(sw_data->service_id,\n-\t\t\t\t\t\t\t  1);\n+\tif (mapped_count != 1)\n+\t\treturn mapped_count < 1 ? -ENOENT : -ENOTSUP;\n \n-\treturn mapped_count < 1 ? -ENOENT : -ENOTSUP;\n+\treturn rte_service_component_runstate_set(sw->service_id, 1);\n }\n \n static int\n-sw_event_timer_adapter_stop(const struct rte_event_timer_adapter *adapter)\n+swtim_stop(const struct rte_event_timer_adapter *adapter)\n {\n \tint ret;\n-\tstruct rte_event_timer_adapter_sw_data *sw_data =\n-\t\t\t\t\t\tadapter->data->adapter_priv;\n+\tstruct swtim *sw = swtim_pmd_priv(adapter);\n \n-\tret = rte_service_component_runstate_set(sw_data->service_id, 0);\n+\tret = rte_service_component_runstate_set(sw->service_id, 0);\n \tif (ret < 0)\n \t\treturn ret;\n \n-\t/* Wait for the service to complete its final iteration before\n-\t * stopping.\n-\t */\n-\twhile (sw_data->service_phase != 0)\n+\t/* Wait for the service to complete its final iteration */\n+\twhile (rte_service_may_be_active(sw->service_id))\n \t\trte_pause();\n \n-\trte_smp_rmb();\n-\n \treturn 0;\n }\n \n static void\n-sw_event_timer_adapter_get_info(const struct rte_event_timer_adapter *adapter,\n+swtim_get_info(const struct rte_event_timer_adapter *adapter,\n \t\tstruct rte_event_timer_adapter_info *adapter_info)\n {\n-\tstruct rte_event_timer_adapter_sw_data *sw_data;\n-\tsw_data = adapter->data->adapter_priv;\n-\n-\tadapter_info->min_resolution_ns = sw_data->timer_tick_ns;\n-\tadapter_info->max_tmo_ns = sw_data->max_tmo_ns;\n+\tstruct swtim *sw = swtim_pmd_priv(adapter);\n+\tadapter_info->min_resolution_ns = sw->timer_tick_ns;\n+\tadapter_info->max_tmo_ns = sw->max_tmo_ns;\n }\n \n static int\n-sw_event_timer_adapter_stats_get(const struct rte_event_timer_adapter *adapter,\n-\t\t\t\t struct rte_event_timer_adapter_stats *stats)\n+swtim_stats_get(const struct rte_event_timer_adapter *adapter,\n+\t\tstruct rte_event_timer_adapter_stats *stats)\n {\n-\tstruct rte_event_timer_adapter_sw_data *sw_data;\n-\tsw_data = adapter->data->adapter_priv;\n-\t*stats = sw_data->stats;\n+\tstruct swtim *sw = swtim_pmd_priv(adapter);\n+\t*stats = sw->stats; /* structure copy */\n \treturn 0;\n }\n \n static int\n-sw_event_timer_adapter_stats_reset(\n-\t\t\t\tconst struct rte_event_timer_adapter *adapter)\n+swtim_stats_reset(const struct rte_event_timer_adapter *adapter)\n {\n-\tstruct rte_event_timer_adapter_sw_data *sw_data;\n-\tsw_data = adapter->data->adapter_priv;\n-\tmemset(&sw_data->stats, 0, sizeof(sw_data->stats));\n+\tstruct swtim *sw = swtim_pmd_priv(adapter);\n+\tmemset(&sw->stats, 0, sizeof(sw->stats));\n \treturn 0;\n }\n \n-static __rte_always_inline uint16_t\n-__sw_event_timer_arm_burst(const struct rte_event_timer_adapter *adapter,\n-\t\t\t  struct rte_event_timer **evtims,\n-\t\t\t  uint16_t nb_evtims)\n+static uint16_t\n+__swtim_arm_burst(const struct rte_event_timer_adapter *adapter,\n+\t\tstruct rte_event_timer **evtims,\n+\t\tuint16_t nb_evtims)\n {\n-\tuint16_t i;\n-\tint ret;\n-\tstruct rte_event_timer_adapter_sw_data *sw_data;\n-\tstruct msg *msgs[nb_evtims];\n+\tint i, ret;\n+\tstruct swtim *sw = swtim_pmd_priv(adapter);\n+\tuint32_t lcore_id = rte_lcore_id();\n+\tstruct rte_timer *tim, *tims[nb_evtims];\n+\tuint64_t cycles;\n \n #ifdef RTE_LIBRTE_EVENTDEV_DEBUG\n \t/* Check that the service is running. */\n@@ -1107,101 +1022,104 @@ __sw_event_timer_arm_burst(const struct rte_event_timer_adapter *adapter,\n \t}\n #endif\n \n-\tsw_data = adapter->data->adapter_priv;\n+\t/* Adjust lcore_id if non-EAL thread. Arbitrarily pick the timer list of\n+\t * the highest lcore to insert such timers into\n+\t */\n+\tif (lcore_id == LCORE_ID_ANY)\n+\t\tlcore_id = RTE_MAX_LCORE - 1;\n+\n+\t/* If this is the first time we're arming an event timer on this lcore,\n+\t * mark this lcore as \"in use\"; this will cause the service\n+\t * function to process the timer list that corresponds to this lcore.\n+\t */\n+\tif (unlikely(rte_atomic16_test_and_set(&sw->in_use[lcore_id].v))) {\n+\t\trte_spinlock_lock(&sw->poll_lcores_sl);\n+\t\tEVTIM_LOG_DBG(\"Adding lcore id = %u to list of lcores to poll\",\n+\t\t\t      lcore_id);\n+\t\tsw->poll_lcores[sw->n_poll_lcores++] = lcore_id;\n+\t\trte_spinlock_unlock(&sw->poll_lcores_sl);\n+\t}\n \n-\tret = rte_mempool_get_bulk(sw_data->msg_pool, (void **)msgs, nb_evtims);\n+\tret = rte_mempool_get_bulk(sw->tim_pool, (void **)tims,\n+\t\t\t\t   nb_evtims);\n \tif (ret < 0) {\n \t\trte_errno = ENOSPC;\n \t\treturn 0;\n \t}\n \n-\t/* Let the service know we're producing messages for it to process */\n-\trte_atomic16_inc(&sw_data->message_producer_count);\n-\n-\t/* If the service is managing timers, wait for it to finish */\n-\twhile (sw_data->service_phase == 2)\n-\t\trte_pause();\n-\n-\trte_smp_rmb();\n-\n \tfor (i = 0; i < nb_evtims; i++) {\n \t\t/* Don't modify the event timer state in these cases */\n \t\tif (evtims[i]->state == RTE_EVENT_TIMER_ARMED) {\n \t\t\trte_errno = EALREADY;\n \t\t\tbreak;\n \t\t} else if (!(evtims[i]->state == RTE_EVENT_TIMER_NOT_ARMED ||\n-\t\t    evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) {\n+\t\t\t     evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) {\n \t\t\trte_errno = EINVAL;\n \t\t\tbreak;\n \t\t}\n \n \t\tret = check_timeout(evtims[i], adapter);\n-\t\tif (ret == -1) {\n+\t\tif (unlikely(ret == -1)) {\n \t\t\tevtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOLATE;\n \t\t\trte_errno = EINVAL;\n \t\t\tbreak;\n-\t\t}\n-\t\tif (ret == -2) {\n+\t\t} else if (unlikely(ret == -2)) {\n \t\t\tevtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOEARLY;\n \t\t\trte_errno = EINVAL;\n \t\t\tbreak;\n \t\t}\n \n-\t\tif (check_destination_event_queue(evtims[i], adapter) < 0) {\n+\t\tif (unlikely(check_destination_event_queue(evtims[i],\n+\t\t\t\t\t\t\t   adapter) < 0)) {\n \t\t\tevtims[i]->state = RTE_EVENT_TIMER_ERROR;\n \t\t\trte_errno = EINVAL;\n \t\t\tbreak;\n \t\t}\n \n-\t\t/* Checks passed, set up a message to enqueue */\n-\t\tmsgs[i]->type = MSG_TYPE_ARM;\n-\t\tmsgs[i]->evtim = evtims[i];\n+\t\ttim = tims[i];\n+\t\trte_timer_init(tim);\n \n-\t\t/* Set the payload pointer if not set. */\n-\t\tif (evtims[i]->ev.event_ptr == NULL)\n-\t\t\tevtims[i]->ev.event_ptr = evtims[i];\n+\t\tevtims[i]->impl_opaque[0] = (uintptr_t)tim;\n+\t\tevtims[i]->impl_opaque[1] = (uintptr_t)adapter;\n \n-\t\t/* msg objects that get enqueued successfully will be freed\n-\t\t * either by a future cancel operation or by the timer\n-\t\t * expiration callback.\n-\t\t */\n-\t\tif (rte_ring_enqueue(sw_data->msg_ring, msgs[i]) < 0) {\n-\t\t\trte_errno = ENOSPC;\n+\t\tcycles = get_timeout_cycles(evtims[i], adapter);\n+\t\tret = rte_timer_alt_reset(sw->timer_data_id, tim, cycles,\n+\t\t\t\t\t  SINGLE, lcore_id, NULL, evtims[i]);\n+\t\tif (ret < 0) {\n+\t\t\t/* tim was in RUNNING or CONFIG state */\n+\t\t\tevtims[i]->state = RTE_EVENT_TIMER_ERROR;\n \t\t\tbreak;\n \t\t}\n \n-\t\tEVTIM_LOG_DBG(\"enqueued ARM message to ring\");\n-\n+\t\trte_smp_wmb();\n+\t\tEVTIM_LOG_DBG(\"armed an event timer\");\n \t\tevtims[i]->state = RTE_EVENT_TIMER_ARMED;\n \t}\n \n-\t/* Let the service know we're done producing messages */\n-\trte_atomic16_dec(&sw_data->message_producer_count);\n-\n \tif (i < nb_evtims)\n-\t\trte_mempool_put_bulk(sw_data->msg_pool, (void **)&msgs[i],\n-\t\t\t\t     nb_evtims - i);\n+\t\trte_mempool_put_bulk(sw->tim_pool,\n+\t\t\t\t     (void **)&tims[i], nb_evtims - i);\n \n \treturn i;\n }\n \n static uint16_t\n-sw_event_timer_arm_burst(const struct rte_event_timer_adapter *adapter,\n-\t\t\t struct rte_event_timer **evtims,\n-\t\t\t uint16_t nb_evtims)\n+swtim_arm_burst(const struct rte_event_timer_adapter *adapter,\n+\t\tstruct rte_event_timer **evtims,\n+\t\tuint16_t nb_evtims)\n {\n-\treturn __sw_event_timer_arm_burst(adapter, evtims, nb_evtims);\n+\treturn __swtim_arm_burst(adapter, evtims, nb_evtims);\n }\n \n static uint16_t\n-sw_event_timer_cancel_burst(const struct rte_event_timer_adapter *adapter,\n-\t\t\t    struct rte_event_timer **evtims,\n-\t\t\t    uint16_t nb_evtims)\n+swtim_cancel_burst(const struct rte_event_timer_adapter *adapter,\n+\t\t   struct rte_event_timer **evtims,\n+\t\t   uint16_t nb_evtims)\n {\n-\tuint16_t i;\n-\tint ret;\n-\tstruct rte_event_timer_adapter_sw_data *sw_data;\n-\tstruct msg *msgs[nb_evtims];\n+\tint i, ret;\n+\tstruct rte_timer *timp;\n+\tuint64_t opaque;\n+\tstruct swtim *sw = swtim_pmd_priv(adapter);\n \n #ifdef RTE_LIBRTE_EVENTDEV_DEBUG\n \t/* Check that the service is running. */\n@@ -1211,23 +1129,6 @@ sw_event_timer_cancel_burst(const struct rte_event_timer_adapter *adapter,\n \t}\n #endif\n \n-\tsw_data = adapter->data->adapter_priv;\n-\n-\tret = rte_mempool_get_bulk(sw_data->msg_pool, (void **)msgs, nb_evtims);\n-\tif (ret < 0) {\n-\t\trte_errno = ENOSPC;\n-\t\treturn 0;\n-\t}\n-\n-\t/* Let the service know we're producing messages for it to process */\n-\trte_atomic16_inc(&sw_data->message_producer_count);\n-\n-\t/* If the service could be modifying event timer states, wait */\n-\twhile (sw_data->service_phase == 2)\n-\t\trte_pause();\n-\n-\trte_smp_rmb();\n-\n \tfor (i = 0; i < nb_evtims; i++) {\n \t\t/* Don't modify the event timer state in these cases */\n \t\tif (evtims[i]->state == RTE_EVENT_TIMER_CANCELED) {\n@@ -1238,54 +1139,56 @@ sw_event_timer_cancel_burst(const struct rte_event_timer_adapter *adapter,\n \t\t\tbreak;\n \t\t}\n \n-\t\tmsgs[i]->type = MSG_TYPE_CANCEL;\n-\t\tmsgs[i]->evtim = evtims[i];\n+\t\trte_smp_rmb();\n+\n+\t\topaque = evtims[i]->impl_opaque[0];\n+\t\ttimp = (struct rte_timer *)(uintptr_t)opaque;\n+\t\tRTE_ASSERT(timp != NULL);\n \n-\t\tif (rte_ring_enqueue(sw_data->msg_ring, msgs[i]) < 0) {\n-\t\t\trte_errno = ENOSPC;\n+\t\tret = rte_timer_alt_stop(sw->timer_data_id, timp);\n+\t\tif (ret < 0) {\n+\t\t\t/* Timer is running or being configured */\n+\t\t\trte_errno = EAGAIN;\n \t\t\tbreak;\n \t\t}\n \n-\t\tEVTIM_LOG_DBG(\"enqueued CANCEL message to ring\");\n+\t\trte_mempool_put(sw->tim_pool, (void **)timp);\n \n \t\tevtims[i]->state = RTE_EVENT_TIMER_CANCELED;\n-\t}\n+\t\tevtims[i]->impl_opaque[0] = 0;\n+\t\tevtims[i]->impl_opaque[1] = 0;\n \n-\t/* Let the service know we're done producing messages */\n-\trte_atomic16_dec(&sw_data->message_producer_count);\n-\n-\tif (i < nb_evtims)\n-\t\trte_mempool_put_bulk(sw_data->msg_pool, (void **)&msgs[i],\n-\t\t\t\t     nb_evtims - i);\n+\t\trte_smp_wmb();\n+\t}\n \n \treturn i;\n }\n \n static uint16_t\n-sw_event_timer_arm_tmo_tick_burst(const struct rte_event_timer_adapter *adapter,\n-\t\t\t\t  struct rte_event_timer **evtims,\n-\t\t\t\t  uint64_t timeout_ticks,\n-\t\t\t\t  uint16_t nb_evtims)\n+swtim_arm_tmo_tick_burst(const struct rte_event_timer_adapter *adapter,\n+\t\t\t struct rte_event_timer **evtims,\n+\t\t\t uint64_t timeout_ticks,\n+\t\t\t uint16_t nb_evtims)\n {\n \tint i;\n \n \tfor (i = 0; i < nb_evtims; i++)\n \t\tevtims[i]->timeout_ticks = timeout_ticks;\n \n-\treturn __sw_event_timer_arm_burst(adapter, evtims, nb_evtims);\n+\treturn __swtim_arm_burst(adapter, evtims, nb_evtims);\n }\n \n-static const struct rte_event_timer_adapter_ops sw_event_adapter_timer_ops = {\n-\t.init = sw_event_timer_adapter_init,\n-\t.uninit = sw_event_timer_adapter_uninit,\n-\t.start = sw_event_timer_adapter_start,\n-\t.stop = sw_event_timer_adapter_stop,\n-\t.get_info = sw_event_timer_adapter_get_info,\n-\t.stats_get = sw_event_timer_adapter_stats_get,\n-\t.stats_reset = sw_event_timer_adapter_stats_reset,\n-\t.arm_burst = sw_event_timer_arm_burst,\n-\t.arm_tmo_tick_burst = sw_event_timer_arm_tmo_tick_burst,\n-\t.cancel_burst = sw_event_timer_cancel_burst,\n+static const struct rte_event_timer_adapter_ops swtim_ops = {\n+\t.init\t\t\t= swtim_init,\n+\t.uninit\t\t\t= swtim_uninit,\n+\t.start\t\t\t= swtim_start,\n+\t.stop\t\t\t= swtim_stop,\n+\t.get_info\t\t= swtim_get_info,\n+\t.stats_get\t\t= swtim_stats_get,\n+\t.stats_reset\t\t= swtim_stats_reset,\n+\t.arm_burst\t\t= swtim_arm_burst,\n+\t.arm_tmo_tick_burst\t= swtim_arm_tmo_tick_burst,\n+\t.cancel_burst\t\t= swtim_cancel_burst,\n };\n \n RTE_INIT(event_timer_adapter_init_log)\n",
    "prefixes": [
        "v7",
        "1/1"
    ]
}