Show a patch.

GET /api/patches/41655/
HTTP 200 OK
Allow: GET, PUT, PATCH, HEAD, OPTIONS
Content-Type: application/json
Vary: Accept

{
    "id": 41655,
    "url": "http://patches.dpdk.org/api/patches/41655/",
    "web_url": "http://patches.dpdk.org/patch/41655/",
    "project": {
        "id": 1,
        "url": "http://patches.dpdk.org/api/projects/1/",
        "name": "DPDK",
        "link_name": "dpdk",
        "list_id": "dev.dpdk.org",
        "list_email": "dev@dpdk.org",
        "web_url": "http://core.dpdk.org",
        "scm_url": "git://dpdk.org/dpdk",
        "webscm_url": "http://git.dpdk.org/dpdk"
    },
    "msgid": "<1530096938-71399-5-git-send-email-nikhil.rao@intel.com>",
    "date": "2018-06-27T10:55:37",
    "name": "[v2,4/5] eventdev: add interrupt driven queues to Rx adapter",
    "commit_ref": null,
    "pull_url": null,
    "state": "superseded",
    "archived": true,
    "hash": "1bf868659be49df49acd21eaeacf7c1c4e20e567",
    "submitter": {
        "id": 528,
        "url": "http://patches.dpdk.org/api/people/528/",
        "name": "Nikhil Rao",
        "email": "nikhil.rao@intel.com"
    },
    "delegate": {
        "id": 310,
        "url": "http://patches.dpdk.org/api/users/310/",
        "username": "jerin",
        "first_name": "Jerin",
        "last_name": "Jacob",
        "email": "jerin.jacob@caviumnetworks.com"
    },
    "mbox": "http://patches.dpdk.org/patch/41655/mbox/",
    "series": [
        {
            "id": 260,
            "url": "http://patches.dpdk.org/api/series/260/",
            "web_url": "http://patches.dpdk.org/project/dpdk/list/?series=260",
            "date": "2018-06-27T10:55:33",
            "name": "eventdev: add interrupt driven queues to Rx adapter",
            "version": 2,
            "mbox": "http://patches.dpdk.org/series/260/mbox/"
        }
    ],
    "comments": "http://patches.dpdk.org/api/patches/41655/comments/",
    "check": "fail",
    "checks": "http://patches.dpdk.org/api/patches/41655/checks/",
    "tags": {},
    "headers": {
        "X-Mailman-Version": "2.1.15",
        "X-ExtLoop1": "1",
        "Errors-To": "dev-bounces@dpdk.org",
        "X-Amp-Result": "SKIPPED(no attachment in message)",
        "X-Mailer": "git-send-email 1.8.3.1",
        "Received": [
            "from [92.243.14.124] (localhost [127.0.0.1])\n\tby dpdk.org (Postfix) with ESMTP id BEF151BEC3;\n\tWed, 27 Jun 2018 12:56:07 +0200 (CEST)",
            "from mga05.intel.com (mga05.intel.com [192.55.52.43])\n\tby dpdk.org (Postfix) with ESMTP id 424DA1BB92\n\tfor <dev@dpdk.org>; Wed, 27 Jun 2018 12:55:56 +0200 (CEST)",
            "from orsmga008.jf.intel.com ([10.7.209.65])\n\tby fmsmga105.fm.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384;\n\t27 Jun 2018 03:55:55 -0700",
            "from unknown (HELO localhost.localdomain.localdomain)\n\t([10.224.122.193])\n\tby orsmga008.jf.intel.com with ESMTP; 27 Jun 2018 03:55:54 -0700"
        ],
        "References": "<1528481718-7241-1-git-send-email-nikhil.rao@intel.com>\n\t<1530096938-71399-1-git-send-email-nikhil.rao@intel.com>",
        "X-Amp-File-Uploaded": "False",
        "X-BeenThere": "dev@dpdk.org",
        "Message-Id": "<1530096938-71399-5-git-send-email-nikhil.rao@intel.com>",
        "X-IronPort-AV": "E=Sophos;i=\"5.51,278,1526367600\"; d=\"scan'208\";a=\"52634802\"",
        "List-Id": "DPDK patches and discussions <dev.dpdk.org>",
        "Precedence": "list",
        "From": "Nikhil Rao <nikhil.rao@intel.com>",
        "X-Original-To": "patchwork@dpdk.org",
        "List-Post": "<mailto:dev@dpdk.org>",
        "Return-Path": "<dev-bounces@dpdk.org>",
        "Sender": "\"dev\" <dev-bounces@dpdk.org>",
        "List-Help": "<mailto:dev-request@dpdk.org?subject=help>",
        "In-Reply-To": "<1530096938-71399-1-git-send-email-nikhil.rao@intel.com>",
        "List-Subscribe": "<https://mails.dpdk.org/listinfo/dev>,\n\t<mailto:dev-request@dpdk.org?subject=subscribe>",
        "To": "jerin.jacob@caviumnetworks.com",
        "Delivered-To": "patchwork@dpdk.org",
        "List-Unsubscribe": "<https://mails.dpdk.org/options/dev>,\n\t<mailto:dev-request@dpdk.org?subject=unsubscribe>",
        "Date": "Wed, 27 Jun 2018 16:25:37 +0530",
        "Cc": "nikhil.rao@intel.com,\n\tdev@dpdk.org",
        "List-Archive": "<http://mails.dpdk.org/archives/dev/>",
        "Subject": "[dpdk-dev] [PATCH v2 4/5] eventdev: add interrupt driven queues to\n\tRx adapter"
    },
    "content": "Add support for interrupt driven queues when eth device is\nconfigured for rxq interrupts and servicing weight for the\nqueue is configured to be zero.\n\nA interrupt driven packet received counter has been added to\nrte_event_eth_rx_adapter_stats.\n\nSigned-off-by: Nikhil Rao <nikhil.rao@intel.com>\n---\n config/rte_config.h                                |   1 +\n lib/librte_eventdev/rte_event_eth_rx_adapter.h     |   5 +-\n lib/librte_eventdev/rte_event_eth_rx_adapter.c     | 923 ++++++++++++++++++++-\n .../prog_guide/event_ethernet_rx_adapter.rst       |  24 +\n config/common_base                                 |   1 +\n lib/librte_eventdev/Makefile                       |   2 +-\n 6 files changed, 927 insertions(+), 29 deletions(-)",
    "diff": "diff --git a/config/rte_config.h b/config/rte_config.h\nindex a1d0175..ec88f14 100644\n--- a/config/rte_config.h\n+++ b/config/rte_config.h\n@@ -64,6 +64,7 @@\n #define RTE_EVENT_MAX_DEVS 16\n #define RTE_EVENT_MAX_QUEUES_PER_DEV 64\n #define RTE_EVENT_TIMER_ADAPTER_NUM_MAX 32\n+#define RTE_EVENT_ETH_INTR_RING_SIZE 1024\n #define RTE_EVENT_CRYPTO_ADAPTER_MAX_INSTANCE 32\n \n /* rawdev defines */\ndiff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.h b/lib/librte_eventdev/rte_event_eth_rx_adapter.h\nindex 307b2b5..97f25e9 100644\n--- a/lib/librte_eventdev/rte_event_eth_rx_adapter.h\n+++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.h\n@@ -64,8 +64,7 @@\n  * the service function ID of the adapter in this case.\n  *\n  * Note:\n- * 1) Interrupt driven receive queues are currently unimplemented.\n- * 2) Devices created after an instance of rte_event_eth_rx_adapter_create\n+ * 1) Devices created after an instance of rte_event_eth_rx_adapter_create\n  *  should be added to a new instance of the rx adapter.\n  */\n \n@@ -199,6 +198,8 @@ struct rte_event_eth_rx_adapter_stats {\n \t * block cycles can be used to compute the percentage of\n \t * cycles the service is blocked by the event device.\n \t */\n+\tuint64_t rx_intr_packets;\n+\t/**< Received packet count for interrupt mode Rx queues */\n };\n \n /**\ndiff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c b/lib/librte_eventdev/rte_event_eth_rx_adapter.c\nindex 8fe037f..62886c4 100644\n--- a/lib/librte_eventdev/rte_event_eth_rx_adapter.c\n+++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c\n@@ -2,6 +2,8 @@\n  * Copyright(c) 2017 Intel Corporation.\n  * All rights reserved.\n  */\n+#include <unistd.h>\n+#include <sys/epoll.h>\n #include <rte_cycles.h>\n #include <rte_common.h>\n #include <rte_dev.h>\n@@ -11,6 +13,7 @@\n #include <rte_malloc.h>\n #include <rte_service_component.h>\n #include <rte_thash.h>\n+#include <rte_interrupts.h>\n \n #include \"rte_eventdev.h\"\n #include \"rte_eventdev_pmd.h\"\n@@ -24,6 +27,22 @@\n #define ETH_RX_ADAPTER_MEM_NAME_LEN\t32\n \n #define RSS_KEY_SIZE\t40\n+/* value written to intr thread pipe to signal thread exit */\n+#define ETH_BRIDGE_INTR_THREAD_EXIT\t1\n+/* Sentinel value to detect initialized file handle */\n+#define INIT_FD\t\t-1\n+\n+/*\n+ * Used to store port and queue ID of interrupting Rx queue\n+ */\n+union queue_data {\n+\tRTE_STD_C11\n+\tvoid *ptr;\n+\tstruct {\n+\t\tuint16_t port;\n+\t\tuint16_t queue;\n+\t};\n+};\n \n /*\n  * There is an instance of this struct per polled Rx queue added to the\n@@ -75,6 +94,32 @@ struct rte_event_eth_rx_adapter {\n \tuint16_t enq_block_count;\n \t/* Block start ts */\n \tuint64_t rx_enq_block_start_ts;\n+\t/* epoll fd used to wait for Rx interrupts */\n+\tint epd;\n+\t/* Num of interrupt driven interrupt queues */\n+\tuint32_t num_rx_intr;\n+\t/* Used to send <dev id, queue id> of interrupting Rx queues from\n+\t * the interrupt thread to the Rx thread\n+\t */\n+\tstruct rte_ring *intr_ring;\n+\t/* Rx Queue data (dev id, queue id) for the last non-empty\n+\t * queue polled\n+\t */\n+\tunion queue_data qd;\n+\t/* queue_data is valid */\n+\tint qd_valid;\n+\t/* Interrupt ring lock, synchronizes Rx thread\n+\t * and interrupt thread\n+\t */\n+\trte_spinlock_t intr_ring_lock;\n+\t/* event array passed to rte_poll_wait */\n+\tstruct rte_epoll_event *epoll_events;\n+\t/* Count of interrupt vectors in use */\n+\tuint32_t num_intr_vec;\n+\t/* Thread blocked on Rx interrupts */\n+\tpthread_t rx_intr_thread;\n+\t/* Stop thread flag */\n+\tuint8_t stop_thread;\n \t/* Configuration callback for rte_service configuration */\n \trte_event_eth_rx_adapter_conf_cb conf_cb;\n \t/* Configuration callback argument */\n@@ -93,6 +138,8 @@ struct rte_event_eth_rx_adapter {\n \tuint32_t service_id;\n \t/* Adapter started flag */\n \tuint8_t rxa_started;\n+\t/* Adapter ID */\n+\tuint8_t id;\n } __rte_cache_aligned;\n \n /* Per eth device */\n@@ -111,19 +158,40 @@ struct eth_device_info {\n \tuint8_t dev_rx_started;\n \t/* Number of queues added for this device */\n \tuint16_t nb_dev_queues;\n-\t/* If nb_rx_poll > 0, the start callback will\n+\t/* Number of poll based queues\n+\t * If nb_rx_poll > 0, the start callback will\n \t * be invoked if not already invoked\n \t */\n \tuint16_t nb_rx_poll;\n+\t/* Number of interrupt based queues\n+\t * If nb_rx_intr > 0, the start callback will\n+\t * be invoked if not already invoked.\n+\t */\n+\tuint16_t nb_rx_intr;\n+\t/* Number of queues that use the shared interrupt */\n+\tuint16_t nb_shared_intr;\n \t/* sum(wrr(q)) for all queues within the device\n \t * useful when deleting all device queues\n \t */\n \tuint32_t wrr_len;\n+\t/* Intr based queue index to start polling from, this is used\n+\t * if the number of shared interrupts is non-zero\n+\t */\n+\tuint16_t next_q_idx;\n+\t/* Intr based queue indices */\n+\tuint16_t *intr_queue;\n+\t/* device generates per Rx queue interrupt for queue index\n+\t * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1\n+\t */\n+\tint multi_intr_cap;\n+\t/* shared interrupt enabled */\n+\tint shared_intr_enabled;\n };\n \n /* Per Rx queue */\n struct eth_rx_queue_info {\n \tint queue_enabled;\t/* True if added */\n+\tint intr_enabled;\n \tuint16_t wt;\t\t/* Polling weight */\n \tuint8_t event_queue_id;\t/* Event queue to enqueue packets to */\n \tuint8_t sched_type;\t/* Sched type for events */\n@@ -150,7 +218,7 @@ struct eth_rx_queue_info {\n static inline int\n rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)\n {\n-\treturn rx_adapter->num_rx_polled;\n+\treturn rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;\n }\n \n /* Greatest common divisor */\n@@ -195,6 +263,28 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)\n }\n \n static inline int\n+rxa_shared_intr(struct eth_device_info *dev_info,\n+\tint rx_queue_id)\n+{\n+\tint multi_intr_cap =\n+\t\t\trte_intr_cap_multiple(dev_info->dev->intr_handle);\n+\treturn !multi_intr_cap ||\n+\t\trx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;\n+}\n+\n+static inline int\n+rxa_intr_queue(struct eth_device_info *dev_info,\n+\tint rx_queue_id)\n+{\n+\tstruct eth_rx_queue_info *queue_info;\n+\n+\tqueue_info = &dev_info->rx_queue[rx_queue_id];\n+\treturn dev_info->rx_queue &&\n+\t\t!dev_info->internal_event_port &&\n+\t\tqueue_info->queue_enabled && queue_info->wt == 0;\n+}\n+\n+static inline int\n rxa_polled_queue(struct eth_device_info *dev_info,\n \tint rx_queue_id)\n {\n@@ -206,6 +296,95 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)\n \t\tqueue_info->queue_enabled && queue_info->wt != 0;\n }\n \n+/* Calculate change in number of vectors after Rx queue ID is add/deleted */\n+static int\n+rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)\n+{\n+\tuint16_t i;\n+\tint n, s;\n+\tuint16_t nbq;\n+\n+\tnbq = dev_info->dev->data->nb_rx_queues;\n+\tn = 0; /* non shared count */\n+\ts = 0; /* shared count */\n+\n+\tif (rx_queue_id == -1) {\n+\t\tfor (i = 0; i < nbq; i++) {\n+\t\t\tif (!rxa_shared_intr(dev_info, i))\n+\t\t\t\tn += add ? !rxa_intr_queue(dev_info, i) :\n+\t\t\t\t\trxa_intr_queue(dev_info, i);\n+\t\t\telse\n+\t\t\t\ts += add ? !rxa_intr_queue(dev_info, i) :\n+\t\t\t\t\trxa_intr_queue(dev_info, i);\n+\t\t}\n+\n+\t\tif (s > 0) {\n+\t\t\tif ((add && dev_info->nb_shared_intr == 0) ||\n+\t\t\t\t(!add && dev_info->nb_shared_intr))\n+\t\t\t\tn += 1;\n+\t\t}\n+\t} else {\n+\t\tif (!rxa_shared_intr(dev_info, rx_queue_id))\n+\t\t\tn = add ? !rxa_intr_queue(dev_info, rx_queue_id) :\n+\t\t\t\trxa_intr_queue(dev_info, rx_queue_id);\n+\t\telse\n+\t\t\tn = add ? !dev_info->nb_shared_intr :\n+\t\t\t\tdev_info->nb_shared_intr == 1;\n+\t}\n+\n+\treturn add ? n : -n;\n+}\n+\n+/* Calculate nb_rx_intr after deleting interrupt mode rx queues\n+ */\n+static void\n+rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter,\n+\t\t\tstruct eth_device_info *dev_info,\n+\t\t\tint rx_queue_id,\n+\t\t\tuint32_t *nb_rx_intr)\n+{\n+\tuint32_t intr_diff;\n+\n+\tif (rx_queue_id == -1)\n+\t\tintr_diff = dev_info->nb_rx_intr;\n+\telse\n+\t\tintr_diff = rxa_intr_queue(dev_info, rx_queue_id);\n+\n+\t*nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;\n+}\n+\n+/* Calculate nb_rx_* after adding interrupt mode rx queues, newly added\n+ * interrupt queues could currently be poll mode Rx queues\n+ */\n+static void\n+rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter,\n+\t\t\tstruct eth_device_info *dev_info,\n+\t\t\tint rx_queue_id,\n+\t\t\tuint32_t *nb_rx_poll,\n+\t\t\tuint32_t *nb_rx_intr,\n+\t\t\tuint32_t *nb_wrr)\n+{\n+\tuint32_t intr_diff;\n+\tuint32_t poll_diff;\n+\tuint32_t wrr_len_diff;\n+\n+\tif (rx_queue_id == -1) {\n+\t\tintr_diff = dev_info->dev->data->nb_rx_queues -\n+\t\t\t\t\t\tdev_info->nb_rx_intr;\n+\t\tpoll_diff = dev_info->nb_rx_poll;\n+\t\twrr_len_diff = dev_info->wrr_len;\n+\t} else {\n+\t\tintr_diff = !rxa_intr_queue(dev_info, rx_queue_id);\n+\t\tpoll_diff = rxa_polled_queue(dev_info, rx_queue_id);\n+\t\twrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :\n+\t\t\t\t\t0;\n+\t}\n+\n+\t*nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;\n+\t*nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;\n+\t*nb_wrr = rx_adapter->wrr_len - wrr_len_diff;\n+}\n+\n /* Calculate size of the eth_rx_poll and wrr_sched arrays\n  * after deleting poll mode rx queues\n  */\n@@ -240,17 +419,21 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)\n \t\t\tint rx_queue_id,\n \t\t\tuint16_t wt,\n \t\t\tuint32_t *nb_rx_poll,\n+\t\t\tuint32_t *nb_rx_intr,\n \t\t\tuint32_t *nb_wrr)\n {\n+\tuint32_t intr_diff;\n \tuint32_t poll_diff;\n \tuint32_t wrr_len_diff;\n \n \tif (rx_queue_id == -1) {\n+\t\tintr_diff = dev_info->nb_rx_intr;\n \t\tpoll_diff = dev_info->dev->data->nb_rx_queues -\n \t\t\t\t\t\tdev_info->nb_rx_poll;\n \t\twrr_len_diff = wt*dev_info->dev->data->nb_rx_queues\n \t\t\t\t- dev_info->wrr_len;\n \t} else {\n+\t\tintr_diff = rxa_intr_queue(dev_info, rx_queue_id);\n \t\tpoll_diff = !rxa_polled_queue(dev_info, rx_queue_id);\n \t\twrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?\n \t\t\t\twt - dev_info->rx_queue[rx_queue_id].wt :\n@@ -258,6 +441,7 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)\n \t}\n \n \t*nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;\n+\t*nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;\n \t*nb_wrr = rx_adapter->wrr_len + wrr_len_diff;\n }\n \n@@ -268,10 +452,15 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)\n \t\tint rx_queue_id,\n \t\tuint16_t wt,\n \t\tuint32_t *nb_rx_poll,\n+\t\tuint32_t *nb_rx_intr,\n \t\tuint32_t *nb_wrr)\n {\n-\trxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,\n-\t\t\t\twt, nb_rx_poll, nb_wrr);\n+\tif (wt != 0)\n+\t\trxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,\n+\t\t\t\t\twt, nb_rx_poll, nb_rx_intr, nb_wrr);\n+\telse\n+\t\trxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,\n+\t\t\t\t\tnb_rx_poll, nb_rx_intr, nb_wrr);\n }\n \n /* Calculate nb_rx_* after deleting rx_queue_id */\n@@ -280,10 +469,13 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)\n \t\tstruct eth_device_info *dev_info,\n \t\tint rx_queue_id,\n \t\tuint32_t *nb_rx_poll,\n+\t\tuint32_t *nb_rx_intr,\n \t\tuint32_t *nb_wrr)\n {\n \trxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,\n \t\t\t\tnb_wrr);\n+\trxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,\n+\t\t\t\tnb_rx_intr);\n }\n \n /*\n@@ -622,7 +814,8 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)\n \tuint16_t port_id,\n \tuint16_t queue_id,\n \tuint32_t rx_count,\n-\tuint32_t max_rx)\n+\tuint32_t max_rx,\n+\tint *rxq_empty)\n {\n \tstruct rte_mbuf *mbufs[BATCH_SIZE];\n \tstruct rte_eth_event_enqueue_buffer *buf =\n@@ -632,6 +825,8 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)\n \tuint16_t n;\n \tuint32_t nb_rx = 0;\n \n+\tif (rxq_empty)\n+\t\t*rxq_empty = 0;\n \t/* Don't do a batch dequeue from the rx queue if there isn't\n \t * enough space in the enqueue buffer.\n \t */\n@@ -641,8 +836,11 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)\n \n \t\tstats->rx_poll_count++;\n \t\tn = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);\n-\t\tif (unlikely(!n))\n+\t\tif (unlikely(!n)) {\n+\t\t\tif (rxq_empty)\n+\t\t\t\t*rxq_empty = 1;\n \t\t\tbreak;\n+\t\t}\n \t\trxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);\n \t\tnb_rx += n;\n \t\tif (rx_count + nb_rx > max_rx)\n@@ -655,6 +853,228 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)\n \treturn nb_rx;\n }\n \n+static inline void\n+rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,\n+\t\tvoid *data)\n+{\n+\tuint16_t port_id;\n+\tuint16_t queue;\n+\tint err;\n+\tunion queue_data qd;\n+\tstruct eth_device_info *dev_info;\n+\tstruct eth_rx_queue_info *queue_info;\n+\tint *intr_enabled;\n+\n+\tqd.ptr = data;\n+\tport_id = qd.port;\n+\tqueue = qd.queue;\n+\n+\tdev_info = &rx_adapter->eth_devices[port_id];\n+\tqueue_info = &dev_info->rx_queue[queue];\n+\trte_spinlock_lock(&rx_adapter->intr_ring_lock);\n+\tif (rxa_shared_intr(dev_info, queue))\n+\t\tintr_enabled = &dev_info->shared_intr_enabled;\n+\telse\n+\t\tintr_enabled = &queue_info->intr_enabled;\n+\n+\tif (*intr_enabled) {\n+\t\t*intr_enabled = 0;\n+\t\terr = rte_ring_enqueue(rx_adapter->intr_ring, data);\n+\t\t/* Entry should always be available.\n+\t\t * The ring size equals the maximum number of interrupt\n+\t\t * vectors supported (an interrupt vector is shared in\n+\t\t * case of shared interrupts)\n+\t\t */\n+\t\tif (err)\n+\t\t\tRTE_EDEV_LOG_ERR(\"Failed to enqueue interrupt\"\n+\t\t\t\t\" to ring: %s\", strerror(err));\n+\t\telse\n+\t\t\trte_eth_dev_rx_intr_disable(port_id, queue);\n+\t}\n+\trte_spinlock_unlock(&rx_adapter->intr_ring_lock);\n+}\n+\n+static int\n+rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter,\n+\t\t\tuint32_t num_intr_vec)\n+{\n+\tif (rx_adapter->num_intr_vec + num_intr_vec >\n+\t\t\t\tRTE_EVENT_ETH_INTR_RING_SIZE) {\n+\t\tRTE_EDEV_LOG_ERR(\"Exceeded intr ring slots current\"\n+\t\t\" %d needed %d limit %d\", rx_adapter->num_intr_vec,\n+\t\tnum_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);\n+\t\treturn -ENOSPC;\n+\t}\n+\n+\treturn 0;\n+}\n+\n+/* Delete entries for (dev, queue) from the interrupt ring */\n+static void\n+rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter,\n+\t\t\tstruct eth_device_info *dev_info,\n+\t\t\tuint16_t rx_queue_id)\n+{\n+\tint i, n;\n+\tunion queue_data qd;\n+\n+\trte_spinlock_lock(&rx_adapter->intr_ring_lock);\n+\n+\tn = rte_ring_count(rx_adapter->intr_ring);\n+\tfor (i = 0; i < n; i++) {\n+\t\trte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);\n+\t\tif (!rxa_shared_intr(dev_info, rx_queue_id)) {\n+\t\t\tif (qd.port == dev_info->dev->data->port_id &&\n+\t\t\t\tqd.queue == rx_queue_id)\n+\t\t\t\tcontinue;\n+\t\t} else {\n+\t\t\tif (qd.port == dev_info->dev->data->port_id)\n+\t\t\t\tcontinue;\n+\t\t}\n+\t\trte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);\n+\t}\n+\n+\trte_spinlock_unlock(&rx_adapter->intr_ring_lock);\n+}\n+\n+/* pthread callback handling interrupt mode receive queues\n+ * After receiving an Rx interrupt, it enqueues the port id and queue id of the\n+ * interrupting queue to the adapter's ring buffer for interrupt events.\n+ * These events are picked up by rxa_intr_ring_dequeue() which is invoked from\n+ * the adapter service function.\n+ */\n+static void *\n+rxa_intr_thread(void *arg)\n+{\n+\tstruct rte_event_eth_rx_adapter *rx_adapter = arg;\n+\tstruct rte_epoll_event *epoll_events = rx_adapter->epoll_events;\n+\tint n, i;\n+\n+\twhile (1) {\n+\t\tn = rte_epoll_wait(rx_adapter->epd, epoll_events,\n+\t\t\t\tRTE_EVENT_ETH_INTR_RING_SIZE + 1, -1);\n+\t\tif (unlikely(n < 0))\n+\t\t\tRTE_EDEV_LOG_ERR(\"rte_epoll_wait returned error %d\",\n+\t\t\t\t\tn);\n+\t\tfor (i = 0; i < n; i++) {\n+\t\t\trxa_intr_ring_enqueue(rx_adapter,\n+\t\t\t\t\tepoll_events[i].epdata.data);\n+\t\t}\n+\t}\n+\n+\treturn NULL;\n+}\n+\n+/* Dequeue <port, q> from interrupt ring and enqueue received\n+ * mbufs to eventdev\n+ */\n+static inline uint32_t\n+rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)\n+{\n+\tuint32_t n;\n+\tuint32_t nb_rx = 0;\n+\tint rxq_empty;\n+\tstruct rte_eth_event_enqueue_buffer *buf;\n+\trte_spinlock_t *ring_lock;\n+\tuint8_t max_done = 0;\n+\n+\tif (rx_adapter->num_rx_intr == 0)\n+\t\treturn 0;\n+\n+\tif (rte_ring_count(rx_adapter->intr_ring) == 0\n+\t\t&& !rx_adapter->qd_valid)\n+\t\treturn 0;\n+\n+\tbuf = &rx_adapter->event_enqueue_buffer;\n+\tring_lock = &rx_adapter->intr_ring_lock;\n+\n+\tif (buf->count >= BATCH_SIZE)\n+\t\trxa_flush_event_buffer(rx_adapter);\n+\n+\twhile (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {\n+\t\tstruct eth_device_info *dev_info;\n+\t\tuint16_t port;\n+\t\tuint16_t queue;\n+\t\tunion queue_data qd  = rx_adapter->qd;\n+\t\tint err;\n+\n+\t\tif (!rx_adapter->qd_valid) {\n+\t\t\tstruct eth_rx_queue_info *queue_info;\n+\n+\t\t\trte_spinlock_lock(ring_lock);\n+\t\t\terr = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);\n+\t\t\tif (err) {\n+\t\t\t\trte_spinlock_unlock(ring_lock);\n+\t\t\t\tbreak;\n+\t\t\t}\n+\n+\t\t\tport = qd.port;\n+\t\t\tqueue = qd.queue;\n+\t\t\trx_adapter->qd = qd;\n+\t\t\trx_adapter->qd_valid = 1;\n+\t\t\tdev_info = &rx_adapter->eth_devices[port];\n+\t\t\tif (rxa_shared_intr(dev_info, queue))\n+\t\t\t\tdev_info->shared_intr_enabled = 1;\n+\t\t\telse {\n+\t\t\t\tqueue_info = &dev_info->rx_queue[queue];\n+\t\t\t\tqueue_info->intr_enabled = 1;\n+\t\t\t}\n+\t\t\trte_eth_dev_rx_intr_enable(port, queue);\n+\t\t\trte_spinlock_unlock(ring_lock);\n+\t\t} else {\n+\t\t\tport = qd.port;\n+\t\t\tqueue = qd.queue;\n+\n+\t\t\tdev_info = &rx_adapter->eth_devices[port];\n+\t\t}\n+\n+\t\tif (rxa_shared_intr(dev_info, queue)) {\n+\t\t\tuint16_t i;\n+\t\t\tuint16_t nb_queues;\n+\n+\t\t\tnb_queues = dev_info->dev->data->nb_rx_queues;\n+\t\t\tn = 0;\n+\t\t\tfor (i = dev_info->next_q_idx; i < nb_queues; i++) {\n+\t\t\t\tuint8_t enq_buffer_full;\n+\n+\t\t\t\tif (!rxa_intr_queue(dev_info, i))\n+\t\t\t\t\tcontinue;\n+\t\t\t\tn = rxa_eth_rx(rx_adapter, port, i, nb_rx,\n+\t\t\t\t\trx_adapter->max_nb_rx,\n+\t\t\t\t\t&rxq_empty);\n+\t\t\t\tnb_rx += n;\n+\n+\t\t\t\tenq_buffer_full = !rxq_empty && n == 0;\n+\t\t\t\tmax_done = nb_rx > rx_adapter->max_nb_rx;\n+\n+\t\t\t\tif (enq_buffer_full || max_done) {\n+\t\t\t\t\tdev_info->next_q_idx = i;\n+\t\t\t\t\tgoto done;\n+\t\t\t\t}\n+\t\t\t}\n+\n+\t\t\trx_adapter->qd_valid = 0;\n+\n+\t\t\t/* Reinitialize for next interrupt */\n+\t\t\tdev_info->next_q_idx = dev_info->multi_intr_cap ?\n+\t\t\t\t\t\tRTE_MAX_RXTX_INTR_VEC_ID - 1 :\n+\t\t\t\t\t\t0;\n+\t\t} else {\n+\t\t\tn = rxa_eth_rx(rx_adapter, port, queue, nb_rx,\n+\t\t\t\trx_adapter->max_nb_rx,\n+\t\t\t\t&rxq_empty);\n+\t\t\trx_adapter->qd_valid = !rxq_empty;\n+\t\t\tnb_rx += n;\n+\t\t\tif (nb_rx > rx_adapter->max_nb_rx)\n+\t\t\t\tbreak;\n+\t\t}\n+\t}\n+\n+done:\n+\trx_adapter->stats.rx_intr_packets += nb_rx;\n+\treturn nb_rx;\n+}\n+\n /*\n  * Polls receive queues added to the event adapter and enqueues received\n  * packets to the event device.\n@@ -668,7 +1088,7 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)\n  * the hypervisor's switching layer where adjustments can be made to deal with\n  * it.\n  */\n-static inline void\n+static inline uint32_t\n rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)\n {\n \tuint32_t num_queue;\n@@ -676,7 +1096,6 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)\n \tstruct rte_eth_event_enqueue_buffer *buf;\n \tuint32_t wrr_pos;\n \tuint32_t max_nb_rx;\n-\tstruct rte_event_eth_rx_adapter_stats *stats;\n \n \twrr_pos = rx_adapter->wrr_pos;\n \tmax_nb_rx = rx_adapter->max_nb_rx;\n@@ -696,10 +1115,11 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)\n \t\t\trxa_flush_event_buffer(rx_adapter);\n \t\tif (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {\n \t\t\trx_adapter->wrr_pos = wrr_pos;\n-\t\t\tbreak;\n+\t\t\treturn nb_rx;\n \t\t}\n \n-\t\tnb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx);\n+\t\tnb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,\n+\t\t\t\tNULL);\n \t\tif (nb_rx > max_nb_rx) {\n \t\t\trx_adapter->wrr_pos =\n \t\t\t\t    (wrr_pos + 1) % rx_adapter->wrr_len;\n@@ -709,14 +1129,14 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)\n \t\tif (++wrr_pos == rx_adapter->wrr_len)\n \t\t\twrr_pos = 0;\n \t}\n-\n-\tstats->rx_packets += nb_rx;\n+\treturn nb_rx;\n }\n \n static int\n rxa_service_func(void *args)\n {\n \tstruct rte_event_eth_rx_adapter *rx_adapter = args;\n+\tstruct rte_event_eth_rx_adapter_stats *stats;\n \n \tif (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)\n \t\treturn 0;\n@@ -724,7 +1144,10 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)\n \t\treturn 0;\n \t\trte_spinlock_unlock(&rx_adapter->rx_lock);\n \t}\n-\trxa_poll(rx_adapter);\n+\n+\tstats = &rx_adapter->stats;\n+\tstats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);\n+\tstats->rx_packets += rxa_poll(rx_adapter);\n \trte_spinlock_unlock(&rx_adapter->rx_lock);\n \treturn 0;\n }\n@@ -809,6 +1232,339 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)\n }\n \n static int\n+rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter)\n+{\n+\tif (rx_adapter->epd != INIT_FD)\n+\t\treturn 0;\n+\n+\trx_adapter->epd = epoll_create1(EPOLL_CLOEXEC);\n+\tif (rx_adapter->epd < 0) {\n+\t\trx_adapter->epd = INIT_FD;\n+\t\tRTE_EDEV_LOG_ERR(\"epoll_create1() failed, err %d\", errno);\n+\t\treturn -errno;\n+\t}\n+\n+\treturn 0;\n+}\n+\n+static int\n+rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)\n+{\n+\tint err;\n+\tchar thread_name[RTE_MAX_THREAD_NAME_LEN];\n+\n+\tif (rx_adapter->intr_ring)\n+\t\treturn 0;\n+\n+\trx_adapter->intr_ring = rte_ring_create(\"intr_ring\",\n+\t\t\t\t\tRTE_EVENT_ETH_INTR_RING_SIZE,\n+\t\t\t\t\trte_socket_id(), 0);\n+\tif (!rx_adapter->intr_ring)\n+\t\treturn -ENOMEM;\n+\n+\trx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,\n+\t\t\t\t\t(RTE_EVENT_ETH_INTR_RING_SIZE + 1) *\n+\t\t\t\t\tsizeof(struct rte_epoll_event),\n+\t\t\t\t\tRTE_CACHE_LINE_SIZE,\n+\t\t\t\t\trx_adapter->socket_id);\n+\tif (!rx_adapter->epoll_events) {\n+\t\terr = -ENOMEM;\n+\t\tgoto error;\n+\t}\n+\n+\trte_spinlock_init(&rx_adapter->intr_ring_lock);\n+\n+\tsnprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,\n+\t\t\t\"rx-intr-thread-%d\", rx_adapter->id);\n+\n+\terr = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,\n+\t\t\t\tNULL, rxa_intr_thread, rx_adapter);\n+\tif (!err) {\n+\t\trte_thread_setname(rx_adapter->rx_intr_thread, thread_name);\n+\t\treturn 0;\n+\t}\n+\n+\tRTE_EDEV_LOG_ERR(\"Failed to create interrupt thread err = %d\\n\", err);\n+error:\n+\trte_ring_free(rx_adapter->intr_ring);\n+\trx_adapter->intr_ring = NULL;\n+\trx_adapter->epoll_events = NULL;\n+\treturn err;\n+}\n+\n+static int\n+rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)\n+{\n+\tint err;\n+\n+\trx_adapter->stop_thread = 1;\n+\terr = pthread_cancel(rx_adapter->rx_intr_thread);\n+\tif (err)\n+\t\tRTE_EDEV_LOG_ERR(\"Can't cancel interrupt thread err = %d\\n\",\n+\t\t\t\terr);\n+\n+\terr = pthread_join(rx_adapter->rx_intr_thread, NULL);\n+\tif (err)\n+\t\tRTE_EDEV_LOG_ERR(\"Can't join interrupt thread err = %d\\n\", err);\n+\n+\trx_adapter->stop_thread = 0;\n+\trte_free(rx_adapter->epoll_events);\n+\trte_ring_free(rx_adapter->intr_ring);\n+\trx_adapter->intr_ring = NULL;\n+\trx_adapter->epoll_events = NULL;\n+\treturn 0;\n+}\n+\n+static int\n+rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter)\n+{\n+\tint ret;\n+\n+\tif (rx_adapter->num_rx_intr == 0)\n+\t\treturn 0;\n+\n+\tret = rxa_destroy_intr_thread(rx_adapter);\n+\tif (ret)\n+\t\treturn ret;\n+\n+\tclose(rx_adapter->epd);\n+\trx_adapter->epd = INIT_FD;\n+\n+\treturn ret;\n+}\n+\n+static int\n+rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter,\n+\tstruct eth_device_info *dev_info,\n+\tuint16_t rx_queue_id)\n+{\n+\tint err;\n+\tuint16_t eth_dev_id = dev_info->dev->data->port_id;\n+\tint sintr = rxa_shared_intr(dev_info, rx_queue_id);\n+\n+\terr = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);\n+\tif (err) {\n+\t\tRTE_EDEV_LOG_ERR(\"Could not disable interrupt for Rx queue %u\",\n+\t\t\trx_queue_id);\n+\t\treturn err;\n+\t}\n+\n+\terr = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,\n+\t\t\t\t\trx_adapter->epd,\n+\t\t\t\t\tRTE_INTR_EVENT_DEL,\n+\t\t\t\t\t0);\n+\tif (err)\n+\t\tRTE_EDEV_LOG_ERR(\"Interrupt event deletion failed %d\", err);\n+\n+\tif (sintr)\n+\t\tdev_info->rx_queue[rx_queue_id].intr_enabled = 0;\n+\telse\n+\t\tdev_info->shared_intr_enabled = 0;\n+\treturn err;\n+}\n+\n+static int\n+rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,\n+\t\tstruct eth_device_info *dev_info,\n+\t\tint rx_queue_id)\n+{\n+\tint err;\n+\tint i;\n+\tint s;\n+\n+\tif (dev_info->nb_rx_intr == 0)\n+\t\treturn 0;\n+\n+\terr = 0;\n+\tif (rx_queue_id == -1) {\n+\t\ts = dev_info->nb_shared_intr;\n+\t\tfor (i = 0; i < dev_info->nb_rx_intr; i++) {\n+\t\t\tint sintr;\n+\t\t\tuint16_t q;\n+\n+\t\t\tq = dev_info->intr_queue[i];\n+\t\t\tsintr = rxa_shared_intr(dev_info, q);\n+\t\t\ts -= sintr;\n+\n+\t\t\tif (!sintr || s == 0) {\n+\n+\t\t\t\terr = rxa_disable_intr(rx_adapter, dev_info,\n+\t\t\t\t\t\tq);\n+\t\t\t\tif (err)\n+\t\t\t\t\treturn err;\n+\t\t\t\trxa_intr_ring_del_entries(rx_adapter, dev_info,\n+\t\t\t\t\t\t\tq);\n+\t\t\t}\n+\t\t}\n+\t} else {\n+\t\tif (!rxa_intr_queue(dev_info, rx_queue_id))\n+\t\t\treturn 0;\n+\t\tif (!rxa_shared_intr(dev_info, rx_queue_id) ||\n+\t\t\t\tdev_info->nb_shared_intr == 1) {\n+\t\t\terr = rxa_disable_intr(rx_adapter, dev_info,\n+\t\t\t\t\trx_queue_id);\n+\t\t\tif (err)\n+\t\t\t\treturn err;\n+\t\t\trxa_intr_ring_del_entries(rx_adapter, dev_info,\n+\t\t\t\t\t\trx_queue_id);\n+\t\t}\n+\n+\t\tfor (i = 0; i < dev_info->nb_rx_intr; i++) {\n+\t\t\tif (dev_info->intr_queue[i] == rx_queue_id) {\n+\t\t\t\tfor (; i < dev_info->nb_rx_intr - 1; i++)\n+\t\t\t\t\tdev_info->intr_queue[i] =\n+\t\t\t\t\t\tdev_info->intr_queue[i + 1];\n+\t\t\t\tbreak;\n+\t\t\t}\n+\t\t}\n+\t}\n+\n+\treturn err;\n+}\n+\n+static int\n+rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter,\n+\tstruct eth_device_info *dev_info,\n+\tuint16_t rx_queue_id)\n+{\n+\tint err, err1;\n+\tuint16_t eth_dev_id = dev_info->dev->data->port_id;\n+\tunion queue_data qd;\n+\tint init_fd;\n+\tuint16_t *intr_queue;\n+\tint sintr = rxa_shared_intr(dev_info, rx_queue_id);\n+\n+\tif (rxa_intr_queue(dev_info, rx_queue_id))\n+\t\treturn 0;\n+\n+\tintr_queue = dev_info->intr_queue;\n+\tif (dev_info->intr_queue == NULL) {\n+\t\tsize_t len =\n+\t\t\tdev_info->dev->data->nb_rx_queues * sizeof(uint16_t);\n+\t\tdev_info->intr_queue =\n+\t\t\trte_zmalloc_socket(\n+\t\t\t\trx_adapter->mem_name,\n+\t\t\t\tlen,\n+\t\t\t\t0,\n+\t\t\t\trx_adapter->socket_id);\n+\t\tif (dev_info->intr_queue == NULL)\n+\t\t\treturn -ENOMEM;\n+\t}\n+\n+\tinit_fd = rx_adapter->epd;\n+\terr = rxa_init_epd(rx_adapter);\n+\tif (err)\n+\t\tgoto err_free_queue;\n+\n+\tqd.port = eth_dev_id;\n+\tqd.queue = rx_queue_id;\n+\n+\terr = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,\n+\t\t\t\t\trx_adapter->epd,\n+\t\t\t\t\tRTE_INTR_EVENT_ADD,\n+\t\t\t\t\tqd.ptr);\n+\tif (err) {\n+\t\tRTE_EDEV_LOG_ERR(\"Failed to add interrupt event for\"\n+\t\t\t\" Rx Queue %u err %d\", rx_queue_id, err);\n+\t\tgoto err_del_fd;\n+\t}\n+\n+\terr = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);\n+\tif (err) {\n+\t\tRTE_EDEV_LOG_ERR(\"Could not enable interrupt for\"\n+\t\t\t\t\" Rx Queue %u err %d\", rx_queue_id, err);\n+\n+\t\tgoto err_del_event;\n+\t}\n+\n+\terr = rxa_create_intr_thread(rx_adapter);\n+\tif (!err)  {\n+\t\tif (sintr)\n+\t\t\tdev_info->shared_intr_enabled = 1;\n+\t\telse\n+\t\t\tdev_info->rx_queue[rx_queue_id].intr_enabled = 1;\n+\t\treturn 0;\n+\t}\n+\n+\n+\terr = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);\n+\tif (err)\n+\t\tRTE_EDEV_LOG_ERR(\"Could not disable interrupt for\"\n+\t\t\t\t\" Rx Queue %u err %d\", rx_queue_id, err);\n+err_del_event:\n+\terr1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,\n+\t\t\t\t\trx_adapter->epd,\n+\t\t\t\t\tRTE_INTR_EVENT_DEL,\n+\t\t\t\t\t0);\n+\tif (err1) {\n+\t\tRTE_EDEV_LOG_ERR(\"Could not delete event for\"\n+\t\t\t\t\" Rx Queue %u err %d\", rx_queue_id, err1);\n+\t}\n+err_del_fd:\n+\tif (init_fd == INIT_FD) {\n+\t\tclose(rx_adapter->epd);\n+\t\trx_adapter->epd = -1;\n+\t}\n+err_free_queue:\n+\tif (intr_queue == NULL)\n+\t\trte_free(dev_info->intr_queue);\n+\n+\treturn err;\n+}\n+\n+static int\n+rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,\n+\tstruct eth_device_info *dev_info,\n+\tint rx_queue_id)\n+\n+{\n+\tint i, j, err;\n+\tint si = -1;\n+\tint shared_done = (dev_info->nb_shared_intr > 0);\n+\n+\tif (rx_queue_id != -1) {\n+\t\tif (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)\n+\t\t\treturn 0;\n+\t\treturn rxa_config_intr(rx_adapter, dev_info, rx_queue_id);\n+\t}\n+\n+\terr = 0;\n+\tfor (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {\n+\n+\t\tif (rxa_shared_intr(dev_info, i) && shared_done)\n+\t\t\tcontinue;\n+\n+\t\terr = rxa_config_intr(rx_adapter, dev_info, i);\n+\n+\t\tshared_done = err == 0 && rxa_shared_intr(dev_info, i);\n+\t\tif (shared_done) {\n+\t\t\tsi = i;\n+\t\t\tdev_info->shared_intr_enabled = 1;\n+\t\t}\n+\t\tif (err)\n+\t\t\tbreak;\n+\t}\n+\n+\tif (err == 0)\n+\t\treturn 0;\n+\n+\tshared_done = (dev_info->nb_shared_intr > 0);\n+\tfor (j = 0; j < i; j++) {\n+\t\tif (rxa_intr_queue(dev_info, j))\n+\t\t\tcontinue;\n+\t\tif (rxa_shared_intr(dev_info, j) && si != j)\n+\t\t\tcontinue;\n+\t\terr = rxa_disable_intr(rx_adapter, dev_info, j);\n+\t\tif (err)\n+\t\t\tbreak;\n+\n+\t}\n+\n+\treturn err;\n+}\n+\n+\n+static int\n rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)\n {\n \tint ret;\n@@ -843,6 +1599,7 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)\n \trx_adapter->event_port_id = rx_adapter_conf.event_port_id;\n \trx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;\n \trx_adapter->service_inited = 1;\n+\trx_adapter->epd = INIT_FD;\n \treturn 0;\n \n err_done:\n@@ -886,6 +1643,9 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)\n \tint32_t rx_queue_id)\n {\n \tint pollq;\n+\tint intrq;\n+\tint sintrq;\n+\n \n \tif (rx_adapter->nb_queues == 0)\n \t\treturn;\n@@ -901,9 +1661,14 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)\n \t}\n \n \tpollq = rxa_polled_queue(dev_info, rx_queue_id);\n+\tintrq = rxa_intr_queue(dev_info, rx_queue_id);\n+\tsintrq = rxa_shared_intr(dev_info, rx_queue_id);\n \trxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);\n \trx_adapter->num_rx_polled -= pollq;\n \tdev_info->nb_rx_poll -= pollq;\n+\trx_adapter->num_rx_intr -= intrq;\n+\tdev_info->nb_rx_intr -= intrq;\n+\tdev_info->nb_shared_intr -= intrq && sintrq;\n }\n \n static void\n@@ -915,6 +1680,8 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)\n \tstruct eth_rx_queue_info *queue_info;\n \tconst struct rte_event *ev = &conf->ev;\n \tint pollq;\n+\tint intrq;\n+\tint sintrq;\n \n \tif (rx_queue_id == -1) {\n \t\tuint16_t nb_rx_queues;\n@@ -927,6 +1694,8 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)\n \t}\n \n \tpollq = rxa_polled_queue(dev_info, rx_queue_id);\n+\tintrq = rxa_intr_queue(dev_info, rx_queue_id);\n+\tsintrq = rxa_shared_intr(dev_info, rx_queue_id);\n \n \tqueue_info = &dev_info->rx_queue[rx_queue_id];\n \tqueue_info->event_queue_id = ev->queue_id;\n@@ -944,6 +1713,24 @@ static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)\n \tif (rxa_polled_queue(dev_info, rx_queue_id)) {\n \t\trx_adapter->num_rx_polled += !pollq;\n \t\tdev_info->nb_rx_poll += !pollq;\n+\t\trx_adapter->num_rx_intr -= intrq;\n+\t\tdev_info->nb_rx_intr -= intrq;\n+\t\tdev_info->nb_shared_intr -= intrq && sintrq;\n+\t}\n+\n+\tif (rxa_intr_queue(dev_info, rx_queue_id)) {\n+\t\trx_adapter->num_rx_polled -= pollq;\n+\t\tdev_info->nb_rx_poll -= pollq;\n+\t\trx_adapter->num_rx_intr += !intrq;\n+\t\tdev_info->nb_rx_intr += !intrq;\n+\t\tdev_info->nb_shared_intr += !intrq && sintrq;\n+\t\tif (dev_info->nb_shared_intr == 1) {\n+\t\t\tif (dev_info->multi_intr_cap)\n+\t\t\t\tdev_info->next_q_idx =\n+\t\t\t\t\tRTE_MAX_RXTX_INTR_VEC_ID - 1;\n+\t\t\telse\n+\t\t\t\tdev_info->next_q_idx = 0;\n+\t\t}\n \t}\n }\n \n@@ -960,24 +1747,24 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,\n \tuint32_t *rx_wrr;\n \tuint16_t nb_rx_queues;\n \tuint32_t nb_rx_poll, nb_wrr;\n+\tuint32_t nb_rx_intr;\n+\tint num_intr_vec;\n+\tuint16_t wt;\n \n \tif (queue_conf->servicing_weight == 0) {\n-\n \t\tstruct rte_eth_dev_data *data = dev_info->dev->data;\n-\t\tif (data->dev_conf.intr_conf.rxq) {\n-\t\t\tRTE_EDEV_LOG_ERR(\"Interrupt driven queues\"\n-\t\t\t\t\t\" not supported\");\n-\t\t\treturn -ENOTSUP;\n-\t\t}\n-\t\ttemp_conf = *queue_conf;\n \n-\t\t/* If Rx interrupts are disabled set wt = 1 */\n-\t\ttemp_conf.servicing_weight = 1;\n+\t\ttemp_conf = *queue_conf;\n+\t\tif (!data->dev_conf.intr_conf.rxq) {\n+\t\t\t/* If Rx interrupts are disabled set wt = 1 */\n+\t\t\ttemp_conf.servicing_weight = 1;\n+\t\t}\n \t\tqueue_conf = &temp_conf;\n \t}\n \n \tnb_rx_queues = dev_info->dev->data->nb_rx_queues;\n \trx_queue = dev_info->rx_queue;\n+\twt = queue_conf->servicing_weight;\n \n \tif (dev_info->rx_queue == NULL) {\n \t\tdev_info->rx_queue =\n@@ -993,13 +1780,64 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,\n \n \trxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,\n \t\t\tqueue_conf->servicing_weight,\n-\t\t\t&nb_rx_poll, &nb_wrr);\n+\t\t\t&nb_rx_poll, &nb_rx_intr, &nb_wrr);\n+\n+\tdev_info->multi_intr_cap =\n+\t\t\trte_intr_cap_multiple(dev_info->dev->intr_handle);\n \n \tret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,\n \t\t\t\t&rx_poll, &rx_wrr);\n \tif (ret)\n \t\tgoto err_free_rxqueue;\n \n+\tif (wt == 0) {\n+\t\tnum_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);\n+\n+\t\tret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);\n+\t\tif (ret)\n+\t\t\tgoto err_free_rxqueue;\n+\n+\t\tret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);\n+\t\tif (ret)\n+\t\t\tgoto err_free_rxqueue;\n+\t} else {\n+\n+\t\tnum_intr_vec = 0;\n+\t\tif (rx_adapter->num_rx_intr > nb_rx_intr) {\n+\t\t\tnum_intr_vec = rxa_nb_intr_vect(dev_info,\n+\t\t\t\t\t\trx_queue_id, 0);\n+\t\t\t/* interrupt based queues are being converted to\n+\t\t\t * poll mode queues, delete the interrupt configuration\n+\t\t\t * for those.\n+\t\t\t */\n+\t\t\tret = rxa_del_intr_queue(rx_adapter,\n+\t\t\t\t\t\tdev_info, rx_queue_id);\n+\t\t\tif (ret)\n+\t\t\t\tgoto err_free_rxqueue;\n+\t\t}\n+\t}\n+\n+\tif (nb_rx_intr == 0) {\n+\t\tret = rxa_free_intr_resources(rx_adapter);\n+\t\tif (ret)\n+\t\t\tgoto err_free_rxqueue;\n+\t}\n+\n+\tif (wt == 0) {\n+\t\tuint16_t i;\n+\n+\t\tif (rx_queue_id  == -1) {\n+\t\t\tfor (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)\n+\t\t\t\tdev_info->intr_queue[i] = i;\n+\t\t} else {\n+\t\t\tif (!rxa_intr_queue(dev_info, rx_queue_id))\n+\t\t\t\tdev_info->intr_queue[nb_rx_intr - 1] =\n+\t\t\t\t\trx_queue_id;\n+\t\t}\n+\t}\n+\n+\n+\n \trxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);\n \trxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);\n \n@@ -1009,6 +1847,7 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,\n \trx_adapter->eth_rx_poll = rx_poll;\n \trx_adapter->wrr_sched = rx_wrr;\n \trx_adapter->wrr_len = nb_wrr;\n+\trx_adapter->num_intr_vec += num_intr_vec;\n \treturn 0;\n \n err_free_rxqueue:\n@@ -1119,6 +1958,7 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,\n \trx_adapter->socket_id = socket_id;\n \trx_adapter->conf_cb = conf_cb;\n \trx_adapter->conf_arg = conf_arg;\n+\trx_adapter->id = id;\n \tstrcpy(rx_adapter->mem_name, mem_name);\n \trx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,\n \t\t\t\t\t/* FIXME: incompatible with hotplug */\n@@ -1302,8 +2142,10 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,\n \tuint32_t cap;\n \tuint32_t nb_rx_poll = 0;\n \tuint32_t nb_wrr = 0;\n+\tuint32_t nb_rx_intr;\n \tstruct eth_rx_poll_entry *rx_poll = NULL;\n \tuint32_t *rx_wrr = NULL;\n+\tint num_intr_vec;\n \n \tRTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);\n \tRTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);\n@@ -1346,29 +2188,59 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,\n \t\t}\n \t} else {\n \t\trxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,\n-\t\t\t&nb_rx_poll, &nb_wrr);\n+\t\t\t&nb_rx_poll, &nb_rx_intr, &nb_wrr);\n+\n \t\tret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,\n \t\t\t&rx_poll, &rx_wrr);\n \t\tif (ret)\n \t\t\treturn ret;\n \n \t\trte_spinlock_lock(&rx_adapter->rx_lock);\n+\n+\t\tnum_intr_vec = 0;\n+\t\tif (rx_adapter->num_rx_intr > nb_rx_intr) {\n+\n+\t\t\tnum_intr_vec = rxa_nb_intr_vect(dev_info,\n+\t\t\t\t\t\trx_queue_id, 0);\n+\t\t\tret = rxa_del_intr_queue(rx_adapter, dev_info,\n+\t\t\t\t\trx_queue_id);\n+\t\t\tif (ret)\n+\t\t\t\tgoto unlock_ret;\n+\t\t}\n+\n+\t\tif (nb_rx_intr == 0) {\n+\t\t\tret = rxa_free_intr_resources(rx_adapter);\n+\t\t\tif (ret)\n+\t\t\t\tgoto unlock_ret;\n+\t\t}\n+\n \t\trxa_sw_del(rx_adapter, dev_info, rx_queue_id);\n \t\trxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);\n \n \t\trte_free(rx_adapter->eth_rx_poll);\n \t\trte_free(rx_adapter->wrr_sched);\n \n+\t\tif (nb_rx_intr == 0) {\n+\t\t\trte_free(dev_info->intr_queue);\n+\t\t\tdev_info->intr_queue = NULL;\n+\t\t}\n+\n \t\trx_adapter->eth_rx_poll = rx_poll;\n-\t\trx_adapter->num_rx_polled = nb_rx_poll;\n \t\trx_adapter->wrr_sched = rx_wrr;\n \t\trx_adapter->wrr_len = nb_wrr;\n+\t\trx_adapter->num_intr_vec += num_intr_vec;\n \n \t\tif (dev_info->nb_dev_queues == 0) {\n \t\t\trte_free(dev_info->rx_queue);\n \t\t\tdev_info->rx_queue = NULL;\n \t\t}\n+unlock_ret:\n \t\trte_spinlock_unlock(&rx_adapter->rx_lock);\n+\t\tif (ret) {\n+\t\t\trte_free(rx_poll);\n+\t\t\trte_free(rx_wrr);\n+\t\t\treturn ret;\n+\t\t}\n \n \t\trte_service_component_runstate_set(rx_adapter->service_id,\n \t\t\t\trxa_sw_adapter_queue_count(rx_adapter));\n@@ -1377,7 +2249,6 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,\n \treturn ret;\n }\n \n-\n int\n rte_event_eth_rx_adapter_start(uint8_t id)\n {\ndiff --git a/doc/guides/prog_guide/event_ethernet_rx_adapter.rst b/doc/guides/prog_guide/event_ethernet_rx_adapter.rst\nindex 319e4f0..2f055ec 100644\n--- a/doc/guides/prog_guide/event_ethernet_rx_adapter.rst\n+++ b/doc/guides/prog_guide/event_ethernet_rx_adapter.rst\n@@ -144,3 +144,27 @@ enqueued event counts are a sum of the counts from the eventdev PMD callbacks\n if the callback is supported, and the counts maintained by the service function,\n if one exists. The service function also maintains a count of cycles for which\n it was not able to enqueue to the event device.\n+\n+Interrupt Based Rx Queues\n+~~~~~~~~~~~~~~~~~~~~~~~~~~\n+\n+The service core function is typically set up to poll ethernet Rx queues for\n+packets. Certain queues may have low packet rates and it would be more\n+efficient to enable the Rx queue interrupt and read packets after receiving\n+the interrupt.\n+\n+The servicing_weight member of struct rte_event_eth_rx_adapter_queue_conf\n+is applicable when the adapter uses a service core function. The application\n+has to enable Rx queue interrupts when configuring the ethernet device\n+uing the ``rte_eth_dev_configue()`` function and then use a servicing_weight\n+of zero when addding the Rx queue to the adapter.\n+\n+The adapter creates a thread blocked on the interrupt, on an interrupt this\n+thread enqueues the port id and the queue id to a ring buffer. The adapter\n+service function dequeues the port id and queue id from the ring buffer,\n+invokes the ``rte_eth_rx_burst()`` to receive packets on the queue and\n+converts the received packets to events in the same manner as packets\n+received on a polled Rx queue. The interrupt thread is affinitized to the same\n+CPUs as the lcores of the Rx adapter service function, if the Rx adapter\n+service function has not been mapped to any lcores, the interrupt thread\n+is mapped to the master lcore.\ndiff --git a/config/common_base b/config/common_base\nindex fcf3a1f..3cb5edd 100644\n--- a/config/common_base\n+++ b/config/common_base\n@@ -597,6 +597,7 @@ CONFIG_RTE_LIBRTE_EVENTDEV_DEBUG=n\n CONFIG_RTE_EVENT_MAX_DEVS=16\n CONFIG_RTE_EVENT_MAX_QUEUES_PER_DEV=64\n CONFIG_RTE_EVENT_TIMER_ADAPTER_NUM_MAX=32\n+CONFIG_RTE_EVENT_ETH_INTR_RING_SIZE=1024\n CONFIG_RTE_EVENT_CRYPTO_ADAPTER_MAX_INSTANCE=32\n \n #\ndiff --git a/lib/librte_eventdev/Makefile b/lib/librte_eventdev/Makefile\nindex b3e2546..24af956 100644\n--- a/lib/librte_eventdev/Makefile\n+++ b/lib/librte_eventdev/Makefile\n@@ -8,7 +8,7 @@ include $(RTE_SDK)/mk/rte.vars.mk\n LIB = librte_eventdev.a\n \n # library version\n-LIBABIVER := 4\n+LIBABIVER := 5\n \n # build flags\n CFLAGS += -DALLOW_EXPERIMENTAL_API\n",
    "prefixes": [
        "v2",
        "4/5"
    ]
}