[v5,4/8] eventdev: add Rx adapter event vector support
Checks
Commit Message
From: Pavan Nikhilesh <pbhagavatula@marvell.com>
Add event vector support for event eth Rx adapter, the implementation
creates vector flows based on port and queue identifier of the received
mbufs.
Signed-off-by: Pavan Nikhilesh <pbhagavatula@marvell.com>
---
lib/librte_eventdev/eventdev_pmd.h | 7 +-
.../rte_event_eth_rx_adapter.c | 257 ++++++++++++++++--
lib/librte_eventdev/rte_eventdev.c | 6 +-
3 files changed, 250 insertions(+), 20 deletions(-)
Comments
> -----Original Message-----
> From: pbhagavatula@marvell.com <pbhagavatula@marvell.com>
> Sent: Wednesday, March 24, 2021 10:35 AM
> To: jerinj@marvell.com; Jayatheerthan, Jay <jay.jayatheerthan@intel.com>; Carrillo, Erik G <erik.g.carrillo@intel.com>; Gujjar,
> Abhinandan S <abhinandan.gujjar@intel.com>; McDaniel, Timothy <timothy.mcdaniel@intel.com>; hemant.agrawal@nxp.com; Van
> Haaren, Harry <harry.van.haaren@intel.com>; mattias.ronnblom <mattias.ronnblom@ericsson.com>; Ma, Liang J
> <liang.j.ma@intel.com>
> Cc: dev@dpdk.org; Pavan Nikhilesh <pbhagavatula@marvell.com>
> Subject: [dpdk-dev] [PATCH v5 4/8] eventdev: add Rx adapter event vector support
>
> From: Pavan Nikhilesh <pbhagavatula@marvell.com>
>
> Add event vector support for event eth Rx adapter, the implementation
> creates vector flows based on port and queue identifier of the received
> mbufs.
>
> Signed-off-by: Pavan Nikhilesh <pbhagavatula@marvell.com>
> ---
> lib/librte_eventdev/eventdev_pmd.h | 7 +-
> .../rte_event_eth_rx_adapter.c | 257 ++++++++++++++++--
> lib/librte_eventdev/rte_eventdev.c | 6 +-
> 3 files changed, 250 insertions(+), 20 deletions(-)
>
> diff --git a/lib/librte_eventdev/eventdev_pmd.h b/lib/librte_eventdev/eventdev_pmd.h
> index 9297f1433..0f724ac85 100644
> --- a/lib/librte_eventdev/eventdev_pmd.h
> +++ b/lib/librte_eventdev/eventdev_pmd.h
> @@ -69,9 +69,10 @@ extern "C" {
> } \
> } while (0)
>
> -#define RTE_EVENT_ETH_RX_ADAPTER_SW_CAP \
> - ((RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) | \
> - (RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ))
> +#define RTE_EVENT_ETH_RX_ADAPTER_SW_CAP \
> + ((RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) | \
> + (RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) | \
> + (RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR))
>
> #define RTE_EVENT_CRYPTO_ADAPTER_SW_CAP \
> RTE_EVENT_CRYPTO_ADAPTER_CAP_SESSION_PRIVATE_DATA
> diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
> index ac8ba5bf0..c71990078 100644
> --- a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
> +++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
> @@ -26,6 +26,10 @@
> #define BATCH_SIZE 32
> #define BLOCK_CNT_THRESHOLD 10
> #define ETH_EVENT_BUFFER_SIZE (4*BATCH_SIZE)
> +#define MAX_VECTOR_SIZE 1024
> +#define MIN_VECTOR_SIZE 4
> +#define MAX_VECTOR_NS 1E9
> +#define MIN_VECTOR_NS 1E5
>
> #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
> #define ETH_RX_ADAPTER_MEM_NAME_LEN 32
> @@ -59,6 +63,20 @@ struct eth_rx_poll_entry {
> uint16_t eth_rx_qid;
> };
>
> +struct eth_rx_vector_data {
> + TAILQ_ENTRY(eth_rx_vector_data) next;
> + uint16_t port;
> + uint16_t queue;
> + uint16_t max_vector_count;
> + uint64_t event;
> + uint64_t ts;
> + uint64_t vector_timeout_ticks;
> + struct rte_mempool *vector_pool;
> + struct rte_event_vector *vector_ev;
> +} __rte_cache_aligned;
> +
> +TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
> +
> /* Instance per adapter */
> struct rte_eth_event_enqueue_buffer {
> /* Count of events in this buffer */
> @@ -92,6 +110,14 @@ struct rte_event_eth_rx_adapter {
> uint32_t wrr_pos;
> /* Event burst buffer */
> struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
> + /* Vector enable flag */
> + uint8_t ena_vector;
> + /* Timestamp of previous vector expiry list traversal */
> + uint64_t prev_expiry_ts;
> + /* Minimum ticks to wait before traversing expiry list */
> + uint64_t vector_tmo_ticks;
> + /* vector list */
> + struct eth_rx_vector_data_list vector_list;
> /* Per adapter stats */
> struct rte_event_eth_rx_adapter_stats stats;
> /* Block count, counts up to BLOCK_CNT_THRESHOLD */
> @@ -198,9 +224,11 @@ struct eth_device_info {
> struct eth_rx_queue_info {
> int queue_enabled; /* True if added */
> int intr_enabled;
> + uint8_t ena_vector;
> uint16_t wt; /* Polling weight */
> uint32_t flow_id_mask; /* Set to ~0 if app provides flow id else 0 */
> uint64_t event;
> + struct eth_rx_vector_data vector_data;
> };
>
> static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
> @@ -722,6 +750,9 @@ rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
> &rx_adapter->event_enqueue_buffer;
> struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
>
> + if (!buf->count)
> + return 0;
> +
> uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
> rx_adapter->event_port_id,
> buf->events,
> @@ -742,6 +773,72 @@ rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
> return n;
> }
>
> +static inline uint16_t
> +rxa_create_event_vector(struct rte_event_eth_rx_adapter *rx_adapter,
> + struct eth_rx_queue_info *queue_info,
> + struct rte_eth_event_enqueue_buffer *buf,
> + struct rte_mbuf **mbufs, uint16_t num)
> +{
> + struct rte_event *ev = &buf->events[buf->count];
> + struct eth_rx_vector_data *vec;
> + uint16_t filled, space, sz;
> +
> + filled = 0;
> + vec = &queue_info->vector_data;
> + while (num) {
> + if (vec->vector_ev == NULL) {
> + if (rte_mempool_get(vec->vector_pool,
> + (void **)&vec->vector_ev) < 0) {
> + rte_pktmbuf_free_bulk(mbufs, num);
> + return 0;
> + }
> + vec->vector_ev->nb_elem = 0;
> + vec->vector_ev->port = vec->port;
> + vec->vector_ev->queue = vec->queue;
> + vec->vector_ev->attr_valid = true;
> + TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
> + } else if (vec->vector_ev->nb_elem == vec->max_vector_count) {
Is there a case where nb_elem > max_vector_count as we accumulate sz to it ?
> + /* Event ready. */
> + ev->event = vec->event;
> + ev->vec = vec->vector_ev;
> + ev++;
> + filled++;
> + vec->vector_ev = NULL;
> + TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
> + if (rte_mempool_get(vec->vector_pool,
> + (void **)&vec->vector_ev) < 0) {
> + rte_pktmbuf_free_bulk(mbufs, num);
> + return 0;
> + }
> + vec->vector_ev->nb_elem = 0;
> + vec->vector_ev->port = vec->port;
> + vec->vector_ev->queue = vec->queue;
> + vec->vector_ev->attr_valid = true;
> + TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
> + }
> +
> + space = vec->max_vector_count - vec->vector_ev->nb_elem;
> + sz = num > space ? space : num;
> + memcpy(vec->vector_ev->mbufs + vec->vector_ev->nb_elem, mbufs,
> + sizeof(void *) * sz);
> + vec->vector_ev->nb_elem += sz;
> + num -= sz;
> + mbufs += sz;
> + vec->ts = rte_rdtsc();
> + }
> +
> + if (vec->vector_ev->nb_elem == vec->max_vector_count) {
Same here.
> + ev->event = vec->event;
> + ev->vec = vec->vector_ev;
> + ev++;
> + filled++;
> + vec->vector_ev = NULL;
> + TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
> + }
> +
> + return filled;
> +}
I am seeing more than one repeating code chunks in this function. Perhaps, you can give it a try to not repeat. We can drop if its performance affecting.
> +
> static inline void
> rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
> uint16_t eth_dev_id,
> @@ -770,25 +867,30 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
> rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
> do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
The RSS related code is executed for vector case as well. Can this be moved inside ena_vector if condition ?
>
> - for (i = 0; i < num; i++) {
> - m = mbufs[i];
> -
> - rss = do_rss ?
> - rxa_do_softrss(m, rx_adapter->rss_key_be) :
> - m->hash.rss;
> - ev->event = event;
> - ev->flow_id = (rss & ~flow_id_mask) |
> - (ev->flow_id & flow_id_mask);
> - ev->mbuf = m;
> - ev++;
> + if (!eth_rx_queue_info->ena_vector) {
> + for (i = 0; i < num; i++) {
> + m = mbufs[i];
> +
> + rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
> + : m->hash.rss;
> + ev->event = event;
> + ev->flow_id = (rss & ~flow_id_mask) |
> + (ev->flow_id & flow_id_mask);
> + ev->mbuf = m;
> + ev++;
> + }
> + } else {
> + num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
> + buf, mbufs, num);
> }
>
> - if (dev_info->cb_fn) {
> + if (num && dev_info->cb_fn) {
>
> dropped = 0;
> nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
> - ETH_EVENT_BUFFER_SIZE, buf->count, ev,
> - num, dev_info->cb_arg, &dropped);
> + ETH_EVENT_BUFFER_SIZE, buf->count,
> + &buf->events[buf->count], num,
> + dev_info->cb_arg, &dropped);
Before this patch, we pass ev which is &buf->events[buf->count] + num as fifth param when calling cb_fn. Now, we are passing &buf->events[buf->count] for non-vector case. Do you see this as an issue?
Also, for vector case would it make sense to do pass &buf->events[buf->count] + num ?
> if (unlikely(nb_cb > num))
> RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
> nb_cb, num);
> @@ -1124,6 +1226,30 @@ rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
> return nb_rx;
> }
>
> +static void
> +rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
> +{
> + struct rte_event_eth_rx_adapter *rx_adapter = arg;
> + struct rte_eth_event_enqueue_buffer *buf =
> + &rx_adapter->event_enqueue_buffer;
> + struct rte_event *ev;
> +
> + if (buf->count)
> + rxa_flush_event_buffer(rx_adapter);
> +
> + if (vec->vector_ev->nb_elem == 0)
> + return;
> + ev = &buf->events[buf->count];
> +
> + /* Event ready. */
> + ev->event = vec->event;
> + ev->vec = vec->vector_ev;
> + buf->count++;
> +
> + vec->vector_ev = NULL;
> + vec->ts = 0;
> +}
> +
> static int
> rxa_service_func(void *args)
> {
> @@ -1137,6 +1263,24 @@ rxa_service_func(void *args)
> return 0;
> }
>
> + if (rx_adapter->ena_vector) {
> + if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
> + rx_adapter->vector_tmo_ticks) {
> + struct eth_rx_vector_data *vec;
> +
> + TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
> + uint64_t elapsed_time = rte_rdtsc() - vec->ts;
> +
> + if (elapsed_time >= vec->vector_timeout_ticks) {
> + rxa_vector_expire(vec, rx_adapter);
> + TAILQ_REMOVE(&rx_adapter->vector_list,
> + vec, next);
> + }
> + }
> + rx_adapter->prev_expiry_ts = rte_rdtsc();
> + }
> + }
> +
> stats = &rx_adapter->stats;
> stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
> stats->rx_packets += rxa_poll(rx_adapter);
> @@ -1640,6 +1784,28 @@ rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
> }
> }
>
> +static void
> +rxa_set_vector_data(struct eth_rx_queue_info *queue_info, uint16_t vector_count,
> + uint64_t vector_ns, struct rte_mempool *mp, int32_t qid,
> + uint16_t port_id)
> +{
> +#define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
> + struct eth_rx_vector_data *vector_data;
> + uint32_t flow_id;
> +
> + vector_data = &queue_info->vector_data;
> + vector_data->max_vector_count = vector_count;
> + vector_data->port = port_id;
> + vector_data->queue = qid;
> + vector_data->vector_pool = mp;
> + vector_data->vector_timeout_ticks =
> + NSEC2TICK(vector_ns, rte_get_timer_hz());
> + vector_data->ts = 0;
> + flow_id = queue_info->event & 0xFFFFF;
> + flow_id = flow_id == 0 ? (qid & 0xFF) | (port_id & 0xFFFF) : flow_id;
Maybe I am missing something here. Looking at the code it looks like qid and port_id may overlap. For e.g., if qid = 0x10 and port_id = 0x11, flow_id would end up being 0x11. Is this the expectation? Also, it may be useful to document flow_id format.
Comparing this format with existing RSS hash based method, are we saying that all mbufs received in a rx burst are part of same flow when vectorization is used?
> + vector_data->event = (queue_info->event & ~0xFFFFF) | flow_id;
> +}
> +
> static void
> rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
> struct eth_device_info *dev_info,
> @@ -1741,6 +1907,44 @@ rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
> }
> }
>
> +static void
> +rxa_sw_event_vector_configure(
> + struct rte_event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
> + int rx_queue_id,
> + const struct rte_event_eth_rx_adapter_event_vector_config *config)
> +{
> + struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
> + struct eth_rx_queue_info *queue_info;
> + struct rte_event *qi_ev;
> +
> + if (rx_queue_id == -1) {
> + uint16_t nb_rx_queues;
> + uint16_t i;
> +
> + nb_rx_queues = dev_info->dev->data->nb_rx_queues;
> + for (i = 0; i < nb_rx_queues; i++)
> + rxa_sw_event_vector_configure(rx_adapter, eth_dev_id, i,
> + config);
> + return;
> + }
> +
> + queue_info = &dev_info->rx_queue[rx_queue_id];
> + qi_ev = (struct rte_event *)&queue_info->event;
> + queue_info->ena_vector = 1;
> + qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
> + rxa_set_vector_data(queue_info, config->vector_sz,
> + config->vector_timeout_ns, config->vector_mp,
> + rx_queue_id, dev_info->dev->data->port_id);
> + rx_adapter->ena_vector = 1;
> + rx_adapter->vector_tmo_ticks =
> + rx_adapter->vector_tmo_ticks ?
> + RTE_MIN(config->vector_timeout_ns << 1,
> + rx_adapter->vector_tmo_ticks) :
> + config->vector_timeout_ns << 1;
> + rx_adapter->prev_expiry_ts = 0;
> + TAILQ_INIT(&rx_adapter->vector_list);
> +}
> +
> static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
> uint16_t eth_dev_id,
> int rx_queue_id,
> @@ -2081,6 +2285,15 @@ rte_event_eth_rx_adapter_queue_add(uint8_t id,
> return -EINVAL;
> }
>
> + if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0 &&
> + (queue_conf->rx_queue_flags &
> + RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR)) {
> + RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
> + " eth port: %" PRIu16 " adapter id: %" PRIu8,
> + eth_dev_id, id);
> + return -EINVAL;
> + }
> +
> if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
> (rx_queue_id != -1)) {
> RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
> @@ -2143,6 +2356,17 @@ rte_event_eth_rx_adapter_queue_add(uint8_t id,
> return 0;
> }
>
> +static int
> +rxa_sw_vector_limits(struct rte_event_eth_rx_adapter_vector_limits *limits)
> +{
> + limits->max_sz = MAX_VECTOR_SIZE;
> + limits->min_sz = MIN_VECTOR_SIZE;
> + limits->max_timeout_ns = MAX_VECTOR_NS;
> + limits->min_timeout_ns = MIN_VECTOR_NS;
> +
> + return 0;
> +}
> +
> int
> rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
> int32_t rx_queue_id)
> @@ -2333,7 +2557,8 @@ rte_event_eth_rx_adapter_queue_event_vector_config(
> ret = dev->dev_ops->eth_rx_adapter_event_vector_config(
> dev, &rte_eth_devices[eth_dev_id], rx_queue_id, config);
> } else {
> - ret = -ENOTSUP;
> + rxa_sw_event_vector_configure(rx_adapter, eth_dev_id,
> + rx_queue_id, config);
> }
>
> return ret;
> @@ -2371,7 +2596,7 @@ rte_event_eth_rx_adapter_vector_limits_get(
> ret = dev->dev_ops->eth_rx_adapter_vector_limits_get(
> dev, &rte_eth_devices[eth_port_id], limits);
> } else {
> - ret = -ENOTSUP;
> + ret = rxa_sw_vector_limits(limits);
> }
>
> return ret;
> diff --git a/lib/librte_eventdev/rte_eventdev.c b/lib/librte_eventdev/rte_eventdev.c
> index f95edc075..254a31b1f 100644
> --- a/lib/librte_eventdev/rte_eventdev.c
> +++ b/lib/librte_eventdev/rte_eventdev.c
> @@ -122,7 +122,11 @@ rte_event_eth_rx_adapter_caps_get(uint8_t dev_id, uint16_t eth_port_id,
>
> if (caps == NULL)
> return -EINVAL;
> - *caps = 0;
> +
> + if (dev->dev_ops->eth_rx_adapter_caps_get == NULL)
> + *caps = RTE_EVENT_ETH_RX_ADAPTER_SW_CAP;
> + else
> + *caps = 0;
Any reason why we had to set default caps value? I am thinking if sw event device is used, it would set it anyways.
>
> return dev->dev_ops->eth_rx_adapter_caps_get ?
> (*dev->dev_ops->eth_rx_adapter_caps_get)(dev,
> --
> 2.17.1
>-----Original Message-----
>From: Jayatheerthan, Jay <jay.jayatheerthan@intel.com>
>Sent: Thursday, March 25, 2021 4:07 PM
>To: Pavan Nikhilesh Bhagavatula <pbhagavatula@marvell.com>; Jerin
>Jacob Kollanukkaran <jerinj@marvell.com>; Carrillo, Erik G
><erik.g.carrillo@intel.com>; Gujjar, Abhinandan S
><abhinandan.gujjar@intel.com>; McDaniel, Timothy
><timothy.mcdaniel@intel.com>; hemant.agrawal@nxp.com; Van
>Haaren, Harry <harry.van.haaren@intel.com>; mattias.ronnblom
><mattias.ronnblom@ericsson.com>; Ma, Liang J
><liang.j.ma@intel.com>
>Cc: dev@dpdk.org
>Subject: [EXT] RE: [dpdk-dev] [PATCH v5 4/8] eventdev: add Rx adapter
>event vector support
>
>External Email
>
>----------------------------------------------------------------------
>> -----Original Message-----
>> From: pbhagavatula@marvell.com <pbhagavatula@marvell.com>
>> Sent: Wednesday, March 24, 2021 10:35 AM
>> To: jerinj@marvell.com; Jayatheerthan, Jay
><jay.jayatheerthan@intel.com>; Carrillo, Erik G
><erik.g.carrillo@intel.com>; Gujjar,
>> Abhinandan S <abhinandan.gujjar@intel.com>; McDaniel, Timothy
><timothy.mcdaniel@intel.com>; hemant.agrawal@nxp.com; Van
>> Haaren, Harry <harry.van.haaren@intel.com>; mattias.ronnblom
><mattias.ronnblom@ericsson.com>; Ma, Liang J
>> <liang.j.ma@intel.com>
>> Cc: dev@dpdk.org; Pavan Nikhilesh <pbhagavatula@marvell.com>
>> Subject: [dpdk-dev] [PATCH v5 4/8] eventdev: add Rx adapter event
>vector support
>>
>> From: Pavan Nikhilesh <pbhagavatula@marvell.com>
>>
>> Add event vector support for event eth Rx adapter, the
>implementation
>> creates vector flows based on port and queue identifier of the
>received
>> mbufs.
>>
>> Signed-off-by: Pavan Nikhilesh <pbhagavatula@marvell.com>
>> ---
>> lib/librte_eventdev/eventdev_pmd.h | 7 +-
>> .../rte_event_eth_rx_adapter.c | 257 ++++++++++++++++--
>> lib/librte_eventdev/rte_eventdev.c | 6 +-
>> 3 files changed, 250 insertions(+), 20 deletions(-)
>>
>> diff --git a/lib/librte_eventdev/eventdev_pmd.h
>b/lib/librte_eventdev/eventdev_pmd.h
>> index 9297f1433..0f724ac85 100644
>> --- a/lib/librte_eventdev/eventdev_pmd.h
>> +++ b/lib/librte_eventdev/eventdev_pmd.h
>> @@ -69,9 +69,10 @@ extern "C" {
>> } \
>> } while (0)
>>
>> -#define RTE_EVENT_ETH_RX_ADAPTER_SW_CAP \
>> -
> ((RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) |
>\
>> -
> (RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ))
>> +#define RTE_EVENT_ETH_RX_ADAPTER_SW_CAP
>\
>> + ((RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) |
>\
>> + (RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) |
>\
>> + (RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR))
>>
>> #define RTE_EVENT_CRYPTO_ADAPTER_SW_CAP \
>>
> RTE_EVENT_CRYPTO_ADAPTER_CAP_SESSION_PRIVATE_DATA
>> diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
>b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
>> index ac8ba5bf0..c71990078 100644
>> --- a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
>> +++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
>> @@ -26,6 +26,10 @@
>> #define BATCH_SIZE 32
>> #define BLOCK_CNT_THRESHOLD 10
>> #define ETH_EVENT_BUFFER_SIZE (4*BATCH_SIZE)
>> +#define MAX_VECTOR_SIZE 1024
>> +#define MIN_VECTOR_SIZE 4
>> +#define MAX_VECTOR_NS 1E9
>> +#define MIN_VECTOR_NS 1E5
>>
>> #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
>> #define ETH_RX_ADAPTER_MEM_NAME_LEN 32
>> @@ -59,6 +63,20 @@ struct eth_rx_poll_entry {
>> uint16_t eth_rx_qid;
>> };
>>
>> +struct eth_rx_vector_data {
>> + TAILQ_ENTRY(eth_rx_vector_data) next;
>> + uint16_t port;
>> + uint16_t queue;
>> + uint16_t max_vector_count;
>> + uint64_t event;
>> + uint64_t ts;
>> + uint64_t vector_timeout_ticks;
>> + struct rte_mempool *vector_pool;
>> + struct rte_event_vector *vector_ev;
>> +} __rte_cache_aligned;
>> +
>> +TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
>> +
>> /* Instance per adapter */
>> struct rte_eth_event_enqueue_buffer {
>> /* Count of events in this buffer */
>> @@ -92,6 +110,14 @@ struct rte_event_eth_rx_adapter {
>> uint32_t wrr_pos;
>> /* Event burst buffer */
>> struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
>> + /* Vector enable flag */
>> + uint8_t ena_vector;
>> + /* Timestamp of previous vector expiry list traversal */
>> + uint64_t prev_expiry_ts;
>> + /* Minimum ticks to wait before traversing expiry list */
>> + uint64_t vector_tmo_ticks;
>> + /* vector list */
>> + struct eth_rx_vector_data_list vector_list;
>> /* Per adapter stats */
>> struct rte_event_eth_rx_adapter_stats stats;
>> /* Block count, counts up to BLOCK_CNT_THRESHOLD */
>> @@ -198,9 +224,11 @@ struct eth_device_info {
>> struct eth_rx_queue_info {
>> int queue_enabled; /* True if added */
>> int intr_enabled;
>> + uint8_t ena_vector;
>> uint16_t wt; /* Polling weight */
>> uint32_t flow_id_mask; /* Set to ~0 if app provides flow id else
>0 */
>> uint64_t event;
>> + struct eth_rx_vector_data vector_data;
>> };
>>
>> static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
>> @@ -722,6 +750,9 @@ rxa_flush_event_buffer(struct
>rte_event_eth_rx_adapter *rx_adapter)
>> &rx_adapter->event_enqueue_buffer;
>> struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter-
>>stats;
>>
>> + if (!buf->count)
>> + return 0;
>> +
>> uint16_t n = rte_event_enqueue_new_burst(rx_adapter-
>>eventdev_id,
>> rx_adapter->event_port_id,
>> buf->events,
>> @@ -742,6 +773,72 @@ rxa_flush_event_buffer(struct
>rte_event_eth_rx_adapter *rx_adapter)
>> return n;
>> }
>>
>> +static inline uint16_t
>> +rxa_create_event_vector(struct rte_event_eth_rx_adapter
>*rx_adapter,
>> + struct eth_rx_queue_info *queue_info,
>> + struct rte_eth_event_enqueue_buffer *buf,
>> + struct rte_mbuf **mbufs, uint16_t num)
>> +{
>> + struct rte_event *ev = &buf->events[buf->count];
>> + struct eth_rx_vector_data *vec;
>> + uint16_t filled, space, sz;
>> +
>> + filled = 0;
>> + vec = &queue_info->vector_data;
>> + while (num) {
>> + if (vec->vector_ev == NULL) {
>> + if (rte_mempool_get(vec->vector_pool,
>> + (void **)&vec->vector_ev) <
>0) {
>> + rte_pktmbuf_free_bulk(mbufs, num);
>> + return 0;
>> + }
>> + vec->vector_ev->nb_elem = 0;
>> + vec->vector_ev->port = vec->port;
>> + vec->vector_ev->queue = vec->queue;
>> + vec->vector_ev->attr_valid = true;
>> + TAILQ_INSERT_TAIL(&rx_adapter->vector_list,
>vec, next);
>> + } else if (vec->vector_ev->nb_elem == vec-
>>max_vector_count) {
>
>Is there a case where nb_elem > max_vector_count as we accumulate
>sz to it ?
I don't think so, that would overflow the vector event.
>
>> + /* Event ready. */
>> + ev->event = vec->event;
>> + ev->vec = vec->vector_ev;
>> + ev++;
>> + filled++;
>> + vec->vector_ev = NULL;
>> + TAILQ_REMOVE(&rx_adapter->vector_list, vec,
>next);
>> + if (rte_mempool_get(vec->vector_pool,
>> + (void **)&vec->vector_ev) <
>0) {
>> + rte_pktmbuf_free_bulk(mbufs, num);
>> + return 0;
>> + }
>> + vec->vector_ev->nb_elem = 0;
>> + vec->vector_ev->port = vec->port;
>> + vec->vector_ev->queue = vec->queue;
>> + vec->vector_ev->attr_valid = true;
>> + TAILQ_INSERT_TAIL(&rx_adapter->vector_list,
>vec, next);
>> + }
>> +
>> + space = vec->max_vector_count - vec->vector_ev-
>>nb_elem;
>> + sz = num > space ? space : num;
>> + memcpy(vec->vector_ev->mbufs + vec->vector_ev-
>>nb_elem, mbufs,
>> + sizeof(void *) * sz);
>> + vec->vector_ev->nb_elem += sz;
>> + num -= sz;
>> + mbufs += sz;
>> + vec->ts = rte_rdtsc();
>> + }
>> +
>> + if (vec->vector_ev->nb_elem == vec->max_vector_count) {
>
>Same here.
>
>> + ev->event = vec->event;
>> + ev->vec = vec->vector_ev;
>> + ev++;
>> + filled++;
>> + vec->vector_ev = NULL;
>> + TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
>> + }
>> +
>> + return filled;
>> +}
>
>I am seeing more than one repeating code chunks in this function.
>Perhaps, you can give it a try to not repeat. We can drop if its
>performance affecting.
I will try to move them to inline functions and test.
>
>> +
>> static inline void
>> rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
>> uint16_t eth_dev_id,
>> @@ -770,25 +867,30 @@ rxa_buffer_mbufs(struct
>rte_event_eth_rx_adapter *rx_adapter,
>> rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
>> do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
>
>The RSS related code is executed for vector case as well. Can this be
>moved inside ena_vector if condition ?
RSS is used to generate the event flowid, in vector case the flow id
Will be a combination of port and queue id.
The idea is that flows having the same RSS LSB will end up in the same
queue.
>
>>
>> - for (i = 0; i < num; i++) {
>> - m = mbufs[i];
>> -
>> - rss = do_rss ?
>> - rxa_do_softrss(m, rx_adapter->rss_key_be) :
>> - m->hash.rss;
>> - ev->event = event;
>> - ev->flow_id = (rss & ~flow_id_mask) |
>> - (ev->flow_id & flow_id_mask);
>> - ev->mbuf = m;
>> - ev++;
>> + if (!eth_rx_queue_info->ena_vector) {
>> + for (i = 0; i < num; i++) {
>> + m = mbufs[i];
>> +
>> + rss = do_rss ? rxa_do_softrss(m, rx_adapter-
>>rss_key_be)
>> + : m->hash.rss;
>> + ev->event = event;
>> + ev->flow_id = (rss & ~flow_id_mask) |
>> + (ev->flow_id & flow_id_mask);
>> + ev->mbuf = m;
>> + ev++;
>> + }
>> + } else {
>> + num = rxa_create_event_vector(rx_adapter,
>eth_rx_queue_info,
>> + buf, mbufs, num);
>> }
>>
>> - if (dev_info->cb_fn) {
>> + if (num && dev_info->cb_fn) {
>>
>> dropped = 0;
>> nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
>> - ETH_EVENT_BUFFER_SIZE, buf-
>>count, ev,
>> - num, dev_info->cb_arg,
>&dropped);
>> + ETH_EVENT_BUFFER_SIZE, buf-
>>count,
>> + &buf->events[buf->count],
>num,
>> + dev_info->cb_arg, &dropped);
>
>Before this patch, we pass ev which is &buf->events[buf->count] + num
>as fifth param when calling cb_fn. Now, we are passing &buf-
>>events[buf->count] for non-vector case. Do you see this as an issue?
>
The callback function takes in the array newly formed events i.e. we need
to pass the start of array and the count.
the previous code had a bug where it passes the end of the event list.
>Also, for vector case would it make sense to do pass &buf->events[buf-
>>count] + num ?
>
>> if (unlikely(nb_cb > num))
>> RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d)
>events",
>> nb_cb, num);
>> @@ -1124,6 +1226,30 @@ rxa_poll(struct rte_event_eth_rx_adapter
>*rx_adapter)
>> return nb_rx;
>> }
>>
>> +static void
>> +rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
>> +{
>> + struct rte_event_eth_rx_adapter *rx_adapter = arg;
>> + struct rte_eth_event_enqueue_buffer *buf =
>> + &rx_adapter->event_enqueue_buffer;
>> + struct rte_event *ev;
>> +
>> + if (buf->count)
>> + rxa_flush_event_buffer(rx_adapter);
>> +
>> + if (vec->vector_ev->nb_elem == 0)
>> + return;
>> + ev = &buf->events[buf->count];
>> +
>> + /* Event ready. */
>> + ev->event = vec->event;
>> + ev->vec = vec->vector_ev;
>> + buf->count++;
>> +
>> + vec->vector_ev = NULL;
>> + vec->ts = 0;
>> +}
>> +
>> static int
>> rxa_service_func(void *args)
>> {
>> @@ -1137,6 +1263,24 @@ rxa_service_func(void *args)
>> return 0;
>> }
>>
>> + if (rx_adapter->ena_vector) {
>> + if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
>> + rx_adapter->vector_tmo_ticks) {
>> + struct eth_rx_vector_data *vec;
>> +
>> + TAILQ_FOREACH(vec, &rx_adapter->vector_list,
>next) {
>> + uint64_t elapsed_time = rte_rdtsc() -
>vec->ts;
>> +
>> + if (elapsed_time >= vec-
>>vector_timeout_ticks) {
>> + rxa_vector_expire(vec,
>rx_adapter);
>> + TAILQ_REMOVE(&rx_adapter-
>>vector_list,
>> + vec, next);
>> + }
>> + }
>> + rx_adapter->prev_expiry_ts = rte_rdtsc();
>> + }
>> + }
>> +
>> stats = &rx_adapter->stats;
>> stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
>> stats->rx_packets += rxa_poll(rx_adapter);
>> @@ -1640,6 +1784,28 @@ rxa_update_queue(struct
>rte_event_eth_rx_adapter *rx_adapter,
>> }
>> }
>>
>> +static void
>> +rxa_set_vector_data(struct eth_rx_queue_info *queue_info,
>uint16_t vector_count,
>> + uint64_t vector_ns, struct rte_mempool *mp, int32_t
>qid,
>> + uint16_t port_id)
>> +{
>> +#define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
>> + struct eth_rx_vector_data *vector_data;
>> + uint32_t flow_id;
>> +
>> + vector_data = &queue_info->vector_data;
>> + vector_data->max_vector_count = vector_count;
>> + vector_data->port = port_id;
>> + vector_data->queue = qid;
>> + vector_data->vector_pool = mp;
>> + vector_data->vector_timeout_ticks =
>> + NSEC2TICK(vector_ns, rte_get_timer_hz());
>> + vector_data->ts = 0;
>> + flow_id = queue_info->event & 0xFFFFF;
>> + flow_id = flow_id == 0 ? (qid & 0xFF) | (port_id & 0xFFFF) :
>flow_id;
>
>Maybe I am missing something here. Looking at the code it looks like
>qid and port_id may overlap. For e.g., if qid = 0x10 and port_id = 0x11,
>flow_id would end up being 0x11. Is this the expectation? Also, it may
>be useful to document flow_id format.
The flow_id is 20 bit, I guess we could do 12bit queue_id and 8bit port
as a flow.
>Comparing this format with existing RSS hash based method, are we
>saying that all mbufs received in a rx burst are part of same flow when
>vectorization is used?
Yes, the hard way to do this is to use a hash table and treating each
mbuf having an unique flow.
>
>> + vector_data->event = (queue_info->event & ~0xFFFFF) |
>flow_id;
>> +}
>> +
>> static void
>> rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
>> struct eth_device_info *dev_info,
>> @@ -1741,6 +1907,44 @@ rxa_add_queue(struct
>rte_event_eth_rx_adapter *rx_adapter,
>> }
>> }
>>
>> +static void
>> +rxa_sw_event_vector_configure(
>> + struct rte_event_eth_rx_adapter *rx_adapter, uint16_t
>eth_dev_id,
>> + int rx_queue_id,
>> + const struct rte_event_eth_rx_adapter_event_vector_config
>*config)
>> +{
>> + struct eth_device_info *dev_info = &rx_adapter-
>>eth_devices[eth_dev_id];
>> + struct eth_rx_queue_info *queue_info;
>> + struct rte_event *qi_ev;
>> +
>> + if (rx_queue_id == -1) {
>> + uint16_t nb_rx_queues;
>> + uint16_t i;
>> +
>> + nb_rx_queues = dev_info->dev->data->nb_rx_queues;
>> + for (i = 0; i < nb_rx_queues; i++)
>> + rxa_sw_event_vector_configure(rx_adapter,
>eth_dev_id, i,
>> + config);
>> + return;
>> + }
>> +
>> + queue_info = &dev_info->rx_queue[rx_queue_id];
>> + qi_ev = (struct rte_event *)&queue_info->event;
>> + queue_info->ena_vector = 1;
>> + qi_ev->event_type =
>RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
>> + rxa_set_vector_data(queue_info, config->vector_sz,
>> + config->vector_timeout_ns, config-
>>vector_mp,
>> + rx_queue_id, dev_info->dev->data->port_id);
>> + rx_adapter->ena_vector = 1;
>> + rx_adapter->vector_tmo_ticks =
>> + rx_adapter->vector_tmo_ticks ?
>> + RTE_MIN(config->vector_timeout_ns << 1,
>> + rx_adapter->vector_tmo_ticks) :
>> + config->vector_timeout_ns << 1;
>> + rx_adapter->prev_expiry_ts = 0;
>> + TAILQ_INIT(&rx_adapter->vector_list);
>> +}
>> +
>> static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
>> uint16_t eth_dev_id,
>> int rx_queue_id,
>> @@ -2081,6 +2285,15 @@
>rte_event_eth_rx_adapter_queue_add(uint8_t id,
>> return -EINVAL;
>> }
>>
>> + if ((cap &
>RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0 &&
>> + (queue_conf->rx_queue_flags &
>> + RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR)) {
>> + RTE_EDEV_LOG_ERR("Event vectorization is not
>supported,"
>> + " eth port: %" PRIu16 " adapter id: %"
>PRIu8,
>> + eth_dev_id, id);
>> + return -EINVAL;
>> + }
>> +
>> if ((cap &
>RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
>> (rx_queue_id != -1)) {
>> RTE_EDEV_LOG_ERR("Rx queues can only be connected
>to single "
>> @@ -2143,6 +2356,17 @@
>rte_event_eth_rx_adapter_queue_add(uint8_t id,
>> return 0;
>> }
>>
>> +static int
>> +rxa_sw_vector_limits(struct
>rte_event_eth_rx_adapter_vector_limits *limits)
>> +{
>> + limits->max_sz = MAX_VECTOR_SIZE;
>> + limits->min_sz = MIN_VECTOR_SIZE;
>> + limits->max_timeout_ns = MAX_VECTOR_NS;
>> + limits->min_timeout_ns = MIN_VECTOR_NS;
>> +
>> + return 0;
>> +}
>> +
>> int
>> rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t
>eth_dev_id,
>> int32_t rx_queue_id)
>> @@ -2333,7 +2557,8 @@
>rte_event_eth_rx_adapter_queue_event_vector_config(
>> ret = dev->dev_ops-
>>eth_rx_adapter_event_vector_config(
>> dev, &rte_eth_devices[eth_dev_id],
>rx_queue_id, config);
>> } else {
>> - ret = -ENOTSUP;
>> + rxa_sw_event_vector_configure(rx_adapter,
>eth_dev_id,
>> + rx_queue_id, config);
>> }
>>
>> return ret;
>> @@ -2371,7 +2596,7 @@
>rte_event_eth_rx_adapter_vector_limits_get(
>> ret = dev->dev_ops-
>>eth_rx_adapter_vector_limits_get(
>> dev, &rte_eth_devices[eth_port_id], limits);
>> } else {
>> - ret = -ENOTSUP;
>> + ret = rxa_sw_vector_limits(limits);
>> }
>>
>> return ret;
>> diff --git a/lib/librte_eventdev/rte_eventdev.c
>b/lib/librte_eventdev/rte_eventdev.c
>> index f95edc075..254a31b1f 100644
>> --- a/lib/librte_eventdev/rte_eventdev.c
>> +++ b/lib/librte_eventdev/rte_eventdev.c
>> @@ -122,7 +122,11 @@ rte_event_eth_rx_adapter_caps_get(uint8_t
>dev_id, uint16_t eth_port_id,
>>
>> if (caps == NULL)
>> return -EINVAL;
>> - *caps = 0;
>> +
>> + if (dev->dev_ops->eth_rx_adapter_caps_get == NULL)
>> + *caps = RTE_EVENT_ETH_RX_ADAPTER_SW_CAP;
>> + else
>> + *caps = 0;
>
>Any reason why we had to set default caps value? I am thinking if sw
>event device is used, it would set it anyways.
>
There are multiple sw event devices which don't implement caps_get
function, this changes solves that.
>>
>> return dev->dev_ops->eth_rx_adapter_caps_get ?
>> (*dev->dev_ops-
>>eth_rx_adapter_caps_get)(dev,
>> --
>> 2.17.1
> -----Original Message-----
> From: Pavan Nikhilesh Bhagavatula <pbhagavatula@marvell.com>
> Sent: Thursday, March 25, 2021 6:44 PM
> To: Jayatheerthan, Jay <jay.jayatheerthan@intel.com>; Jerin Jacob Kollanukkaran <jerinj@marvell.com>; Carrillo, Erik G
> <erik.g.carrillo@intel.com>; Gujjar, Abhinandan S <abhinandan.gujjar@intel.com>; McDaniel, Timothy
> <timothy.mcdaniel@intel.com>; hemant.agrawal@nxp.com; Van Haaren, Harry <harry.van.haaren@intel.com>; mattias.ronnblom
> <mattias.ronnblom@ericsson.com>; Ma, Liang J <liang.j.ma@intel.com>
> Cc: dev@dpdk.org
> Subject: RE: [dpdk-dev] [PATCH v5 4/8] eventdev: add Rx adapter event vector support
>
>
>
> >-----Original Message-----
> >From: Jayatheerthan, Jay <jay.jayatheerthan@intel.com>
> >Sent: Thursday, March 25, 2021 4:07 PM
> >To: Pavan Nikhilesh Bhagavatula <pbhagavatula@marvell.com>; Jerin
> >Jacob Kollanukkaran <jerinj@marvell.com>; Carrillo, Erik G
> ><erik.g.carrillo@intel.com>; Gujjar, Abhinandan S
> ><abhinandan.gujjar@intel.com>; McDaniel, Timothy
> ><timothy.mcdaniel@intel.com>; hemant.agrawal@nxp.com; Van
> >Haaren, Harry <harry.van.haaren@intel.com>; mattias.ronnblom
> ><mattias.ronnblom@ericsson.com>; Ma, Liang J
> ><liang.j.ma@intel.com>
> >Cc: dev@dpdk.org
> >Subject: [EXT] RE: [dpdk-dev] [PATCH v5 4/8] eventdev: add Rx adapter
> >event vector support
> >
> >External Email
> >
> >----------------------------------------------------------------------
> >> -----Original Message-----
> >> From: pbhagavatula@marvell.com <pbhagavatula@marvell.com>
> >> Sent: Wednesday, March 24, 2021 10:35 AM
> >> To: jerinj@marvell.com; Jayatheerthan, Jay
> ><jay.jayatheerthan@intel.com>; Carrillo, Erik G
> ><erik.g.carrillo@intel.com>; Gujjar,
> >> Abhinandan S <abhinandan.gujjar@intel.com>; McDaniel, Timothy
> ><timothy.mcdaniel@intel.com>; hemant.agrawal@nxp.com; Van
> >> Haaren, Harry <harry.van.haaren@intel.com>; mattias.ronnblom
> ><mattias.ronnblom@ericsson.com>; Ma, Liang J
> >> <liang.j.ma@intel.com>
> >> Cc: dev@dpdk.org; Pavan Nikhilesh <pbhagavatula@marvell.com>
> >> Subject: [dpdk-dev] [PATCH v5 4/8] eventdev: add Rx adapter event
> >vector support
> >>
> >> From: Pavan Nikhilesh <pbhagavatula@marvell.com>
> >>
> >> Add event vector support for event eth Rx adapter, the
> >implementation
> >> creates vector flows based on port and queue identifier of the
> >received
> >> mbufs.
> >>
> >> Signed-off-by: Pavan Nikhilesh <pbhagavatula@marvell.com>
> >> ---
> >> lib/librte_eventdev/eventdev_pmd.h | 7 +-
> >> .../rte_event_eth_rx_adapter.c | 257 ++++++++++++++++--
> >> lib/librte_eventdev/rte_eventdev.c | 6 +-
> >> 3 files changed, 250 insertions(+), 20 deletions(-)
> >>
> >> diff --git a/lib/librte_eventdev/eventdev_pmd.h
> >b/lib/librte_eventdev/eventdev_pmd.h
> >> index 9297f1433..0f724ac85 100644
> >> --- a/lib/librte_eventdev/eventdev_pmd.h
> >> +++ b/lib/librte_eventdev/eventdev_pmd.h
> >> @@ -69,9 +69,10 @@ extern "C" {
> >> } \
> >> } while (0)
> >>
> >> -#define RTE_EVENT_ETH_RX_ADAPTER_SW_CAP \
> >> -
> > ((RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) |
> >\
> >> -
> > (RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ))
> >> +#define RTE_EVENT_ETH_RX_ADAPTER_SW_CAP
> >\
> >> + ((RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) |
> >\
> >> + (RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) |
> >\
> >> + (RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR))
> >>
> >> #define RTE_EVENT_CRYPTO_ADAPTER_SW_CAP \
> >>
> > RTE_EVENT_CRYPTO_ADAPTER_CAP_SESSION_PRIVATE_DATA
> >> diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
> >b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
> >> index ac8ba5bf0..c71990078 100644
> >> --- a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
> >> +++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
> >> @@ -26,6 +26,10 @@
> >> #define BATCH_SIZE 32
> >> #define BLOCK_CNT_THRESHOLD 10
> >> #define ETH_EVENT_BUFFER_SIZE (4*BATCH_SIZE)
> >> +#define MAX_VECTOR_SIZE 1024
> >> +#define MIN_VECTOR_SIZE 4
> >> +#define MAX_VECTOR_NS 1E9
> >> +#define MIN_VECTOR_NS 1E5
> >>
> >> #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
> >> #define ETH_RX_ADAPTER_MEM_NAME_LEN 32
> >> @@ -59,6 +63,20 @@ struct eth_rx_poll_entry {
> >> uint16_t eth_rx_qid;
> >> };
> >>
> >> +struct eth_rx_vector_data {
> >> + TAILQ_ENTRY(eth_rx_vector_data) next;
> >> + uint16_t port;
> >> + uint16_t queue;
> >> + uint16_t max_vector_count;
> >> + uint64_t event;
> >> + uint64_t ts;
> >> + uint64_t vector_timeout_ticks;
> >> + struct rte_mempool *vector_pool;
> >> + struct rte_event_vector *vector_ev;
> >> +} __rte_cache_aligned;
> >> +
> >> +TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
> >> +
> >> /* Instance per adapter */
> >> struct rte_eth_event_enqueue_buffer {
> >> /* Count of events in this buffer */
> >> @@ -92,6 +110,14 @@ struct rte_event_eth_rx_adapter {
> >> uint32_t wrr_pos;
> >> /* Event burst buffer */
> >> struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
> >> + /* Vector enable flag */
> >> + uint8_t ena_vector;
> >> + /* Timestamp of previous vector expiry list traversal */
> >> + uint64_t prev_expiry_ts;
> >> + /* Minimum ticks to wait before traversing expiry list */
> >> + uint64_t vector_tmo_ticks;
> >> + /* vector list */
> >> + struct eth_rx_vector_data_list vector_list;
> >> /* Per adapter stats */
> >> struct rte_event_eth_rx_adapter_stats stats;
> >> /* Block count, counts up to BLOCK_CNT_THRESHOLD */
> >> @@ -198,9 +224,11 @@ struct eth_device_info {
> >> struct eth_rx_queue_info {
> >> int queue_enabled; /* True if added */
> >> int intr_enabled;
> >> + uint8_t ena_vector;
> >> uint16_t wt; /* Polling weight */
> >> uint32_t flow_id_mask; /* Set to ~0 if app provides flow id else
> >0 */
> >> uint64_t event;
> >> + struct eth_rx_vector_data vector_data;
> >> };
> >>
> >> static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
> >> @@ -722,6 +750,9 @@ rxa_flush_event_buffer(struct
> >rte_event_eth_rx_adapter *rx_adapter)
> >> &rx_adapter->event_enqueue_buffer;
> >> struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter-
> >>stats;
> >>
> >> + if (!buf->count)
> >> + return 0;
> >> +
> >> uint16_t n = rte_event_enqueue_new_burst(rx_adapter-
> >>eventdev_id,
> >> rx_adapter->event_port_id,
> >> buf->events,
> >> @@ -742,6 +773,72 @@ rxa_flush_event_buffer(struct
> >rte_event_eth_rx_adapter *rx_adapter)
> >> return n;
> >> }
> >>
> >> +static inline uint16_t
> >> +rxa_create_event_vector(struct rte_event_eth_rx_adapter
> >*rx_adapter,
> >> + struct eth_rx_queue_info *queue_info,
> >> + struct rte_eth_event_enqueue_buffer *buf,
> >> + struct rte_mbuf **mbufs, uint16_t num)
> >> +{
> >> + struct rte_event *ev = &buf->events[buf->count];
> >> + struct eth_rx_vector_data *vec;
> >> + uint16_t filled, space, sz;
> >> +
> >> + filled = 0;
> >> + vec = &queue_info->vector_data;
> >> + while (num) {
> >> + if (vec->vector_ev == NULL) {
> >> + if (rte_mempool_get(vec->vector_pool,
> >> + (void **)&vec->vector_ev) <
> >0) {
> >> + rte_pktmbuf_free_bulk(mbufs, num);
> >> + return 0;
> >> + }
> >> + vec->vector_ev->nb_elem = 0;
> >> + vec->vector_ev->port = vec->port;
> >> + vec->vector_ev->queue = vec->queue;
> >> + vec->vector_ev->attr_valid = true;
> >> + TAILQ_INSERT_TAIL(&rx_adapter->vector_list,
> >vec, next);
> >> + } else if (vec->vector_ev->nb_elem == vec-
> >>max_vector_count) {
> >
> >Is there a case where nb_elem > max_vector_count as we accumulate
> >sz to it ?
>
> I don't think so, that would overflow the vector event.
>
> >
> >> + /* Event ready. */
> >> + ev->event = vec->event;
> >> + ev->vec = vec->vector_ev;
> >> + ev++;
> >> + filled++;
> >> + vec->vector_ev = NULL;
> >> + TAILQ_REMOVE(&rx_adapter->vector_list, vec,
> >next);
> >> + if (rte_mempool_get(vec->vector_pool,
> >> + (void **)&vec->vector_ev) <
> >0) {
> >> + rte_pktmbuf_free_bulk(mbufs, num);
> >> + return 0;
> >> + }
> >> + vec->vector_ev->nb_elem = 0;
> >> + vec->vector_ev->port = vec->port;
> >> + vec->vector_ev->queue = vec->queue;
> >> + vec->vector_ev->attr_valid = true;
> >> + TAILQ_INSERT_TAIL(&rx_adapter->vector_list,
> >vec, next);
> >> + }
> >> +
> >> + space = vec->max_vector_count - vec->vector_ev-
> >>nb_elem;
> >> + sz = num > space ? space : num;
> >> + memcpy(vec->vector_ev->mbufs + vec->vector_ev-
> >>nb_elem, mbufs,
> >> + sizeof(void *) * sz);
> >> + vec->vector_ev->nb_elem += sz;
> >> + num -= sz;
> >> + mbufs += sz;
> >> + vec->ts = rte_rdtsc();
> >> + }
> >> +
> >> + if (vec->vector_ev->nb_elem == vec->max_vector_count) {
> >
> >Same here.
> >
> >> + ev->event = vec->event;
> >> + ev->vec = vec->vector_ev;
> >> + ev++;
> >> + filled++;
> >> + vec->vector_ev = NULL;
> >> + TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
> >> + }
> >> +
> >> + return filled;
> >> +}
> >
> >I am seeing more than one repeating code chunks in this function.
> >Perhaps, you can give it a try to not repeat. We can drop if its
> >performance affecting.
>
> I will try to move them to inline functions and test.
>
> >
> >> +
> >> static inline void
> >> rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
> >> uint16_t eth_dev_id,
> >> @@ -770,25 +867,30 @@ rxa_buffer_mbufs(struct
> >rte_event_eth_rx_adapter *rx_adapter,
> >> rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
> >> do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
> >
> >The RSS related code is executed for vector case as well. Can this be
> >moved inside ena_vector if condition ?
>
> RSS is used to generate the event flowid, in vector case the flow id
> Will be a combination of port and queue id.
> The idea is that flows having the same RSS LSB will end up in the same
> queue.
>
I meant to say, rss_mask and do_rss are used only when ena_vector is false. Could be moved inside the appropriate condition ?
> >
> >>
> >> - for (i = 0; i < num; i++) {
> >> - m = mbufs[i];
> >> -
> >> - rss = do_rss ?
> >> - rxa_do_softrss(m, rx_adapter->rss_key_be) :
> >> - m->hash.rss;
> >> - ev->event = event;
> >> - ev->flow_id = (rss & ~flow_id_mask) |
> >> - (ev->flow_id & flow_id_mask);
> >> - ev->mbuf = m;
> >> - ev++;
> >> + if (!eth_rx_queue_info->ena_vector) {
> >> + for (i = 0; i < num; i++) {
> >> + m = mbufs[i];
> >> +
> >> + rss = do_rss ? rxa_do_softrss(m, rx_adapter-
> >>rss_key_be)
> >> + : m->hash.rss;
> >> + ev->event = event;
> >> + ev->flow_id = (rss & ~flow_id_mask) |
> >> + (ev->flow_id & flow_id_mask);
> >> + ev->mbuf = m;
> >> + ev++;
> >> + }
> >> + } else {
> >> + num = rxa_create_event_vector(rx_adapter,
> >eth_rx_queue_info,
> >> + buf, mbufs, num);
> >> }
> >>
> >> - if (dev_info->cb_fn) {
> >> + if (num && dev_info->cb_fn) {
> >>
> >> dropped = 0;
> >> nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
> >> - ETH_EVENT_BUFFER_SIZE, buf-
> >>count, ev,
> >> - num, dev_info->cb_arg,
> >&dropped);
> >> + ETH_EVENT_BUFFER_SIZE, buf-
> >>count,
> >> + &buf->events[buf->count],
> >num,
> >> + dev_info->cb_arg, &dropped);
> >
> >Before this patch, we pass ev which is &buf->events[buf->count] + num
> >as fifth param when calling cb_fn. Now, we are passing &buf-
> >>events[buf->count] for non-vector case. Do you see this as an issue?
> >
>
> The callback function takes in the array newly formed events i.e. we need
> to pass the start of array and the count.
>
> the previous code had a bug where it passes the end of the event list.
ok, that makes sense.
>
> >Also, for vector case would it make sense to do pass &buf->events[buf-
> >>count] + num ?
> >
> >> if (unlikely(nb_cb > num))
> >> RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d)
> >events",
> >> nb_cb, num);
> >> @@ -1124,6 +1226,30 @@ rxa_poll(struct rte_event_eth_rx_adapter
> >*rx_adapter)
> >> return nb_rx;
> >> }
> >>
> >> +static void
> >> +rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
> >> +{
> >> + struct rte_event_eth_rx_adapter *rx_adapter = arg;
> >> + struct rte_eth_event_enqueue_buffer *buf =
> >> + &rx_adapter->event_enqueue_buffer;
> >> + struct rte_event *ev;
> >> +
> >> + if (buf->count)
> >> + rxa_flush_event_buffer(rx_adapter);
> >> +
> >> + if (vec->vector_ev->nb_elem == 0)
> >> + return;
> >> + ev = &buf->events[buf->count];
> >> +
> >> + /* Event ready. */
> >> + ev->event = vec->event;
> >> + ev->vec = vec->vector_ev;
> >> + buf->count++;
> >> +
> >> + vec->vector_ev = NULL;
> >> + vec->ts = 0;
> >> +}
> >> +
> >> static int
> >> rxa_service_func(void *args)
> >> {
> >> @@ -1137,6 +1263,24 @@ rxa_service_func(void *args)
> >> return 0;
> >> }
> >>
> >> + if (rx_adapter->ena_vector) {
> >> + if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
> >> + rx_adapter->vector_tmo_ticks) {
> >> + struct eth_rx_vector_data *vec;
> >> +
> >> + TAILQ_FOREACH(vec, &rx_adapter->vector_list,
> >next) {
> >> + uint64_t elapsed_time = rte_rdtsc() -
> >vec->ts;
> >> +
> >> + if (elapsed_time >= vec-
> >>vector_timeout_ticks) {
> >> + rxa_vector_expire(vec,
> >rx_adapter);
> >> + TAILQ_REMOVE(&rx_adapter-
> >>vector_list,
> >> + vec, next);
> >> + }
> >> + }
> >> + rx_adapter->prev_expiry_ts = rte_rdtsc();
> >> + }
> >> + }
> >> +
> >> stats = &rx_adapter->stats;
> >> stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
> >> stats->rx_packets += rxa_poll(rx_adapter);
> >> @@ -1640,6 +1784,28 @@ rxa_update_queue(struct
> >rte_event_eth_rx_adapter *rx_adapter,
> >> }
> >> }
> >>
> >> +static void
> >> +rxa_set_vector_data(struct eth_rx_queue_info *queue_info,
> >uint16_t vector_count,
> >> + uint64_t vector_ns, struct rte_mempool *mp, int32_t
> >qid,
> >> + uint16_t port_id)
> >> +{
> >> +#define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
> >> + struct eth_rx_vector_data *vector_data;
> >> + uint32_t flow_id;
> >> +
> >> + vector_data = &queue_info->vector_data;
> >> + vector_data->max_vector_count = vector_count;
> >> + vector_data->port = port_id;
> >> + vector_data->queue = qid;
> >> + vector_data->vector_pool = mp;
> >> + vector_data->vector_timeout_ticks =
> >> + NSEC2TICK(vector_ns, rte_get_timer_hz());
> >> + vector_data->ts = 0;
> >> + flow_id = queue_info->event & 0xFFFFF;
> >> + flow_id = flow_id == 0 ? (qid & 0xFF) | (port_id & 0xFFFF) :
> >flow_id;
> >
> >Maybe I am missing something here. Looking at the code it looks like
> >qid and port_id may overlap. For e.g., if qid = 0x10 and port_id = 0x11,
> >flow_id would end up being 0x11. Is this the expectation? Also, it may
> >be useful to document flow_id format.
>
> The flow_id is 20 bit, I guess we could do 12bit queue_id and 8bit port
> as a flow.
This sounds reasonable to me. It would be useful to have the flow_id format and how it is used for vectorization in Rx/Tx adapter documentation.
>
> >Comparing this format with existing RSS hash based method, are we
> >saying that all mbufs received in a rx burst are part of same flow when
> >vectorization is used?
>
> Yes, the hard way to do this is to use a hash table and treating each
> mbuf having an unique flow.
>
> >
> >> + vector_data->event = (queue_info->event & ~0xFFFFF) |
> >flow_id;
> >> +}
> >> +
> >> static void
> >> rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
> >> struct eth_device_info *dev_info,
> >> @@ -1741,6 +1907,44 @@ rxa_add_queue(struct
> >rte_event_eth_rx_adapter *rx_adapter,
> >> }
> >> }
> >>
> >> +static void
> >> +rxa_sw_event_vector_configure(
> >> + struct rte_event_eth_rx_adapter *rx_adapter, uint16_t
> >eth_dev_id,
> >> + int rx_queue_id,
> >> + const struct rte_event_eth_rx_adapter_event_vector_config
> >*config)
> >> +{
> >> + struct eth_device_info *dev_info = &rx_adapter-
> >>eth_devices[eth_dev_id];
> >> + struct eth_rx_queue_info *queue_info;
> >> + struct rte_event *qi_ev;
> >> +
> >> + if (rx_queue_id == -1) {
> >> + uint16_t nb_rx_queues;
> >> + uint16_t i;
> >> +
> >> + nb_rx_queues = dev_info->dev->data->nb_rx_queues;
> >> + for (i = 0; i < nb_rx_queues; i++)
> >> + rxa_sw_event_vector_configure(rx_adapter,
> >eth_dev_id, i,
> >> + config);
> >> + return;
> >> + }
> >> +
> >> + queue_info = &dev_info->rx_queue[rx_queue_id];
> >> + qi_ev = (struct rte_event *)&queue_info->event;
> >> + queue_info->ena_vector = 1;
> >> + qi_ev->event_type =
> >RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
> >> + rxa_set_vector_data(queue_info, config->vector_sz,
> >> + config->vector_timeout_ns, config-
> >>vector_mp,
> >> + rx_queue_id, dev_info->dev->data->port_id);
> >> + rx_adapter->ena_vector = 1;
> >> + rx_adapter->vector_tmo_ticks =
> >> + rx_adapter->vector_tmo_ticks ?
> >> + RTE_MIN(config->vector_timeout_ns << 1,
> >> + rx_adapter->vector_tmo_ticks) :
> >> + config->vector_timeout_ns << 1;
> >> + rx_adapter->prev_expiry_ts = 0;
> >> + TAILQ_INIT(&rx_adapter->vector_list);
> >> +}
> >> +
> >> static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
> >> uint16_t eth_dev_id,
> >> int rx_queue_id,
> >> @@ -2081,6 +2285,15 @@
> >rte_event_eth_rx_adapter_queue_add(uint8_t id,
> >> return -EINVAL;
> >> }
> >>
> >> + if ((cap &
> >RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0 &&
> >> + (queue_conf->rx_queue_flags &
> >> + RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR)) {
> >> + RTE_EDEV_LOG_ERR("Event vectorization is not
> >supported,"
> >> + " eth port: %" PRIu16 " adapter id: %"
> >PRIu8,
> >> + eth_dev_id, id);
> >> + return -EINVAL;
> >> + }
> >> +
> >> if ((cap &
> >RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
> >> (rx_queue_id != -1)) {
> >> RTE_EDEV_LOG_ERR("Rx queues can only be connected
> >to single "
> >> @@ -2143,6 +2356,17 @@
> >rte_event_eth_rx_adapter_queue_add(uint8_t id,
> >> return 0;
> >> }
> >>
> >> +static int
> >> +rxa_sw_vector_limits(struct
> >rte_event_eth_rx_adapter_vector_limits *limits)
> >> +{
> >> + limits->max_sz = MAX_VECTOR_SIZE;
> >> + limits->min_sz = MIN_VECTOR_SIZE;
> >> + limits->max_timeout_ns = MAX_VECTOR_NS;
> >> + limits->min_timeout_ns = MIN_VECTOR_NS;
> >> +
> >> + return 0;
> >> +}
> >> +
> >> int
> >> rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t
> >eth_dev_id,
> >> int32_t rx_queue_id)
> >> @@ -2333,7 +2557,8 @@
> >rte_event_eth_rx_adapter_queue_event_vector_config(
> >> ret = dev->dev_ops-
> >>eth_rx_adapter_event_vector_config(
> >> dev, &rte_eth_devices[eth_dev_id],
> >rx_queue_id, config);
> >> } else {
> >> - ret = -ENOTSUP;
> >> + rxa_sw_event_vector_configure(rx_adapter,
> >eth_dev_id,
> >> + rx_queue_id, config);
> >> }
> >>
> >> return ret;
> >> @@ -2371,7 +2596,7 @@
> >rte_event_eth_rx_adapter_vector_limits_get(
> >> ret = dev->dev_ops-
> >>eth_rx_adapter_vector_limits_get(
> >> dev, &rte_eth_devices[eth_port_id], limits);
> >> } else {
> >> - ret = -ENOTSUP;
> >> + ret = rxa_sw_vector_limits(limits);
> >> }
> >>
> >> return ret;
> >> diff --git a/lib/librte_eventdev/rte_eventdev.c
> >b/lib/librte_eventdev/rte_eventdev.c
> >> index f95edc075..254a31b1f 100644
> >> --- a/lib/librte_eventdev/rte_eventdev.c
> >> +++ b/lib/librte_eventdev/rte_eventdev.c
> >> @@ -122,7 +122,11 @@ rte_event_eth_rx_adapter_caps_get(uint8_t
> >dev_id, uint16_t eth_port_id,
> >>
> >> if (caps == NULL)
> >> return -EINVAL;
> >> - *caps = 0;
> >> +
> >> + if (dev->dev_ops->eth_rx_adapter_caps_get == NULL)
> >> + *caps = RTE_EVENT_ETH_RX_ADAPTER_SW_CAP;
> >> + else
> >> + *caps = 0;
> >
> >Any reason why we had to set default caps value? I am thinking if sw
> >event device is used, it would set it anyways.
> >
>
> There are multiple sw event devices which don't implement caps_get
> function, this changes solves that.
>
> >>
> >> return dev->dev_ops->eth_rx_adapter_caps_get ?
> >> (*dev->dev_ops-
> >>eth_rx_adapter_caps_get)(dev,
> >> --
> >> 2.17.1
>> From: Pavan Nikhilesh Bhagavatula <pbhagavatula@marvell.com>
>> Sent: Thursday, March 25, 2021 6:44 PM
>> To: Jayatheerthan, Jay <jay.jayatheerthan@intel.com>; Jerin Jacob
>Kollanukkaran <jerinj@marvell.com>; Carrillo, Erik G
>> <erik.g.carrillo@intel.com>; Gujjar, Abhinandan S
><abhinandan.gujjar@intel.com>; McDaniel, Timothy
>> <timothy.mcdaniel@intel.com>; hemant.agrawal@nxp.com; Van
>Haaren, Harry <harry.van.haaren@intel.com>; mattias.ronnblom
>> <mattias.ronnblom@ericsson.com>; Ma, Liang J
><liang.j.ma@intel.com>
>> Cc: dev@dpdk.org
>> Subject: RE: [dpdk-dev] [PATCH v5 4/8] eventdev: add Rx adapter
>event vector support
>>
>>
>>
>> >-----Original Message-----
>> >From: Jayatheerthan, Jay <jay.jayatheerthan@intel.com>
>> >Sent: Thursday, March 25, 2021 4:07 PM
>> >To: Pavan Nikhilesh Bhagavatula <pbhagavatula@marvell.com>;
>Jerin
>> >Jacob Kollanukkaran <jerinj@marvell.com>; Carrillo, Erik G
>> ><erik.g.carrillo@intel.com>; Gujjar, Abhinandan S
>> ><abhinandan.gujjar@intel.com>; McDaniel, Timothy
>> ><timothy.mcdaniel@intel.com>; hemant.agrawal@nxp.com; Van
>> >Haaren, Harry <harry.van.haaren@intel.com>; mattias.ronnblom
>> ><mattias.ronnblom@ericsson.com>; Ma, Liang J
>> ><liang.j.ma@intel.com>
>> >Cc: dev@dpdk.org
>> >Subject: [EXT] RE: [dpdk-dev] [PATCH v5 4/8] eventdev: add Rx
>adapter
>> >event vector support
>> >
>> >External Email
>> >
>> >----------------------------------------------------------------------
>> >> -----Original Message-----
>> >> From: pbhagavatula@marvell.com <pbhagavatula@marvell.com>
>> >> Sent: Wednesday, March 24, 2021 10:35 AM
>> >> To: jerinj@marvell.com; Jayatheerthan, Jay
>> ><jay.jayatheerthan@intel.com>; Carrillo, Erik G
>> ><erik.g.carrillo@intel.com>; Gujjar,
>> >> Abhinandan S <abhinandan.gujjar@intel.com>; McDaniel, Timothy
>> ><timothy.mcdaniel@intel.com>; hemant.agrawal@nxp.com; Van
>> >> Haaren, Harry <harry.van.haaren@intel.com>; mattias.ronnblom
>> ><mattias.ronnblom@ericsson.com>; Ma, Liang J
>> >> <liang.j.ma@intel.com>
>> >> Cc: dev@dpdk.org; Pavan Nikhilesh <pbhagavatula@marvell.com>
>> >> Subject: [dpdk-dev] [PATCH v5 4/8] eventdev: add Rx adapter
>event
>> >vector support
>> >>
>> >> From: Pavan Nikhilesh <pbhagavatula@marvell.com>
>> >>
>> >> Add event vector support for event eth Rx adapter, the
>> >implementation
>> >> creates vector flows based on port and queue identifier of the
>> >received
>> >> mbufs.
>> >>
>> >> Signed-off-by: Pavan Nikhilesh <pbhagavatula@marvell.com>
>> >> ---
>> >> lib/librte_eventdev/eventdev_pmd.h | 7 +-
>> >> .../rte_event_eth_rx_adapter.c | 257 ++++++++++++++++-
>-
>> >> lib/librte_eventdev/rte_eventdev.c | 6 +-
>> >> 3 files changed, 250 insertions(+), 20 deletions(-)
>> >>
>> >> diff --git a/lib/librte_eventdev/eventdev_pmd.h
>> >b/lib/librte_eventdev/eventdev_pmd.h
>> >> index 9297f1433..0f724ac85 100644
>> >> --- a/lib/librte_eventdev/eventdev_pmd.h
>> >> +++ b/lib/librte_eventdev/eventdev_pmd.h
>> >> @@ -69,9 +69,10 @@ extern "C" {
>> >> } \
>> >> } while (0)
>> >>
>> >> -#define RTE_EVENT_ETH_RX_ADAPTER_SW_CAP \
>> >> -
>> > ((RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) |
>> >\
>> >> -
>> > (RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ))
>> >> +#define RTE_EVENT_ETH_RX_ADAPTER_SW_CAP
>> >\
>> >> + ((RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) |
>> >\
>> >> + (RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) |
>> >\
>> >> + (RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR))
>> >>
>> >> #define RTE_EVENT_CRYPTO_ADAPTER_SW_CAP \
>> >>
>> > RTE_EVENT_CRYPTO_ADAPTER_CAP_SESSION_PRIVATE_DATA
>> >> diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
>> >b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
>> >> index ac8ba5bf0..c71990078 100644
>> >> --- a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
>> >> +++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
>> >> @@ -26,6 +26,10 @@
>> >> #define BATCH_SIZE 32
>> >> #define BLOCK_CNT_THRESHOLD 10
>> >> #define ETH_EVENT_BUFFER_SIZE (4*BATCH_SIZE)
>> >> +#define MAX_VECTOR_SIZE 1024
>> >> +#define MIN_VECTOR_SIZE 4
>> >> +#define MAX_VECTOR_NS 1E9
>> >> +#define MIN_VECTOR_NS 1E5
>> >>
>> >> #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
>> >> #define ETH_RX_ADAPTER_MEM_NAME_LEN 32
>> >> @@ -59,6 +63,20 @@ struct eth_rx_poll_entry {
>> >> uint16_t eth_rx_qid;
>> >> };
>> >>
>> >> +struct eth_rx_vector_data {
>> >> + TAILQ_ENTRY(eth_rx_vector_data) next;
>> >> + uint16_t port;
>> >> + uint16_t queue;
>> >> + uint16_t max_vector_count;
>> >> + uint64_t event;
>> >> + uint64_t ts;
>> >> + uint64_t vector_timeout_ticks;
>> >> + struct rte_mempool *vector_pool;
>> >> + struct rte_event_vector *vector_ev;
>> >> +} __rte_cache_aligned;
>> >> +
>> >> +TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
>> >> +
>> >> /* Instance per adapter */
>> >> struct rte_eth_event_enqueue_buffer {
>> >> /* Count of events in this buffer */
>> >> @@ -92,6 +110,14 @@ struct rte_event_eth_rx_adapter {
>> >> uint32_t wrr_pos;
>> >> /* Event burst buffer */
>> >> struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
>> >> + /* Vector enable flag */
>> >> + uint8_t ena_vector;
>> >> + /* Timestamp of previous vector expiry list traversal */
>> >> + uint64_t prev_expiry_ts;
>> >> + /* Minimum ticks to wait before traversing expiry list */
>> >> + uint64_t vector_tmo_ticks;
>> >> + /* vector list */
>> >> + struct eth_rx_vector_data_list vector_list;
>> >> /* Per adapter stats */
>> >> struct rte_event_eth_rx_adapter_stats stats;
>> >> /* Block count, counts up to BLOCK_CNT_THRESHOLD */
>> >> @@ -198,9 +224,11 @@ struct eth_device_info {
>> >> struct eth_rx_queue_info {
>> >> int queue_enabled; /* True if added */
>> >> int intr_enabled;
>> >> + uint8_t ena_vector;
>> >> uint16_t wt; /* Polling weight */
>> >> uint32_t flow_id_mask; /* Set to ~0 if app provides flow id else
>> >0 */
>> >> uint64_t event;
>> >> + struct eth_rx_vector_data vector_data;
>> >> };
>> >>
>> >> static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
>> >> @@ -722,6 +750,9 @@ rxa_flush_event_buffer(struct
>> >rte_event_eth_rx_adapter *rx_adapter)
>> >> &rx_adapter->event_enqueue_buffer;
>> >> struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter-
>> >>stats;
>> >>
>> >> + if (!buf->count)
>> >> + return 0;
>> >> +
>> >> uint16_t n = rte_event_enqueue_new_burst(rx_adapter-
>> >>eventdev_id,
>> >> rx_adapter->event_port_id,
>> >> buf->events,
>> >> @@ -742,6 +773,72 @@ rxa_flush_event_buffer(struct
>> >rte_event_eth_rx_adapter *rx_adapter)
>> >> return n;
>> >> }
>> >>
>> >> +static inline uint16_t
>> >> +rxa_create_event_vector(struct rte_event_eth_rx_adapter
>> >*rx_adapter,
>> >> + struct eth_rx_queue_info *queue_info,
>> >> + struct rte_eth_event_enqueue_buffer *buf,
>> >> + struct rte_mbuf **mbufs, uint16_t num)
>> >> +{
>> >> + struct rte_event *ev = &buf->events[buf->count];
>> >> + struct eth_rx_vector_data *vec;
>> >> + uint16_t filled, space, sz;
>> >> +
>> >> + filled = 0;
>> >> + vec = &queue_info->vector_data;
>> >> + while (num) {
>> >> + if (vec->vector_ev == NULL) {
>> >> + if (rte_mempool_get(vec->vector_pool,
>> >> + (void **)&vec->vector_ev) <
>> >0) {
>> >> + rte_pktmbuf_free_bulk(mbufs, num);
>> >> + return 0;
>> >> + }
>> >> + vec->vector_ev->nb_elem = 0;
>> >> + vec->vector_ev->port = vec->port;
>> >> + vec->vector_ev->queue = vec->queue;
>> >> + vec->vector_ev->attr_valid = true;
>> >> + TAILQ_INSERT_TAIL(&rx_adapter->vector_list,
>> >vec, next);
>> >> + } else if (vec->vector_ev->nb_elem == vec-
>> >>max_vector_count) {
>> >
>> >Is there a case where nb_elem > max_vector_count as we
>accumulate
>> >sz to it ?
>>
>> I don't think so, that would overflow the vector event.
>>
>> >
>> >> + /* Event ready. */
>> >> + ev->event = vec->event;
>> >> + ev->vec = vec->vector_ev;
>> >> + ev++;
>> >> + filled++;
>> >> + vec->vector_ev = NULL;
>> >> + TAILQ_REMOVE(&rx_adapter->vector_list, vec,
>> >next);
>> >> + if (rte_mempool_get(vec->vector_pool,
>> >> + (void **)&vec->vector_ev) <
>> >0) {
>> >> + rte_pktmbuf_free_bulk(mbufs, num);
>> >> + return 0;
>> >> + }
>> >> + vec->vector_ev->nb_elem = 0;
>> >> + vec->vector_ev->port = vec->port;
>> >> + vec->vector_ev->queue = vec->queue;
>> >> + vec->vector_ev->attr_valid = true;
>> >> + TAILQ_INSERT_TAIL(&rx_adapter->vector_list,
>> >vec, next);
>> >> + }
>> >> +
>> >> + space = vec->max_vector_count - vec->vector_ev-
>> >>nb_elem;
>> >> + sz = num > space ? space : num;
>> >> + memcpy(vec->vector_ev->mbufs + vec->vector_ev-
>> >>nb_elem, mbufs,
>> >> + sizeof(void *) * sz);
>> >> + vec->vector_ev->nb_elem += sz;
>> >> + num -= sz;
>> >> + mbufs += sz;
>> >> + vec->ts = rte_rdtsc();
>> >> + }
>> >> +
>> >> + if (vec->vector_ev->nb_elem == vec->max_vector_count) {
>> >
>> >Same here.
>> >
>> >> + ev->event = vec->event;
>> >> + ev->vec = vec->vector_ev;
>> >> + ev++;
>> >> + filled++;
>> >> + vec->vector_ev = NULL;
>> >> + TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
>> >> + }
>> >> +
>> >> + return filled;
>> >> +}
>> >
>> >I am seeing more than one repeating code chunks in this function.
>> >Perhaps, you can give it a try to not repeat. We can drop if its
>> >performance affecting.
>>
>> I will try to move them to inline functions and test.
>>
>> >
>> >> +
>> >> static inline void
>> >> rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
>> >> uint16_t eth_dev_id,
>> >> @@ -770,25 +867,30 @@ rxa_buffer_mbufs(struct
>> >rte_event_eth_rx_adapter *rx_adapter,
>> >> rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
>> >> do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
>> >
>> >The RSS related code is executed for vector case as well. Can this be
>> >moved inside ena_vector if condition ?
>>
>> RSS is used to generate the event flowid, in vector case the flow id
>> Will be a combination of port and queue id.
>> The idea is that flows having the same RSS LSB will end up in the same
>> queue.
>>
>
>I meant to say, rss_mask and do_rss are used only when ena_vector is
>false. Could be moved inside the appropriate condition ?
>
Ah! I see I will move them inside the conditional.
>> >
>> >>
>> >> - for (i = 0; i < num; i++) {
>> >> - m = mbufs[i];
>> >> -
>> >> - rss = do_rss ?
>> >> - rxa_do_softrss(m, rx_adapter->rss_key_be) :
>> >> - m->hash.rss;
>> >> - ev->event = event;
>> >> - ev->flow_id = (rss & ~flow_id_mask) |
>> >> - (ev->flow_id & flow_id_mask);
>> >> - ev->mbuf = m;
>> >> - ev++;
>> >> + if (!eth_rx_queue_info->ena_vector) {
>> >> + for (i = 0; i < num; i++) {
>> >> + m = mbufs[i];
>> >> +
>> >> + rss = do_rss ? rxa_do_softrss(m, rx_adapter-
>> >>rss_key_be)
>> >> + : m->hash.rss;
>> >> + ev->event = event;
>> >> + ev->flow_id = (rss & ~flow_id_mask) |
>> >> + (ev->flow_id & flow_id_mask);
>> >> + ev->mbuf = m;
>> >> + ev++;
>> >> + }
>> >> + } else {
>> >> + num = rxa_create_event_vector(rx_adapter,
>> >eth_rx_queue_info,
>> >> + buf, mbufs, num);
>> >> }
>> >>
>> >> - if (dev_info->cb_fn) {
>> >> + if (num && dev_info->cb_fn) {
>> >>
>> >> dropped = 0;
>> >> nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
>> >> - ETH_EVENT_BUFFER_SIZE, buf-
>> >>count, ev,
>> >> - num, dev_info->cb_arg,
>> >&dropped);
>> >> + ETH_EVENT_BUFFER_SIZE, buf-
>> >>count,
>> >> + &buf->events[buf->count],
>> >num,
>> >> + dev_info->cb_arg, &dropped);
>> >
>> >Before this patch, we pass ev which is &buf->events[buf->count] +
>num
>> >as fifth param when calling cb_fn. Now, we are passing &buf-
>> >>events[buf->count] for non-vector case. Do you see this as an
>issue?
>> >
>>
>> The callback function takes in the array newly formed events i.e. we
>need
>> to pass the start of array and the count.
>>
>> the previous code had a bug where it passes the end of the event list.
>
>ok, that makes sense.
>
>>
>> >Also, for vector case would it make sense to do pass &buf-
>>events[buf-
>> >>count] + num ?
>> >
>> >> if (unlikely(nb_cb > num))
>> >> RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d)
>> >events",
>> >> nb_cb, num);
>> >> @@ -1124,6 +1226,30 @@ rxa_poll(struct
>rte_event_eth_rx_adapter
>> >*rx_adapter)
>> >> return nb_rx;
>> >> }
>> >>
>> >> +static void
>> >> +rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
>> >> +{
>> >> + struct rte_event_eth_rx_adapter *rx_adapter = arg;
>> >> + struct rte_eth_event_enqueue_buffer *buf =
>> >> + &rx_adapter->event_enqueue_buffer;
>> >> + struct rte_event *ev;
>> >> +
>> >> + if (buf->count)
>> >> + rxa_flush_event_buffer(rx_adapter);
>> >> +
>> >> + if (vec->vector_ev->nb_elem == 0)
>> >> + return;
>> >> + ev = &buf->events[buf->count];
>> >> +
>> >> + /* Event ready. */
>> >> + ev->event = vec->event;
>> >> + ev->vec = vec->vector_ev;
>> >> + buf->count++;
>> >> +
>> >> + vec->vector_ev = NULL;
>> >> + vec->ts = 0;
>> >> +}
>> >> +
>> >> static int
>> >> rxa_service_func(void *args)
>> >> {
>> >> @@ -1137,6 +1263,24 @@ rxa_service_func(void *args)
>> >> return 0;
>> >> }
>> >>
>> >> + if (rx_adapter->ena_vector) {
>> >> + if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
>> >> + rx_adapter->vector_tmo_ticks) {
>> >> + struct eth_rx_vector_data *vec;
>> >> +
>> >> + TAILQ_FOREACH(vec, &rx_adapter->vector_list,
>> >next) {
>> >> + uint64_t elapsed_time = rte_rdtsc() -
>> >vec->ts;
>> >> +
>> >> + if (elapsed_time >= vec-
>> >>vector_timeout_ticks) {
>> >> + rxa_vector_expire(vec,
>> >rx_adapter);
>> >> + TAILQ_REMOVE(&rx_adapter-
>> >>vector_list,
>> >> + vec, next);
>> >> + }
>> >> + }
>> >> + rx_adapter->prev_expiry_ts = rte_rdtsc();
>> >> + }
>> >> + }
>> >> +
>> >> stats = &rx_adapter->stats;
>> >> stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
>> >> stats->rx_packets += rxa_poll(rx_adapter);
>> >> @@ -1640,6 +1784,28 @@ rxa_update_queue(struct
>> >rte_event_eth_rx_adapter *rx_adapter,
>> >> }
>> >> }
>> >>
>> >> +static void
>> >> +rxa_set_vector_data(struct eth_rx_queue_info *queue_info,
>> >uint16_t vector_count,
>> >> + uint64_t vector_ns, struct rte_mempool *mp, int32_t
>> >qid,
>> >> + uint16_t port_id)
>> >> +{
>> >> +#define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
>> >> + struct eth_rx_vector_data *vector_data;
>> >> + uint32_t flow_id;
>> >> +
>> >> + vector_data = &queue_info->vector_data;
>> >> + vector_data->max_vector_count = vector_count;
>> >> + vector_data->port = port_id;
>> >> + vector_data->queue = qid;
>> >> + vector_data->vector_pool = mp;
>> >> + vector_data->vector_timeout_ticks =
>> >> + NSEC2TICK(vector_ns, rte_get_timer_hz());
>> >> + vector_data->ts = 0;
>> >> + flow_id = queue_info->event & 0xFFFFF;
>> >> + flow_id = flow_id == 0 ? (qid & 0xFF) | (port_id & 0xFFFF) :
>> >flow_id;
>> >
>> >Maybe I am missing something here. Looking at the code it looks like
>> >qid and port_id may overlap. For e.g., if qid = 0x10 and port_id =
>0x11,
>> >flow_id would end up being 0x11. Is this the expectation? Also, it
>may
>> >be useful to document flow_id format.
>>
>> The flow_id is 20 bit, I guess we could do 12bit queue_id and 8bit port
>> as a flow.
>
>This sounds reasonable to me. It would be useful to have the flow_id
>format and how it is used for vectorization in Rx/Tx adapter
>documentation.
This is only applicable to the SW Rx adapter implementation a HW implementation might have
its own way of implementing the flow aggregation.
There is no documentation specific to SW Rx adapter, I will add it to the commit log.
>
>>
>> >Comparing this format with existing RSS hash based method, are we
>> >saying that all mbufs received in a rx burst are part of same flow
>when
>> >vectorization is used?
>>
>> Yes, the hard way to do this is to use a hash table and treating each
>> mbuf having an unique flow.
>>
>> >
>> >> + vector_data->event = (queue_info->event & ~0xFFFFF) |
>> >flow_id;
>> >> +}
>> >> +
>> >> static void
>> >> rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
>> >> struct eth_device_info *dev_info,
>> >> @@ -1741,6 +1907,44 @@ rxa_add_queue(struct
>> >rte_event_eth_rx_adapter *rx_adapter,
>> >> }
>> >> }
>> >>
>> >> +static void
>> >> +rxa_sw_event_vector_configure(
>> >> + struct rte_event_eth_rx_adapter *rx_adapter, uint16_t
>> >eth_dev_id,
>> >> + int rx_queue_id,
>> >> + const struct rte_event_eth_rx_adapter_event_vector_config
>> >*config)
>> >> +{
>> >> + struct eth_device_info *dev_info = &rx_adapter-
>> >>eth_devices[eth_dev_id];
>> >> + struct eth_rx_queue_info *queue_info;
>> >> + struct rte_event *qi_ev;
>> >> +
>> >> + if (rx_queue_id == -1) {
>> >> + uint16_t nb_rx_queues;
>> >> + uint16_t i;
>> >> +
>> >> + nb_rx_queues = dev_info->dev->data->nb_rx_queues;
>> >> + for (i = 0; i < nb_rx_queues; i++)
>> >> + rxa_sw_event_vector_configure(rx_adapter,
>> >eth_dev_id, i,
>> >> + config);
>> >> + return;
>> >> + }
>> >> +
>> >> + queue_info = &dev_info->rx_queue[rx_queue_id];
>> >> + qi_ev = (struct rte_event *)&queue_info->event;
>> >> + queue_info->ena_vector = 1;
>> >> + qi_ev->event_type =
>> >RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
>> >> + rxa_set_vector_data(queue_info, config->vector_sz,
>> >> + config->vector_timeout_ns, config-
>> >>vector_mp,
>> >> + rx_queue_id, dev_info->dev->data->port_id);
>> >> + rx_adapter->ena_vector = 1;
>> >> + rx_adapter->vector_tmo_ticks =
>> >> + rx_adapter->vector_tmo_ticks ?
>> >> + RTE_MIN(config->vector_timeout_ns << 1,
>> >> + rx_adapter->vector_tmo_ticks) :
>> >> + config->vector_timeout_ns << 1;
>> >> + rx_adapter->prev_expiry_ts = 0;
>> >> + TAILQ_INIT(&rx_adapter->vector_list);
>> >> +}
>> >> +
>> >> static int rxa_sw_add(struct rte_event_eth_rx_adapter
>*rx_adapter,
>> >> uint16_t eth_dev_id,
>> >> int rx_queue_id,
>> >> @@ -2081,6 +2285,15 @@
>> >rte_event_eth_rx_adapter_queue_add(uint8_t id,
>> >> return -EINVAL;
>> >> }
>> >>
>> >> + if ((cap &
>> >RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0 &&
>> >> + (queue_conf->rx_queue_flags &
>> >> + RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR)) {
>> >> + RTE_EDEV_LOG_ERR("Event vectorization is not
>> >supported,"
>> >> + " eth port: %" PRIu16 " adapter id: %"
>> >PRIu8,
>> >> + eth_dev_id, id);
>> >> + return -EINVAL;
>> >> + }
>> >> +
>> >> if ((cap &
>> >RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
>> >> (rx_queue_id != -1)) {
>> >> RTE_EDEV_LOG_ERR("Rx queues can only be connected
>> >to single "
>> >> @@ -2143,6 +2356,17 @@
>> >rte_event_eth_rx_adapter_queue_add(uint8_t id,
>> >> return 0;
>> >> }
>> >>
>> >> +static int
>> >> +rxa_sw_vector_limits(struct
>> >rte_event_eth_rx_adapter_vector_limits *limits)
>> >> +{
>> >> + limits->max_sz = MAX_VECTOR_SIZE;
>> >> + limits->min_sz = MIN_VECTOR_SIZE;
>> >> + limits->max_timeout_ns = MAX_VECTOR_NS;
>> >> + limits->min_timeout_ns = MIN_VECTOR_NS;
>> >> +
>> >> + return 0;
>> >> +}
>> >> +
>> >> int
>> >> rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t
>> >eth_dev_id,
>> >> int32_t rx_queue_id)
>> >> @@ -2333,7 +2557,8 @@
>> >rte_event_eth_rx_adapter_queue_event_vector_config(
>> >> ret = dev->dev_ops-
>> >>eth_rx_adapter_event_vector_config(
>> >> dev, &rte_eth_devices[eth_dev_id],
>> >rx_queue_id, config);
>> >> } else {
>> >> - ret = -ENOTSUP;
>> >> + rxa_sw_event_vector_configure(rx_adapter,
>> >eth_dev_id,
>> >> + rx_queue_id, config);
>> >> }
>> >>
>> >> return ret;
>> >> @@ -2371,7 +2596,7 @@
>> >rte_event_eth_rx_adapter_vector_limits_get(
>> >> ret = dev->dev_ops-
>> >>eth_rx_adapter_vector_limits_get(
>> >> dev, &rte_eth_devices[eth_port_id], limits);
>> >> } else {
>> >> - ret = -ENOTSUP;
>> >> + ret = rxa_sw_vector_limits(limits);
>> >> }
>> >>
>> >> return ret;
>> >> diff --git a/lib/librte_eventdev/rte_eventdev.c
>> >b/lib/librte_eventdev/rte_eventdev.c
>> >> index f95edc075..254a31b1f 100644
>> >> --- a/lib/librte_eventdev/rte_eventdev.c
>> >> +++ b/lib/librte_eventdev/rte_eventdev.c
>> >> @@ -122,7 +122,11 @@
>rte_event_eth_rx_adapter_caps_get(uint8_t
>> >dev_id, uint16_t eth_port_id,
>> >>
>> >> if (caps == NULL)
>> >> return -EINVAL;
>> >> - *caps = 0;
>> >> +
>> >> + if (dev->dev_ops->eth_rx_adapter_caps_get == NULL)
>> >> + *caps = RTE_EVENT_ETH_RX_ADAPTER_SW_CAP;
>> >> + else
>> >> + *caps = 0;
>> >
>> >Any reason why we had to set default caps value? I am thinking if sw
>> >event device is used, it would set it anyways.
>> >
>>
>> There are multiple sw event devices which don't implement caps_get
>> function, this changes solves that.
>>
>> >>
>> >> return dev->dev_ops->eth_rx_adapter_caps_get ?
>> >> (*dev->dev_ops-
>> >>eth_rx_adapter_caps_get)(dev,
>> >> --
>> >> 2.17.1
@@ -69,9 +69,10 @@ extern "C" {
} \
} while (0)
-#define RTE_EVENT_ETH_RX_ADAPTER_SW_CAP \
- ((RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) | \
- (RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ))
+#define RTE_EVENT_ETH_RX_ADAPTER_SW_CAP \
+ ((RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) | \
+ (RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) | \
+ (RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR))
#define RTE_EVENT_CRYPTO_ADAPTER_SW_CAP \
RTE_EVENT_CRYPTO_ADAPTER_CAP_SESSION_PRIVATE_DATA
@@ -26,6 +26,10 @@
#define BATCH_SIZE 32
#define BLOCK_CNT_THRESHOLD 10
#define ETH_EVENT_BUFFER_SIZE (4*BATCH_SIZE)
+#define MAX_VECTOR_SIZE 1024
+#define MIN_VECTOR_SIZE 4
+#define MAX_VECTOR_NS 1E9
+#define MIN_VECTOR_NS 1E5
#define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
#define ETH_RX_ADAPTER_MEM_NAME_LEN 32
@@ -59,6 +63,20 @@ struct eth_rx_poll_entry {
uint16_t eth_rx_qid;
};
+struct eth_rx_vector_data {
+ TAILQ_ENTRY(eth_rx_vector_data) next;
+ uint16_t port;
+ uint16_t queue;
+ uint16_t max_vector_count;
+ uint64_t event;
+ uint64_t ts;
+ uint64_t vector_timeout_ticks;
+ struct rte_mempool *vector_pool;
+ struct rte_event_vector *vector_ev;
+} __rte_cache_aligned;
+
+TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
+
/* Instance per adapter */
struct rte_eth_event_enqueue_buffer {
/* Count of events in this buffer */
@@ -92,6 +110,14 @@ struct rte_event_eth_rx_adapter {
uint32_t wrr_pos;
/* Event burst buffer */
struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
+ /* Vector enable flag */
+ uint8_t ena_vector;
+ /* Timestamp of previous vector expiry list traversal */
+ uint64_t prev_expiry_ts;
+ /* Minimum ticks to wait before traversing expiry list */
+ uint64_t vector_tmo_ticks;
+ /* vector list */
+ struct eth_rx_vector_data_list vector_list;
/* Per adapter stats */
struct rte_event_eth_rx_adapter_stats stats;
/* Block count, counts up to BLOCK_CNT_THRESHOLD */
@@ -198,9 +224,11 @@ struct eth_device_info {
struct eth_rx_queue_info {
int queue_enabled; /* True if added */
int intr_enabled;
+ uint8_t ena_vector;
uint16_t wt; /* Polling weight */
uint32_t flow_id_mask; /* Set to ~0 if app provides flow id else 0 */
uint64_t event;
+ struct eth_rx_vector_data vector_data;
};
static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
@@ -722,6 +750,9 @@ rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
&rx_adapter->event_enqueue_buffer;
struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
+ if (!buf->count)
+ return 0;
+
uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
rx_adapter->event_port_id,
buf->events,
@@ -742,6 +773,72 @@ rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
return n;
}
+static inline uint16_t
+rxa_create_event_vector(struct rte_event_eth_rx_adapter *rx_adapter,
+ struct eth_rx_queue_info *queue_info,
+ struct rte_eth_event_enqueue_buffer *buf,
+ struct rte_mbuf **mbufs, uint16_t num)
+{
+ struct rte_event *ev = &buf->events[buf->count];
+ struct eth_rx_vector_data *vec;
+ uint16_t filled, space, sz;
+
+ filled = 0;
+ vec = &queue_info->vector_data;
+ while (num) {
+ if (vec->vector_ev == NULL) {
+ if (rte_mempool_get(vec->vector_pool,
+ (void **)&vec->vector_ev) < 0) {
+ rte_pktmbuf_free_bulk(mbufs, num);
+ return 0;
+ }
+ vec->vector_ev->nb_elem = 0;
+ vec->vector_ev->port = vec->port;
+ vec->vector_ev->queue = vec->queue;
+ vec->vector_ev->attr_valid = true;
+ TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
+ } else if (vec->vector_ev->nb_elem == vec->max_vector_count) {
+ /* Event ready. */
+ ev->event = vec->event;
+ ev->vec = vec->vector_ev;
+ ev++;
+ filled++;
+ vec->vector_ev = NULL;
+ TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
+ if (rte_mempool_get(vec->vector_pool,
+ (void **)&vec->vector_ev) < 0) {
+ rte_pktmbuf_free_bulk(mbufs, num);
+ return 0;
+ }
+ vec->vector_ev->nb_elem = 0;
+ vec->vector_ev->port = vec->port;
+ vec->vector_ev->queue = vec->queue;
+ vec->vector_ev->attr_valid = true;
+ TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
+ }
+
+ space = vec->max_vector_count - vec->vector_ev->nb_elem;
+ sz = num > space ? space : num;
+ memcpy(vec->vector_ev->mbufs + vec->vector_ev->nb_elem, mbufs,
+ sizeof(void *) * sz);
+ vec->vector_ev->nb_elem += sz;
+ num -= sz;
+ mbufs += sz;
+ vec->ts = rte_rdtsc();
+ }
+
+ if (vec->vector_ev->nb_elem == vec->max_vector_count) {
+ ev->event = vec->event;
+ ev->vec = vec->vector_ev;
+ ev++;
+ filled++;
+ vec->vector_ev = NULL;
+ TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
+ }
+
+ return filled;
+}
+
static inline void
rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
uint16_t eth_dev_id,
@@ -770,25 +867,30 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
- for (i = 0; i < num; i++) {
- m = mbufs[i];
-
- rss = do_rss ?
- rxa_do_softrss(m, rx_adapter->rss_key_be) :
- m->hash.rss;
- ev->event = event;
- ev->flow_id = (rss & ~flow_id_mask) |
- (ev->flow_id & flow_id_mask);
- ev->mbuf = m;
- ev++;
+ if (!eth_rx_queue_info->ena_vector) {
+ for (i = 0; i < num; i++) {
+ m = mbufs[i];
+
+ rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
+ : m->hash.rss;
+ ev->event = event;
+ ev->flow_id = (rss & ~flow_id_mask) |
+ (ev->flow_id & flow_id_mask);
+ ev->mbuf = m;
+ ev++;
+ }
+ } else {
+ num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
+ buf, mbufs, num);
}
- if (dev_info->cb_fn) {
+ if (num && dev_info->cb_fn) {
dropped = 0;
nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
- ETH_EVENT_BUFFER_SIZE, buf->count, ev,
- num, dev_info->cb_arg, &dropped);
+ ETH_EVENT_BUFFER_SIZE, buf->count,
+ &buf->events[buf->count], num,
+ dev_info->cb_arg, &dropped);
if (unlikely(nb_cb > num))
RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
nb_cb, num);
@@ -1124,6 +1226,30 @@ rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
return nb_rx;
}
+static void
+rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
+{
+ struct rte_event_eth_rx_adapter *rx_adapter = arg;
+ struct rte_eth_event_enqueue_buffer *buf =
+ &rx_adapter->event_enqueue_buffer;
+ struct rte_event *ev;
+
+ if (buf->count)
+ rxa_flush_event_buffer(rx_adapter);
+
+ if (vec->vector_ev->nb_elem == 0)
+ return;
+ ev = &buf->events[buf->count];
+
+ /* Event ready. */
+ ev->event = vec->event;
+ ev->vec = vec->vector_ev;
+ buf->count++;
+
+ vec->vector_ev = NULL;
+ vec->ts = 0;
+}
+
static int
rxa_service_func(void *args)
{
@@ -1137,6 +1263,24 @@ rxa_service_func(void *args)
return 0;
}
+ if (rx_adapter->ena_vector) {
+ if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
+ rx_adapter->vector_tmo_ticks) {
+ struct eth_rx_vector_data *vec;
+
+ TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
+ uint64_t elapsed_time = rte_rdtsc() - vec->ts;
+
+ if (elapsed_time >= vec->vector_timeout_ticks) {
+ rxa_vector_expire(vec, rx_adapter);
+ TAILQ_REMOVE(&rx_adapter->vector_list,
+ vec, next);
+ }
+ }
+ rx_adapter->prev_expiry_ts = rte_rdtsc();
+ }
+ }
+
stats = &rx_adapter->stats;
stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
stats->rx_packets += rxa_poll(rx_adapter);
@@ -1640,6 +1784,28 @@ rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
}
}
+static void
+rxa_set_vector_data(struct eth_rx_queue_info *queue_info, uint16_t vector_count,
+ uint64_t vector_ns, struct rte_mempool *mp, int32_t qid,
+ uint16_t port_id)
+{
+#define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
+ struct eth_rx_vector_data *vector_data;
+ uint32_t flow_id;
+
+ vector_data = &queue_info->vector_data;
+ vector_data->max_vector_count = vector_count;
+ vector_data->port = port_id;
+ vector_data->queue = qid;
+ vector_data->vector_pool = mp;
+ vector_data->vector_timeout_ticks =
+ NSEC2TICK(vector_ns, rte_get_timer_hz());
+ vector_data->ts = 0;
+ flow_id = queue_info->event & 0xFFFFF;
+ flow_id = flow_id == 0 ? (qid & 0xFF) | (port_id & 0xFFFF) : flow_id;
+ vector_data->event = (queue_info->event & ~0xFFFFF) | flow_id;
+}
+
static void
rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
struct eth_device_info *dev_info,
@@ -1741,6 +1907,44 @@ rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
}
}
+static void
+rxa_sw_event_vector_configure(
+ struct rte_event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
+ int rx_queue_id,
+ const struct rte_event_eth_rx_adapter_event_vector_config *config)
+{
+ struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
+ struct eth_rx_queue_info *queue_info;
+ struct rte_event *qi_ev;
+
+ if (rx_queue_id == -1) {
+ uint16_t nb_rx_queues;
+ uint16_t i;
+
+ nb_rx_queues = dev_info->dev->data->nb_rx_queues;
+ for (i = 0; i < nb_rx_queues; i++)
+ rxa_sw_event_vector_configure(rx_adapter, eth_dev_id, i,
+ config);
+ return;
+ }
+
+ queue_info = &dev_info->rx_queue[rx_queue_id];
+ qi_ev = (struct rte_event *)&queue_info->event;
+ queue_info->ena_vector = 1;
+ qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
+ rxa_set_vector_data(queue_info, config->vector_sz,
+ config->vector_timeout_ns, config->vector_mp,
+ rx_queue_id, dev_info->dev->data->port_id);
+ rx_adapter->ena_vector = 1;
+ rx_adapter->vector_tmo_ticks =
+ rx_adapter->vector_tmo_ticks ?
+ RTE_MIN(config->vector_timeout_ns << 1,
+ rx_adapter->vector_tmo_ticks) :
+ config->vector_timeout_ns << 1;
+ rx_adapter->prev_expiry_ts = 0;
+ TAILQ_INIT(&rx_adapter->vector_list);
+}
+
static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
uint16_t eth_dev_id,
int rx_queue_id,
@@ -2081,6 +2285,15 @@ rte_event_eth_rx_adapter_queue_add(uint8_t id,
return -EINVAL;
}
+ if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0 &&
+ (queue_conf->rx_queue_flags &
+ RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR)) {
+ RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
+ " eth port: %" PRIu16 " adapter id: %" PRIu8,
+ eth_dev_id, id);
+ return -EINVAL;
+ }
+
if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
(rx_queue_id != -1)) {
RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
@@ -2143,6 +2356,17 @@ rte_event_eth_rx_adapter_queue_add(uint8_t id,
return 0;
}
+static int
+rxa_sw_vector_limits(struct rte_event_eth_rx_adapter_vector_limits *limits)
+{
+ limits->max_sz = MAX_VECTOR_SIZE;
+ limits->min_sz = MIN_VECTOR_SIZE;
+ limits->max_timeout_ns = MAX_VECTOR_NS;
+ limits->min_timeout_ns = MIN_VECTOR_NS;
+
+ return 0;
+}
+
int
rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
int32_t rx_queue_id)
@@ -2333,7 +2557,8 @@ rte_event_eth_rx_adapter_queue_event_vector_config(
ret = dev->dev_ops->eth_rx_adapter_event_vector_config(
dev, &rte_eth_devices[eth_dev_id], rx_queue_id, config);
} else {
- ret = -ENOTSUP;
+ rxa_sw_event_vector_configure(rx_adapter, eth_dev_id,
+ rx_queue_id, config);
}
return ret;
@@ -2371,7 +2596,7 @@ rte_event_eth_rx_adapter_vector_limits_get(
ret = dev->dev_ops->eth_rx_adapter_vector_limits_get(
dev, &rte_eth_devices[eth_port_id], limits);
} else {
- ret = -ENOTSUP;
+ ret = rxa_sw_vector_limits(limits);
}
return ret;
@@ -122,7 +122,11 @@ rte_event_eth_rx_adapter_caps_get(uint8_t dev_id, uint16_t eth_port_id,
if (caps == NULL)
return -EINVAL;
- *caps = 0;
+
+ if (dev->dev_ops->eth_rx_adapter_caps_get == NULL)
+ *caps = RTE_EVENT_ETH_RX_ADAPTER_SW_CAP;
+ else
+ *caps = 0;
return dev->dev_ops->eth_rx_adapter_caps_get ?
(*dev->dev_ops->eth_rx_adapter_caps_get)(dev,