diff mbox series

[v2,12/12] raw/ioat: report status of completed jobs

Message ID 20210426095259.225354-13-bruce.richardson@intel.com (mailing list archive)
State Superseded, archived
Delegated to: Thomas Monjalon
Headers show
Series ioat driver updates | expand

Checks

Context Check Description
ci/iol-mellanox-Performance success Performance Testing PASS
ci/github-robot success github build: passed
ci/iol-testing success Testing PASS
ci/iol-abi-testing success Testing PASS
ci/iol-intel-Performance success Performance Testing PASS
ci/intel-Testing success Testing PASS
ci/iol-intel-Functional success Functional Testing PASS
ci/Intel-compilation success Compilation OK
ci/checkpatch success coding style OK

Commit Message

Bruce Richardson April 26, 2021, 9:52 a.m. UTC
Add improved error handling to rte_ioat_completed_ops(). This patch adds
new parameters to the function to enable the user to track the completion
status of each individual operation in a batch. With this addition, the
function can help the user to determine firstly, how many operations may
have failed or been skipped and then secondly, which specific operations
did not complete successfully.

Signed-off-by: Kevin Laatz <kevin.laatz@intel.com>
Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
---
 doc/guides/rel_notes/release_21_05.rst |   5 +
 drivers/raw/ioat/ioat_common.c         |   9 +
 drivers/raw/ioat/ioat_rawdev_test.c    | 300 +++++++++++++++++++++++--
 drivers/raw/ioat/rte_idxd_rawdev_fns.h | 146 ++++++++----
 drivers/raw/ioat/rte_ioat_rawdev.h     |  53 ++++-
 drivers/raw/ioat/rte_ioat_rawdev_fns.h |  15 +-
 examples/ioat/ioatfwd.c                |  14 +-
 examples/vhost/ioat.c                  |   2 +-
 8 files changed, 464 insertions(+), 80 deletions(-)
diff mbox series

Patch

diff --git a/doc/guides/rel_notes/release_21_05.rst b/doc/guides/rel_notes/release_21_05.rst
index b3224dc332..7f29f5789f 100644
--- a/doc/guides/rel_notes/release_21_05.rst
+++ b/doc/guides/rel_notes/release_21_05.rst
@@ -329,6 +329,11 @@  API Changes
   ``policer_action_recolor_supported`` and ``policer_action_drop_supported``
   have been removed.
 
+* raw/ioat: The experimental function ``rte_ioat_completed_ops()`` now
+  supports two additional parameters, ``status`` and ``num_unsuccessful``,
+  to allow the reporting of errors from hardware when performing copy
+  operations.
+
 
 ABI Changes
 -----------
diff --git a/drivers/raw/ioat/ioat_common.c b/drivers/raw/ioat/ioat_common.c
index fcb30572e6..d01c1ee367 100644
--- a/drivers/raw/ioat/ioat_common.c
+++ b/drivers/raw/ioat/ioat_common.c
@@ -162,6 +162,15 @@  idxd_dev_configure(const struct rte_rawdev *dev,
 		rte_idxd->desc_ring = NULL;
 		return -ENOMEM;
 	}
+	rte_idxd->hdl_ring_flags = rte_zmalloc(NULL,
+			sizeof(*rte_idxd->hdl_ring_flags) * max_desc, 0);
+	if (rte_idxd->hdl_ring_flags == NULL) {
+		rte_free(rte_idxd->desc_ring);
+		rte_free(rte_idxd->hdl_ring);
+		rte_idxd->desc_ring = NULL;
+		rte_idxd->hdl_ring = NULL;
+		return -ENOMEM;
+	}
 	rte_idxd->hdls_read = rte_idxd->batch_start = 0;
 	rte_idxd->batch_size = 0;
 
