diff mbox series

[v5,4/8] eventdev: add Rx adapter event vector support

Message ID 20210324050525.4489-5-pbhagavatula@marvell.com (mailing list archive)
State Superseded, archived
Delegated to: Jerin Jacob
Headers show
Series Introduce event vectorization | expand

Checks

Context Check Description
ci/checkpatch success coding style OK

Commit Message

Pavan Nikhilesh Bhagavatula March 24, 2021, 5:05 a.m. UTC
From: Pavan Nikhilesh <pbhagavatula@marvell.com>

Add event vector support for event eth Rx adapter, the implementation
creates vector flows based on port and queue identifier of the received
mbufs.

Signed-off-by: Pavan Nikhilesh <pbhagavatula@marvell.com>
---
 lib/librte_eventdev/eventdev_pmd.h            |   7 +-
 .../rte_event_eth_rx_adapter.c                | 257 ++++++++++++++++--
 lib/librte_eventdev/rte_eventdev.c            |   6 +-
 3 files changed, 250 insertions(+), 20 deletions(-)

Comments

Jayatheerthan, Jay March 25, 2021, 10:37 a.m. UTC | #1
> -----Original Message-----
> From: pbhagavatula@marvell.com <pbhagavatula@marvell.com>
> Sent: Wednesday, March 24, 2021 10:35 AM
> To: jerinj@marvell.com; Jayatheerthan, Jay <jay.jayatheerthan@intel.com>; Carrillo, Erik G <erik.g.carrillo@intel.com>; Gujjar,
> Abhinandan S <abhinandan.gujjar@intel.com>; McDaniel, Timothy <timothy.mcdaniel@intel.com>; hemant.agrawal@nxp.com; Van
> Haaren, Harry <harry.van.haaren@intel.com>; mattias.ronnblom <mattias.ronnblom@ericsson.com>; Ma, Liang J
> <liang.j.ma@intel.com>
> Cc: dev@dpdk.org; Pavan Nikhilesh <pbhagavatula@marvell.com>
> Subject: [dpdk-dev] [PATCH v5 4/8] eventdev: add Rx adapter event vector support
> 
> From: Pavan Nikhilesh <pbhagavatula@marvell.com>
> 
> Add event vector support for event eth Rx adapter, the implementation
> creates vector flows based on port and queue identifier of the received
> mbufs.
> 
> Signed-off-by: Pavan Nikhilesh <pbhagavatula@marvell.com>
> ---
>  lib/librte_eventdev/eventdev_pmd.h            |   7 +-
>  .../rte_event_eth_rx_adapter.c                | 257 ++++++++++++++++--
>  lib/librte_eventdev/rte_eventdev.c            |   6 +-
>  3 files changed, 250 insertions(+), 20 deletions(-)
> 
> diff --git a/lib/librte_eventdev/eventdev_pmd.h b/lib/librte_eventdev/eventdev_pmd.h
> index 9297f1433..0f724ac85 100644
> --- a/lib/librte_eventdev/eventdev_pmd.h
> +++ b/lib/librte_eventdev/eventdev_pmd.h
> @@ -69,9 +69,10 @@ extern "C" {
>  	} \
>  } while (0)
> 
> -#define RTE_EVENT_ETH_RX_ADAPTER_SW_CAP \
> -		((RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) | \
> -			(RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ))
> +#define RTE_EVENT_ETH_RX_ADAPTER_SW_CAP                                        \
> +	((RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) |                     \
> +	 (RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) |                         \
> +	 (RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR))
> 
>  #define RTE_EVENT_CRYPTO_ADAPTER_SW_CAP \
>  		RTE_EVENT_CRYPTO_ADAPTER_CAP_SESSION_PRIVATE_DATA
> diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
> index ac8ba5bf0..c71990078 100644
> --- a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
> +++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
> @@ -26,6 +26,10 @@
>  #define BATCH_SIZE		32
>  #define BLOCK_CNT_THRESHOLD	10
>  #define ETH_EVENT_BUFFER_SIZE	(4*BATCH_SIZE)
> +#define MAX_VECTOR_SIZE		1024
> +#define MIN_VECTOR_SIZE		4
> +#define MAX_VECTOR_NS		1E9
> +#define MIN_VECTOR_NS		1E5
> 
>  #define ETH_RX_ADAPTER_SERVICE_NAME_LEN	32
>  #define ETH_RX_ADAPTER_MEM_NAME_LEN	32
> @@ -59,6 +63,20 @@ struct eth_rx_poll_entry {
>  	uint16_t eth_rx_qid;
>  };
> 
> +struct eth_rx_vector_data {
> +	TAILQ_ENTRY(eth_rx_vector_data) next;
> +	uint16_t port;
> +	uint16_t queue;
> +	uint16_t max_vector_count;
> +	uint64_t event;
> +	uint64_t ts;
> +	uint64_t vector_timeout_ticks;
> +	struct rte_mempool *vector_pool;
> +	struct rte_event_vector *vector_ev;
> +} __rte_cache_aligned;
> +
> +TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
> +
>  /* Instance per adapter */
>  struct rte_eth_event_enqueue_buffer {
>  	/* Count of events in this buffer */
> @@ -92,6 +110,14 @@ struct rte_event_eth_rx_adapter {
>  	uint32_t wrr_pos;
>  	/* Event burst buffer */
>  	struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
> +	/* Vector enable flag */
> +	uint8_t ena_vector;
> +	/* Timestamp of previous vector expiry list traversal */
> +	uint64_t prev_expiry_ts;
> +	/* Minimum ticks to wait before traversing expiry list */
> +	uint64_t vector_tmo_ticks;
> +	/* vector list */
> +	struct eth_rx_vector_data_list vector_list;
>  	/* Per adapter stats */
>  	struct rte_event_eth_rx_adapter_stats stats;
>  	/* Block count, counts up to BLOCK_CNT_THRESHOLD */
> @@ -198,9 +224,11 @@ struct eth_device_info {
>  struct eth_rx_queue_info {
>  	int queue_enabled;	/* True if added */
>  	int intr_enabled;
> +	uint8_t ena_vector;
>  	uint16_t wt;		/* Polling weight */
>  	uint32_t flow_id_mask;	/* Set to ~0 if app provides flow id else 0 */
>  	uint64_t event;
> +	struct eth_rx_vector_data vector_data;
>  };
> 
>  static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
> @@ -722,6 +750,9 @@ rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
>  	    &rx_adapter->event_enqueue_buffer;
>  	struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
> 
> +	if (!buf->count)
> +		return 0;
> +
>  	uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
>  					rx_adapter->event_port_id,
>  					buf->events,
> @@ -742,6 +773,72 @@ rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
>  	return n;
>  }
> 
> +static inline uint16_t
> +rxa_create_event_vector(struct rte_event_eth_rx_adapter *rx_adapter,
> +			struct eth_rx_queue_info *queue_info,
> +			struct rte_eth_event_enqueue_buffer *buf,
> +			struct rte_mbuf **mbufs, uint16_t num)
> +{
> +	struct rte_event *ev = &buf->events[buf->count];
> +	struct eth_rx_vector_data *vec;
> +	uint16_t filled, space, sz;
> +
> +	filled = 0;
> +	vec = &queue_info->vector_data;
> +	while (num) {
> +		if (vec->vector_ev == NULL) {
> +			if (rte_mempool_get(vec->vector_pool,
> +					    (void **)&vec->vector_ev) < 0) {
> +				rte_pktmbuf_free_bulk(mbufs, num);
> +				return 0;
> +			}
> +			vec->vector_ev->nb_elem = 0;
> +			vec->vector_ev->port = vec->port;
> +			vec->vector_ev->queue = vec->queue;
> +			vec->vector_ev->attr_valid = true;
> +			TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
> +		} else if (vec->vector_ev->nb_elem == vec->max_vector_count) {

Is there a case where nb_elem > max_vector_count as we accumulate sz to it ?

> +			/* Event ready. */
> +			ev->event = vec->event;
> +			ev->vec = vec->vector_ev;
> +			ev++;
> +			filled++;
> +			vec->vector_ev = NULL;
> +			TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
> +			if (rte_mempool_get(vec->vector_pool,
> +					    (void **)&vec->vector_ev) < 0) {
> +				rte_pktmbuf_free_bulk(mbufs, num);
> +				return 0;
> +			}
> +			vec->vector_ev->nb_elem = 0;
> +			vec->vector_ev->port = vec->port;
> +			vec->vector_ev->queue = vec->queue;
> +			vec->vector_ev->attr_valid = true;
> +			TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
> +		}
> +
> +		space = vec->max_vector_count - vec->vector_ev->nb_elem;
> +		sz = num > space ? space : num;
> +		memcpy(vec->vector_ev->mbufs + vec->vector_ev->nb_elem, mbufs,
> +		       sizeof(void *) * sz);
> +		vec->vector_ev->nb_elem += sz;
> +		num -= sz;
> +		mbufs += sz;
> +		vec->ts = rte_rdtsc();
> +	}
> +
> +	if (vec->vector_ev->nb_elem == vec->max_vector_count) {

Same here.

> +		ev->event = vec->event;
> +		ev->vec = vec->vector_ev;
> +		ev++;
> +		filled++;
> +		vec->vector_ev = NULL;
> +		TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
> +	}
> +
> +	return filled;
> +}

I am seeing more than one repeating code chunks in this function. Perhaps, you can give it a try to not repeat. We can drop if its performance affecting.

> +
>  static inline void
>  rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
>  		uint16_t eth_dev_id,
> @@ -770,25 +867,30 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
>  	rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
>  	do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;

The RSS related code is executed for vector case as well. Can this be moved inside ena_vector if condition ?

> 
> -	for (i = 0; i < num; i++) {
> -		m = mbufs[i];
> -
> -		rss = do_rss ?
> -			rxa_do_softrss(m, rx_adapter->rss_key_be) :
> -			m->hash.rss;
> -		ev->event = event;
> -		ev->flow_id = (rss & ~flow_id_mask) |
> -				(ev->flow_id & flow_id_mask);
> -		ev->mbuf = m;
> -		ev++;
> +	if (!eth_rx_queue_info->ena_vector) {
> +		for (i = 0; i < num; i++) {
> +			m = mbufs[i];
> +
> +			rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
> +				     : m->hash.rss;
> +			ev->event = event;
> +			ev->flow_id = (rss & ~flow_id_mask) |
> +				      (ev->flow_id & flow_id_mask);
> +			ev->mbuf = m;
> +			ev++;
> +		}
> +	} else {
> +		num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
> +					      buf, mbufs, num);
>  	}
> 
> -	if (dev_info->cb_fn) {
> +	if (num && dev_info->cb_fn) {
> 
>  		dropped = 0;
>  		nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
> -					ETH_EVENT_BUFFER_SIZE, buf->count, ev,
> -					num, dev_info->cb_arg, &dropped);
> +					ETH_EVENT_BUFFER_SIZE, buf->count,
> +					&buf->events[buf->count], num,
> +					dev_info->cb_arg, &dropped);

Before this patch, we pass ev which is &buf->events[buf->count] + num as fifth param when calling cb_fn. Now, we are passing &buf->events[buf->count] for non-vector case. Do you see this as an issue?

Also, for vector case would it make sense to do pass &buf->events[buf->count] + num ?

>  		if (unlikely(nb_cb > num))
>  			RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
>  				nb_cb, num);
> @@ -1124,6 +1226,30 @@ rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
>  	return nb_rx;
>  }
> 
> +static void
> +rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
> +{
> +	struct rte_event_eth_rx_adapter *rx_adapter = arg;
> +	struct rte_eth_event_enqueue_buffer *buf =
> +		&rx_adapter->event_enqueue_buffer;
> +	struct rte_event *ev;
> +
> +	if (buf->count)
> +		rxa_flush_event_buffer(rx_adapter);
> +
> +	if (vec->vector_ev->nb_elem == 0)
> +		return;
> +	ev = &buf->events[buf->count];
> +
> +	/* Event ready. */
> +	ev->event = vec->event;
> +	ev->vec = vec->vector_ev;
> +	buf->count++;
> +
> +	vec->vector_ev = NULL;
> +	vec->ts = 0;
> +}
> +
>  static int
>  rxa_service_func(void *args)
>  {
> @@ -1137,6 +1263,24 @@ rxa_service_func(void *args)
>  		return 0;
>  	}
> 
> +	if (rx_adapter->ena_vector) {
> +		if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
> +		    rx_adapter->vector_tmo_ticks) {
> +			struct eth_rx_vector_data *vec;
> +
> +			TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
> +				uint64_t elapsed_time = rte_rdtsc() - vec->ts;
> +
> +				if (elapsed_time >= vec->vector_timeout_ticks) {
> +					rxa_vector_expire(vec, rx_adapter);
> +					TAILQ_REMOVE(&rx_adapter->vector_list,
> +						     vec, next);
> +				}
> +			}
> +			rx_adapter->prev_expiry_ts = rte_rdtsc();
> +		}
> +	}
> +
>  	stats = &rx_adapter->stats;
>  	stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
>  	stats->rx_packets += rxa_poll(rx_adapter);
> @@ -1640,6 +1784,28 @@ rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
>  	}
>  }
> 
> +static void
> +rxa_set_vector_data(struct eth_rx_queue_info *queue_info, uint16_t vector_count,
> +		    uint64_t vector_ns, struct rte_mempool *mp, int32_t qid,
> +		    uint16_t port_id)
> +{
> +#define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
> +	struct eth_rx_vector_data *vector_data;
> +	uint32_t flow_id;
> +
> +	vector_data = &queue_info->vector_data;
> +	vector_data->max_vector_count = vector_count;
> +	vector_data->port = port_id;
> +	vector_data->queue = qid;
> +	vector_data->vector_pool = mp;
> +	vector_data->vector_timeout_ticks =
> +		NSEC2TICK(vector_ns, rte_get_timer_hz());
> +	vector_data->ts = 0;
> +	flow_id = queue_info->event & 0xFFFFF;
> +	flow_id = flow_id == 0 ? (qid & 0xFF) | (port_id & 0xFFFF) : flow_id;

Maybe I am missing something here. Looking at the code it looks like qid and port_id may overlap. For e.g., if qid = 0x10 and port_id = 0x11, flow_id would end up being 0x11. Is this the expectation? Also, it may be useful to document flow_id format.
Comparing this format with existing RSS hash based method, are we saying that all mbufs received in a rx burst are part of same flow when vectorization is used?

> +	vector_data->event = (queue_info->event & ~0xFFFFF) | flow_id;
> +}
> +
>  static void
>  rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
>  	struct eth_device_info *dev_info,
> @@ -1741,6 +1907,44 @@ rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
>  	}
>  }
> 
> +static void
> +rxa_sw_event_vector_configure(
> +	struct rte_event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
> +	int rx_queue_id,
> +	const struct rte_event_eth_rx_adapter_event_vector_config *config)
> +{
> +	struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
> +	struct eth_rx_queue_info *queue_info;
> +	struct rte_event *qi_ev;
> +
> +	if (rx_queue_id == -1) {
> +		uint16_t nb_rx_queues;
> +		uint16_t i;
> +
> +		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
> +		for (i = 0; i < nb_rx_queues; i++)
> +			rxa_sw_event_vector_configure(rx_adapter, eth_dev_id, i,
> +						      config);
> +		return;
> +	}
> +
> +	queue_info = &dev_info->rx_queue[rx_queue_id];
> +	qi_ev = (struct rte_event *)&queue_info->event;
> +	queue_info->ena_vector = 1;
> +	qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
> +	rxa_set_vector_data(queue_info, config->vector_sz,
> +			    config->vector_timeout_ns, config->vector_mp,
> +			    rx_queue_id, dev_info->dev->data->port_id);
> +	rx_adapter->ena_vector = 1;
> +	rx_adapter->vector_tmo_ticks =
> +		rx_adapter->vector_tmo_ticks ?
> +			      RTE_MIN(config->vector_timeout_ns << 1,
> +				      rx_adapter->vector_tmo_ticks) :
> +			      config->vector_timeout_ns << 1;
> +	rx_adapter->prev_expiry_ts = 0;
> +	TAILQ_INIT(&rx_adapter->vector_list);
> +}
> +
>  static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
>  		uint16_t eth_dev_id,
>  		int rx_queue_id,
> @@ -2081,6 +2285,15 @@ rte_event_eth_rx_adapter_queue_add(uint8_t id,
>  		return -EINVAL;
>  	}
> 
> +	if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0 &&
> +	    (queue_conf->rx_queue_flags &
> +	     RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR)) {
> +		RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
> +				 " eth port: %" PRIu16 " adapter id: %" PRIu8,
> +				 eth_dev_id, id);
> +		return -EINVAL;
> +	}
> +
>  	if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
>  		(rx_queue_id != -1)) {
>  		RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
> @@ -2143,6 +2356,17 @@ rte_event_eth_rx_adapter_queue_add(uint8_t id,
>  	return 0;
>  }
> 
> +static int
> +rxa_sw_vector_limits(struct rte_event_eth_rx_adapter_vector_limits *limits)
> +{
> +	limits->max_sz = MAX_VECTOR_SIZE;
> +	limits->min_sz = MIN_VECTOR_SIZE;
> +	limits->max_timeout_ns = MAX_VECTOR_NS;
> +	limits->min_timeout_ns = MIN_VECTOR_NS;
> +
> +	return 0;
> +}
> +
>  int
>  rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
>  				int32_t rx_queue_id)
> @@ -2333,7 +2557,8 @@ rte_event_eth_rx_adapter_queue_event_vector_config(
>  		ret = dev->dev_ops->eth_rx_adapter_event_vector_config(
>  			dev, &rte_eth_devices[eth_dev_id], rx_queue_id, config);
>  	} else {
> -		ret = -ENOTSUP;
> +		rxa_sw_event_vector_configure(rx_adapter, eth_dev_id,
> +					      rx_queue_id, config);
>  	}
> 
>  	return ret;
> @@ -2371,7 +2596,7 @@ rte_event_eth_rx_adapter_vector_limits_get(
>  		ret = dev->dev_ops->eth_rx_adapter_vector_limits_get(
>  			dev, &rte_eth_devices[eth_port_id], limits);
>  	} else {
> -		ret = -ENOTSUP;
> +		ret = rxa_sw_vector_limits(limits);
>  	}
> 
>  	return ret;
> diff --git a/lib/librte_eventdev/rte_eventdev.c b/lib/librte_eventdev/rte_eventdev.c
> index f95edc075..254a31b1f 100644
> --- a/lib/librte_eventdev/rte_eventdev.c
> +++ b/lib/librte_eventdev/rte_eventdev.c
> @@ -122,7 +122,11 @@ rte_event_eth_rx_adapter_caps_get(uint8_t dev_id, uint16_t eth_port_id,
> 
>  	if (caps == NULL)
>  		return -EINVAL;
> -	*caps = 0;
> +
> +	if (dev->dev_ops->eth_rx_adapter_caps_get == NULL)
> +		*caps = RTE_EVENT_ETH_RX_ADAPTER_SW_CAP;
> +	else
> +		*caps = 0;

Any reason why we had to set default caps value? I am thinking if sw event device is used, it would set it anyways.

> 
>  	return dev->dev_ops->eth_rx_adapter_caps_get ?
>  				(*dev->dev_ops->eth_rx_adapter_caps_get)(dev,
> --
> 2.17.1
Pavan Nikhilesh Bhagavatula March 25, 2021, 1:14 p.m. UTC | #2
>-----Original Message-----
>From: Jayatheerthan, Jay <jay.jayatheerthan@intel.com>
>Sent: Thursday, March 25, 2021 4:07 PM
>To: Pavan Nikhilesh Bhagavatula <pbhagavatula@marvell.com>; Jerin
>Jacob Kollanukkaran <jerinj@marvell.com>; Carrillo, Erik G
><erik.g.carrillo@intel.com>; Gujjar, Abhinandan S
><abhinandan.gujjar@intel.com>; McDaniel, Timothy
><timothy.mcdaniel@intel.com>; hemant.agrawal@nxp.com; Van
>Haaren, Harry <harry.van.haaren@intel.com>; mattias.ronnblom
><mattias.ronnblom@ericsson.com>; Ma, Liang J
><liang.j.ma@intel.com>
>Cc: dev@dpdk.org
>Subject: [EXT] RE: [dpdk-dev] [PATCH v5 4/8] eventdev: add Rx adapter
>event vector support
>
>External Email
>
>----------------------------------------------------------------------
>> -----Original Message-----
>> From: pbhagavatula@marvell.com <pbhagavatula@marvell.com>
>> Sent: Wednesday, March 24, 2021 10:35 AM
>> To: jerinj@marvell.com; Jayatheerthan, Jay
><jay.jayatheerthan@intel.com>; Carrillo, Erik G
><erik.g.carrillo@intel.com>; Gujjar,
>> Abhinandan S <abhinandan.gujjar@intel.com>; McDaniel, Timothy
><timothy.mcdaniel@intel.com>; hemant.agrawal@nxp.com; Van
>> Haaren, Harry <harry.van.haaren@intel.com>; mattias.ronnblom
><mattias.ronnblom@ericsson.com>; Ma, Liang J
>> <liang.j.ma@intel.com>
>> Cc: dev@dpdk.org; Pavan Nikhilesh <pbhagavatula@marvell.com>
>> Subject: [dpdk-dev] [PATCH v5 4/8] eventdev: add Rx adapter event
>vector support
>>
>> From: Pavan Nikhilesh <pbhagavatula@marvell.com>
>>
>> Add event vector support for event eth Rx adapter, the
>implementation
>> creates vector flows based on port and queue identifier of the
>received
>> mbufs.
>>
>> Signed-off-by: Pavan Nikhilesh <pbhagavatula@marvell.com>
>> ---
>>  lib/librte_eventdev/eventdev_pmd.h            |   7 +-
>>  .../rte_event_eth_rx_adapter.c                | 257 ++++++++++++++++--
>>  lib/librte_eventdev/rte_eventdev.c            |   6 +-
>>  3 files changed, 250 insertions(+), 20 deletions(-)
>>
>> diff --git a/lib/librte_eventdev/eventdev_pmd.h
>b/lib/librte_eventdev/eventdev_pmd.h
>> index 9297f1433..0f724ac85 100644
>> --- a/lib/librte_eventdev/eventdev_pmd.h
>> +++ b/lib/librte_eventdev/eventdev_pmd.h
>> @@ -69,9 +69,10 @@ extern "C" {
>>  	} \
>>  } while (0)
>>
>> -#define RTE_EVENT_ETH_RX_ADAPTER_SW_CAP \
>> -
>	((RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) |
>\
>> -
>	(RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ))
>> +#define RTE_EVENT_ETH_RX_ADAPTER_SW_CAP
>\
>> +	((RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) |
>\
>> +	 (RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) |
>\
>> +	 (RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR))
>>
>>  #define RTE_EVENT_CRYPTO_ADAPTER_SW_CAP \
>>
>	RTE_EVENT_CRYPTO_ADAPTER_CAP_SESSION_PRIVATE_DATA
>> diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
>b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
>> index ac8ba5bf0..c71990078 100644
>> --- a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
>> +++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
>> @@ -26,6 +26,10 @@
>>  #define BATCH_SIZE		32
>>  #define BLOCK_CNT_THRESHOLD	10
>>  #define ETH_EVENT_BUFFER_SIZE	(4*BATCH_SIZE)
>> +#define MAX_VECTOR_SIZE		1024
>> +#define MIN_VECTOR_SIZE		4
>> +#define MAX_VECTOR_NS		1E9
>> +#define MIN_VECTOR_NS		1E5
>>
>>  #define ETH_RX_ADAPTER_SERVICE_NAME_LEN	32
>>  #define ETH_RX_ADAPTER_MEM_NAME_LEN	32
>> @@ -59,6 +63,20 @@ struct eth_rx_poll_entry {
>>  	uint16_t eth_rx_qid;
>>  };
>>
>> +struct eth_rx_vector_data {
>> +	TAILQ_ENTRY(eth_rx_vector_data) next;
>> +	uint16_t port;
>> +	uint16_t queue;
>> +	uint16_t max_vector_count;
>> +	uint64_t event;
>> +	uint64_t ts;
>> +	uint64_t vector_timeout_ticks;
>> +	struct rte_mempool *vector_pool;
>> +	struct rte_event_vector *vector_ev;
>> +} __rte_cache_aligned;
>> +
>> +TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
>> +
>>  /* Instance per adapter */
>>  struct rte_eth_event_enqueue_buffer {
>>  	/* Count of events in this buffer */
>> @@ -92,6 +110,14 @@ struct rte_event_eth_rx_adapter {
>>  	uint32_t wrr_pos;
>>  	/* Event burst buffer */
>>  	struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
>> +	/* Vector enable flag */
>> +	uint8_t ena_vector;
>> +	/* Timestamp of previous vector expiry list traversal */
>> +	uint64_t prev_expiry_ts;
>> +	/* Minimum ticks to wait before traversing expiry list */
>> +	uint64_t vector_tmo_ticks;
>> +	/* vector list */
>> +	struct eth_rx_vector_data_list vector_list;
>>  	/* Per adapter stats */
>>  	struct rte_event_eth_rx_adapter_stats stats;
>>  	/* Block count, counts up to BLOCK_CNT_THRESHOLD */
>> @@ -198,9 +224,11 @@ struct eth_device_info {
>>  struct eth_rx_queue_info {
>>  	int queue_enabled;	/* True if added */
>>  	int intr_enabled;
>> +	uint8_t ena_vector;
>>  	uint16_t wt;		/* Polling weight */
>>  	uint32_t flow_id_mask;	/* Set to ~0 if app provides flow id else
>0 */
>>  	uint64_t event;
>> +	struct eth_rx_vector_data vector_data;
>>  };
>>
>>  static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
>> @@ -722,6 +750,9 @@ rxa_flush_event_buffer(struct
>rte_event_eth_rx_adapter *rx_adapter)
>>  	    &rx_adapter->event_enqueue_buffer;
>>  	struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter-
>>stats;
>>
>> +	if (!buf->count)
>> +		return 0;
>> +
>>  	uint16_t n = rte_event_enqueue_new_burst(rx_adapter-
>>eventdev_id,
>>  					rx_adapter->event_port_id,
>>  					buf->events,
>> @@ -742,6 +773,72 @@ rxa_flush_event_buffer(struct
>rte_event_eth_rx_adapter *rx_adapter)
>>  	return n;
>>  }
>>
>> +static inline uint16_t
>> +rxa_create_event_vector(struct rte_event_eth_rx_adapter
>*rx_adapter,
>> +			struct eth_rx_queue_info *queue_info,
>> +			struct rte_eth_event_enqueue_buffer *buf,
>> +			struct rte_mbuf **mbufs, uint16_t num)
>> +{
>> +	struct rte_event *ev = &buf->events[buf->count];
>> +	struct eth_rx_vector_data *vec;
>> +	uint16_t filled, space, sz;
>> +
>> +	filled = 0;
>> +	vec = &queue_info->vector_data;
>> +	while (num) {
>> +		if (vec->vector_ev == NULL) {
>> +			if (rte_mempool_get(vec->vector_pool,
>> +					    (void **)&vec->vector_ev) <
>0) {
>> +				rte_pktmbuf_free_bulk(mbufs, num);
>> +				return 0;
>> +			}
>> +			vec->vector_ev->nb_elem = 0;
>> +			vec->vector_ev->port = vec->port;
>> +			vec->vector_ev->queue = vec->queue;
>> +			vec->vector_ev->attr_valid = true;
>> +			TAILQ_INSERT_TAIL(&rx_adapter->vector_list,
>vec, next);
>> +		} else if (vec->vector_ev->nb_elem == vec-
>>max_vector_count) {
>
>Is there a case where nb_elem > max_vector_count as we accumulate
>sz to it ?

I don't think so, that would overflow the vector event.

>
>> +			/* Event ready. */
>> +			ev->event = vec->event;
>> +			ev->vec = vec->vector_ev;
>> +			ev++;
>> +			filled++;
>> +			vec->vector_ev = NULL;
>> +			TAILQ_REMOVE(&rx_adapter->vector_list, vec,
>next);
>> +			if (rte_mempool_get(vec->vector_pool,
>> +					    (void **)&vec->vector_ev) <
>0) {
>> +				rte_pktmbuf_free_bulk(mbufs, num);
>> +				return 0;
>> +			}
>> +			vec->vector_ev->nb_elem = 0;
>> +			vec->vector_ev->port = vec->port;
>> +			vec->vector_ev->queue = vec->queue;
>> +			vec->vector_ev->attr_valid = true;
>> +			TAILQ_INSERT_TAIL(&rx_adapter->vector_list,
>vec, next);
>> +		}
>> +
>> +		space = vec->max_vector_count - vec->vector_ev-
>>nb_elem;
>> +		sz = num > space ? space : num;
>> +		memcpy(vec->vector_ev->mbufs + vec->vector_ev-
>>nb_elem, mbufs,
>> +		       sizeof(void *) * sz);
>> +		vec->vector_ev->nb_elem += sz;
>> +		num -= sz;
>> +		mbufs += sz;
>> +		vec->ts = rte_rdtsc();
>> +	}
>> +
>> +	if (vec->vector_ev->nb_elem == vec->max_vector_count) {
>
>Same here.
>
>> +		ev->event = vec->event;
>> +		ev->vec = vec->vector_ev;
>> +		ev++;
>> +		filled++;
>> +		vec->vector_ev = NULL;
>> +		TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
>> +	}
>> +
>> +	return filled;
>> +}
>
>I am seeing more than one repeating code chunks in this function.
>Perhaps, you can give it a try to not repeat. We can drop if its
>performance affecting.

I will try to move them to inline functions and test.

>
>> +
>>  static inline void
>>  rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
>>  		uint16_t eth_dev_id,
>> @@ -770,25 +867,30 @@ rxa_buffer_mbufs(struct
>rte_event_eth_rx_adapter *rx_adapter,
>>  	rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
>>  	do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
>
>The RSS related code is executed for vector case as well. Can this be
>moved inside ena_vector if condition ?

RSS is used to generate the event flowid, in vector case the flow id 
Will be a combination of port and queue id. 
The idea is that flows having the same RSS LSB will end up in the same 
queue.

>
>>
>> -	for (i = 0; i < num; i++) {
>> -		m = mbufs[i];
>> -
>> -		rss = do_rss ?
>> -			rxa_do_softrss(m, rx_adapter->rss_key_be) :
>> -			m->hash.rss;
>> -		ev->event = event;
>> -		ev->flow_id = (rss & ~flow_id_mask) |
>> -				(ev->flow_id & flow_id_mask);
>> -		ev->mbuf = m;
>> -		ev++;
>> +	if (!eth_rx_queue_info->ena_vector) {
>> +		for (i = 0; i < num; i++) {
>> +			m = mbufs[i];
>> +
>> +			rss = do_rss ? rxa_do_softrss(m, rx_adapter-
>>rss_key_be)
>> +				     : m->hash.rss;
>> +			ev->event = event;
>> +			ev->flow_id = (rss & ~flow_id_mask) |
>> +				      (ev->flow_id & flow_id_mask);
>> +			ev->mbuf = m;
>> +			ev++;
>> +		}
>> +	} else {
>> +		num = rxa_create_event_vector(rx_adapter,
>eth_rx_queue_info,
>> +					      buf, mbufs, num);
>>  	}
>>
>> -	if (dev_info->cb_fn) {
>> +	if (num && dev_info->cb_fn) {
>>
>>  		dropped = 0;
>>  		nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
>> -					ETH_EVENT_BUFFER_SIZE, buf-
>>count, ev,
>> -					num, dev_info->cb_arg,
>&dropped);
>> +					ETH_EVENT_BUFFER_SIZE, buf-
>>count,
>> +					&buf->events[buf->count],
>num,
>> +					dev_info->cb_arg, &dropped);
>
>Before this patch, we pass ev which is &buf->events[buf->count] + num
>as fifth param when calling cb_fn. Now, we are passing &buf-
>>events[buf->count] for non-vector case. Do you see this as an issue?
>

The callback function takes in the array newly formed events i.e. we need
to pass the start of array and the count.

the previous code had a bug where it passes the end of the event list.

>Also, for vector case would it make sense to do pass &buf->events[buf-
>>count] + num ?
>
>>  		if (unlikely(nb_cb > num))
>>  			RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d)
>events",
>>  				nb_cb, num);
>> @@ -1124,6 +1226,30 @@ rxa_poll(struct rte_event_eth_rx_adapter
>*rx_adapter)
>>  	return nb_rx;
>>  }
>>
>> +static void
>> +rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
>> +{
>> +	struct rte_event_eth_rx_adapter *rx_adapter = arg;
>> +	struct rte_eth_event_enqueue_buffer *buf =
>> +		&rx_adapter->event_enqueue_buffer;
>> +	struct rte_event *ev;
>> +
>> +	if (buf->count)
>> +		rxa_flush_event_buffer(rx_adapter);
>> +
>> +	if (vec->vector_ev->nb_elem == 0)
>> +		return;
>> +	ev = &buf->events[buf->count];
>> +
>> +	/* Event ready. */
>> +	ev->event = vec->event;
>> +	ev->vec = vec->vector_ev;
>> +	buf->count++;
>> +
>> +	vec->vector_ev = NULL;
>> +	vec->ts = 0;
>> +}
>> +
>>  static int
>>  rxa_service_func(void *args)
>>  {
>> @@ -1137,6 +1263,24 @@ rxa_service_func(void *args)
>>  		return 0;
>>  	}
>>
>> +	if (rx_adapter->ena_vector) {
>> +		if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
>> +		    rx_adapter->vector_tmo_ticks) {
>> +			struct eth_rx_vector_data *vec;
>> +
>> +			TAILQ_FOREACH(vec, &rx_adapter->vector_list,
>next) {
>> +				uint64_t elapsed_time = rte_rdtsc() -
>vec->ts;
>> +
>> +				if (elapsed_time >= vec-
>>vector_timeout_ticks) {
>> +					rxa_vector_expire(vec,
>rx_adapter);
>> +					TAILQ_REMOVE(&rx_adapter-
>>vector_list,
>> +						     vec, next);
>> +				}
>> +			}
>> +			rx_adapter->prev_expiry_ts = rte_rdtsc();
>> +		}
>> +	}
>> +
>>  	stats = &rx_adapter->stats;
>>  	stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
>>  	stats->rx_packets += rxa_poll(rx_adapter);
>> @@ -1640,6 +1784,28 @@ rxa_update_queue(struct
>rte_event_eth_rx_adapter *rx_adapter,
>>  	}
>>  }
>>
>> +static void
>> +rxa_set_vector_data(struct eth_rx_queue_info *queue_info,
>uint16_t vector_count,
>> +		    uint64_t vector_ns, struct rte_mempool *mp, int32_t
>qid,
>> +		    uint16_t port_id)
>> +{
>> +#define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
>> +	struct eth_rx_vector_data *vector_data;
>> +	uint32_t flow_id;
>> +
>> +	vector_data = &queue_info->vector_data;
>> +	vector_data->max_vector_count = vector_count;
>> +	vector_data->port = port_id;
>> +	vector_data->queue = qid;
>> +	vector_data->vector_pool = mp;
>> +	vector_data->vector_timeout_ticks =
>> +		NSEC2TICK(vector_ns, rte_get_timer_hz());
>> +	vector_data->ts = 0;
>> +	flow_id = queue_info->event & 0xFFFFF;
>> +	flow_id = flow_id == 0 ? (qid & 0xFF) | (port_id & 0xFFFF) :
>flow_id;
>
>Maybe I am missing something here. Looking at the code it looks like
>qid and port_id may overlap. For e.g., if qid = 0x10 and port_id = 0x11,
>flow_id would end up being 0x11. Is this the expectation? Also, it may
>be useful to document flow_id format.

The flow_id is 20 bit, I guess we could do 12bit queue_id and 8bit port
as a flow.

>Comparing this format with existing RSS hash based method, are we
>saying that all mbufs received in a rx burst are part of same flow when
>vectorization is used?

Yes, the hard way to do this is to use a hash table and treating each 
mbuf having an unique flow.

>
>> +	vector_data->event = (queue_info->event & ~0xFFFFF) |
>flow_id;
>> +}
>> +
>>  static void
>>  rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
>>  	struct eth_device_info *dev_info,
>> @@ -1741,6 +1907,44 @@ rxa_add_queue(struct
>rte_event_eth_rx_adapter *rx_adapter,
>>  	}
>>  }
>>
>> +static void
>> +rxa_sw_event_vector_configure(
>> +	struct rte_event_eth_rx_adapter *rx_adapter, uint16_t
>eth_dev_id,
>> +	int rx_queue_id,
>> +	const struct rte_event_eth_rx_adapter_event_vector_config
>*config)
>> +{
>> +	struct eth_device_info *dev_info = &rx_adapter-
>>eth_devices[eth_dev_id];
>> +	struct eth_rx_queue_info *queue_info;
>> +	struct rte_event *qi_ev;
>> +
>> +	if (rx_queue_id == -1) {
>> +		uint16_t nb_rx_queues;
>> +		uint16_t i;
>> +
>> +		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
>> +		for (i = 0; i < nb_rx_queues; i++)
>> +			rxa_sw_event_vector_configure(rx_adapter,
>eth_dev_id, i,
>> +						      config);
>> +		return;
>> +	}
>> +
>> +	queue_info = &dev_info->rx_queue[rx_queue_id];
>> +	qi_ev = (struct rte_event *)&queue_info->event;
>> +	queue_info->ena_vector = 1;
>> +	qi_ev->event_type =
>RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
>> +	rxa_set_vector_data(queue_info, config->vector_sz,
>> +			    config->vector_timeout_ns, config-
>>vector_mp,
>> +			    rx_queue_id, dev_info->dev->data->port_id);
>> +	rx_adapter->ena_vector = 1;
>> +	rx_adapter->vector_tmo_ticks =
>> +		rx_adapter->vector_tmo_ticks ?
>> +			      RTE_MIN(config->vector_timeout_ns << 1,
>> +				      rx_adapter->vector_tmo_ticks) :
>> +			      config->vector_timeout_ns << 1;
>> +	rx_adapter->prev_expiry_ts = 0;
>> +	TAILQ_INIT(&rx_adapter->vector_list);
>> +}
>> +
>>  static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
>>  		uint16_t eth_dev_id,
>>  		int rx_queue_id,
>> @@ -2081,6 +2285,15 @@
>rte_event_eth_rx_adapter_queue_add(uint8_t id,
>>  		return -EINVAL;
>>  	}
>>
>> +	if ((cap &
>RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0 &&
>> +	    (queue_conf->rx_queue_flags &
>> +	     RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR)) {
>> +		RTE_EDEV_LOG_ERR("Event vectorization is not
>supported,"
>> +				 " eth port: %" PRIu16 " adapter id: %"
>PRIu8,
>> +				 eth_dev_id, id);
>> +		return -EINVAL;
>> +	}
>> +
>>  	if ((cap &
>RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
>>  		(rx_queue_id != -1)) {
>>  		RTE_EDEV_LOG_ERR("Rx queues can only be connected
>to single "
>> @@ -2143,6 +2356,17 @@
>rte_event_eth_rx_adapter_queue_add(uint8_t id,
>>  	return 0;
>>  }
>>
>> +static int
>> +rxa_sw_vector_limits(struct
>rte_event_eth_rx_adapter_vector_limits *limits)
>> +{
>> +	limits->max_sz = MAX_VECTOR_SIZE;
>> +	limits->min_sz = MIN_VECTOR_SIZE;
>> +	limits->max_timeout_ns = MAX_VECTOR_NS;
>> +	limits->min_timeout_ns = MIN_VECTOR_NS;
>> +
>> +	return 0;
>> +}
>> +
>>  int
>>  rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t
>eth_dev_id,
>>  				int32_t rx_queue_id)
>> @@ -2333,7 +2557,8 @@
>rte_event_eth_rx_adapter_queue_event_vector_config(
>>  		ret = dev->dev_ops-
>>eth_rx_adapter_event_vector_config(
>>  			dev, &rte_eth_devices[eth_dev_id],
>rx_queue_id, config);
>>  	} else {
>> -		ret = -ENOTSUP;
>> +		rxa_sw_event_vector_configure(rx_adapter,
>eth_dev_id,
>> +					      rx_queue_id, config);
>>  	}
>>
>>  	return ret;
>> @@ -2371,7 +2596,7 @@
>rte_event_eth_rx_adapter_vector_limits_get(
>>  		ret = dev->dev_ops-
>>eth_rx_adapter_vector_limits_get(
>>  			dev, &rte_eth_devices[eth_port_id], limits);
>>  	} else {
>> -		ret = -ENOTSUP;
>> +		ret = rxa_sw_vector_limits(limits);
>>  	}
>>
>>  	return ret;
>> diff --git a/lib/librte_eventdev/rte_eventdev.c
>b/lib/librte_eventdev/rte_eventdev.c
>> index f95edc075..254a31b1f 100644
>> --- a/lib/librte_eventdev/rte_eventdev.c
>> +++ b/lib/librte_eventdev/rte_eventdev.c
>> @@ -122,7 +122,11 @@ rte_event_eth_rx_adapter_caps_get(uint8_t
>dev_id, uint16_t eth_port_id,
>>
>>  	if (caps == NULL)
>>  		return -EINVAL;
>> -	*caps = 0;
>> +
>> +	if (dev->dev_ops->eth_rx_adapter_caps_get == NULL)
>> +		*caps = RTE_EVENT_ETH_RX_ADAPTER_SW_CAP;
>> +	else
>> +		*caps = 0;
>
>Any reason why we had to set default caps value? I am thinking if sw
>event device is used, it would set it anyways.
>

There are multiple sw event devices which don't implement caps_get 
function, this changes solves that.

>>
>>  	return dev->dev_ops->eth_rx_adapter_caps_get ?
>>  				(*dev->dev_ops-
>>eth_rx_adapter_caps_get)(dev,
>> --
>> 2.17.1
Jayatheerthan, Jay March 26, 2021, 6:26 a.m. UTC | #3
> -----Original Message-----
> From: Pavan Nikhilesh Bhagavatula <pbhagavatula@marvell.com>
> Sent: Thursday, March 25, 2021 6:44 PM
> To: Jayatheerthan, Jay <jay.jayatheerthan@intel.com>; Jerin Jacob Kollanukkaran <jerinj@marvell.com>; Carrillo, Erik G
> <erik.g.carrillo@intel.com>; Gujjar, Abhinandan S <abhinandan.gujjar@intel.com>; McDaniel, Timothy
> <timothy.mcdaniel@intel.com>; hemant.agrawal@nxp.com; Van Haaren, Harry <harry.van.haaren@intel.com>; mattias.ronnblom
> <mattias.ronnblom@ericsson.com>; Ma, Liang J <liang.j.ma@intel.com>
> Cc: dev@dpdk.org
> Subject: RE: [dpdk-dev] [PATCH v5 4/8] eventdev: add Rx adapter event vector support
> 
> 
> 
> >-----Original Message-----
> >From: Jayatheerthan, Jay <jay.jayatheerthan@intel.com>
> >Sent: Thursday, March 25, 2021 4:07 PM
> >To: Pavan Nikhilesh Bhagavatula <pbhagavatula@marvell.com>; Jerin
> >Jacob Kollanukkaran <jerinj@marvell.com>; Carrillo, Erik G
> ><erik.g.carrillo@intel.com>; Gujjar, Abhinandan S
> ><abhinandan.gujjar@intel.com>; McDaniel, Timothy
> ><timothy.mcdaniel@intel.com>; hemant.agrawal@nxp.com; Van
> >Haaren, Harry <harry.van.haaren@intel.com>; mattias.ronnblom
> ><mattias.ronnblom@ericsson.com>; Ma, Liang J
> ><liang.j.ma@intel.com>
> >Cc: dev@dpdk.org
> >Subject: [EXT] RE: [dpdk-dev] [PATCH v5 4/8] eventdev: add Rx adapter
> >event vector support
> >
> >External Email
> >
> >----------------------------------------------------------------------
> >> -----Original Message-----
> >> From: pbhagavatula@marvell.com <pbhagavatula@marvell.com>
> >> Sent: Wednesday, March 24, 2021 10:35 AM
> >> To: jerinj@marvell.com; Jayatheerthan, Jay
> ><jay.jayatheerthan@intel.com>; Carrillo, Erik G
> ><erik.g.carrillo@intel.com>; Gujjar,
> >> Abhinandan S <abhinandan.gujjar@intel.com>; McDaniel, Timothy
> ><timothy.mcdaniel@intel.com>; hemant.agrawal@nxp.com; Van
> >> Haaren, Harry <harry.van.haaren@intel.com>; mattias.ronnblom
> ><mattias.ronnblom@ericsson.com>; Ma, Liang J
> >> <liang.j.ma@intel.com>
> >> Cc: dev@dpdk.org; Pavan Nikhilesh <pbhagavatula@marvell.com>
> >> Subject: [dpdk-dev] [PATCH v5 4/8] eventdev: add Rx adapter event
> >vector support
> >>
> >> From: Pavan Nikhilesh <pbhagavatula@marvell.com>
> >>
> >> Add event vector support for event eth Rx adapter, the
> >implementation
> >> creates vector flows based on port and queue identifier of the
> >received
> >> mbufs.
> >>
> >> Signed-off-by: Pavan Nikhilesh <pbhagavatula@marvell.com>
> >> ---
> >>  lib/librte_eventdev/eventdev_pmd.h            |   7 +-
> >>  .../rte_event_eth_rx_adapter.c                | 257 ++++++++++++++++--
> >>  lib/librte_eventdev/rte_eventdev.c            |   6 +-
> >>  3 files changed, 250 insertions(+), 20 deletions(-)
> >>
> >> diff --git a/lib/librte_eventdev/eventdev_pmd.h
> >b/lib/librte_eventdev/eventdev_pmd.h
> >> index 9297f1433..0f724ac85 100644
> >> --- a/lib/librte_eventdev/eventdev_pmd.h
> >> +++ b/lib/librte_eventdev/eventdev_pmd.h
> >> @@ -69,9 +69,10 @@ extern "C" {
> >>  	} \
> >>  } while (0)
> >>
> >> -#define RTE_EVENT_ETH_RX_ADAPTER_SW_CAP \
> >> -
> >	((RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) |
> >\
> >> -
> >	(RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ))
> >> +#define RTE_EVENT_ETH_RX_ADAPTER_SW_CAP
> >\
> >> +	((RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) |
> >\
> >> +	 (RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) |
> >\
> >> +	 (RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR))
> >>
> >>  #define RTE_EVENT_CRYPTO_ADAPTER_SW_CAP \
> >>
> >	RTE_EVENT_CRYPTO_ADAPTER_CAP_SESSION_PRIVATE_DATA
> >> diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
> >b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
> >> index ac8ba5bf0..c71990078 100644
> >> --- a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
> >> +++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
> >> @@ -26,6 +26,10 @@
> >>  #define BATCH_SIZE		32
> >>  #define BLOCK_CNT_THRESHOLD	10
> >>  #define ETH_EVENT_BUFFER_SIZE	(4*BATCH_SIZE)
> >> +#define MAX_VECTOR_SIZE		1024
> >> +#define MIN_VECTOR_SIZE		4
> >> +#define MAX_VECTOR_NS		1E9
> >> +#define MIN_VECTOR_NS		1E5
> >>
> >>  #define ETH_RX_ADAPTER_SERVICE_NAME_LEN	32
> >>  #define ETH_RX_ADAPTER_MEM_NAME_LEN	32
> >> @@ -59,6 +63,20 @@ struct eth_rx_poll_entry {
> >>  	uint16_t eth_rx_qid;
> >>  };
> >>
> >> +struct eth_rx_vector_data {
> >> +	TAILQ_ENTRY(eth_rx_vector_data) next;
> >> +	uint16_t port;
> >> +	uint16_t queue;
> >> +	uint16_t max_vector_count;
> >> +	uint64_t event;
> >> +	uint64_t ts;
> >> +	uint64_t vector_timeout_ticks;
> >> +	struct rte_mempool *vector_pool;
> >> +	struct rte_event_vector *vector_ev;
> >> +} __rte_cache_aligned;
> >> +
> >> +TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
> >> +
> >>  /* Instance per adapter */
> >>  struct rte_eth_event_enqueue_buffer {
> >>  	/* Count of events in this buffer */
> >> @@ -92,6 +110,14 @@ struct rte_event_eth_rx_adapter {
> >>  	uint32_t wrr_pos;
> >>  	/* Event burst buffer */
> >>  	struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
> >> +	/* Vector enable flag */
> >> +	uint8_t ena_vector;
> >> +	/* Timestamp of previous vector expiry list traversal */
> >> +	uint64_t prev_expiry_ts;
> >> +	/* Minimum ticks to wait before traversing expiry list */
> >> +	uint64_t vector_tmo_ticks;
> >> +	/* vector list */
> >> +	struct eth_rx_vector_data_list vector_list;
> >>  	/* Per adapter stats */
> >>  	struct rte_event_eth_rx_adapter_stats stats;
> >>  	/* Block count, counts up to BLOCK_CNT_THRESHOLD */
> >> @@ -198,9 +224,11 @@ struct eth_device_info {
> >>  struct eth_rx_queue_info {
> >>  	int queue_enabled;	/* True if added */
> >>  	int intr_enabled;
> >> +	uint8_t ena_vector;
> >>  	uint16_t wt;		/* Polling weight */
> >>  	uint32_t flow_id_mask;	/* Set to ~0 if app provides flow id else
> >0 */
> >>  	uint64_t event;
> >> +	struct eth_rx_vector_data vector_data;
> >>  };
> >>
> >>  static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
> >> @@ -722,6 +750,9 @@ rxa_flush_event_buffer(struct
> >rte_event_eth_rx_adapter *rx_adapter)
> >>  	    &rx_adapter->event_enqueue_buffer;
> >>  	struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter-
> >>stats;
> >>
> >> +	if (!buf->count)
> >> +		return 0;
> >> +
> >>  	uint16_t n = rte_event_enqueue_new_burst(rx_adapter-
> >>eventdev_id,
> >>  					rx_adapter->event_port_id,
> >>  					buf->events,
> >> @@ -742,6 +773,72 @@ rxa_flush_event_buffer(struct
> >rte_event_eth_rx_adapter *rx_adapter)
> >>  	return n;
> >>  }
> >>
> >> +static inline uint16_t
> >> +rxa_create_event_vector(struct rte_event_eth_rx_adapter
> >*rx_adapter,
> >> +			struct eth_rx_queue_info *queue_info,
> >> +			struct rte_eth_event_enqueue_buffer *buf,
> >> +			struct rte_mbuf **mbufs, uint16_t num)
> >> +{
> >> +	struct rte_event *ev = &buf->events[buf->count];
> >> +	struct eth_rx_vector_data *vec;
> >> +	uint16_t filled, space, sz;
> >> +
> >> +	filled = 0;
> >> +	vec = &queue_info->vector_data;
> >> +	while (num) {
> >> +		if (vec->vector_ev == NULL) {
> >> +			if (rte_mempool_get(vec->vector_pool,
> >> +					    (void **)&vec->vector_ev) <
> >0) {
> >> +				rte_pktmbuf_free_bulk(mbufs, num);
> >> +				return 0;
> >> +			}
> >> +			vec->vector_ev->nb_elem = 0;
> >> +			vec->vector_ev->port = vec->port;
> >> +			vec->vector_ev->queue = vec->queue;
> >> +			vec->vector_ev->attr_valid = true;
> >> +			TAILQ_INSERT_TAIL(&rx_adapter->vector_list,
> >vec, next);
> >> +		} else if (vec->vector_ev->nb_elem == vec-
> >>max_vector_count) {
> >
> >Is there a case where nb_elem > max_vector_count as we accumulate
> >sz to it ?
> 
> I don't think so, that would overflow the vector event.
> 
> >
> >> +			/* Event ready. */
> >> +			ev->event = vec->event;
> >> +			ev->vec = vec->vector_ev;
> >> +			ev++;
> >> +			filled++;
> >> +			vec->vector_ev = NULL;
> >> +			TAILQ_REMOVE(&rx_adapter->vector_list, vec,
> >next);
> >> +			if (rte_mempool_get(vec->vector_pool,
> >> +					    (void **)&vec->vector_ev) <
> >0) {
> >> +				rte_pktmbuf_free_bulk(mbufs, num);
> >> +				return 0;
> >> +			}
> >> +			vec->vector_ev->nb_elem = 0;
> >> +			vec->vector_ev->port = vec->port;
> >> +			vec->vector_ev->queue = vec->queue;
> >> +			vec->vector_ev->attr_valid = true;
> >> +			TAILQ_INSERT_TAIL(&rx_adapter->vector_list,
> >vec, next);
> >> +		}
> >> +
> >> +		space = vec->max_vector_count - vec->vector_ev-
> >>nb_elem;
> >> +		sz = num > space ? space : num;
> >> +		memcpy(vec->vector_ev->mbufs + vec->vector_ev-
> >>nb_elem, mbufs,
> >> +		       sizeof(void *) * sz);
> >> +		vec->vector_ev->nb_elem += sz;
> >> +		num -= sz;
> >> +		mbufs += sz;
> >> +		vec->ts = rte_rdtsc();
> >> +	}
> >> +
> >> +	if (vec->vector_ev->nb_elem == vec->max_vector_count) {
> >
> >Same here.
> >
> >> +		ev->event = vec->event;
> >> +		ev->vec = vec->vector_ev;
> >> +		ev++;
> >> +		filled++;
> >> +		vec->vector_ev = NULL;
> >> +		TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
> >> +	}
> >> +
> >> +	return filled;
> >> +}
> >
> >I am seeing more than one repeating code chunks in this function.
> >Perhaps, you can give it a try to not repeat. We can drop if its
> >performance affecting.
> 
> I will try to move them to inline functions and test.
> 
> >
> >> +
> >>  static inline void
> >>  rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
> >>  		uint16_t eth_dev_id,
> >> @@ -770,25 +867,30 @@ rxa_buffer_mbufs(struct
> >rte_event_eth_rx_adapter *rx_adapter,
> >>  	rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
> >>  	do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
> >
> >The RSS related code is executed for vector case as well. Can this be
> >moved inside ena_vector if condition ?
> 
> RSS is used to generate the event flowid, in vector case the flow id
> Will be a combination of port and queue id.
> The idea is that flows having the same RSS LSB will end up in the same
> queue.
> 

I meant to say, rss_mask and do_rss are used only when ena_vector is false. Could be moved inside the appropriate condition ?

> >
> >>
> >> -	for (i = 0; i < num; i++) {
> >> -		m = mbufs[i];
> >> -
> >> -		rss = do_rss ?
> >> -			rxa_do_softrss(m, rx_adapter->rss_key_be) :
> >> -			m->hash.rss;
> >> -		ev->event = event;
> >> -		ev->flow_id = (rss & ~flow_id_mask) |
> >> -				(ev->flow_id & flow_id_mask);
> >> -		ev->mbuf = m;
> >> -		ev++;
> >> +	if (!eth_rx_queue_info->ena_vector) {
> >> +		for (i = 0; i < num; i++) {
> >> +			m = mbufs[i];
> >> +
> >> +			rss = do_rss ? rxa_do_softrss(m, rx_adapter-
> >>rss_key_be)
> >> +				     : m->hash.rss;
> >> +			ev->event = event;
> >> +			ev->flow_id = (rss & ~flow_id_mask) |
> >> +				      (ev->flow_id & flow_id_mask);
> >> +			ev->mbuf = m;
> >> +			ev++;
> >> +		}
> >> +	} else {
> >> +		num = rxa_create_event_vector(rx_adapter,
> >eth_rx_queue_info,
> >> +					      buf, mbufs, num);
> >>  	}
> >>
> >> -	if (dev_info->cb_fn) {
> >> +	if (num && dev_info->cb_fn) {
> >>
> >>  		dropped = 0;
> >>  		nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
> >> -					ETH_EVENT_BUFFER_SIZE, buf-
> >>count, ev,
> >> -					num, dev_info->cb_arg,
> >&dropped);
> >> +					ETH_EVENT_BUFFER_SIZE, buf-
> >>count,
> >> +					&buf->events[buf->count],
> >num,
> >> +					dev_info->cb_arg, &dropped);
> >
> >Before this patch, we pass ev which is &buf->events[buf->count] + num
> >as fifth param when calling cb_fn. Now, we are passing &buf-
> >>events[buf->count] for non-vector case. Do you see this as an issue?
> >
> 
> The callback function takes in the array newly formed events i.e. we need
> to pass the start of array and the count.
> 
> the previous code had a bug where it passes the end of the event list.

ok, that makes sense.

> 
> >Also, for vector case would it make sense to do pass &buf->events[buf-
> >>count] + num ?
> >
> >>  		if (unlikely(nb_cb > num))
> >>  			RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d)
> >events",
> >>  				nb_cb, num);
> >> @@ -1124,6 +1226,30 @@ rxa_poll(struct rte_event_eth_rx_adapter
> >*rx_adapter)
> >>  	return nb_rx;
> >>  }
> >>
> >> +static void
> >> +rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
> >> +{
> >> +	struct rte_event_eth_rx_adapter *rx_adapter = arg;
> >> +	struct rte_eth_event_enqueue_buffer *buf =
> >> +		&rx_adapter->event_enqueue_buffer;
> >> +	struct rte_event *ev;
> >> +
> >> +	if (buf->count)
> >> +		rxa_flush_event_buffer(rx_adapter);
> >> +
> >> +	if (vec->vector_ev->nb_elem == 0)
> >> +		return;
> >> +	ev = &buf->events[buf->count];
> >> +
> >> +	/* Event ready. */
> >> +	ev->event = vec->event;
> >> +	ev->vec = vec->vector_ev;
> >> +	buf->count++;
> >> +
> >> +	vec->vector_ev = NULL;
> >> +	vec->ts = 0;
> >> +}
> >> +
> >>  static int
> >>  rxa_service_func(void *args)
> >>  {
> >> @@ -1137,6 +1263,24 @@ rxa_service_func(void *args)
> >>  		return 0;
> >>  	}
> >>
> >> +	if (rx_adapter->ena_vector) {
> >> +		if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
> >> +		    rx_adapter->vector_tmo_ticks) {
> >> +			struct eth_rx_vector_data *vec;
> >> +
> >> +			TAILQ_FOREACH(vec, &rx_adapter->vector_list,
> >next) {
> >> +				uint64_t elapsed_time = rte_rdtsc() -
> >vec->ts;
> >> +
> >> +				if (elapsed_time >= vec-
> >>vector_timeout_ticks) {
> >> +					rxa_vector_expire(vec,
> >rx_adapter);
> >> +					TAILQ_REMOVE(&rx_adapter-
> >>vector_list,
> >> +						     vec, next);
> >> +				}
> >> +			}
> >> +			rx_adapter->prev_expiry_ts = rte_rdtsc();
> >> +		}
> >> +	}
> >> +
> >>  	stats = &rx_adapter->stats;
> >>  	stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
> >>  	stats->rx_packets += rxa_poll(rx_adapter);
> >> @@ -1640,6 +1784,28 @@ rxa_update_queue(struct
> >rte_event_eth_rx_adapter *rx_adapter,
> >>  	}
> >>  }
> >>
> >> +static void
> >> +rxa_set_vector_data(struct eth_rx_queue_info *queue_info,
> >uint16_t vector_count,
> >> +		    uint64_t vector_ns, struct rte_mempool *mp, int32_t
> >qid,
> >> +		    uint16_t port_id)
> >> +{
> >> +#define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
> >> +	struct eth_rx_vector_data *vector_data;
> >> +	uint32_t flow_id;
> >> +
> >> +	vector_data = &queue_info->vector_data;
> >> +	vector_data->max_vector_count = vector_count;
> >> +	vector_data->port = port_id;
> >> +	vector_data->queue = qid;
> >> +	vector_data->vector_pool = mp;
> >> +	vector_data->vector_timeout_ticks =
> >> +		NSEC2TICK(vector_ns, rte_get_timer_hz());
> >> +	vector_data->ts = 0;
> >> +	flow_id = queue_info->event & 0xFFFFF;
> >> +	flow_id = flow_id == 0 ? (qid & 0xFF) | (port_id & 0xFFFF) :
> >flow_id;
> >
> >Maybe I am missing something here. Looking at the code it looks like
> >qid and port_id may overlap. For e.g., if qid = 0x10 and port_id = 0x11,
> >flow_id would end up being 0x11. Is this the expectation? Also, it may
> >be useful to document flow_id format.
> 
> The flow_id is 20 bit, I guess we could do 12bit queue_id and 8bit port
> as a flow.

This sounds reasonable to me. It would be useful to have the flow_id format and how it is used for vectorization in Rx/Tx adapter documentation.

> 
> >Comparing this format with existing RSS hash based method, are we
> >saying that all mbufs received in a rx burst are part of same flow when
> >vectorization is used?
> 
> Yes, the hard way to do this is to use a hash table and treating each
> mbuf having an unique flow.
> 
> >
> >> +	vector_data->event = (queue_info->event & ~0xFFFFF) |
> >flow_id;
> >> +}
> >> +
> >>  static void
> >>  rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
> >>  	struct eth_device_info *dev_info,
> >> @@ -1741,6 +1907,44 @@ rxa_add_queue(struct
> >rte_event_eth_rx_adapter *rx_adapter,
> >>  	}
> >>  }
> >>
> >> +static void
> >> +rxa_sw_event_vector_configure(
> >> +	struct rte_event_eth_rx_adapter *rx_adapter, uint16_t
> >eth_dev_id,
> >> +	int rx_queue_id,
> >> +	const struct rte_event_eth_rx_adapter_event_vector_config
> >*config)
> >> +{
> >> +	struct eth_device_info *dev_info = &rx_adapter-
> >>eth_devices[eth_dev_id];
> >> +	struct eth_rx_queue_info *queue_info;
> >> +	struct rte_event *qi_ev;
> >> +
> >> +	if (rx_queue_id == -1) {
> >> +		uint16_t nb_rx_queues;
> >> +		uint16_t i;
> >> +
> >> +		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
> >> +		for (i = 0; i < nb_rx_queues; i++)
> >> +			rxa_sw_event_vector_configure(rx_adapter,
> >eth_dev_id, i,
> >> +						      config);
> >> +		return;
> >> +	}
> >> +
> >> +	queue_info = &dev_info->rx_queue[rx_queue_id];
> >> +	qi_ev = (struct rte_event *)&queue_info->event;
> >> +	queue_info->ena_vector = 1;
> >> +	qi_ev->event_type =
> >RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
> >> +	rxa_set_vector_data(queue_info, config->vector_sz,
> >> +			    config->vector_timeout_ns, config-
> >>vector_mp,
> >> +			    rx_queue_id, dev_info->dev->data->port_id);
> >> +	rx_adapter->ena_vector = 1;
> >> +	rx_adapter->vector_tmo_ticks =
> >> +		rx_adapter->vector_tmo_ticks ?
> >> +			      RTE_MIN(config->vector_timeout_ns << 1,
> >> +				      rx_adapter->vector_tmo_ticks) :
> >> +			      config->vector_timeout_ns << 1;
> >> +	rx_adapter->prev_expiry_ts = 0;
> >> +	TAILQ_INIT(&rx_adapter->vector_list);
> >> +}
> >> +
> >>  static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
> >>  		uint16_t eth_dev_id,
> >>  		int rx_queue_id,
> >> @@ -2081,6 +2285,15 @@
> >rte_event_eth_rx_adapter_queue_add(uint8_t id,
> >>  		return -EINVAL;
> >>  	}
> >>
> >> +	if ((cap &
> >RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0 &&
> >> +	    (queue_conf->rx_queue_flags &
> >> +	     RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR)) {
> >> +		RTE_EDEV_LOG_ERR("Event vectorization is not
> >supported,"
> >> +				 " eth port: %" PRIu16 " adapter id: %"
> >PRIu8,
> >> +				 eth_dev_id, id);
> >> +		return -EINVAL;
> >> +	}
> >> +
> >>  	if ((cap &
> >RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
> >>  		(rx_queue_id != -1)) {
> >>  		RTE_EDEV_LOG_ERR("Rx queues can only be connected
> >to single "
> >> @@ -2143,6 +2356,17 @@
> >rte_event_eth_rx_adapter_queue_add(uint8_t id,
> >>  	return 0;
> >>  }
> >>
> >> +static int
> >> +rxa_sw_vector_limits(struct
> >rte_event_eth_rx_adapter_vector_limits *limits)
> >> +{
> >> +	limits->max_sz = MAX_VECTOR_SIZE;
> >> +	limits->min_sz = MIN_VECTOR_SIZE;
> >> +	limits->max_timeout_ns = MAX_VECTOR_NS;
> >> +	limits->min_timeout_ns = MIN_VECTOR_NS;
> >> +
> >> +	return 0;
> >> +}
> >> +
> >>  int
> >>  rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t
> >eth_dev_id,
> >>  				int32_t rx_queue_id)
> >> @@ -2333,7 +2557,8 @@
> >rte_event_eth_rx_adapter_queue_event_vector_config(
> >>  		ret = dev->dev_ops-
> >>eth_rx_adapter_event_vector_config(
> >>  			dev, &rte_eth_devices[eth_dev_id],
> >rx_queue_id, config);
> >>  	} else {
> >> -		ret = -ENOTSUP;
> >> +		rxa_sw_event_vector_configure(rx_adapter,
> >eth_dev_id,
> >> +					      rx_queue_id, config);
> >>  	}
> >>
> >>  	return ret;
> >> @@ -2371,7 +2596,7 @@
> >rte_event_eth_rx_adapter_vector_limits_get(
> >>  		ret = dev->dev_ops-
> >>eth_rx_adapter_vector_limits_get(
> >>  			dev, &rte_eth_devices[eth_port_id], limits);
> >>  	} else {
> >> -		ret = -ENOTSUP;
> >> +		ret = rxa_sw_vector_limits(limits);
> >>  	}
> >>
> >>  	return ret;
> >> diff --git a/lib/librte_eventdev/rte_eventdev.c
> >b/lib/librte_eventdev/rte_eventdev.c
> >> index f95edc075..254a31b1f 100644
> >> --- a/lib/librte_eventdev/rte_eventdev.c
> >> +++ b/lib/librte_eventdev/rte_eventdev.c
> >> @@ -122,7 +122,11 @@ rte_event_eth_rx_adapter_caps_get(uint8_t
> >dev_id, uint16_t eth_port_id,
> >>
> >>  	if (caps == NULL)
> >>  		return -EINVAL;
> >> -	*caps = 0;
> >> +
> >> +	if (dev->dev_ops->eth_rx_adapter_caps_get == NULL)
> >> +		*caps = RTE_EVENT_ETH_RX_ADAPTER_SW_CAP;
> >> +	else
> >> +		*caps = 0;
> >
> >Any reason why we had to set default caps value? I am thinking if sw
> >event device is used, it would set it anyways.
> >
> 
> There are multiple sw event devices which don't implement caps_get
> function, this changes solves that.
> 
> >>
> >>  	return dev->dev_ops->eth_rx_adapter_caps_get ?
> >>  				(*dev->dev_ops-
> >>eth_rx_adapter_caps_get)(dev,
> >> --
> >> 2.17.1
Pavan Nikhilesh Bhagavatula March 26, 2021, 9 a.m. UTC | #4
>> From: Pavan Nikhilesh Bhagavatula <pbhagavatula@marvell.com>
>> Sent: Thursday, March 25, 2021 6:44 PM
>> To: Jayatheerthan, Jay <jay.jayatheerthan@intel.com>; Jerin Jacob
>Kollanukkaran <jerinj@marvell.com>; Carrillo, Erik G
>> <erik.g.carrillo@intel.com>; Gujjar, Abhinandan S
><abhinandan.gujjar@intel.com>; McDaniel, Timothy
>> <timothy.mcdaniel@intel.com>; hemant.agrawal@nxp.com; Van
>Haaren, Harry <harry.van.haaren@intel.com>; mattias.ronnblom
>> <mattias.ronnblom@ericsson.com>; Ma, Liang J
><liang.j.ma@intel.com>
>> Cc: dev@dpdk.org
>> Subject: RE: [dpdk-dev] [PATCH v5 4/8] eventdev: add Rx adapter
>event vector support
>>
>>
>>
>> >-----Original Message-----
>> >From: Jayatheerthan, Jay <jay.jayatheerthan@intel.com>
>> >Sent: Thursday, March 25, 2021 4:07 PM
>> >To: Pavan Nikhilesh Bhagavatula <pbhagavatula@marvell.com>;
>Jerin
>> >Jacob Kollanukkaran <jerinj@marvell.com>; Carrillo, Erik G
>> ><erik.g.carrillo@intel.com>; Gujjar, Abhinandan S
>> ><abhinandan.gujjar@intel.com>; McDaniel, Timothy
>> ><timothy.mcdaniel@intel.com>; hemant.agrawal@nxp.com; Van
>> >Haaren, Harry <harry.van.haaren@intel.com>; mattias.ronnblom
>> ><mattias.ronnblom@ericsson.com>; Ma, Liang J
>> ><liang.j.ma@intel.com>
>> >Cc: dev@dpdk.org
>> >Subject: [EXT] RE: [dpdk-dev] [PATCH v5 4/8] eventdev: add Rx
>adapter
>> >event vector support
>> >
>> >External Email
>> >
>> >----------------------------------------------------------------------
>> >> -----Original Message-----
>> >> From: pbhagavatula@marvell.com <pbhagavatula@marvell.com>
>> >> Sent: Wednesday, March 24, 2021 10:35 AM
>> >> To: jerinj@marvell.com; Jayatheerthan, Jay
>> ><jay.jayatheerthan@intel.com>; Carrillo, Erik G
>> ><erik.g.carrillo@intel.com>; Gujjar,
>> >> Abhinandan S <abhinandan.gujjar@intel.com>; McDaniel, Timothy
>> ><timothy.mcdaniel@intel.com>; hemant.agrawal@nxp.com; Van
>> >> Haaren, Harry <harry.van.haaren@intel.com>; mattias.ronnblom
>> ><mattias.ronnblom@ericsson.com>; Ma, Liang J
>> >> <liang.j.ma@intel.com>
>> >> Cc: dev@dpdk.org; Pavan Nikhilesh <pbhagavatula@marvell.com>
>> >> Subject: [dpdk-dev] [PATCH v5 4/8] eventdev: add Rx adapter
>event
>> >vector support
>> >>
>> >> From: Pavan Nikhilesh <pbhagavatula@marvell.com>
>> >>
>> >> Add event vector support for event eth Rx adapter, the
>> >implementation
>> >> creates vector flows based on port and queue identifier of the
>> >received
>> >> mbufs.
>> >>
>> >> Signed-off-by: Pavan Nikhilesh <pbhagavatula@marvell.com>
>> >> ---
>> >>  lib/librte_eventdev/eventdev_pmd.h            |   7 +-
>> >>  .../rte_event_eth_rx_adapter.c                | 257 ++++++++++++++++-
>-
>> >>  lib/librte_eventdev/rte_eventdev.c            |   6 +-
>> >>  3 files changed, 250 insertions(+), 20 deletions(-)
>> >>
>> >> diff --git a/lib/librte_eventdev/eventdev_pmd.h
>> >b/lib/librte_eventdev/eventdev_pmd.h
>> >> index 9297f1433..0f724ac85 100644
>> >> --- a/lib/librte_eventdev/eventdev_pmd.h
>> >> +++ b/lib/librte_eventdev/eventdev_pmd.h
>> >> @@ -69,9 +69,10 @@ extern "C" {
>> >>  	} \
>> >>  } while (0)
>> >>
>> >> -#define RTE_EVENT_ETH_RX_ADAPTER_SW_CAP \
>> >> -
>> >	((RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) |
>> >\
>> >> -
>> >	(RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ))
>> >> +#define RTE_EVENT_ETH_RX_ADAPTER_SW_CAP
>> >\
>> >> +	((RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) |
>> >\
>> >> +	 (RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) |
>> >\
>> >> +	 (RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR))
>> >>
>> >>  #define RTE_EVENT_CRYPTO_ADAPTER_SW_CAP \
>> >>
>> >	RTE_EVENT_CRYPTO_ADAPTER_CAP_SESSION_PRIVATE_DATA
>> >> diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
>> >b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
>> >> index ac8ba5bf0..c71990078 100644
>> >> --- a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
>> >> +++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
>> >> @@ -26,6 +26,10 @@
>> >>  #define BATCH_SIZE		32
>> >>  #define BLOCK_CNT_THRESHOLD	10
>> >>  #define ETH_EVENT_BUFFER_SIZE	(4*BATCH_SIZE)
>> >> +#define MAX_VECTOR_SIZE		1024
>> >> +#define MIN_VECTOR_SIZE		4
>> >> +#define MAX_VECTOR_NS		1E9
>> >> +#define MIN_VECTOR_NS		1E5
>> >>
>> >>  #define ETH_RX_ADAPTER_SERVICE_NAME_LEN	32
>> >>  #define ETH_RX_ADAPTER_MEM_NAME_LEN	32
>> >> @@ -59,6 +63,20 @@ struct eth_rx_poll_entry {
>> >>  	uint16_t eth_rx_qid;
>> >>  };
>> >>
>> >> +struct eth_rx_vector_data {
>> >> +	TAILQ_ENTRY(eth_rx_vector_data) next;
>> >> +	uint16_t port;
>> >> +	uint16_t queue;
>> >> +	uint16_t max_vector_count;
>> >> +	uint64_t event;
>> >> +	uint64_t ts;
>> >> +	uint64_t vector_timeout_ticks;
>> >> +	struct rte_mempool *vector_pool;
>> >> +	struct rte_event_vector *vector_ev;
>> >> +} __rte_cache_aligned;
>> >> +
>> >> +TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
>> >> +
>> >>  /* Instance per adapter */
>> >>  struct rte_eth_event_enqueue_buffer {
>> >>  	/* Count of events in this buffer */
>> >> @@ -92,6 +110,14 @@ struct rte_event_eth_rx_adapter {
>> >>  	uint32_t wrr_pos;
>> >>  	/* Event burst buffer */
>> >>  	struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
>> >> +	/* Vector enable flag */
>> >> +	uint8_t ena_vector;
>> >> +	/* Timestamp of previous vector expiry list traversal */
>> >> +	uint64_t prev_expiry_ts;
>> >> +	/* Minimum ticks to wait before traversing expiry list */
>> >> +	uint64_t vector_tmo_ticks;
>> >> +	/* vector list */
>> >> +	struct eth_rx_vector_data_list vector_list;
>> >>  	/* Per adapter stats */
>> >>  	struct rte_event_eth_rx_adapter_stats stats;
>> >>  	/* Block count, counts up to BLOCK_CNT_THRESHOLD */
>> >> @@ -198,9 +224,11 @@ struct eth_device_info {
>> >>  struct eth_rx_queue_info {
>> >>  	int queue_enabled;	/* True if added */
>> >>  	int intr_enabled;
>> >> +	uint8_t ena_vector;
>> >>  	uint16_t wt;		/* Polling weight */
>> >>  	uint32_t flow_id_mask;	/* Set to ~0 if app provides flow id else
>> >0 */
>> >>  	uint64_t event;
>> >> +	struct eth_rx_vector_data vector_data;
>> >>  };
>> >>
>> >>  static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
>> >> @@ -722,6 +750,9 @@ rxa_flush_event_buffer(struct
>> >rte_event_eth_rx_adapter *rx_adapter)
>> >>  	    &rx_adapter->event_enqueue_buffer;
>> >>  	struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter-
>> >>stats;
>> >>
>> >> +	if (!buf->count)
>> >> +		return 0;
>> >> +
>> >>  	uint16_t n = rte_event_enqueue_new_burst(rx_adapter-
>> >>eventdev_id,
>> >>  					rx_adapter->event_port_id,
>> >>  					buf->events,
>> >> @@ -742,6 +773,72 @@ rxa_flush_event_buffer(struct
>> >rte_event_eth_rx_adapter *rx_adapter)
>> >>  	return n;
>> >>  }
>> >>
>> >> +static inline uint16_t
>> >> +rxa_create_event_vector(struct rte_event_eth_rx_adapter
>> >*rx_adapter,
>> >> +			struct eth_rx_queue_info *queue_info,
>> >> +			struct rte_eth_event_enqueue_buffer *buf,
>> >> +			struct rte_mbuf **mbufs, uint16_t num)
>> >> +{
>> >> +	struct rte_event *ev = &buf->events[buf->count];
>> >> +	struct eth_rx_vector_data *vec;
>> >> +	uint16_t filled, space, sz;
>> >> +
>> >> +	filled = 0;
>> >> +	vec = &queue_info->vector_data;
>> >> +	while (num) {
>> >> +		if (vec->vector_ev == NULL) {
>> >> +			if (rte_mempool_get(vec->vector_pool,
>> >> +					    (void **)&vec->vector_ev) <
>> >0) {
>> >> +				rte_pktmbuf_free_bulk(mbufs, num);
>> >> +				return 0;
>> >> +			}
>> >> +			vec->vector_ev->nb_elem = 0;
>> >> +			vec->vector_ev->port = vec->port;
>> >> +			vec->vector_ev->queue = vec->queue;
>> >> +			vec->vector_ev->attr_valid = true;
>> >> +			TAILQ_INSERT_TAIL(&rx_adapter->vector_list,
>> >vec, next);
>> >> +		} else if (vec->vector_ev->nb_elem == vec-
>> >>max_vector_count) {
>> >
>> >Is there a case where nb_elem > max_vector_count as we
>accumulate
>> >sz to it ?
>>
>> I don't think so, that would overflow the vector event.
>>
>> >
>> >> +			/* Event ready. */
>> >> +			ev->event = vec->event;
>> >> +			ev->vec = vec->vector_ev;
>> >> +			ev++;
>> >> +			filled++;
>> >> +			vec->vector_ev = NULL;
>> >> +			TAILQ_REMOVE(&rx_adapter->vector_list, vec,
>> >next);
>> >> +			if (rte_mempool_get(vec->vector_pool,
>> >> +					    (void **)&vec->vector_ev) <
>> >0) {
>> >> +				rte_pktmbuf_free_bulk(mbufs, num);
>> >> +				return 0;
>> >> +			}
>> >> +			vec->vector_ev->nb_elem = 0;
>> >> +			vec->vector_ev->port = vec->port;
>> >> +			vec->vector_ev->queue = vec->queue;
>> >> +			vec->vector_ev->attr_valid = true;
>> >> +			TAILQ_INSERT_TAIL(&rx_adapter->vector_list,
>> >vec, next);
>> >> +		}
>> >> +
>> >> +		space = vec->max_vector_count - vec->vector_ev-
>> >>nb_elem;
>> >> +		sz = num > space ? space : num;
>> >> +		memcpy(vec->vector_ev->mbufs + vec->vector_ev-
>> >>nb_elem, mbufs,
>> >> +		       sizeof(void *) * sz);
>> >> +		vec->vector_ev->nb_elem += sz;
>> >> +		num -= sz;
>> >> +		mbufs += sz;
>> >> +		vec->ts = rte_rdtsc();
>> >> +	}
>> >> +
>> >> +	if (vec->vector_ev->nb_elem == vec->max_vector_count) {
>> >
>> >Same here.
>> >
>> >> +		ev->event = vec->event;
>> >> +		ev->vec = vec->vector_ev;
>> >> +		ev++;
>> >> +		filled++;
>> >> +		vec->vector_ev = NULL;
>> >> +		TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
>> >> +	}
>> >> +
>> >> +	return filled;
>> >> +}
>> >
>> >I am seeing more than one repeating code chunks in this function.
>> >Perhaps, you can give it a try to not repeat. We can drop if its
>> >performance affecting.
>>
>> I will try to move them to inline functions and test.
>>
>> >
>> >> +
>> >>  static inline void
>> >>  rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
>> >>  		uint16_t eth_dev_id,
>> >> @@ -770,25 +867,30 @@ rxa_buffer_mbufs(struct
>> >rte_event_eth_rx_adapter *rx_adapter,
>> >>  	rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
>> >>  	do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
>> >
>> >The RSS related code is executed for vector case as well. Can this be
>> >moved inside ena_vector if condition ?
>>
>> RSS is used to generate the event flowid, in vector case the flow id
>> Will be a combination of port and queue id.
>> The idea is that flows having the same RSS LSB will end up in the same
>> queue.
>>
>
>I meant to say, rss_mask and do_rss are used only when ena_vector is
>false. Could be moved inside the appropriate condition ?
>

Ah! I see I will move them inside the conditional.

>> >
>> >>
>> >> -	for (i = 0; i < num; i++) {
>> >> -		m = mbufs[i];
>> >> -
>> >> -		rss = do_rss ?
>> >> -			rxa_do_softrss(m, rx_adapter->rss_key_be) :
>> >> -			m->hash.rss;
>> >> -		ev->event = event;
>> >> -		ev->flow_id = (rss & ~flow_id_mask) |
>> >> -				(ev->flow_id & flow_id_mask);
>> >> -		ev->mbuf = m;
>> >> -		ev++;
>> >> +	if (!eth_rx_queue_info->ena_vector) {
>> >> +		for (i = 0; i < num; i++) {
>> >> +			m = mbufs[i];
>> >> +
>> >> +			rss = do_rss ? rxa_do_softrss(m, rx_adapter-
>> >>rss_key_be)
>> >> +				     : m->hash.rss;
>> >> +			ev->event = event;
>> >> +			ev->flow_id = (rss & ~flow_id_mask) |
>> >> +				      (ev->flow_id & flow_id_mask);
>> >> +			ev->mbuf = m;
>> >> +			ev++;
>> >> +		}
>> >> +	} else {
>> >> +		num = rxa_create_event_vector(rx_adapter,
>> >eth_rx_queue_info,
>> >> +					      buf, mbufs, num);
>> >>  	}
>> >>
>> >> -	if (dev_info->cb_fn) {
>> >> +	if (num && dev_info->cb_fn) {
>> >>
>> >>  		dropped = 0;
>> >>  		nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
>> >> -					ETH_EVENT_BUFFER_SIZE, buf-
>> >>count, ev,
>> >> -					num, dev_info->cb_arg,
>> >&dropped);
>> >> +					ETH_EVENT_BUFFER_SIZE, buf-
>> >>count,
>> >> +					&buf->events[buf->count],
>> >num,
>> >> +					dev_info->cb_arg, &dropped);
>> >
>> >Before this patch, we pass ev which is &buf->events[buf->count] +
>num
>> >as fifth param when calling cb_fn. Now, we are passing &buf-
>> >>events[buf->count] for non-vector case. Do you see this as an
>issue?
>> >
>>
>> The callback function takes in the array newly formed events i.e. we
>need
>> to pass the start of array and the count.
>>
>> the previous code had a bug where it passes the end of the event list.
>
>ok, that makes sense.
>
>>
>> >Also, for vector case would it make sense to do pass &buf-
>>events[buf-
>> >>count] + num ?
>> >
>> >>  		if (unlikely(nb_cb > num))
>> >>  			RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d)
>> >events",
>> >>  				nb_cb, num);
>> >> @@ -1124,6 +1226,30 @@ rxa_poll(struct
>rte_event_eth_rx_adapter
>> >*rx_adapter)
>> >>  	return nb_rx;
>> >>  }
>> >>
>> >> +static void
>> >> +rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
>> >> +{
>> >> +	struct rte_event_eth_rx_adapter *rx_adapter = arg;
>> >> +	struct rte_eth_event_enqueue_buffer *buf =
>> >> +		&rx_adapter->event_enqueue_buffer;
>> >> +	struct rte_event *ev;
>> >> +
>> >> +	if (buf->count)
>> >> +		rxa_flush_event_buffer(rx_adapter);
>> >> +
>> >> +	if (vec->vector_ev->nb_elem == 0)
>> >> +		return;
>> >> +	ev = &buf->events[buf->count];
>> >> +
>> >> +	/* Event ready. */
>> >> +	ev->event = vec->event;
>> >> +	ev->vec = vec->vector_ev;
>> >> +	buf->count++;
>> >> +
>> >> +	vec->vector_ev = NULL;
>> >> +	vec->ts = 0;
>> >> +}
>> >> +
>> >>  static int
>> >>  rxa_service_func(void *args)
>> >>  {
>> >> @@ -1137,6 +1263,24 @@ rxa_service_func(void *args)
>> >>  		return 0;
>> >>  	}
>> >>
>> >> +	if (rx_adapter->ena_vector) {
>> >> +		if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
>> >> +		    rx_adapter->vector_tmo_ticks) {
>> >> +			struct eth_rx_vector_data *vec;
>> >> +
>> >> +			TAILQ_FOREACH(vec, &rx_adapter->vector_list,
>> >next) {
>> >> +				uint64_t elapsed_time = rte_rdtsc() -
>> >vec->ts;
>> >> +
>> >> +				if (elapsed_time >= vec-
>> >>vector_timeout_ticks) {
>> >> +					rxa_vector_expire(vec,
>> >rx_adapter);
>> >> +					TAILQ_REMOVE(&rx_adapter-
>> >>vector_list,
>> >> +						     vec, next);
>> >> +				}
>> >> +			}
>> >> +			rx_adapter->prev_expiry_ts = rte_rdtsc();
>> >> +		}
>> >> +	}
>> >> +
>> >>  	stats = &rx_adapter->stats;
>> >>  	stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
>> >>  	stats->rx_packets += rxa_poll(rx_adapter);
>> >> @@ -1640,6 +1784,28 @@ rxa_update_queue(struct
>> >rte_event_eth_rx_adapter *rx_adapter,
>> >>  	}
>> >>  }
>> >>
>> >> +static void
>> >> +rxa_set_vector_data(struct eth_rx_queue_info *queue_info,
>> >uint16_t vector_count,
>> >> +		    uint64_t vector_ns, struct rte_mempool *mp, int32_t
>> >qid,
>> >> +		    uint16_t port_id)
>> >> +{
>> >> +#define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
>> >> +	struct eth_rx_vector_data *vector_data;
>> >> +	uint32_t flow_id;
>> >> +
>> >> +	vector_data = &queue_info->vector_data;
>> >> +	vector_data->max_vector_count = vector_count;
>> >> +	vector_data->port = port_id;
>> >> +	vector_data->queue = qid;
>> >> +	vector_data->vector_pool = mp;
>> >> +	vector_data->vector_timeout_ticks =
>> >> +		NSEC2TICK(vector_ns, rte_get_timer_hz());
>> >> +	vector_data->ts = 0;
>> >> +	flow_id = queue_info->event & 0xFFFFF;
>> >> +	flow_id = flow_id == 0 ? (qid & 0xFF) | (port_id & 0xFFFF) :
>> >flow_id;
>> >
>> >Maybe I am missing something here. Looking at the code it looks like
>> >qid and port_id may overlap. For e.g., if qid = 0x10 and port_id =
>0x11,
>> >flow_id would end up being 0x11. Is this the expectation? Also, it
>may
>> >be useful to document flow_id format.
>>
>> The flow_id is 20 bit, I guess we could do 12bit queue_id and 8bit port
>> as a flow.
>
>This sounds reasonable to me. It would be useful to have the flow_id
>format and how it is used for vectorization in Rx/Tx adapter
>documentation.

This is only applicable to the SW Rx adapter implementation a HW implementation might have
its own way of implementing the flow aggregation.
There is no documentation specific to SW Rx adapter, I will add it to the commit log.

>
>>
>> >Comparing this format with existing RSS hash based method, are we
>> >saying that all mbufs received in a rx burst are part of same flow
>when
>> >vectorization is used?
>>
>> Yes, the hard way to do this is to use a hash table and treating each
>> mbuf having an unique flow.
>>
>> >
>> >> +	vector_data->event = (queue_info->event & ~0xFFFFF) |
>> >flow_id;
>> >> +}
>> >> +
>> >>  static void
>> >>  rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
>> >>  	struct eth_device_info *dev_info,
>> >> @@ -1741,6 +1907,44 @@ rxa_add_queue(struct
>> >rte_event_eth_rx_adapter *rx_adapter,
>> >>  	}
>> >>  }
>> >>
>> >> +static void
>> >> +rxa_sw_event_vector_configure(
>> >> +	struct rte_event_eth_rx_adapter *rx_adapter, uint16_t
>> >eth_dev_id,
>> >> +	int rx_queue_id,
>> >> +	const struct rte_event_eth_rx_adapter_event_vector_config
>> >*config)
>> >> +{
>> >> +	struct eth_device_info *dev_info = &rx_adapter-
>> >>eth_devices[eth_dev_id];
>> >> +	struct eth_rx_queue_info *queue_info;
>> >> +	struct rte_event *qi_ev;
>> >> +
>> >> +	if (rx_queue_id == -1) {
>> >> +		uint16_t nb_rx_queues;
>> >> +		uint16_t i;
>> >> +
>> >> +		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
>> >> +		for (i = 0; i < nb_rx_queues; i++)
>> >> +			rxa_sw_event_vector_configure(rx_adapter,
>> >eth_dev_id, i,
>> >> +						      config);
>> >> +		return;
>> >> +	}
>> >> +
>> >> +	queue_info = &dev_info->rx_queue[rx_queue_id];
>> >> +	qi_ev = (struct rte_event *)&queue_info->event;
>> >> +	queue_info->ena_vector = 1;
>> >> +	qi_ev->event_type =
>> >RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
>> >> +	rxa_set_vector_data(queue_info, config->vector_sz,
>> >> +			    config->vector_timeout_ns, config-
>> >>vector_mp,
>> >> +			    rx_queue_id, dev_info->dev->data->port_id);
>> >> +	rx_adapter->ena_vector = 1;
>> >> +	rx_adapter->vector_tmo_ticks =
>> >> +		rx_adapter->vector_tmo_ticks ?
>> >> +			      RTE_MIN(config->vector_timeout_ns << 1,
>> >> +				      rx_adapter->vector_tmo_ticks) :
>> >> +			      config->vector_timeout_ns << 1;
>> >> +	rx_adapter->prev_expiry_ts = 0;
>> >> +	TAILQ_INIT(&rx_adapter->vector_list);
>> >> +}
>> >> +
>> >>  static int rxa_sw_add(struct rte_event_eth_rx_adapter
>*rx_adapter,
>> >>  		uint16_t eth_dev_id,
>> >>  		int rx_queue_id,
>> >> @@ -2081,6 +2285,15 @@
>> >rte_event_eth_rx_adapter_queue_add(uint8_t id,
>> >>  		return -EINVAL;
>> >>  	}
>> >>
>> >> +	if ((cap &
>> >RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0 &&
>> >> +	    (queue_conf->rx_queue_flags &
>> >> +	     RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR)) {
>> >> +		RTE_EDEV_LOG_ERR("Event vectorization is not
>> >supported,"
>> >> +				 " eth port: %" PRIu16 " adapter id: %"
>> >PRIu8,
>> >> +				 eth_dev_id, id);
>> >> +		return -EINVAL;
>> >> +	}
>> >> +
>> >>  	if ((cap &
>> >RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
>> >>  		(rx_queue_id != -1)) {
>> >>  		RTE_EDEV_LOG_ERR("Rx queues can only be connected
>> >to single "
>> >> @@ -2143,6 +2356,17 @@
>> >rte_event_eth_rx_adapter_queue_add(uint8_t id,
>> >>  	return 0;
>> >>  }
>> >>
>> >> +static int
>> >> +rxa_sw_vector_limits(struct
>> >rte_event_eth_rx_adapter_vector_limits *limits)
>> >> +{
>> >> +	limits->max_sz = MAX_VECTOR_SIZE;
>> >> +	limits->min_sz = MIN_VECTOR_SIZE;
>> >> +	limits->max_timeout_ns = MAX_VECTOR_NS;
>> >> +	limits->min_timeout_ns = MIN_VECTOR_NS;
>> >> +
>> >> +	return 0;
>> >> +}
>> >> +
>> >>  int
>> >>  rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t
>> >eth_dev_id,
>> >>  				int32_t rx_queue_id)
>> >> @@ -2333,7 +2557,8 @@
>> >rte_event_eth_rx_adapter_queue_event_vector_config(
>> >>  		ret = dev->dev_ops-
>> >>eth_rx_adapter_event_vector_config(
>> >>  			dev, &rte_eth_devices[eth_dev_id],
>> >rx_queue_id, config);
>> >>  	} else {
>> >> -		ret = -ENOTSUP;
>> >> +		rxa_sw_event_vector_configure(rx_adapter,
>> >eth_dev_id,
>> >> +					      rx_queue_id, config);
>> >>  	}
>> >>
>> >>  	return ret;
>> >> @@ -2371,7 +2596,7 @@
>> >rte_event_eth_rx_adapter_vector_limits_get(
>> >>  		ret = dev->dev_ops-
>> >>eth_rx_adapter_vector_limits_get(
>> >>  			dev, &rte_eth_devices[eth_port_id], limits);
>> >>  	} else {
>> >> -		ret = -ENOTSUP;
>> >> +		ret = rxa_sw_vector_limits(limits);
>> >>  	}
>> >>
>> >>  	return ret;
>> >> diff --git a/lib/librte_eventdev/rte_eventdev.c
>> >b/lib/librte_eventdev/rte_eventdev.c
>> >> index f95edc075..254a31b1f 100644
>> >> --- a/lib/librte_eventdev/rte_eventdev.c
>> >> +++ b/lib/librte_eventdev/rte_eventdev.c
>> >> @@ -122,7 +122,11 @@
>rte_event_eth_rx_adapter_caps_get(uint8_t
>> >dev_id, uint16_t eth_port_id,
>> >>
>> >>  	if (caps == NULL)
>> >>  		return -EINVAL;
>> >> -	*caps = 0;
>> >> +
>> >> +	if (dev->dev_ops->eth_rx_adapter_caps_get == NULL)
>> >> +		*caps = RTE_EVENT_ETH_RX_ADAPTER_SW_CAP;
>> >> +	else
>> >> +		*caps = 0;
>> >
>> >Any reason why we had to set default caps value? I am thinking if sw
>> >event device is used, it would set it anyways.
>> >
>>
>> There are multiple sw event devices which don't implement caps_get
>> function, this changes solves that.
>>
>> >>
>> >>  	return dev->dev_ops->eth_rx_adapter_caps_get ?
>> >>  				(*dev->dev_ops-
>> >>eth_rx_adapter_caps_get)(dev,
>> >> --
>> >> 2.17.1
diff mbox series

Patch

diff --git a/lib/librte_eventdev/eventdev_pmd.h b/lib/librte_eventdev/eventdev_pmd.h
index 9297f1433..0f724ac85 100644
--- a/lib/librte_eventdev/eventdev_pmd.h
+++ b/lib/librte_eventdev/eventdev_pmd.h
@@ -69,9 +69,10 @@  extern "C" {
 	} \
 } while (0)
 
-#define RTE_EVENT_ETH_RX_ADAPTER_SW_CAP \
-		((RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) | \
-			(RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ))
+#define RTE_EVENT_ETH_RX_ADAPTER_SW_CAP                                        \
+	((RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) |                     \
+	 (RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) |                         \
+	 (RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR))
 
 #define RTE_EVENT_CRYPTO_ADAPTER_SW_CAP \
 		RTE_EVENT_CRYPTO_ADAPTER_CAP_SESSION_PRIVATE_DATA
diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
index ac8ba5bf0..c71990078 100644
--- a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
+++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
@@ -26,6 +26,10 @@ 
 #define BATCH_SIZE		32
 #define BLOCK_CNT_THRESHOLD	10
 #define ETH_EVENT_BUFFER_SIZE	(4*BATCH_SIZE)
+#define MAX_VECTOR_SIZE		1024
+#define MIN_VECTOR_SIZE		4
+#define MAX_VECTOR_NS		1E9
+#define MIN_VECTOR_NS		1E5
 
 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN	32
 #define ETH_RX_ADAPTER_MEM_NAME_LEN	32
@@ -59,6 +63,20 @@  struct eth_rx_poll_entry {
 	uint16_t eth_rx_qid;
 };
 
+struct eth_rx_vector_data {
+	TAILQ_ENTRY(eth_rx_vector_data) next;
+	uint16_t port;
+	uint16_t queue;
+	uint16_t max_vector_count;
+	uint64_t event;
+	uint64_t ts;
+	uint64_t vector_timeout_ticks;
+	struct rte_mempool *vector_pool;
+	struct rte_event_vector *vector_ev;
+} __rte_cache_aligned;
+
+TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
+
 /* Instance per adapter */
 struct rte_eth_event_enqueue_buffer {
 	/* Count of events in this buffer */
@@ -92,6 +110,14 @@  struct rte_event_eth_rx_adapter {
 	uint32_t wrr_pos;
 	/* Event burst buffer */
 	struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
+	/* Vector enable flag */
+	uint8_t ena_vector;
+	/* Timestamp of previous vector expiry list traversal */
+	uint64_t prev_expiry_ts;
+	/* Minimum ticks to wait before traversing expiry list */
+	uint64_t vector_tmo_ticks;
+	/* vector list */
+	struct eth_rx_vector_data_list vector_list;
 	/* Per adapter stats */
 	struct rte_event_eth_rx_adapter_stats stats;
 	/* Block count, counts up to BLOCK_CNT_THRESHOLD */
@@ -198,9 +224,11 @@  struct eth_device_info {
 struct eth_rx_queue_info {
 	int queue_enabled;	/* True if added */
 	int intr_enabled;
+	uint8_t ena_vector;
 	uint16_t wt;		/* Polling weight */
 	uint32_t flow_id_mask;	/* Set to ~0 if app provides flow id else 0 */
 	uint64_t event;
+	struct eth_rx_vector_data vector_data;
 };
 
 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
@@ -722,6 +750,9 @@  rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
 	    &rx_adapter->event_enqueue_buffer;
 	struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
 
+	if (!buf->count)
+		return 0;
+
 	uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
 					rx_adapter->event_port_id,
 					buf->events,
@@ -742,6 +773,72 @@  rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
 	return n;
 }
 
+static inline uint16_t
+rxa_create_event_vector(struct rte_event_eth_rx_adapter *rx_adapter,
+			struct eth_rx_queue_info *queue_info,
+			struct rte_eth_event_enqueue_buffer *buf,
+			struct rte_mbuf **mbufs, uint16_t num)
+{
+	struct rte_event *ev = &buf->events[buf->count];
+	struct eth_rx_vector_data *vec;
+	uint16_t filled, space, sz;
+
+	filled = 0;
+	vec = &queue_info->vector_data;
+	while (num) {
+		if (vec->vector_ev == NULL) {
+			if (rte_mempool_get(vec->vector_pool,
+					    (void **)&vec->vector_ev) < 0) {
+				rte_pktmbuf_free_bulk(mbufs, num);
+				return 0;
+			}
+			vec->vector_ev->nb_elem = 0;
+			vec->vector_ev->port = vec->port;
+			vec->vector_ev->queue = vec->queue;
+			vec->vector_ev->attr_valid = true;
+			TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
+		} else if (vec->vector_ev->nb_elem == vec->max_vector_count) {
+			/* Event ready. */
+			ev->event = vec->event;
+			ev->vec = vec->vector_ev;
+			ev++;
+			filled++;
+			vec->vector_ev = NULL;
+			TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
+			if (rte_mempool_get(vec->vector_pool,
+					    (void **)&vec->vector_ev) < 0) {
+				rte_pktmbuf_free_bulk(mbufs, num);
+				return 0;
+			}
+			vec->vector_ev->nb_elem = 0;
+			vec->vector_ev->port = vec->port;
+			vec->vector_ev->queue = vec->queue;
+			vec->vector_ev->attr_valid = true;
+			TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
+		}
+
+		space = vec->max_vector_count - vec->vector_ev->nb_elem;
+		sz = num > space ? space : num;
+		memcpy(vec->vector_ev->mbufs + vec->vector_ev->nb_elem, mbufs,
+		       sizeof(void *) * sz);
+		vec->vector_ev->nb_elem += sz;
+		num -= sz;
+		mbufs += sz;
+		vec->ts = rte_rdtsc();
+	}
+
+	if (vec->vector_ev->nb_elem == vec->max_vector_count) {
+		ev->event = vec->event;
+		ev->vec = vec->vector_ev;
+		ev++;
+		filled++;
+		vec->vector_ev = NULL;
+		TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
+	}
+
+	return filled;
+}
+
 static inline void
 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
 		uint16_t eth_dev_id,
@@ -770,25 +867,30 @@  rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
 	rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
 	do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
 
-	for (i = 0; i < num; i++) {
-		m = mbufs[i];
-
-		rss = do_rss ?
-			rxa_do_softrss(m, rx_adapter->rss_key_be) :
-			m->hash.rss;
-		ev->event = event;
-		ev->flow_id = (rss & ~flow_id_mask) |
-				(ev->flow_id & flow_id_mask);
-		ev->mbuf = m;
-		ev++;
+	if (!eth_rx_queue_info->ena_vector) {
+		for (i = 0; i < num; i++) {
+			m = mbufs[i];
+
+			rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
+				     : m->hash.rss;
+			ev->event = event;
+			ev->flow_id = (rss & ~flow_id_mask) |
+				      (ev->flow_id & flow_id_mask);
+			ev->mbuf = m;
+			ev++;
+		}
+	} else {
+		num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
+					      buf, mbufs, num);
 	}
 
-	if (dev_info->cb_fn) {
+	if (num && dev_info->cb_fn) {
 
 		dropped = 0;
 		nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
-					ETH_EVENT_BUFFER_SIZE, buf->count, ev,
-					num, dev_info->cb_arg, &dropped);
+					ETH_EVENT_BUFFER_SIZE, buf->count,
+					&buf->events[buf->count], num,
+					dev_info->cb_arg, &dropped);
 		if (unlikely(nb_cb > num))
 			RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
 				nb_cb, num);
@@ -1124,6 +1226,30 @@  rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
 	return nb_rx;
 }
 
+static void
+rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
+{
+	struct rte_event_eth_rx_adapter *rx_adapter = arg;
+	struct rte_eth_event_enqueue_buffer *buf =
+		&rx_adapter->event_enqueue_buffer;
+	struct rte_event *ev;
+
+	if (buf->count)
+		rxa_flush_event_buffer(rx_adapter);
+
+	if (vec->vector_ev->nb_elem == 0)
+		return;
+	ev = &buf->events[buf->count];
+
+	/* Event ready. */
+	ev->event = vec->event;
+	ev->vec = vec->vector_ev;
+	buf->count++;
+
+	vec->vector_ev = NULL;
+	vec->ts = 0;
+}
+
 static int
 rxa_service_func(void *args)
 {
@@ -1137,6 +1263,24 @@  rxa_service_func(void *args)
 		return 0;
 	}
 
+	if (rx_adapter->ena_vector) {
+		if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
+		    rx_adapter->vector_tmo_ticks) {
+			struct eth_rx_vector_data *vec;
+
+			TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
+				uint64_t elapsed_time = rte_rdtsc() - vec->ts;
+
+				if (elapsed_time >= vec->vector_timeout_ticks) {
+					rxa_vector_expire(vec, rx_adapter);
+					TAILQ_REMOVE(&rx_adapter->vector_list,
+						     vec, next);
+				}
+			}
+			rx_adapter->prev_expiry_ts = rte_rdtsc();
+		}
+	}
+
 	stats = &rx_adapter->stats;
 	stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
 	stats->rx_packets += rxa_poll(rx_adapter);
@@ -1640,6 +1784,28 @@  rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 	}
 }
 
+static void
+rxa_set_vector_data(struct eth_rx_queue_info *queue_info, uint16_t vector_count,
+		    uint64_t vector_ns, struct rte_mempool *mp, int32_t qid,
+		    uint16_t port_id)
+{
+#define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
+	struct eth_rx_vector_data *vector_data;
+	uint32_t flow_id;
+
+	vector_data = &queue_info->vector_data;
+	vector_data->max_vector_count = vector_count;
+	vector_data->port = port_id;
+	vector_data->queue = qid;
+	vector_data->vector_pool = mp;
+	vector_data->vector_timeout_ticks =
+		NSEC2TICK(vector_ns, rte_get_timer_hz());
+	vector_data->ts = 0;
+	flow_id = queue_info->event & 0xFFFFF;
+	flow_id = flow_id == 0 ? (qid & 0xFF) | (port_id & 0xFFFF) : flow_id;
+	vector_data->event = (queue_info->event & ~0xFFFFF) | flow_id;
+}
+
 static void
 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
 	struct eth_device_info *dev_info,
@@ -1741,6 +1907,44 @@  rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
 	}
 }
 
+static void
+rxa_sw_event_vector_configure(
+	struct rte_event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
+	int rx_queue_id,
+	const struct rte_event_eth_rx_adapter_event_vector_config *config)
+{
+	struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
+	struct eth_rx_queue_info *queue_info;
+	struct rte_event *qi_ev;
+
+	if (rx_queue_id == -1) {
+		uint16_t nb_rx_queues;
+		uint16_t i;
+
+		nb_rx_queues = dev_info->dev->data->nb_rx_queues;
+		for (i = 0; i < nb_rx_queues; i++)
+			rxa_sw_event_vector_configure(rx_adapter, eth_dev_id, i,
+						      config);
+		return;
+	}
+
+	queue_info = &dev_info->rx_queue[rx_queue_id];
+	qi_ev = (struct rte_event *)&queue_info->event;
+	queue_info->ena_vector = 1;
+	qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
+	rxa_set_vector_data(queue_info, config->vector_sz,
+			    config->vector_timeout_ns, config->vector_mp,
+			    rx_queue_id, dev_info->dev->data->port_id);
+	rx_adapter->ena_vector = 1;
+	rx_adapter->vector_tmo_ticks =
+		rx_adapter->vector_tmo_ticks ?
+			      RTE_MIN(config->vector_timeout_ns << 1,
+				      rx_adapter->vector_tmo_ticks) :
+			      config->vector_timeout_ns << 1;
+	rx_adapter->prev_expiry_ts = 0;
+	TAILQ_INIT(&rx_adapter->vector_list);
+}
+
 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 		uint16_t eth_dev_id,
 		int rx_queue_id,
@@ -2081,6 +2285,15 @@  rte_event_eth_rx_adapter_queue_add(uint8_t id,
 		return -EINVAL;
 	}
 
+	if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0 &&
+	    (queue_conf->rx_queue_flags &
+	     RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR)) {
+		RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
+				 " eth port: %" PRIu16 " adapter id: %" PRIu8,
+				 eth_dev_id, id);
+		return -EINVAL;
+	}
+
 	if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
 		(rx_queue_id != -1)) {
 		RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
@@ -2143,6 +2356,17 @@  rte_event_eth_rx_adapter_queue_add(uint8_t id,
 	return 0;
 }
 
+static int
+rxa_sw_vector_limits(struct rte_event_eth_rx_adapter_vector_limits *limits)
+{
+	limits->max_sz = MAX_VECTOR_SIZE;
+	limits->min_sz = MIN_VECTOR_SIZE;
+	limits->max_timeout_ns = MAX_VECTOR_NS;
+	limits->min_timeout_ns = MIN_VECTOR_NS;
+
+	return 0;
+}
+
 int
 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
 				int32_t rx_queue_id)
@@ -2333,7 +2557,8 @@  rte_event_eth_rx_adapter_queue_event_vector_config(
 		ret = dev->dev_ops->eth_rx_adapter_event_vector_config(
 			dev, &rte_eth_devices[eth_dev_id], rx_queue_id, config);
 	} else {
-		ret = -ENOTSUP;
+		rxa_sw_event_vector_configure(rx_adapter, eth_dev_id,
+					      rx_queue_id, config);
 	}
 
 	return ret;
@@ -2371,7 +2596,7 @@  rte_event_eth_rx_adapter_vector_limits_get(
 		ret = dev->dev_ops->eth_rx_adapter_vector_limits_get(
 			dev, &rte_eth_devices[eth_port_id], limits);
 	} else {
-		ret = -ENOTSUP;
+		ret = rxa_sw_vector_limits(limits);
 	}
 
 	return ret;
diff --git a/lib/librte_eventdev/rte_eventdev.c b/lib/librte_eventdev/rte_eventdev.c
index f95edc075..254a31b1f 100644
--- a/lib/librte_eventdev/rte_eventdev.c
+++ b/lib/librte_eventdev/rte_eventdev.c
@@ -122,7 +122,11 @@  rte_event_eth_rx_adapter_caps_get(uint8_t dev_id, uint16_t eth_port_id,
 
 	if (caps == NULL)
 		return -EINVAL;
-	*caps = 0;
+
+	if (dev->dev_ops->eth_rx_adapter_caps_get == NULL)
+		*caps = RTE_EVENT_ETH_RX_ADAPTER_SW_CAP;
+	else
+		*caps = 0;
 
 	return dev->dev_ops->eth_rx_adapter_caps_get ?
 				(*dev->dev_ops->eth_rx_adapter_caps_get)(dev,