From patchwork Sun Jan 7 15:34:45 2024 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Srikanth Yalavarthi X-Patchwork-Id: 135784 X-Patchwork-Delegate: jerinj@marvell.com Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id 1FB6E43857; Sun, 7 Jan 2024 16:35:59 +0100 (CET) Received: from mails.dpdk.org (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id CFA3740A7A; Sun, 7 Jan 2024 16:35:26 +0100 (CET) Received: from mx0b-0016f401.pphosted.com (mx0b-0016f401.pphosted.com [67.231.156.173]) by mails.dpdk.org (Postfix) with ESMTP id 0FAFC40698 for ; Sun, 7 Jan 2024 16:35:23 +0100 (CET) Received: from pps.filterd (m0045851.ppops.net [127.0.0.1]) by mx0b-0016f401.pphosted.com (8.17.1.24/8.17.1.24) with ESMTP id 407DlRPu022570 for ; Sun, 7 Jan 2024 07:35:23 -0800 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=marvell.com; h= from:to:cc:subject:date:message-id:in-reply-to:references :mime-version:content-transfer-encoding:content-type; s= pfpt0220; bh=OeqVFzzJXsdOnja2urJytWOV15I8Dy3akCa4PvG6nE8=; b=Ir3 fWt8u+z2Eb/3YxWlR/Deb7gGH+2jcBiFirstxAF4o7D/OzaPA6BBMPLJYGllZaJ3 APl2a+kmeTLAsiMFhRRRUIgrXhkjcEcKPEIqamEzum9CduO+DLrH9wIidektYp2d QvtNhaOqawmYibICbhQASThUBZ5KA6lA7zFw+iAiHwQMR3Sdjp7kge6Vw50ioxl6 H7I/wZ6rRugoLkb0w0VxlPEL4kz2u8kcGHM+SqMes9o+r6bfkaSK4cU63inyTWDM PdiVQRmUTwwXr5Yy3fUYr3fzP6taSAntbX5rg6BH5kvF77u0MWBGnZVudaLZdzo5 kHych7P6DTYtifvWBdA== Received: from dc5-exch01.marvell.com ([199.233.59.181]) by mx0b-0016f401.pphosted.com (PPS) with ESMTPS id 3vf78n2a1r-1 (version=TLSv1.2 cipher=ECDHE-RSA-AES256-SHA384 bits=256 verify=NOT) for ; Sun, 07 Jan 2024 07:35:23 -0800 (PST) Received: from DC5-EXCH01.marvell.com (10.69.176.38) by DC5-EXCH01.marvell.com (10.69.176.38) with Microsoft SMTP Server (TLS) id 15.0.1497.48; Sun, 7 Jan 2024 07:35:20 -0800 Received: from maili.marvell.com (10.69.176.80) by DC5-EXCH01.marvell.com (10.69.176.38) with Microsoft SMTP Server id 15.0.1497.48 via Frontend Transport; Sun, 7 Jan 2024 07:35:20 -0800 Received: from ml-host-33.caveonetworks.com (unknown [10.110.143.233]) by maili.marvell.com (Postfix) with ESMTP id 0C74E3F7093; Sun, 7 Jan 2024 07:35:20 -0800 (PST) From: Srikanth Yalavarthi To: Srikanth Yalavarthi , Jerin Jacob CC: , , , Subject: [PATCH 06/11] event/ml: add support for service function Date: Sun, 7 Jan 2024 07:34:45 -0800 Message-ID: <20240107153454.3909-7-syalavarthi@marvell.com> X-Mailer: git-send-email 2.42.0 In-Reply-To: <20240107153454.3909-1-syalavarthi@marvell.com> References: <20240107153454.3909-1-syalavarthi@marvell.com> MIME-Version: 1.0 X-Proofpoint-ORIG-GUID: MbGUJ5JqTxXIHoV9eWmvsw83PezpDNFk X-Proofpoint-GUID: MbGUJ5JqTxXIHoV9eWmvsw83PezpDNFk X-Proofpoint-Virus-Version: vendor=baseguard engine=ICAP:2.0.272,Aquarius:18.0.997,Hydra:6.0.619,FMLib:17.11.176.26 definitions=2023-12-09_02,2023-12-07_01,2023-05-22_02 X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Added support for ML adapter service function for software based event devices. Signed-off-by: Srikanth Yalavarthi --- lib/eventdev/rte_event_ml_adapter.c | 538 ++++++++++++++++++++++++++++ 1 file changed, 538 insertions(+) diff --git a/lib/eventdev/rte_event_ml_adapter.c b/lib/eventdev/rte_event_ml_adapter.c index 9d441c5d967..95f566b1025 100644 --- a/lib/eventdev/rte_event_ml_adapter.c +++ b/lib/eventdev/rte_event_ml_adapter.c @@ -5,6 +5,7 @@ #include "rte_event_ml_adapter.h" #include "rte_eventdev.h" #include +#include #include "eventdev_pmd.h" #include "rte_mldev_pmd.h" @@ -13,6 +14,9 @@ #define ML_DEFAULT_MAX_NB 128 #define ML_ADAPTER_BUFFER_SIZE 1024 +#define ML_BATCH_SIZE 32 +#define ML_ADAPTER_OPS_BUFFER_SIZE (ML_BATCH_SIZE + ML_BATCH_SIZE) + #define ML_ADAPTER_ARRAY "event_ml_adapter_array" /* ML ops circular buffer */ @@ -54,6 +58,9 @@ struct ml_device_info { * be invoked if not already invoked */ uint16_t num_qpairs; + + /* Next queue pair to be processed */ + uint16_t next_queue_pair_id; } __rte_cache_aligned; struct event_ml_adapter { @@ -78,6 +85,9 @@ struct event_ml_adapter { /* ML device structure array */ struct ml_device_info *mldevs; + /* Next ML device to be processed */ + int16_t next_mldev_id; + /* Circular buffer for processing ML ops to eventdev */ struct ml_ops_circular_buffer ebuf; @@ -92,6 +102,26 @@ struct event_ml_adapter { /* No. of queue pairs configured */ uint16_t nb_qps; + + /* Per adapter EAL service ID */ + uint32_t service_id; + + /* Service initialization state */ + uint8_t service_initialized; + + /* Max ML ops processed in any service function invocation */ + uint32_t max_nb; + + /* Store event port's implicit release capability */ + uint8_t implicit_release_disabled; + + /* Flag to indicate backpressure at mldev + * Stop further dequeuing events from eventdev + */ + bool stop_enq_to_mldev; + + /* Loop counter to flush ml ops */ + uint16_t transmit_loop_count; } __rte_cache_aligned; static struct event_ml_adapter **event_ml_adapter; @@ -133,6 +163,18 @@ emla_array_init(void) return 0; } +static inline bool +emla_circular_buffer_batch_ready(struct ml_ops_circular_buffer *bufp) +{ + return bufp->count >= ML_BATCH_SIZE; +} + +static inline bool +emla_circular_buffer_space_for_batch(struct ml_ops_circular_buffer *bufp) +{ + return (bufp->size - bufp->count) >= ML_BATCH_SIZE; +} + static inline int emla_circular_buffer_init(const char *name, struct ml_ops_circular_buffer *buf, uint16_t sz) { @@ -151,6 +193,49 @@ emla_circular_buffer_free(struct ml_ops_circular_buffer *buf) rte_free(buf->op_buffer); } +static inline int +emla_circular_buffer_add(struct ml_ops_circular_buffer *bufp, struct rte_ml_op *op) +{ + uint16_t *tail = &bufp->tail; + + bufp->op_buffer[*tail] = op; + + /* circular buffer, go round */ + *tail = (*tail + 1) % bufp->size; + bufp->count++; + + return 0; +} + +static inline int +emla_circular_buffer_flush_to_mldev(struct ml_ops_circular_buffer *bufp, uint8_t mldev_id, + uint16_t qp_id, uint16_t *nb_ops_flushed) +{ + uint16_t n = 0; + uint16_t *head = &bufp->head; + uint16_t *tail = &bufp->tail; + struct rte_ml_op **ops = bufp->op_buffer; + + if (*tail > *head) + n = *tail - *head; + else if (*tail < *head) + n = bufp->size - *head; + else { + *nb_ops_flushed = 0; + return 0; /* buffer empty */ + } + + *nb_ops_flushed = rte_ml_enqueue_burst(mldev_id, qp_id, &ops[*head], n); + bufp->count -= *nb_ops_flushed; + if (!bufp->count) { + *head = 0; + *tail = 0; + } else + *head = (*head + *nb_ops_flushed) % bufp->size; + + return *nb_ops_flushed == n ? 0 : -1; +} + static int emla_default_config_cb(uint8_t id, uint8_t evdev_id, struct rte_event_ml_adapter_conf *conf, void *arg) @@ -361,6 +446,394 @@ rte_event_ml_adapter_event_port_get(uint8_t id, uint8_t *event_port_id) return 0; } +static inline unsigned int +emla_enq_to_mldev(struct event_ml_adapter *adapter, struct rte_event *ev, unsigned int cnt) +{ + union rte_event_ml_metadata *m_data = NULL; + struct ml_queue_pair_info *qp_info = NULL; + struct rte_ml_op *ml_op; + unsigned int i, n; + uint16_t qp_id, nb_enqueued = 0; + int16_t mldev_id; + int ret; + + ret = 0; + n = 0; + + for (i = 0; i < cnt; i++) { + ml_op = ev[i].event_ptr; + if (ml_op == NULL) + continue; + + if (ml_op->private_data_offset) + m_data = (union rte_event_ml_metadata *)((uint8_t *)ml_op + + ml_op->private_data_offset); + if (m_data == NULL) { + if (ml_op != NULL && ml_op->mempool != NULL) + rte_mempool_put(ml_op->mempool, ml_op); + continue; + } + + mldev_id = m_data->request_info.mldev_id; + qp_id = m_data->request_info.queue_pair_id; + qp_info = &adapter->mldevs[mldev_id].qpairs[qp_id]; + if (!qp_info->qp_enabled) { + if (ml_op != NULL && ml_op->mempool != NULL) + rte_mempool_put(ml_op->mempool, ml_op); + continue; + } + emla_circular_buffer_add(&qp_info->mlbuf, ml_op); + + if (emla_circular_buffer_batch_ready(&qp_info->mlbuf)) { + ret = emla_circular_buffer_flush_to_mldev(&qp_info->mlbuf, mldev_id, qp_id, + &nb_enqueued); + n += nb_enqueued; + + /** + * If some ml ops failed to flush to mldev and + * space for another batch is not available, stop + * dequeue from eventdev momentarily + */ + if (unlikely(ret < 0 && + !emla_circular_buffer_space_for_batch(&qp_info->mlbuf))) + adapter->stop_enq_to_mldev = true; + } + } + + return n; +} + +static unsigned int +emla_ml_mldev_flush(struct event_ml_adapter *adapter, int16_t mldev_id, uint16_t *nb_ops_flushed) +{ + struct ml_device_info *curr_dev; + struct ml_queue_pair_info *curr_queue; + struct rte_ml_dev *dev; + uint16_t nb = 0, nb_enqueued = 0; + uint16_t qp; + + curr_dev = &adapter->mldevs[mldev_id]; + dev = rte_ml_dev_pmd_get_dev(mldev_id); + + for (qp = 0; qp < dev->data->nb_queue_pairs; qp++) { + + curr_queue = &curr_dev->qpairs[qp]; + if (unlikely(curr_queue == NULL || !curr_queue->qp_enabled)) + continue; + + emla_circular_buffer_flush_to_mldev(&curr_queue->mlbuf, mldev_id, qp, &nb_enqueued); + *nb_ops_flushed += curr_queue->mlbuf.count; + nb += nb_enqueued; + } + + return nb; +} + +static unsigned int +emla_ml_enq_flush(struct event_ml_adapter *adapter) +{ + int16_t mldev_id; + uint16_t nb_enqueued = 0; + uint16_t nb_ops_flushed = 0; + uint16_t num_mldev = rte_ml_dev_count(); + + for (mldev_id = 0; mldev_id < num_mldev; mldev_id++) + nb_enqueued += emla_ml_mldev_flush(adapter, mldev_id, &nb_ops_flushed); + /** + * Enable dequeue from eventdev if all ops from circular + * buffer flushed to mldev + */ + if (!nb_ops_flushed) + adapter->stop_enq_to_mldev = false; + + return nb_enqueued; +} + +/* Flush an instance's enqueue buffers every CRYPTO_ENQ_FLUSH_THRESHOLD + * iterations of emla_ml_adapter_enq_run() + */ +#define ML_ENQ_FLUSH_THRESHOLD 1024 + +static int +emla_ml_adapter_enq_run(struct event_ml_adapter *adapter, unsigned int max_enq) +{ + struct rte_event ev[ML_BATCH_SIZE]; + unsigned int nb_enq, nb_enqueued; + uint16_t n; + uint8_t event_dev_id = adapter->eventdev_id; + uint8_t event_port_id = adapter->event_port_id; + + nb_enqueued = 0; + if (adapter->mode == RTE_EVENT_ML_ADAPTER_OP_NEW) + return 0; + + for (nb_enq = 0; nb_enq < max_enq; nb_enq += n) { + if (unlikely(adapter->stop_enq_to_mldev)) { + nb_enqueued += emla_ml_enq_flush(adapter); + + if (unlikely(adapter->stop_enq_to_mldev)) + break; + } + + n = rte_event_dequeue_burst(event_dev_id, event_port_id, ev, ML_BATCH_SIZE, 0); + + if (!n) + break; + + nb_enqueued += emla_enq_to_mldev(adapter, ev, n); + } + + if ((++adapter->transmit_loop_count & (ML_ENQ_FLUSH_THRESHOLD - 1)) == 0) + nb_enqueued += emla_ml_enq_flush(adapter); + + return nb_enqueued; +} + +#define ML_ADAPTER_MAX_EV_ENQ_RETRIES 100 + +static inline uint16_t +emla_ops_enqueue_burst(struct event_ml_adapter *adapter, struct rte_ml_op **ops, uint16_t num) +{ + union rte_event_ml_metadata *m_data = NULL; + uint8_t event_dev_id = adapter->eventdev_id; + uint8_t event_port_id = adapter->event_port_id; + struct rte_event events[ML_BATCH_SIZE]; + uint16_t nb_enqueued, nb_ev; + uint8_t retry; + uint8_t i; + + nb_ev = 0; + retry = 0; + nb_enqueued = 0; + num = RTE_MIN(num, ML_BATCH_SIZE); + for (i = 0; i < num; i++) { + struct rte_event *ev = &events[nb_ev++]; + + if (ops[i]->private_data_offset) + m_data = (union rte_event_ml_metadata *)((uint8_t *)ops[i] + + ops[i]->private_data_offset); + if (unlikely(m_data == NULL)) { + if (ops[i] != NULL && ops[i]->mempool != NULL) + rte_mempool_put(ops[i]->mempool, ops[i]); + continue; + } + + rte_memcpy(ev, &m_data->response_info, sizeof(*ev)); + ev->event_ptr = ops[i]; + ev->event_type = RTE_EVENT_TYPE_CRYPTODEV; + if (adapter->implicit_release_disabled) + ev->op = RTE_EVENT_OP_FORWARD; + else + ev->op = RTE_EVENT_OP_NEW; + } + + do { + nb_enqueued += rte_event_enqueue_burst(event_dev_id, event_port_id, + &events[nb_enqueued], nb_ev - nb_enqueued); + + } while (retry++ < ML_ADAPTER_MAX_EV_ENQ_RETRIES && nb_enqueued < nb_ev); + + return nb_enqueued; +} + +static int +emla_circular_buffer_flush_to_evdev(struct event_ml_adapter *adapter, + struct ml_ops_circular_buffer *bufp) +{ + uint16_t n = 0, nb_ops_flushed; + uint16_t *head = &bufp->head; + uint16_t *tail = &bufp->tail; + struct rte_ml_op **ops = bufp->op_buffer; + + if (*tail > *head) + n = *tail - *head; + else if (*tail < *head) + n = bufp->size - *head; + else + return 0; /* buffer empty */ + + nb_ops_flushed = emla_ops_enqueue_burst(adapter, &ops[*head], n); + bufp->count -= nb_ops_flushed; + if (!bufp->count) { + *head = 0; + *tail = 0; + return 0; /* buffer empty */ + } + + *head = (*head + nb_ops_flushed) % bufp->size; + return 1; +} + +static void +emla_ops_buffer_flush(struct event_ml_adapter *adapter) +{ + if (likely(adapter->ebuf.count == 0)) + return; + + while (emla_circular_buffer_flush_to_evdev(adapter, &adapter->ebuf)) + ; +} + +static inline unsigned int +emla_ml_adapter_deq_run(struct event_ml_adapter *adapter, unsigned int max_deq) +{ + struct ml_device_info *curr_dev; + struct ml_queue_pair_info *curr_queue; + struct rte_ml_op *ops[ML_BATCH_SIZE]; + uint16_t n, nb_deq, nb_enqueued, i; + struct rte_ml_dev *dev; + int16_t mldev_id; + uint16_t qp, dev_qps; + bool done; + uint16_t num_mldev = rte_ml_dev_count(); + + nb_deq = 0; + emla_ops_buffer_flush(adapter); + + do { + done = true; + + for (mldev_id = adapter->next_mldev_id; mldev_id < num_mldev; mldev_id++) { + uint16_t queues = 0; + + curr_dev = &adapter->mldevs[mldev_id]; + dev = curr_dev->dev; + if (unlikely(dev == NULL)) + continue; + + dev_qps = dev->data->nb_queue_pairs; + + for (qp = curr_dev->next_queue_pair_id; queues < dev_qps; + qp = (qp + 1) % dev_qps, queues++) { + curr_queue = &curr_dev->qpairs[qp]; + if (unlikely(curr_queue == NULL || !curr_queue->qp_enabled)) + continue; + + n = rte_ml_dequeue_burst(mldev_id, qp, ops, ML_BATCH_SIZE); + if (!n) + continue; + + done = false; + nb_enqueued = 0; + + if (unlikely(!adapter->ebuf.count)) + nb_enqueued = emla_ops_enqueue_burst(adapter, ops, n); + + if (likely(nb_enqueued == n)) + goto check; + + /* Failed to enqueue events case */ + for (i = nb_enqueued; i < n; i++) + emla_circular_buffer_add(&adapter->ebuf, ops[i]); + +check: + nb_deq += n; + + if (nb_deq >= max_deq) { + if ((qp + 1) == dev_qps) + adapter->next_mldev_id = (mldev_id + 1) % num_mldev; + + curr_dev->next_queue_pair_id = + (qp + 1) % dev->data->nb_queue_pairs; + + return nb_deq; + } + } + } + adapter->next_mldev_id = 0; + } while (done == false); + + return nb_deq; +} + +static int +emla_ml_adapter_run(struct event_ml_adapter *adapter, unsigned int max_ops) +{ + unsigned int ops_left = max_ops; + + while (ops_left > 0) { + unsigned int e_cnt, d_cnt; + + e_cnt = emla_ml_adapter_deq_run(adapter, ops_left); + ops_left -= RTE_MIN(ops_left, e_cnt); + + d_cnt = emla_ml_adapter_enq_run(adapter, ops_left); + ops_left -= RTE_MIN(ops_left, d_cnt); + + if (e_cnt == 0 && d_cnt == 0) + break; + } + + if (ops_left == max_ops) { + rte_event_maintain(adapter->eventdev_id, adapter->event_port_id, 0); + return -EAGAIN; + } else + return 0; +} + +static int +emla_service_func(void *args) +{ + struct event_ml_adapter *adapter = args; + int ret; + + if (rte_spinlock_trylock(&adapter->lock) == 0) + return 0; + ret = emla_ml_adapter_run(adapter, adapter->max_nb); + rte_spinlock_unlock(&adapter->lock); + + return ret; +} + +static int +emla_init_service(struct event_ml_adapter *adapter, uint8_t id) +{ + struct rte_event_ml_adapter_conf adapter_conf; + struct rte_service_spec service; + int ret; + uint32_t impl_rel; + + if (adapter->service_initialized) + return 0; + + memset(&service, 0, sizeof(service)); + snprintf(service.name, ML_ADAPTER_NAME_LEN, "rte_event_ml_adapter_%d", id); + service.socket_id = adapter->socket_id; + service.callback = emla_service_func; + service.callback_userdata = adapter; + + /* Service function handles locking for queue add/del updates */ + service.capabilities = RTE_SERVICE_CAP_MT_SAFE; + ret = rte_service_component_register(&service, &adapter->service_id); + if (ret) { + RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32, service.name, ret); + return ret; + } + + ret = adapter->conf_cb(id, adapter->eventdev_id, &adapter_conf, adapter->conf_arg); + if (ret) { + RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32, ret); + return ret; + } + + adapter->max_nb = adapter_conf.max_nb; + adapter->event_port_id = adapter_conf.event_port_id; + + if (rte_event_port_attr_get(adapter->eventdev_id, adapter->event_port_id, + RTE_EVENT_PORT_ATTR_IMPLICIT_RELEASE_DISABLE, &impl_rel)) { + RTE_EDEV_LOG_ERR("Failed to get port info for eventdev %" PRId32, + adapter->eventdev_id); + emla_circular_buffer_free(&adapter->ebuf); + rte_free(adapter); + return -EINVAL; + } + + adapter->implicit_release_disabled = (uint8_t)impl_rel; + adapter->service_initialized = 1; + + return ret; +} + static void emla_update_qp_info(struct event_ml_adapter *adapter, struct ml_device_info *dev_info, int32_t queue_pair_id, uint8_t add) @@ -389,6 +862,40 @@ emla_update_qp_info(struct event_ml_adapter *adapter, struct ml_device_info *dev } } +static int +emla_add_queue_pair(struct event_ml_adapter *adapter, int16_t mldev_id, int queue_pair_id) +{ + struct ml_device_info *dev_info = &adapter->mldevs[mldev_id]; + struct ml_queue_pair_info *qpairs; + uint32_t i; + + if (dev_info->qpairs == NULL) { + dev_info->qpairs = rte_zmalloc_socket(adapter->mem_name, + dev_info->dev->data->nb_queue_pairs * + sizeof(struct ml_queue_pair_info), + 0, adapter->socket_id); + if (dev_info->qpairs == NULL) + return -ENOMEM; + + qpairs = dev_info->qpairs; + + if (emla_circular_buffer_init("mla_mldev_circular_buffer", &qpairs->mlbuf, + ML_ADAPTER_OPS_BUFFER_SIZE)) { + RTE_EDEV_LOG_ERR("Failed to get memory for mldev buffer"); + rte_free(qpairs); + return -ENOMEM; + } + } + + if (queue_pair_id == -1) { + for (i = 0; i < dev_info->dev->data->nb_queue_pairs; i++) + emla_update_qp_info(adapter, dev_info, i, 1); + } else + emla_update_qp_info(adapter, dev_info, (uint16_t)queue_pair_id, 1); + + return 0; +} + int rte_event_ml_adapter_queue_pair_add(uint8_t id, int16_t mldev_id, int32_t queue_pair_id, const struct rte_event *event) @@ -458,6 +965,36 @@ rte_event_ml_adapter_queue_pair_add(uint8_t id, int16_t mldev_id, int32_t queue_ emla_update_qp_info(adapter, &adapter->mldevs[mldev_id], queue_pair_id, 1); } + /* In case HW cap is RTE_EVENT_ML_ADAPTER_CAP_INTERNAL_PORT_OP_NEW, or SW adapter, initiate + * services so the application can choose which ever way it wants to use the adapter. + * + * Case 1: RTE_EVENT_ML_ADAPTER_CAP_INTERNAL_PORT_OP_NEW. Application may wants to use one + * of below two modes + * + * a. OP_FORWARD mode -> HW Dequeue + SW enqueue + * b. OP_NEW mode -> HW Dequeue + * + * Case 2: No HW caps, use SW adapter + * + * a. OP_FORWARD mode -> SW enqueue & dequeue + * b. OP_NEW mode -> SW Dequeue + */ + if ((cap & RTE_EVENT_ML_ADAPTER_CAP_INTERNAL_PORT_OP_NEW && + !(cap & RTE_EVENT_ML_ADAPTER_CAP_INTERNAL_PORT_OP_FWD) && + adapter->mode == RTE_EVENT_ML_ADAPTER_OP_FORWARD) || + (!(cap & RTE_EVENT_ML_ADAPTER_CAP_INTERNAL_PORT_OP_NEW) && + !(cap & RTE_EVENT_ML_ADAPTER_CAP_INTERNAL_PORT_OP_FWD) && + !(cap & RTE_EVENT_ML_ADAPTER_CAP_INTERNAL_PORT_QP_EV_BIND))) { + rte_spinlock_lock(&adapter->lock); + ret = emla_init_service(adapter, id); + if (ret == 0) + ret = emla_add_queue_pair(adapter, mldev_id, queue_pair_id); + rte_spinlock_unlock(&adapter->lock); + + if (ret == 0) + rte_service_component_runstate_set(adapter->service_id, 1); + } + return ret; } @@ -529,6 +1066,7 @@ rte_event_ml_adapter_queue_pair_del(uint8_t id, int16_t mldev_id, int32_t queue_ } rte_spinlock_unlock(&adapter->lock); + rte_service_component_runstate_set(adapter->service_id, adapter->nb_qps); } return ret;