diff --git a/drivers/raw/ioat/ioat_rawdev_test.c b/drivers/raw/ioat/ioat_rawdev_test.c
index 5f75c6ff69..d987b560d2 100644
--- a/drivers/raw/ioat/ioat_rawdev_test.c
+++ b/drivers/raw/ioat/ioat_rawdev_test.c
@@ -73,13 +73,15 @@  do_multi_copies(int dev_id, int split_batches, int split_completions)
 	if (split_completions) {
 		/* gather completions in two halves */
 		uint16_t half_len = RTE_DIM(srcs) / 2;
-		if (rte_ioat_completed_ops(dev_id, half_len, (void *)completed_src,
+		if (rte_ioat_completed_ops(dev_id, half_len, NULL, NULL,
+				(void *)completed_src,
 				(void *)completed_dst) != half_len) {
 			PRINT_ERR("Error with rte_ioat_completed_ops - first half request\n");
 			rte_rawdev_dump(dev_id, stdout);
 			return -1;
 		}
-		if (rte_ioat_completed_ops(dev_id, half_len, (void *)&completed_src[half_len],
+		if (rte_ioat_completed_ops(dev_id, half_len, NULL, NULL,
+				(void *)&completed_src[half_len],
 				(void *)&completed_dst[half_len]) != half_len) {
 			PRINT_ERR("Error with rte_ioat_completed_ops - second half request\n");
 			rte_rawdev_dump(dev_id, stdout);
@@ -87,7 +89,8 @@  do_multi_copies(int dev_id, int split_batches, int split_completions)
 		}
 	} else {
 		/* gather all completions in one go */
-		if (rte_ioat_completed_ops(dev_id, 64, (void *)completed_src,
+		if (rte_ioat_completed_ops(dev_id, RTE_DIM(completed_src), NULL, NULL,
+				(void *)completed_src,
 				(void *)completed_dst) != RTE_DIM(srcs)) {
 			PRINT_ERR("Error with rte_ioat_completed_ops\n");
 			rte_rawdev_dump(dev_id, stdout);
@@ -151,7 +154,7 @@  test_enqueue_copies(int dev_id)
 		rte_ioat_perform_ops(dev_id);
 		usleep(10);
 
-		if (rte_ioat_completed_ops(dev_id, 1, (void *)&completed[0],
+		if (rte_ioat_completed_ops(dev_id, 1, NULL, NULL, (void *)&completed[0],
 				(void *)&completed[1]) != 1) {
 			PRINT_ERR("Error with rte_ioat_completed_ops\n");
 			return -1;
@@ -170,6 +173,13 @@  test_enqueue_copies(int dev_id)
 			}
 		rte_pktmbuf_free(src);
 		rte_pktmbuf_free(dst);
+
+		/* check ring is now empty */
+		if (rte_ioat_completed_ops(dev_id, 1, NULL, NULL, (void *)&completed[0],
+				(void *)&completed[1]) != 0) {
+			PRINT_ERR("Error: got unexpected returned handles from rte_ioat_completed_ops\n");
+			return -1;
+		}
 	} while (0);
 
 	/* test doing a multiple single copies */
@@ -203,7 +213,8 @@  test_enqueue_copies(int dev_id)
 		}
 		usleep(10);
 
-		if (rte_ioat_completed_ops(dev_id, max_completions, (void *)&completed[0],
+		if (rte_ioat_completed_ops(dev_id, max_completions, NULL, NULL,
+				(void *)&completed[0],
 				(void *)&completed[max_completions]) != max_ops) {
 			PRINT_ERR("Error with rte_ioat_completed_ops\n");
 			rte_rawdev_dump(dev_id, stdout);
@@ -256,7 +267,7 @@  test_enqueue_fill(int dev_id)
 		rte_ioat_perform_ops(dev_id);
 		usleep(100);
 
-		if (rte_ioat_completed_ops(dev_id, 1, (void *)&completed[0],
+		if (rte_ioat_completed_ops(dev_id, 1, NULL, NULL, (void *)&completed[0],
 			(void *)&completed[1]) != 1) {
 			PRINT_ERR("Error with completed ops\n");
 			return -1;
@@ -266,8 +277,7 @@  test_enqueue_fill(int dev_id)
 			char pat_byte = ((char *)&pattern)[j % 8];
 			if (dst_data[j] != pat_byte) {
 				PRINT_ERR("Error with fill operation (lengths = %u): got (%x), not (%x)\n",
-						lengths[i], dst_data[j],
-						pat_byte);
+						lengths[i], dst_data[j], pat_byte);
 				return -1;
 			}
 		}
@@ -307,12 +317,16 @@  test_burst_capacity(int dev_id)
 	unsigned int i;
 	unsigned int length = 1024;
 	uintptr_t completions[BURST_SIZE];
+	/* for CBDMA, no batch descriptor, for DSA there is one */
+	unsigned int batch_desc = (*(enum rte_ioat_dev_type *)
+			rte_rawdevs[dev_id].dev_private == RTE_IDXD_DEV);
+	unsigned int desc_per_burst = BURST_SIZE + batch_desc;
 
 	/* Ring pointer reset needed for checking test results */
 	reset_ring_ptrs(dev_id);
 
 	const unsigned int ring_space = rte_ioat_burst_capacity(dev_id);
-	const unsigned int expected_bursts = (ring_space)/BURST_SIZE;
+	const unsigned int expected_bursts = (ring_space)/(desc_per_burst);
 	src = rte_pktmbuf_alloc(pool);
 	dst = rte_pktmbuf_alloc(pool);
 
@@ -327,8 +341,7 @@  test_burst_capacity(int dev_id)
 			}
 		}
 		bursts_enqueued++;
-		if ((i & 1) == 1) /* hit doorbell every second burst */
-			rte_ioat_perform_ops(dev_id);
+		rte_ioat_perform_ops(dev_id);
 	}
 	rte_ioat_perform_ops(dev_id);
 
@@ -340,9 +353,9 @@  test_burst_capacity(int dev_id)
 	}
 
 	/* check the space is now as expected */
-	if (rte_ioat_burst_capacity(dev_id) != ring_space - bursts_enqueued * BURST_SIZE) {
-		printf("Capacity error. Expected %u free slots, got %u\n",
-				ring_space - bursts_enqueued * BURST_SIZE,
+	if (rte_ioat_burst_capacity(dev_id) != ring_space - bursts_enqueued * desc_per_burst) {
+		PRINT_ERR("Capacity error. Expected %u free slots, got %u\n",
+				ring_space - bursts_enqueued * desc_per_burst,
 				rte_ioat_burst_capacity(dev_id));
 		return -1;
 	}
@@ -350,8 +363,8 @@  test_burst_capacity(int dev_id)
 	/* do cleanup before next tests */
 	usleep(100);
 	for (i = 0; i < bursts_enqueued; i++) {
-		if (rte_ioat_completed_ops(dev_id, BURST_SIZE, completions,
-				completions) != BURST_SIZE) {
+		if (rte_ioat_completed_ops(dev_id, BURST_SIZE, NULL, NULL,
+				completions, completions) != BURST_SIZE) {
 			PRINT_ERR("error with completions\n");
 			return -1;
 		}
@@ -364,7 +377,8 @@  test_burst_capacity(int dev_id)
 
 	/* Verify the descriptor ring is empty before we test */
 	if (rte_ioat_burst_capacity(dev_id) != ring_space) {
-		PRINT_ERR("Error, ring should be empty\n");
+		PRINT_ERR("Error, ring should be empty. Expected %u, got %u\n",
+				ring_space, rte_ioat_burst_capacity(dev_id));
 		return -1;
 	}
 
@@ -386,20 +400,23 @@  test_burst_capacity(int dev_id)
 	/* This check will confirm both that the correct amount of space is taken
 	 * the ring, and that the ring wrap around handling is correct.
 	 */
-	if (rte_ioat_burst_capacity(dev_id) != ring_space - BURST_SIZE) {
-		PRINT_ERR("Error, space available not as expected\n");
+	if (rte_ioat_burst_capacity(dev_id) != ring_space - desc_per_burst) {
+		PRINT_ERR("Error, space available not as expected. Expected %u, got %u\n",
+				ring_space - desc_per_burst, rte_ioat_burst_capacity(dev_id));
 		return -1;
 	}
 
 	/* Now we gather completions to update the read pointer */
-	if (rte_ioat_completed_ops(dev_id, BURST_SIZE, completions, completions) != BURST_SIZE) {
+	if (rte_ioat_completed_ops(dev_id, BURST_SIZE, NULL, NULL,
+			completions, completions) != BURST_SIZE) {
 		PRINT_ERR("Error with completions\n");
 		return -1;
 	}
 
 	/* After gathering the completions, the descriptor ring should be empty */
 	if (rte_ioat_burst_capacity(dev_id) != ring_space) {
-		PRINT_ERR("Error, space available not as expected\n");
+		PRINT_ERR("Error, space available not as expected, Expected %u, got %u\n",
+				ring_space, rte_ioat_burst_capacity(dev_id));
 		return -1;
 	}
 
@@ -409,6 +426,241 @@  test_burst_capacity(int dev_id)
 	return 0;
 }
 
+static int
+test_completion_status(int dev_id)
+{
+#define COMP_BURST_SZ	16
+	const unsigned int fail_copy[] = {0, 7, 15};
+	struct rte_mbuf *srcs[COMP_BURST_SZ], *dsts[COMP_BURST_SZ];
+	struct rte_mbuf *completed_src[COMP_BURST_SZ * 2];
+	struct rte_mbuf *completed_dst[COMP_BURST_SZ * 2];
+	unsigned int length = 1024;
+	unsigned int i;
+	uint8_t not_ok = 0;
+
+	/* Test single full batch statuses */
+	for (i = 0; i < RTE_DIM(fail_copy); i++) {
+		uint32_t status[COMP_BURST_SZ] = {0};
+		unsigned int j;
+
+		for (j = 0; j < COMP_BURST_SZ; j++) {
+			srcs[j] = rte_pktmbuf_alloc(pool);
+			dsts[j] = rte_pktmbuf_alloc(pool);
+
+			if (rte_ioat_enqueue_copy(dev_id,
+					(j == fail_copy[i] ? (phys_addr_t)NULL :
+							(srcs[j]->buf_iova + srcs[j]->data_off)),
+					dsts[j]->buf_iova + dsts[j]->data_off,
+					length,
+					(uintptr_t)srcs[j],
+					(uintptr_t)dsts[j]) != 1) {
+				PRINT_ERR("Error with rte_ioat_enqueue_copy for buffer %u\n", j);
+				return -1;
+			}
+		}
+		rte_ioat_perform_ops(dev_id);
+		usleep(100);
+
+		if (rte_ioat_completed_ops(dev_id, COMP_BURST_SZ, status, &not_ok,
+				(void *)completed_src, (void *)completed_dst) != COMP_BURST_SZ) {
+			PRINT_ERR("Error with rte_ioat_completed_ops\n");
+			rte_rawdev_dump(dev_id, stdout);
+			return -1;
+		}
+		if (not_ok != 1 || status[fail_copy[i]] == RTE_IOAT_OP_SUCCESS) {
+			unsigned int j;
+			PRINT_ERR("Error, missing expected failed copy, %u\n", fail_copy[i]);
+			for (j = 0; j < COMP_BURST_SZ; j++)
+				printf("%u ", status[j]);
+			printf("<-- Statuses\n");
+			return -1;
+		}
+		for (j = 0; j < COMP_BURST_SZ; j++) {
+			rte_pktmbuf_free(completed_src[j]);
+			rte_pktmbuf_free(completed_dst[j]);
+		}
+	}
+
+	/* Test gathering status for two batches at once */
+	for (i = 0; i < RTE_DIM(fail_copy); i++) {
+		uint32_t status[COMP_BURST_SZ] = {0};
+		unsigned int batch, j;
+		unsigned int expected_failures = 0;
+
+		for (batch = 0; batch < 2; batch++) {
+			for (j = 0; j < COMP_BURST_SZ/2; j++) {
+				srcs[j] = rte_pktmbuf_alloc(pool);
+				dsts[j] = rte_pktmbuf_alloc(pool);
+
+				if (j == fail_copy[i])
+					expected_failures++;
+				if (rte_ioat_enqueue_copy(dev_id,
+						(j == fail_copy[i] ? (phys_addr_t)NULL :
+							(srcs[j]->buf_iova + srcs[j]->data_off)),
+						dsts[j]->buf_iova + dsts[j]->data_off,
+						length,
+						(uintptr_t)srcs[j],
+						(uintptr_t)dsts[j]) != 1) {
+					PRINT_ERR("Error with rte_ioat_enqueue_copy for buffer %u\n",
+							j);
+					return -1;
+				}
+			}
+			rte_ioat_perform_ops(dev_id);
+		}
+		usleep(100);
+
+		if (rte_ioat_completed_ops(dev_id, COMP_BURST_SZ, status, &not_ok,
+				(void *)completed_src, (void *)completed_dst) != COMP_BURST_SZ) {
+			PRINT_ERR("Error with rte_ioat_completed_ops\n");
+			rte_rawdev_dump(dev_id, stdout);
+			return -1;
+		}
+		if (not_ok != expected_failures) {
+			unsigned int j;
+			PRINT_ERR("Error, missing expected failed copy, got %u, not %u\n",
+					not_ok, expected_failures);
+			for (j = 0; j < COMP_BURST_SZ; j++)
+				printf("%u ", status[j]);
+			printf("<-- Statuses\n");
+			return -1;
+		}
+		for (j = 0; j < COMP_BURST_SZ; j++) {
+			rte_pktmbuf_free(completed_src[j]);
+			rte_pktmbuf_free(completed_dst[j]);
+		}
+	}
+
+	/* Test gathering status for half batch at a time */
+	for (i = 0; i < RTE_DIM(fail_copy); i++) {
+		uint32_t status[COMP_BURST_SZ] = {0};
+		unsigned int j;
+
+		for (j = 0; j < COMP_BURST_SZ; j++) {
+			srcs[j] = rte_pktmbuf_alloc(pool);
+			dsts[j] = rte_pktmbuf_alloc(pool);
+
+			if (rte_ioat_enqueue_copy(dev_id,
+					(j == fail_copy[i] ? (phys_addr_t)NULL :
+							(srcs[j]->buf_iova + srcs[j]->data_off)),
+					dsts[j]->buf_iova + dsts[j]->data_off,
+					length,
+					(uintptr_t)srcs[j],
+					(uintptr_t)dsts[j]) != 1) {
+				PRINT_ERR("Error with rte_ioat_enqueue_copy for buffer %u\n", j);
+				return -1;
+			}
+		}
+		rte_ioat_perform_ops(dev_id);
+		usleep(100);
+
+		if (rte_ioat_completed_ops(dev_id, COMP_BURST_SZ / 2, status, &not_ok,
+				(void *)completed_src,
+				(void *)completed_dst) != (COMP_BURST_SZ / 2)) {
+			PRINT_ERR("Error with rte_ioat_completed_ops\n");
+			rte_rawdev_dump(dev_id, stdout);
+			return -1;
+		}
+		if (fail_copy[i] < COMP_BURST_SZ / 2 &&
+				(not_ok != 1 || status[fail_copy[i]] == RTE_IOAT_OP_SUCCESS)) {
+			PRINT_ERR("Missing expected failure in first half-batch\n");
+			rte_rawdev_dump(dev_id, stdout);
+			return -1;
+		}
+		if (rte_ioat_completed_ops(dev_id, COMP_BURST_SZ / 2, status, &not_ok,
+				(void *)&completed_src[COMP_BURST_SZ / 2],
+				(void *)&completed_dst[COMP_BURST_SZ / 2]) != (COMP_BURST_SZ / 2)) {
+			PRINT_ERR("Error with rte_ioat_completed_ops\n");
+			rte_rawdev_dump(dev_id, stdout);
+			return -1;
+		}
+		if (fail_copy[i] >= COMP_BURST_SZ / 2 && (not_ok != 1 ||
+				status[fail_copy[i] - (COMP_BURST_SZ / 2)]
+					== RTE_IOAT_OP_SUCCESS)) {
+			PRINT_ERR("Missing expected failure in second half-batch\n");
+			rte_rawdev_dump(dev_id, stdout);
+			return -1;
+		}
+
+		for (j = 0; j < COMP_BURST_SZ; j++) {
+			rte_pktmbuf_free(completed_src[j]);
+			rte_pktmbuf_free(completed_dst[j]);
+		}
+	}
+
+	/* Test gathering statuses with fence */
+	for (i = 1; i < RTE_DIM(fail_copy); i++) {
+		uint32_t status[COMP_BURST_SZ * 2] = {0};
+		unsigned int j;
+		uint16_t count;
+
+		for (j = 0; j < COMP_BURST_SZ; j++) {
+			srcs[j] = rte_pktmbuf_alloc(pool);
+			dsts[j] = rte_pktmbuf_alloc(pool);
+
+			/* always fail the first copy */
+			if (rte_ioat_enqueue_copy(dev_id,
+					(j == 0 ? (phys_addr_t)NULL :
+						(srcs[j]->buf_iova + srcs[j]->data_off)),
+					dsts[j]->buf_iova + dsts[j]->data_off,
+					length,
+					(uintptr_t)srcs[j],
+					(uintptr_t)dsts[j]) != 1) {
+				PRINT_ERR("Error with rte_ioat_enqueue_copy for buffer %u\n", j);
+				return -1;
+			}
+			/* put in a fence which will stop any further transactions
+			 * because we had a previous failure.
+			 */
+			if (j == fail_copy[i])
+				rte_ioat_fence(dev_id);
+		}
+		rte_ioat_perform_ops(dev_id);
+		usleep(100);
+
+		count = rte_ioat_completed_ops(dev_id, COMP_BURST_SZ * 2, status, &not_ok,
+				(void *)completed_src, (void *)completed_dst);
+		if (count != COMP_BURST_SZ) {
+			PRINT_ERR("Error with rte_ioat_completed_ops, got %u not %u\n",
+					count, COMP_BURST_SZ);
+			for (j = 0; j < count; j++)
+				printf("%u ", status[j]);
+			printf("<-- Statuses\n");
+			return -1;
+		}
+		if (not_ok != COMP_BURST_SZ - fail_copy[i]) {
+			PRINT_ERR("Unexpected failed copy count, got %u, expected %u\n",
+					not_ok, COMP_BURST_SZ - fail_copy[i]);
+			for (j = 0; j < COMP_BURST_SZ; j++)
+				printf("%u ", status[j]);
+			printf("<-- Statuses\n");
+			return -1;
+		}
+		if (status[0] == RTE_IOAT_OP_SUCCESS || status[0] == RTE_IOAT_OP_SKIPPED) {
+			PRINT_ERR("Error, op 0 unexpectedly did not fail.\n");
+			return -1;
+		}
+		for (j = 1; j <= fail_copy[i]; j++) {
+			if (status[j] != RTE_IOAT_OP_SUCCESS) {
+				PRINT_ERR("Error, op %u unexpectedly failed\n", j);
+				return -1;
+			}
+		}
+		for (j = fail_copy[i] + 1; j < COMP_BURST_SZ; j++) {
+			if (status[j] != RTE_IOAT_OP_SKIPPED) {
+				PRINT_ERR("Error, all descriptors after fence should be invalid\n");
+				return -1;
+			}
+		}
+		for (j = 0; j < COMP_BURST_SZ; j++) {
+			rte_pktmbuf_free(completed_src[j]);
+			rte_pktmbuf_free(completed_dst[j]);
+		}
+	}
+
+	return 0;
+}
+
 int
 ioat_rawdev_test(uint16_t dev_id)
 {
@@ -521,6 +773,12 @@  ioat_rawdev_test(uint16_t dev_id)
 	if (test_burst_capacity(dev_id) != 0)
 		goto err;
 
+	if (rte_eal_iova_mode() == RTE_IOVA_VA) {
+		printf("Running Completions Status Test\n");
+		if (test_completion_status(dev_id) != 0)
+			goto err;
+	}
+
 	rte_rawdev_stop(dev_id);
 	if (rte_rawdev_xstats_reset(dev_id, NULL, 0) != 0) {
 		PRINT_ERR("Error resetting xstat values\n");
diff --git a/drivers/raw/ioat/rte_idxd_rawdev_fns.h b/drivers/raw/ioat/rte_idxd_rawdev_fns.h
index 41f0ad6e99..dc16917b63 100644
--- a/drivers/raw/ioat/rte_idxd_rawdev_fns.h
+++ b/drivers/raw/ioat/rte_idxd_rawdev_fns.h
@@ -104,8 +104,17 @@  struct rte_idxd_rawdev {
 
 	struct rte_idxd_hw_desc *desc_ring;
 	struct rte_idxd_user_hdl *hdl_ring;
+	/* flags to indicate handle validity. Kept separate from ring, to avoid
+	 * using 8 bytes per flag. Upper 8 bits holds error code if any.
+	 */
+	uint16_t *hdl_ring_flags;
 };
 
+#define RTE_IDXD_HDL_NORMAL     0
+#define RTE_IDXD_HDL_INVALID    (1 << 0) /* no handle stored for this element */
+#define RTE_IDXD_HDL_OP_FAILED  (1 << 1) /* return failure for this one */
+#define RTE_IDXD_HDL_OP_SKIPPED (1 << 2) /* this op was skipped */
+
 static __rte_always_inline uint16_t
 __idxd_burst_capacity(int dev_id)
 {
@@ -124,8 +133,10 @@  __idxd_burst_capacity(int dev_id)
 		write_idx += idxd->desc_ring_mask + 1;
 	used_space = write_idx - idxd->hdls_read;
 
-	/* Return amount of free space in the descriptor ring */
-	return idxd->desc_ring_mask - used_space;
+	/* Return amount of free space in the descriptor ring
+	 * subtract 1 for space for batch descriptor and 1 for possible null desc
+	 */
+	return idxd->desc_ring_mask - used_space - 2;
 }
 
 static __rte_always_inline rte_iova_t
@@ -150,7 +161,8 @@  __idxd_write_desc(int dev_id,
 	if ((idxd->batch_idx_read == 0 && idxd->batch_idx_write == idxd->max_batches) ||
 			idxd->batch_idx_write + 1 == idxd->batch_idx_read)
 		goto failed;
-	if (((write_idx + 1) & idxd->desc_ring_mask) == idxd->hdls_read)
+	/* for descriptor ring, we always need a slot for batch completion */
+	if (((write_idx + 2) & idxd->desc_ring_mask) == idxd->hdls_read)
 		goto failed;
 
 	/* write desc and handle. Note, descriptors don't wrap */
@@ -161,7 +173,10 @@  __idxd_write_desc(int dev_id,
 	idxd->desc_ring[write_idx].dst = dst;
 	idxd->desc_ring[write_idx].size = size;
 
-	idxd->hdl_ring[write_idx & idxd->desc_ring_mask] = *hdl;
+	if (hdl == NULL)
+		idxd->hdl_ring_flags[write_idx & idxd->desc_ring_mask] = RTE_IDXD_HDL_INVALID;
+	else
+		idxd->hdl_ring[write_idx & idxd->desc_ring_mask] = *hdl;
 	idxd->batch_size++;
 
 	idxd->xstats.enqueued++;
@@ -203,9 +218,8 @@  __idxd_enqueue_copy(int dev_id, rte_iova_t src, rte_iova_t dst,
 static __rte_always_inline int
 __idxd_fence(int dev_id)
 {
-	static const struct rte_idxd_user_hdl null_hdl;
 	/* only op field needs filling - zero src, dst and length */
-	return __idxd_write_desc(dev_id, IDXD_FLAG_FENCE, 0, 0, 0, &null_hdl);
+	return __idxd_write_desc(dev_id, IDXD_FLAG_FENCE, 0, 0, 0, NULL);
 }
 
 static __rte_always_inline void
@@ -222,42 +236,37 @@  __idxd_perform_ops(int dev_id)
 {
 	struct rte_idxd_rawdev *idxd =
 			(struct rte_idxd_rawdev *)rte_rawdevs[dev_id].dev_private;
-	/* write completion to last desc in the batch */
-	uint16_t comp_idx = idxd->batch_start + idxd->batch_size - 1;
-	if (comp_idx > idxd->desc_ring_mask) {
-		comp_idx &= idxd->desc_ring_mask;
-		*((uint64_t *)&idxd->desc_ring[comp_idx]) = 0; /* zero start of desc */
-	}
+
+	if (!idxd->cfg.no_prefetch_completions)
+		rte_prefetch1(&idxd->desc_ring[idxd->batch_idx_ring[idxd->batch_idx_read]]);
 
 	if (idxd->batch_size == 0)
 		return 0;
 
-	_mm_sfence(); /* fence before writing desc to device */
-	if (idxd->batch_size > 1) {
-		struct rte_idxd_hw_desc batch_desc = {
-				.op_flags = (idxd_op_batch << IDXD_CMD_OP_SHIFT) |
-					IDXD_FLAG_COMPLETION_ADDR_VALID |
-					IDXD_FLAG_REQUEST_COMPLETION,
-				.desc_addr = __desc_idx_to_iova(idxd, idxd->batch_start),
-				.completion = __desc_idx_to_iova(idxd, comp_idx),
-				.size = idxd->batch_size,
-		};
-
-		__idxd_movdir64b(idxd->portal, &batch_desc);
-	} else {
-		/* special case batch size of 1, as not allowed by HW */
-		/* comp_idx == batch_start */
-		struct rte_idxd_hw_desc *desc = &idxd->desc_ring[comp_idx];
-		desc->op_flags |= IDXD_FLAG_COMPLETION_ADDR_VALID |
-				IDXD_FLAG_REQUEST_COMPLETION;
-		desc->completion = __desc_idx_to_iova(idxd, comp_idx);
-
-		__idxd_movdir64b(idxd->portal, desc);
-	}
+	if (idxd->batch_size == 1)
+		/* use a fence as a null descriptor, so batch_size >= 2 */
+		if (__idxd_fence(dev_id) != 1)
+			return -1;
+
+	/* write completion beyond last desc in the batch */
+	uint16_t comp_idx = (idxd->batch_start + idxd->batch_size) & idxd->desc_ring_mask;
+	*((uint64_t *)&idxd->desc_ring[comp_idx]) = 0; /* zero start of desc */
+	idxd->hdl_ring_flags[comp_idx] = RTE_IDXD_HDL_INVALID;
+
+	const struct rte_idxd_hw_desc batch_desc = {
+			.op_flags = (idxd_op_batch << IDXD_CMD_OP_SHIFT) |
+				IDXD_FLAG_COMPLETION_ADDR_VALID |
+				IDXD_FLAG_REQUEST_COMPLETION,
+			.desc_addr = __desc_idx_to_iova(idxd, idxd->batch_start),
+			.completion = __desc_idx_to_iova(idxd, comp_idx),
+			.size = idxd->batch_size,
+	};
 
+	_mm_sfence(); /* fence before writing desc to device */
+	__idxd_movdir64b(idxd->portal, &batch_desc);
 	idxd->xstats.started += idxd->batch_size;
 
-	idxd->batch_start += idxd->batch_size;
+	idxd->batch_start += idxd->batch_size + 1;
 	idxd->batch_start &= idxd->desc_ring_mask;
 	idxd->batch_size = 0;
 
@@ -269,7 +278,7 @@  __idxd_perform_ops(int dev_id)
 }
 
 static __rte_always_inline int
-__idxd_completed_ops(int dev_id, uint8_t max_ops,
+__idxd_completed_ops(int dev_id, uint8_t max_ops, uint32_t *status, uint8_t *num_unsuccessful,
 		uintptr_t *src_hdls, uintptr_t *dst_hdls)
 {
 	struct rte_idxd_rawdev *idxd =
@@ -280,8 +289,35 @@  __idxd_completed_ops(int dev_id, uint8_t max_ops,
 		uint16_t idx_to_chk = idxd->batch_idx_ring[idxd->batch_idx_read];
 		volatile struct rte_idxd_completion *comp_to_chk =
 				(struct rte_idxd_completion *)&idxd->desc_ring[idx_to_chk];
-		if (comp_to_chk->status == 0)
+		uint8_t status = comp_to_chk->status;
+		if (status == 0)
 			break;
+		if (unlikely(status > 1)) {
+			/* error occurred somewhere in batch, start where last checked */
+			uint16_t desc_count = comp_to_chk->completed_size;
+			uint16_t batch_start = idxd->hdls_avail;
+			uint16_t batch_end = idx_to_chk;
+
+			if (batch_start > batch_end)
+				batch_end += idxd->desc_ring_mask + 1;
+			/* go through each batch entry and see status */
+			for (n = 0; n < desc_count; n++) {
+				uint16_t idx = (batch_start + n) & idxd->desc_ring_mask;
+				volatile struct rte_idxd_completion *comp =
+					(struct rte_idxd_completion *)&idxd->desc_ring[idx];
+				if (comp->status != 0 &&
+						idxd->hdl_ring_flags[idx] == RTE_IDXD_HDL_NORMAL) {
+					idxd->hdl_ring_flags[idx] = RTE_IDXD_HDL_OP_FAILED;
+					idxd->hdl_ring_flags[idx] |= (comp->status << 8);
+				}
+			}
+			/* if batch is incomplete, mark rest as skipped */
+			for ( ; n < batch_end - batch_start; n++) {
+				uint16_t idx = (batch_start + n) & idxd->desc_ring_mask;
+				if (idxd->hdl_ring_flags[idx] == RTE_IDXD_HDL_NORMAL)
+					idxd->hdl_ring_flags[idx] = RTE_IDXD_HDL_OP_SKIPPED;
+			}
+		}
 		/* avail points to one after the last one written */
 		idxd->hdls_avail = (idx_to_chk + 1) & idxd->desc_ring_mask;
 		idxd->batch_idx_read++;
@@ -289,7 +325,7 @@  __idxd_completed_ops(int dev_id, uint8_t max_ops,
 			idxd->batch_idx_read = 0;
 	}
 
-	if (idxd->cfg.hdls_disable) {
+	if (idxd->cfg.hdls_disable && status == NULL) {
 		n = (idxd->hdls_avail < idxd->hdls_read) ?
 				(idxd->hdls_avail + idxd->desc_ring_mask + 1 - idxd->hdls_read) :
 				(idxd->hdls_avail - idxd->hdls_read);
@@ -297,10 +333,36 @@  __idxd_completed_ops(int dev_id, uint8_t max_ops,
 		goto out;
 	}
 
-	for (n = 0, h_idx = idxd->hdls_read;
-			n < max_ops && h_idx != idxd->hdls_avail; n++) {
-		src_hdls[n] = idxd->hdl_ring[h_idx].src;
-		dst_hdls[n] = idxd->hdl_ring[h_idx].dst;
+	n = 0;
+	h_idx = idxd->hdls_read;
+	while (h_idx != idxd->hdls_avail) {
+		uint16_t flag = idxd->hdl_ring_flags[h_idx];
+		if (flag != RTE_IDXD_HDL_INVALID) {
+			if (!idxd->cfg.hdls_disable) {
+				src_hdls[n] = idxd->hdl_ring[h_idx].src;
+				dst_hdls[n] = idxd->hdl_ring[h_idx].dst;
+			}
+			if (unlikely(flag != RTE_IDXD_HDL_NORMAL)) {
+				if (status != NULL)
+					status[n] = flag == RTE_IDXD_HDL_OP_SKIPPED ?
+							RTE_IOAT_OP_SKIPPED :
+							/* failure case, return err code */
+							idxd->hdl_ring_flags[h_idx] >> 8;
+				if (num_unsuccessful != NULL)
+					*num_unsuccessful += 1;
+			}
+			n++;
+		}
+		idxd->hdl_ring_flags[h_idx] = RTE_IDXD_HDL_NORMAL;
+		if (++h_idx > idxd->desc_ring_mask)
+			h_idx = 0;
+		if (n >= max_ops)
+			break;
+	}
+
+	/* skip over any remaining blank elements, e.g. batch completion */
+	while (idxd->hdl_ring_flags[h_idx] == RTE_IDXD_HDL_INVALID && h_idx != idxd->hdls_avail) {
+		idxd->hdl_ring_flags[h_idx] = RTE_IDXD_HDL_NORMAL;
 		if (++h_idx > idxd->desc_ring_mask)
 			h_idx = 0;
 	}
diff --git a/drivers/raw/ioat/rte_ioat_rawdev.h b/drivers/raw/ioat/rte_ioat_rawdev.h
index e5a22a0799..6cc1560a64 100644
--- a/drivers/raw/ioat/rte_ioat_rawdev.h
+++ b/drivers/raw/ioat/rte_ioat_rawdev.h
@@ -35,6 +35,10 @@  extern "C" {
 struct rte_ioat_rawdev_config {
 	unsigned short ring_size; /**< size of job submission descriptor ring */
 	bool hdls_disable;    /**< if set, ignore user-supplied handle params */
+	/** set "no_prefetch_completions", if polling completions on separate core
+	 * from the core submitting the jobs
+	 */
+	bool no_prefetch_completions;
 };
 
 /**
@@ -131,40 +135,73 @@  static inline int
 __rte_experimental
 rte_ioat_perform_ops(int dev_id);
 
+/*
+ *  Status codes for operations.
+ */
+#define RTE_IOAT_OP_SUCCESS 0  /**< Operation completed successfully */
+#define RTE_IOAT_OP_SKIPPED 1  /**< Operation was not attempted (Earlier fenced op failed) */
+/* Values >1 indicate a failure condition */
+/* Error codes taken from Intel(R) Data Streaming Accelerator Architecture
+ * Specification, section 5.7
+ */
+#define RTE_IOAT_OP_ADDRESS_ERR 0x03  /**< Page fault or invalid address */
+#define RTE_IOAT_OP_INVALID_LEN 0x13  /**< Invalid/too big length field passed */
+#define RTE_IOAT_OP_OVERLAPPING_BUFS 0x16 /**< Overlapping buffers error */
+
+
 /**
  * Returns details of operations that have been completed
  *
+ * The status of each operation is returned in the status array parameter.
  * If the hdls_disable option was not set when the device was configured,
  * the function will return to the caller the user-provided "handles" for
  * the copy operations which have been completed by the hardware, and not
  * already returned by a previous call to this API.
  * If the hdls_disable option for the device was set on configure, the
- * max_copies, src_hdls and dst_hdls parameters will be ignored, and the
+ * src_hdls and dst_hdls parameters will be ignored, and the
  * function returns the number of newly-completed operations.
+ * If status is also NULL, then max_copies parameter is also ignored and the
+ * function returns a count of the number of newly-completed operations.
  *
  * @param dev_id
  *   The rawdev device id of the ioat instance
  * @param max_copies
- *   The number of entries which can fit in the src_hdls and dst_hdls
+ *   The number of entries which can fit in the status, src_hdls and dst_hdls
  *   arrays, i.e. max number of completed operations to report.
  *   NOTE: If hdls_disable configuration option for the device is set, this
- *   parameter is ignored.
+ *   parameter applies only to the "status" array if specified
+ * @param status
+ *   Array to hold the status of each completed operation. Array should be
+ *   set to zeros on input, as the driver will only write error status values.
+ *   A value of 1 implies an operation was not attempted, and any other non-zero
+ *   value indicates operation failure.
+ *   Parameter may be NULL if no status value checking is required.
+ * @param num_unsuccessful
+ *   Returns the number of elements in status where the value is non-zero,
+ *   i.e. the operation either failed or was not attempted due to an earlier
+ *   failure. If this value is returned as zero (the expected case), the
+ *   status array will not have been modified by the function and need not be
+ *   checked by software
  * @param src_hdls
  *   Array to hold the source handle parameters of the completed ops.
  *   NOTE: If hdls_disable configuration option for the device is set, this
- *   parameter is ignored.
+ *   parameter is ignored, and may be NULL
  * @param dst_hdls
  *   Array to hold the destination handle parameters of the completed ops.
  *   NOTE: If hdls_disable configuration option for the device is set, this
- *   parameter is ignored.
+ *   parameter is ignored, and may be NULL
  * @return
- *   -1 on error, with rte_errno set appropriately.
- *   Otherwise number of completed operations i.e. number of entries written
- *   to the src_hdls and dst_hdls array parameters.
+ *   -1 on device error, with rte_errno set appropriately and parameters
+ *   unmodified.
+ *   Otherwise number of returned operations i.e. number of valid entries
+ *   in the status, src_hdls and dst_hdls array parameters. If status is NULL,
+ *   and the hdls_disable config option is set, this value may be greater than
+ *   max_copies parameter.
  */
 static inline int
 __rte_experimental
 rte_ioat_completed_ops(int dev_id, uint8_t max_copies,
+		uint32_t *status, uint8_t *num_unsuccessful,
 		uintptr_t *src_hdls, uintptr_t *dst_hdls);
 
 /* include the implementation details from a separate file */
diff --git a/drivers/raw/ioat/rte_ioat_rawdev_fns.h b/drivers/raw/ioat/rte_ioat_rawdev_fns.h
index 92ccdd03b9..9b8a9fa88e 100644
--- a/drivers/raw/ioat/rte_ioat_rawdev_fns.h
+++ b/drivers/raw/ioat/rte_ioat_rawdev_fns.h
@@ -334,16 +334,22 @@  rte_ioat_perform_ops(int dev_id)
 
 static inline int
 rte_ioat_completed_ops(int dev_id, uint8_t max_copies,
+		uint32_t *status, uint8_t *num_unsuccessful,
 		uintptr_t *src_hdls, uintptr_t *dst_hdls)
 {
 	enum rte_ioat_dev_type *type =
 			(enum rte_ioat_dev_type *)rte_rawdevs[dev_id].dev_private;
+	uint8_t tmp; /* used so functions don't need to check for null parameter */
+
+	if (num_unsuccessful == NULL)
+		num_unsuccessful = &tmp;
+
+	*num_unsuccessful = 0;
 	if (*type == RTE_IDXD_DEV)
-		return __idxd_completed_ops(dev_id, max_copies,
+		return __idxd_completed_ops(dev_id, max_copies, status, num_unsuccessful,
 				src_hdls, dst_hdls);
 	else
-		return __ioat_completed_ops(dev_id,  max_copies,
-				src_hdls, dst_hdls);
+		return __ioat_completed_ops(dev_id, max_copies, src_hdls, dst_hdls);
 }
 
 static inline void
@@ -355,7 +361,8 @@  __rte_deprecated_msg("use rte_ioat_completed_ops() instead")
 rte_ioat_completed_copies(int dev_id, uint8_t max_copies,
 		uintptr_t *src_hdls, uintptr_t *dst_hdls)
 {
-	return rte_ioat_completed_ops(dev_id, max_copies, src_hdls, dst_hdls);
+	return rte_ioat_completed_ops(dev_id, max_copies, NULL, NULL,
+			src_hdls, dst_hdls);
 }
 
 #endif /* _RTE_IOAT_RAWDEV_FNS_H_ */
diff --git a/examples/ioat/ioatfwd.c b/examples/ioat/ioatfwd.c
index 845301a6db..2e377e2d4b 100644
--- a/examples/ioat/ioatfwd.c
+++ b/examples/ioat/ioatfwd.c
@@ -447,12 +447,15 @@  ioat_tx_port(struct rxtx_port_config *tx_config)
 
 	for (i = 0; i < tx_config->nb_queues; i++) {
 		if (copy_mode == COPY_MODE_IOAT_NUM) {
-			/* Deque the mbufs from IOAT device. */
+			/* Dequeue the mbufs from IOAT device. Since all memory
+			 * is DPDK pinned memory and therefore all addresses should
+			 * be valid, we don't check for copy errors
+			 */
 			nb_dq = rte_ioat_completed_ops(
-				tx_config->ioat_ids[i], MAX_PKT_BURST,
+				tx_config->ioat_ids[i], MAX_PKT_BURST, NULL, NULL,
 				(void *)mbufs_src, (void *)mbufs_dst);
 		} else {
-			/* Deque the mbufs from rx_to_tx_ring. */
+			/* Dequeue the mbufs from rx_to_tx_ring. */
 			nb_dq = rte_ring_dequeue_burst(
 				tx_config->rx_to_tx_ring, (void *)mbufs_dst,
 				MAX_PKT_BURST, NULL);
@@ -725,7 +728,10 @@  check_link_status(uint32_t port_mask)
 static void
 configure_rawdev_queue(uint32_t dev_id)
 {
-	struct rte_ioat_rawdev_config dev_config = { .ring_size = ring_size };
+	struct rte_ioat_rawdev_config dev_config = {
+			.ring_size = ring_size,
+			.no_prefetch_completions = (cfg.nb_lcores > 1),
+	};
 	struct rte_rawdev_info info = { .dev_private = &dev_config };
 
 	if (rte_rawdev_configure(dev_id, &info, sizeof(dev_config)) != 0) {
diff --git a/examples/vhost/ioat.c b/examples/vhost/ioat.c
index 60b73be936..efdd3f6f76 100644
--- a/examples/vhost/ioat.c
+++ b/examples/vhost/ioat.c
@@ -183,7 +183,7 @@  ioat_check_completed_copies_cb(int vid, uint16_t queue_id,
 
 		uint16_t dev_id = dma_bind[vid].dmas[queue_id * 2
 				+ VIRTIO_RXQ].dev_id;
-		n_seg = rte_ioat_completed_ops(dev_id, 255, dump, dump);
+		n_seg = rte_ioat_completed_ops(dev_id, 255, NULL, NULL, dump, dump);
 		if (n_seg < 0) {
 			RTE_LOG(ERR,
 				VHOST_DATA,