From patchwork Thu Mar 10 06:54:06 2022 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Ding, Xuan" X-Patchwork-Id: 108631 X-Patchwork-Delegate: maxime.coquelin@redhat.com Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id 3E4D5A0093; Thu, 10 Mar 2022 07:57:07 +0100 (CET) Received: from [217.70.189.124] (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id 2BDE441144; Thu, 10 Mar 2022 07:57:07 +0100 (CET) Received: from mga02.intel.com (mga02.intel.com [134.134.136.20]) by mails.dpdk.org (Postfix) with ESMTP id 16FB2410DD for ; Thu, 10 Mar 2022 07:57:04 +0100 (CET) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=intel.com; i=@intel.com; q=dns/txt; s=Intel; t=1646895425; x=1678431425; h=from:to:cc:subject:date:message-id:in-reply-to: references; bh=o+8TwZPuVa2WRY183jwJprxVIkdaRVUhlPvhoya9ZxE=; b=VeYwq4vHbkwmzyyxMaj7ChQKKjYi1K8W1IqPtsk50BUl/FXeAQbzKloe avzu9aiiRzibaD2nXPePEr1AzX2kGIW3OfDPLWtdnHKLs3dBOPB8j+Xjr M4ERCirt1Kvzvt40Dd0GF5oHyoCRPLUu7KFQ7FiZFk+9bvpAsocwRPNwB mWvVUG0aX8rVYDU5pwBQzeH8JWbSwxfDrueJwMKMypS5ToafIUuxJ0lTf qqc6203YygQspjt88GBVmZnOru/BoJ22i7ceoo3J5QTqsLiBKT2VgxafV 3nyAX2/eAH4eXwmxZEMFL4GhyY88VHJ648K/OoEjgNgof+ENkzBYElbC/ Q==; X-IronPort-AV: E=McAfee;i="6200,9189,10281"; a="242621035" X-IronPort-AV: E=Sophos;i="5.90,169,1643702400"; d="scan'208";a="242621035" Received: from orsmga007.jf.intel.com ([10.7.209.58]) by orsmga101.jf.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384; 09 Mar 2022 22:57:03 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.90,169,1643702400"; d="scan'208";a="538333297" Received: from npg-dpdk-xuan-cbdma.sh.intel.com ([10.67.110.228]) by orsmga007.jf.intel.com with ESMTP; 09 Mar 2022 22:57:00 -0800 From: xuan.ding@intel.com To: maxime.coquelin@redhat.com, chenbo.xia@intel.com Cc: dev@dpdk.org, jiayu.hu@intel.com, cheng1.jiang@intel.com, sunil.pai.g@intel.com, liangma@liangbit.com, Xuan Ding , Yuan Wang Subject: [RFC,v3 1/2] vhost: support async dequeue for split ring Date: Thu, 10 Mar 2022 06:54:06 +0000 Message-Id: <20220310065407.17145-2-xuan.ding@intel.com> X-Mailer: git-send-email 2.17.1 In-Reply-To: <20220310065407.17145-1-xuan.ding@intel.com> References: <20220101001244.90147-1-xuan.ding@intel.com> <20220310065407.17145-1-xuan.ding@intel.com> X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org From: Xuan Ding This patch implements asynchronous dequeue data path for vhost split ring, with dmadev library integrated. Signed-off-by: Xuan Ding Signed-off-by: Yuan Wang --- lib/vhost/rte_vhost_async.h | 37 ++- lib/vhost/version.map | 1 + lib/vhost/vhost.h | 1 + lib/vhost/virtio_net.c | 504 ++++++++++++++++++++++++++++++++++++ 4 files changed, 541 insertions(+), 2 deletions(-) diff --git a/lib/vhost/rte_vhost_async.h b/lib/vhost/rte_vhost_async.h index f1293c6a9d..b6ab0b06a2 100644 --- a/lib/vhost/rte_vhost_async.h +++ b/lib/vhost/rte_vhost_async.h @@ -155,9 +155,9 @@ int rte_vhost_async_get_inflight(int vid, uint16_t queue_id); * @param count * Size of the packet array * @param dma_id - * the identifier of DMA device + * The identifier of DMA device * @param vchan_id - * the identifier of virtual DMA channel + * The identifier of virtual DMA channel * @return * Number of packets returned */ @@ -187,6 +187,39 @@ uint16_t rte_vhost_clear_queue_thread_unsafe(int vid, uint16_t queue_id, __rte_experimental int rte_vhost_async_dma_configure(int16_t dma_id, uint16_t vchan_id); +/** + * This function tries to receive packets from the guest with offloading + * copies to the async channel. The packets that are transfer completed + * are returned in "pkts". The other packets that their copies are submitted to + * the async channel but not completed are called "in-flight packets". + * This function will not return in-flight packets until their copies are + * completed by the async channel. + * + * @param vid + * ID of vhost device to dequeue data + * @param queue_id + * ID of virtqueue to dequeue data + * @param mbuf_pool + * Mbuf_pool where host mbuf is allocated + * @param pkts + * Blank array to keep successfully dequeued packets + * @param count + * Size of the packet array + * @param nr_inflight + * The amount of in-flight packets. If error occurred, its value is set to -1. + * @param dma_id + * The identifier of DMA device + * @param vchan_id + * The identifier of virtual DMA channel + * @return + * Number of successfully dequeued packets + */ +__rte_experimental +uint16_t +rte_vhost_async_try_dequeue_burst(int vid, uint16_t queue_id, + struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count, + int *nr_inflight, uint16_t dma_id, uint16_t vchan_id); + #ifdef __cplusplus } #endif diff --git a/lib/vhost/version.map b/lib/vhost/version.map index 0a66c5840c..968d6d4290 100644 --- a/lib/vhost/version.map +++ b/lib/vhost/version.map @@ -87,6 +87,7 @@ EXPERIMENTAL { # added in 22.03 rte_vhost_async_dma_configure; + rte_vhost_async_try_dequeue_burst; }; INTERNAL { diff --git a/lib/vhost/vhost.h b/lib/vhost/vhost.h index a9edc271aa..3799d41089 100644 --- a/lib/vhost/vhost.h +++ b/lib/vhost/vhost.h @@ -178,6 +178,7 @@ extern struct async_dma_info dma_copy_track[RTE_DMADEV_DEFAULT_MAX]; */ struct async_inflight_info { struct rte_mbuf *mbuf; + struct virtio_net_hdr nethdr; uint16_t descs; /* num of descs inflight */ uint16_t nr_buffers; /* num of buffers inflight for packed ring */ }; diff --git a/lib/vhost/virtio_net.c b/lib/vhost/virtio_net.c index 5f432b0d77..3816caca79 100644 --- a/lib/vhost/virtio_net.c +++ b/lib/vhost/virtio_net.c @@ -3141,3 +3141,507 @@ rte_vhost_dequeue_burst(int vid, uint16_t queue_id, return count; } + +static __rte_always_inline int +async_desc_to_mbuf_seg(struct virtio_net *dev, struct vhost_virtqueue *vq, + struct rte_mbuf *m, uint32_t mbuf_offset, + uint64_t buf_iova, uint32_t cpy_len) +{ + struct vhost_async *async = vq->async; + uint64_t mapped_len; + uint32_t buf_offset = 0; + void *host_iova; + + while (cpy_len) { + host_iova = (void *)(uintptr_t)gpa_to_first_hpa(dev, + buf_iova + buf_offset, cpy_len, + &mapped_len); + if (unlikely(!host_iova)) { + VHOST_LOG_DATA(ERR, "(%s) %s: failed to get host_iova.\n", + dev->ifname, __func__); + return -1; + } + + if (unlikely(async_iter_add_iovec(dev, async, host_iova, + (void *)(uintptr_t)rte_pktmbuf_iova_offset(m, mbuf_offset), + (size_t)mapped_len))) + return -1; + + cpy_len -= (uint32_t)mapped_len; + mbuf_offset += (uint32_t)mapped_len; + buf_offset += (uint32_t)mapped_len; + } + + return 0; +} + +static __rte_always_inline int +async_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq, + struct buf_vector *buf_vec, uint16_t nr_vec, + struct rte_mbuf *m, struct rte_mempool *mbuf_pool, + struct virtio_net_hdr *nethdr) +{ + uint64_t buf_addr, buf_iova; + uint32_t buf_avail, buf_offset, buf_len; + uint32_t mbuf_avail, mbuf_offset; + uint32_t cpy_len; + /* A counter to avoid desc dead loop chain */ + uint16_t vec_idx = 0; + struct rte_mbuf *cur = m, *prev = m; + struct virtio_net_hdr tmp_hdr; + struct virtio_net_hdr *hdr = NULL; + struct vhost_async *async = vq->async; + + buf_addr = buf_vec[vec_idx].buf_addr; + buf_len = buf_vec[vec_idx].buf_len; + buf_iova = buf_vec[vec_idx].buf_iova; + + if (unlikely(buf_len < dev->vhost_hlen && nr_vec <= 1)) + return -1; + + if (virtio_net_with_host_offload(dev)) { + if (unlikely(buf_len < sizeof(struct virtio_net_hdr))) { + /* + * No luck, the virtio-net header doesn't fit + * in a contiguous virtual area. + */ + copy_vnet_hdr_from_desc(&tmp_hdr, buf_vec); + hdr = &tmp_hdr; + } else { + hdr = (struct virtio_net_hdr *)((uintptr_t)buf_addr); + } + } + + /* + * A virtio driver normally uses at least 2 desc buffers + * for Tx: the first for storing the header, and others + * for storing the data. + */ + if (unlikely(buf_len < dev->vhost_hlen)) { + buf_offset = dev->vhost_hlen - buf_len; + vec_idx++; + buf_addr = buf_vec[vec_idx].buf_addr; + buf_iova = buf_vec[vec_idx].buf_iova; + buf_len = buf_vec[vec_idx].buf_len; + buf_avail = buf_len - buf_offset; + } else if (buf_len == dev->vhost_hlen) { + if (unlikely(++vec_idx >= nr_vec)) + return -1; + buf_addr = buf_vec[vec_idx].buf_addr; + buf_iova = buf_vec[vec_idx].buf_iova; + buf_len = buf_vec[vec_idx].buf_len; + + buf_offset = 0; + buf_avail = buf_len; + } else { + buf_offset = dev->vhost_hlen; + buf_avail = buf_vec[vec_idx].buf_len - dev->vhost_hlen; + } + + PRINT_PACKET(dev, (uintptr_t)(buf_addr + buf_offset), (uint32_t)buf_avail, 0); + + mbuf_offset = 0; + mbuf_avail = m->buf_len - RTE_PKTMBUF_HEADROOM; + + if (async_iter_initialize(dev, async)) + return -1; + + while (1) { + cpy_len = RTE_MIN(buf_avail, mbuf_avail); + + if (async_desc_to_mbuf_seg(dev, vq, cur, mbuf_offset, buf_iova + buf_offset, + cpy_len) < 0) + goto error; + + mbuf_avail -= cpy_len; + buf_avail -= cpy_len; + mbuf_offset += cpy_len; + buf_offset += cpy_len; + + /* This buf reaches to its end, get the next one */ + if (buf_avail == 0) { + if (++vec_idx >= nr_vec) + break; + + buf_addr = buf_vec[vec_idx].buf_addr; + buf_iova = buf_vec[vec_idx].buf_iova; + buf_len = buf_vec[vec_idx].buf_len; + + buf_offset = 0; + buf_avail = buf_len; + + PRINT_PACKET(dev, (uintptr_t)buf_addr, (uint32_t)buf_avail, 0); + } + + /* + * This mbuf reaches to its end, get a new one + * to hold more data. + */ + if (mbuf_avail == 0) { + cur = rte_pktmbuf_alloc(mbuf_pool); + if (unlikely(cur == NULL)) { + VHOST_LOG_DATA(ERR, + "(%s) %s: failed to allocate memory for mbuf.\n", + dev->ifname, __func__); + goto error; + } + + prev->next = cur; + prev->data_len = mbuf_offset; + m->nb_segs += 1; + m->pkt_len += mbuf_offset; + prev = cur; + + mbuf_offset = 0; + mbuf_avail = cur->buf_len - RTE_PKTMBUF_HEADROOM; + } + } + + prev->data_len = mbuf_offset; + m->pkt_len += mbuf_offset; + + async_iter_finalize(async); + if (hdr) + *nethdr = *hdr; + + return 0; + +error: + async_iter_cancel(async); + return -1; +} + +static __rte_always_inline uint16_t +async_poll_dequeue_completed_split(struct virtio_net *dev, uint16_t queue_id, + struct rte_mbuf **pkts, uint16_t count, uint16_t dma_id, + uint16_t vchan_id, bool legacy_ol_flags) +{ + uint16_t start_idx, from, i; + uint16_t nr_cpl_pkts = 0; + struct async_inflight_info *pkts_info; + struct vhost_virtqueue *vq = dev->virtqueue[queue_id]; + + pkts_info = vq->async->pkts_info; + + vhost_async_dma_check_completed(dev, dma_id, vchan_id, VHOST_DMA_MAX_COPY_COMPLETE); + + start_idx = async_get_first_inflight_pkt_idx(vq); + + from = start_idx; + while (vq->async->pkts_cmpl_flag[from] && count--) { + vq->async->pkts_cmpl_flag[from] = false; + from = (from + 1) & (vq->size - 1); + nr_cpl_pkts++; + } + + if (nr_cpl_pkts == 0) + return 0; + + for (i = 0; i < nr_cpl_pkts; i++) { + from = (start_idx + i) & (vq->size - 1); + pkts[i] = pkts_info[from].mbuf; + + if (virtio_net_with_host_offload(dev)) + vhost_dequeue_offload(dev, &pkts_info[from].nethdr, pkts[i], + legacy_ol_flags); + } + + /* write back completed descs to used ring and update used idx */ + write_back_completed_descs_split(vq, nr_cpl_pkts); + __atomic_add_fetch(&vq->used->idx, nr_cpl_pkts, __ATOMIC_RELEASE); + vhost_vring_call_split(dev, vq); + + vq->async->pkts_inflight_n -= nr_cpl_pkts; + + return nr_cpl_pkts; +} + +static __rte_always_inline uint16_t +virtio_dev_tx_async_split(struct virtio_net *dev, struct vhost_virtqueue *vq, + uint16_t queue_id, struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, + uint16_t count, uint16_t dma_id, uint16_t vchan_id, bool legacy_ol_flags) +{ + static bool allocerr_warned; + bool dropped = false; + uint16_t free_entries; + uint16_t pkt_idx, slot_idx = 0; + uint16_t nr_done_pkts = 0; + uint16_t pkt_err = 0; + uint16_t n_xfer; + struct vhost_async *async = vq->async; + struct async_inflight_info *pkts_info = async->pkts_info; + struct rte_mbuf *pkts_prealloc[MAX_PKT_BURST]; + uint16_t pkts_size = count; + + /** + * The ordering between avail index and + * desc reads needs to be enforced. + */ + free_entries = __atomic_load_n(&vq->avail->idx, __ATOMIC_ACQUIRE) - vq->last_avail_idx; + if (free_entries == 0) + goto out; + + rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]); + + async_iter_reset(async); + + count = RTE_MIN(count, MAX_PKT_BURST); + count = RTE_MIN(count, free_entries); + VHOST_LOG_DATA(DEBUG, "(%s) about to dequeue %u buffers\n", dev->ifname, count); + + if (rte_pktmbuf_alloc_bulk(mbuf_pool, pkts_prealloc, count)) + goto out; + + for (pkt_idx = 0; pkt_idx < count; pkt_idx++) { + uint16_t head_idx = 0; + uint16_t nr_vec = 0; + uint16_t to; + uint32_t buf_len; + int err; + struct buf_vector buf_vec[BUF_VECTOR_MAX]; + struct rte_mbuf *pkt = pkts_prealloc[pkt_idx]; + + if (unlikely(fill_vec_buf_split(dev, vq, vq->last_avail_idx, + &nr_vec, buf_vec, + &head_idx, &buf_len, + VHOST_ACCESS_RO) < 0)) { + dropped = true; + break; + } + + err = virtio_dev_pktmbuf_prep(dev, pkt, buf_len); + if (unlikely(err)) { + /** + * mbuf allocation fails for jumbo packets when external + * buffer allocation is not allowed and linear buffer + * is required. Drop this packet. + */ + if (!allocerr_warned) { + VHOST_LOG_DATA(ERR, + "(%s) %s: Failed mbuf alloc of size %d from %s\n", + dev->ifname, __func__, buf_len, mbuf_pool->name); + allocerr_warned = true; + } + dropped = true; + break; + } + + slot_idx = (async->pkts_idx + pkt_idx) & (vq->size - 1); + err = async_desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkt, mbuf_pool, + &pkts_info[slot_idx].nethdr); + if (unlikely(err)) { + if (!allocerr_warned) { + VHOST_LOG_DATA(ERR, + "(%s) %s: Failed to offload copies to async channel.\n", + dev->ifname, __func__); + allocerr_warned = true; + } + dropped = true; + break; + } + + pkts_info[slot_idx].mbuf = pkt; + + /* store used descs */ + to = async->desc_idx_split & (vq->size - 1); + async->descs_split[to].id = head_idx; + async->descs_split[to].len = 0; + async->desc_idx_split++; + + vq->last_avail_idx++; + } + + if (unlikely(dropped)) + rte_pktmbuf_free_bulk(&pkts_prealloc[pkt_idx], count - pkt_idx); + + n_xfer = vhost_async_dma_transfer(dev, vq, dma_id, vchan_id, async->pkts_idx, + async->iov_iter, pkt_idx); + + async->pkts_inflight_n += n_xfer; + + pkt_err = pkt_idx - n_xfer; + if (unlikely(pkt_err)) { + VHOST_LOG_DATA(DEBUG, + "(%s) %s: failed to transfer data for queue id %d.\n", + dev->ifname, __func__, queue_id); + + pkt_idx = n_xfer; + /* recover available ring */ + vq->last_avail_idx -= pkt_err; + + /** + * recover async channel copy related structures and free pktmbufs + * for error pkts. + */ + async->desc_idx_split -= pkt_err; + while (pkt_err-- > 0) { + rte_pktmbuf_free(pkts_info[slot_idx & (vq->size - 1)].mbuf); + slot_idx--; + } + } + + async->pkts_idx += pkt_idx; + if (async->pkts_idx >= vq->size) + async->pkts_idx -= vq->size; + +out: + /* DMA device may serve other queues, unconditionally check completed. */ + nr_done_pkts = async_poll_dequeue_completed_split(dev, queue_id, pkts, pkts_size, + dma_id, vchan_id, legacy_ol_flags); + + return nr_done_pkts; +} + +__rte_noinline +static uint16_t +virtio_dev_tx_async_split_legacy(struct virtio_net *dev, + struct vhost_virtqueue *vq, uint16_t queue_id, + struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, + uint16_t count, uint16_t dma_id, uint16_t vchan_id) +{ + return virtio_dev_tx_async_split(dev, vq, queue_id, mbuf_pool, + pkts, count, dma_id, vchan_id, true); +} + +__rte_noinline +static uint16_t +virtio_dev_tx_async_split_compliant(struct virtio_net *dev, + struct vhost_virtqueue *vq, uint16_t queue_id, + struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, + uint16_t count, uint16_t dma_id, uint16_t vchan_id) +{ + return virtio_dev_tx_async_split(dev, vq, queue_id, mbuf_pool, + pkts, count, dma_id, vchan_id, false); +} + +uint16_t +rte_vhost_async_try_dequeue_burst(int vid, uint16_t queue_id, + struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count, + int *nr_inflight, uint16_t dma_id, uint16_t vchan_id) +{ + struct virtio_net *dev; + struct rte_mbuf *rarp_mbuf = NULL; + struct vhost_virtqueue *vq; + int16_t success = 1; + + *nr_inflight = -1; + + dev = get_device(vid); + if (!dev) + return 0; + + if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) { + VHOST_LOG_DATA(ERR, + "(%s) %s: built-in vhost net backend is disabled.\n", + dev->ifname, __func__); + return 0; + } + + if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->nr_vring))) { + VHOST_LOG_DATA(ERR, + "(%s) %s: invalid virtqueue idx %d.\n", + dev->ifname, __func__, queue_id); + return 0; + } + + if (unlikely(!dma_copy_track[dma_id].vchans || + !dma_copy_track[dma_id].vchans[vchan_id].pkts_cmpl_flag_addr)) { + VHOST_LOG_DATA(ERR, "(%s) %s: invalid channel %d:%u.\n", dev->ifname, __func__, + dma_id, vchan_id); + return 0; + } + + vq = dev->virtqueue[queue_id]; + + if (unlikely(rte_spinlock_trylock(&vq->access_lock) == 0)) + return 0; + + if (unlikely(vq->enabled == 0)) { + count = 0; + goto out_access_unlock; + } + + if (unlikely(!vq->async)) { + VHOST_LOG_DATA(ERR, "(%s) %s: async not registered for queue id %d.\n", + dev->ifname, __func__, queue_id); + count = 0; + goto out_access_unlock; + } + + if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM)) + vhost_user_iotlb_rd_lock(vq); + + if (unlikely(vq->access_ok == 0)) + if (unlikely(vring_translate(dev, vq) < 0)) { + count = 0; + goto out; + } + + /* + * Construct a RARP broadcast packet, and inject it to the "pkts" + * array, to looks like that guest actually send such packet. + * + * Check user_send_rarp() for more information. + * + * broadcast_rarp shares a cacheline in the virtio_net structure + * with some fields that are accessed during enqueue and + * __atomic_compare_exchange_n causes a write if performed compare + * and exchange. This could result in false sharing between enqueue + * and dequeue. + * + * Prevent unnecessary false sharing by reading broadcast_rarp first + * and only performing compare and exchange if the read indicates it + * is likely to be set. + */ + if (unlikely(__atomic_load_n(&dev->broadcast_rarp, __ATOMIC_ACQUIRE) && + __atomic_compare_exchange_n(&dev->broadcast_rarp, + &success, 0, 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED))) { + + rarp_mbuf = rte_net_make_rarp_packet(mbuf_pool, &dev->mac); + if (rarp_mbuf == NULL) { + VHOST_LOG_DATA(ERR, "Failed to make RARP packet.\n"); + count = 0; + goto out; + } + /* + * Inject it to the head of "pkts" array, so that switch's mac + * learning table will get updated first. + */ + pkts[0] = rarp_mbuf; + pkts++; + count -= 1; + } + + if (unlikely(vq_is_packed(dev))) { + static bool not_support_pack_log; + if (!not_support_pack_log) { + VHOST_LOG_DATA(ERR, + "(%s) %s: async dequeue does not support packed ring.\n", + dev->ifname, __func__); + not_support_pack_log = true; + } + count = 0; + goto out; + } + + if (dev->flags & VIRTIO_DEV_LEGACY_OL_FLAGS) + count = virtio_dev_tx_async_split_legacy(dev, vq, queue_id, + mbuf_pool, pkts, count, dma_id, vchan_id); + else + count = virtio_dev_tx_async_split_compliant(dev, vq, queue_id, + mbuf_pool, pkts, count, dma_id, vchan_id); + + *nr_inflight = vq->async->pkts_inflight_n; + +out: + if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM)) + vhost_user_iotlb_rd_unlock(vq); + +out_access_unlock: + rte_spinlock_unlock(&vq->access_lock); + + if (unlikely(rarp_mbuf != NULL)) + count += 1; + + return count; +}