[1/2] drivers/qat: im buffer too small - split op
diff mbox series

Message ID 20200318163349.1789-1-adamx.dybkowski@intel.com
State Superseded, archived
Delegated to: akhil goyal
Headers show
Series
  • [1/2] drivers/qat: im buffer too small - split op
Related show

Checks

Context Check Description
ci/iol-testing success Testing PASS
ci/iol-mellanox-Performance success Performance Testing PASS
ci/Intel-compilation success Compilation OK
ci/checkpatch success coding style OK

Commit Message

Adam Dybkowski March 18, 2020, 4:33 p.m. UTC
This patch implements a special way of buffer handling when internal
QAT IM buffer is too small for Huffman dynamic compression operation.
Instead of falling back to fixed compression, the operation is now
split into multiple smaller dynamic compression requests (possible to
execute on QAT) and their results are then combined and copied into
the output buffer. This is not possible if any checksum calculation
was requested - in such case the code falls back to fixed compression
as before.

Signed-off-by: Adam Dybkowski <adamx.dybkowski@intel.com>
---
 drivers/common/qat/qat_qp.c         | 231 ++++++++++++--
 drivers/common/qat/qat_qp.h         |   7 +
 drivers/compress/qat/qat_comp.c     | 458 +++++++++++++++++++++++++---
 drivers/compress/qat/qat_comp.h     |  22 +-
 drivers/compress/qat/qat_comp_pmd.c |  24 +-
 5 files changed, 669 insertions(+), 73 deletions(-)

Comments

Akhil Goyal April 1, 2020, 2:20 p.m. UTC | #1
Hi Fiona,

Could you please review this patchset.
> 
> This patch implements a special way of buffer handling when internal
> QAT IM buffer is too small for Huffman dynamic compression operation.
> Instead of falling back to fixed compression, the operation is now
> split into multiple smaller dynamic compression requests (possible to
> execute on QAT) and their results are then combined and copied into
> the output buffer. This is not possible if any checksum calculation
> was requested - in such case the code falls back to fixed compression
> as before.
> 
> Signed-off-by: Adam Dybkowski <adamx.dybkowski@intel.com>
> ---
>  drivers/common/qat/qat_qp.c         | 231 ++++++++++++--
>  drivers/common/qat/qat_qp.h         |   7 +
>  drivers/compress/qat/qat_comp.c     | 458 +++++++++++++++++++++++++---
>  drivers/compress/qat/qat_comp.h     |  22 +-
>  drivers/compress/qat/qat_comp_pmd.c |  24 +-
>  5 files changed, 669 insertions(+), 73 deletions(-)
>

Patch
diff mbox series

diff --git a/drivers/common/qat/qat_qp.c b/drivers/common/qat/qat_qp.c
index 9958789f0..6fc805a25 100644
--- a/drivers/common/qat/qat_qp.c
+++ b/drivers/common/qat/qat_qp.c
@@ -154,8 +154,8 @@  int qat_qps_per_service(const struct qat_qp_hw_data *qp_hw_data,
 	return count;
 }
 
-static const struct rte_memzone *
-queue_dma_zone_reserve(const char *queue_name, uint32_t queue_size,
+const struct rte_memzone *
+qat_dma_zone_reserve(const char *queue_name, uint32_t queue_size,
 			int socket_id)
 {
 	const struct rte_memzone *mz;
@@ -400,7 +400,7 @@  qat_queue_create(struct qat_pci_device *qat_dev, struct qat_queue *queue,
 		pci_dev->driver->driver.name, qat_dev->qat_dev_id,
 		qp_conf->service_str, "qp_mem",
 		queue->hw_bundle_number, queue->hw_queue_number);
-	qp_mz = queue_dma_zone_reserve(queue->memz_name, queue_size_bytes,
+	qp_mz = qat_dma_zone_reserve(queue->memz_name, queue_size_bytes,
 			qat_dev->pci_dev->device.numa_node);
 	if (qp_mz == NULL) {
 		QAT_LOG(ERR, "Failed to allocate ring memzone");
@@ -645,32 +645,208 @@  qat_enqueue_op_burst(void *qp, void **ops, uint16_t nb_ops)
 	return nb_ops_sent;
 }
 
+/* Use this for compression only - but keep as consistent with above common
+ * function as much as possible.
+ */
+uint16_t
+qat_enqueue_comp_op_burst(void *qp, void **ops, uint16_t nb_ops)
+{
+	register struct qat_queue *queue;
+	struct qat_qp *tmp_qp = (struct qat_qp *)qp;
+	register uint32_t nb_ops_sent = 0;
+	register int nb_desc_to_build;
+	uint16_t nb_ops_possible = nb_ops;
+	register uint8_t *base_addr;
+	register uint32_t tail;
+
+	int descriptors_built, total_descriptors_built = 0;
+	int nb_remaining_descriptors;
+	int overflow = 0;
+
+	if (unlikely(nb_ops == 0))
+		return 0;
+
+	/* read params used a lot in main loop into registers */
+	queue = &(tmp_qp->tx_q);
+	base_addr = (uint8_t *)queue->base_addr;
+	tail = queue->tail;
+
+	/* Find how many can actually fit on the ring */
+	{
+		/* dequeued can only be written by one thread, but it may not
+		 * be this thread. As it's 4-byte aligned it will be read
+		 * atomically here by any Intel CPU.
+		 * enqueued can wrap before dequeued, but cannot
+		 * lap it as var size of enq/deq (uint32_t) > var size of
+		 * max_inflights (uint16_t). In reality inflights is never
+		 * even as big as max uint16_t, as it's <= ADF_MAX_DESC.
+		 * On wrapping, the calculation still returns the correct
+		 * positive value as all three vars are unsigned.
+		 */
+		uint32_t inflights =
+			tmp_qp->enqueued - tmp_qp->dequeued;
+
+		/* Find how many can actually fit on the ring */
+		overflow = (inflights + nb_ops) - tmp_qp->max_inflights;
+		if (overflow > 0) {
+			nb_ops_possible = nb_ops - overflow;
+			if (nb_ops_possible == 0)
+				return 0;
+		}
+
+		/* QAT has plenty of work queued already, so don't waste cycles
+		 * enqueueing, wait til the application has gathered a bigger
+		 * burst or some completed ops have been dequeued
+		 */
+		if (tmp_qp->min_enq_burst_threshold && inflights >
+				QAT_QP_MIN_INFL_THRESHOLD && nb_ops_possible <
+				tmp_qp->min_enq_burst_threshold) {
+			tmp_qp->stats.threshold_hit_count++;
+			return 0;
+		}
+	}
+
+	/* At this point nb_ops_possible is assuming a 1:1 mapping
+	 * between ops and descriptors.
+	 * Fewer may be sent if some ops have to be split.
+	 * nb_ops_possible is <= burst size.
+	 * Find out how many spaces are actually available on the qp in case
+	 * more are needed.
+	 */
+	nb_remaining_descriptors = nb_ops_possible
+			 + ((overflow >= 0) ? 0 : overflow * (-1));
+	QAT_DP_LOG(DEBUG, "Nb ops requested %d, nb descriptors remaining %d",
+			nb_ops, nb_remaining_descriptors);
+
+	while (nb_ops_sent != nb_ops_possible &&
+				nb_remaining_descriptors > 0) {
+
+		descriptors_built = 0;
+
+		QAT_DP_LOG(DEBUG, "--- data length: %u",
+			   ((struct rte_comp_op *)*ops)->src.length);
+
+		nb_desc_to_build = qat_comp_build_request(*ops,
+				base_addr + tail,
+				tmp_qp->op_cookies[tail / queue->msg_size],
+				tmp_qp->qat_dev_gen);
+		QAT_DP_LOG(DEBUG, "%d descriptors built, %d remaining, %d ops "
+			"sent,  %d descriptors needed",
+			total_descriptors_built, nb_remaining_descriptors,
+			nb_ops_sent, nb_desc_to_build);
+
+		if (unlikely(nb_desc_to_build < 0)) {
+			/* this message cannot be enqueued */
+			tmp_qp->stats.enqueue_err_count++;
+			if (nb_ops_sent == 0)
+				return 0;
+			goto kick_tail;
+		} else if (unlikely(nb_desc_to_build > 1)) {
+			/* this op is too big and must be split - get more
+			 * descriptors and retry
+			 */
+			int ret2 = 0;
+
+			QAT_DP_LOG(DEBUG, "Build %d descriptors for this op",
+					nb_desc_to_build);
+
+			nb_remaining_descriptors -= nb_desc_to_build;
+			if (nb_remaining_descriptors >= 0) {
+				/* There are enough remaining descriptors
+				 * so retry
+				 */
+				ret2 = qat_comp_build_multiple_requests(*ops,
+						tmp_qp, tail, nb_desc_to_build);
+				if (unlikely(ret2 < 1)) {
+					QAT_DP_LOG(DEBUG,
+							"Failed to build (%d) descriptors, status %d",
+							nb_desc_to_build, ret2);
+					tmp_qp->stats.enqueue_err_count++;
+					/*
+					 * This message cannot be enqueued,
+					 * decrease by number of ops that
+					 * weren't sent
+					 */
+					if (nb_ops_sent == 0)
+						return 0;
+					goto kick_tail;
+				} else {
+					descriptors_built = ret2;
+					total_descriptors_built +=
+							descriptors_built;
+					nb_remaining_descriptors -=
+							descriptors_built;
+					QAT_DP_LOG(DEBUG,
+							"Multiple descriptors (%d) built ok",
+							descriptors_built);
+				}
+			} else {
+				QAT_DP_LOG(ERR, "For the current op, number of "
+					"requested descriptors (%d) "
+					"exceeds number of available "
+					"descriptors (%d)",
+					nb_desc_to_build,
+					nb_remaining_descriptors);
+
+				/* Not enough extra descriptors. Not an error */
+				if (nb_ops_sent == 0)
+					return 0;
+				goto kick_tail;
+			}
+		} else {
+			descriptors_built = 1;
+			total_descriptors_built++;
+			nb_remaining_descriptors--;
+			QAT_DP_LOG(DEBUG, "Single descriptor built ok");
+		}
+
+		tail = adf_modulo(tail + (queue->msg_size * descriptors_built),
+				  queue->modulo_mask);
+		ops++;
+		nb_ops_sent++;
+	}
+
+kick_tail:
+	queue->tail = tail;
+	tmp_qp->enqueued += total_descriptors_built;
+	tmp_qp->stats.enqueued_count += total_descriptors_built;
+	txq_write_tail(tmp_qp, queue);
+	return nb_ops_sent;
+}
+
 uint16_t
 qat_dequeue_op_burst(void *qp, void **ops, uint16_t nb_ops)
 {
 	struct qat_queue *rx_queue;
 	struct qat_qp *tmp_qp = (struct qat_qp *)qp;
 	uint32_t head;
-	uint32_t resp_counter = 0;
+	uint32_t op_resp_counter = 0, fw_resp_counter = 0;
 	uint8_t *resp_msg;
+	int nb_fw_responses = 0;
 
 	rx_queue = &(tmp_qp->rx_q);
 	head = rx_queue->head;
 	resp_msg = (uint8_t *)rx_queue->base_addr + rx_queue->head;
 
 	while (*(uint32_t *)resp_msg != ADF_RING_EMPTY_SIG &&
-			resp_counter != nb_ops) {
+			op_resp_counter != nb_ops) {
 
-		if (tmp_qp->service_type == QAT_SERVICE_SYMMETRIC)
+		nb_fw_responses = 0;
+		if (tmp_qp->service_type == QAT_SERVICE_SYMMETRIC) {
 			qat_sym_process_response(ops, resp_msg);
-		else if (tmp_qp->service_type == QAT_SERVICE_COMPRESSION)
-			qat_comp_process_response(ops, resp_msg,
+			nb_fw_responses = 1;
+		} else if (tmp_qp->service_type == QAT_SERVICE_COMPRESSION)
+
+			nb_fw_responses = qat_comp_process_response(
+				ops, resp_msg,
 				tmp_qp->op_cookies[head / rx_queue->msg_size],
 				&tmp_qp->stats.dequeue_err_count);
+
 		else if (tmp_qp->service_type == QAT_SERVICE_ASYMMETRIC) {
 #ifdef BUILD_QAT_ASYM
 			qat_asym_process_response(ops, resp_msg,
 				tmp_qp->op_cookies[head / rx_queue->msg_size]);
+			nb_fw_responses = 1;
 #endif
 		}
 
@@ -678,21 +854,42 @@  qat_dequeue_op_burst(void *qp, void **ops, uint16_t nb_ops)
 				  rx_queue->modulo_mask);
 
 		resp_msg = (uint8_t *)rx_queue->base_addr + head;
-		ops++;
-		resp_counter++;
+
+		if (ops != NULL && nb_fw_responses) {
+			/* only move on to next op if one was ready to return
+			 * to API
+			 */
+			ops++;
+			op_resp_counter++;
+		}
+
+		 /* A compression op may be broken up into multiple fw requests.
+		  * Only count fw responses as complete once ALL the responses
+		  * associated with an op have been processed, as the cookie
+		  * data from the first response must be available until
+		  * finished with all firmware responses.
+		  */
+		fw_resp_counter += nb_fw_responses;
 	}
-	if (resp_counter > 0) {
+
+	if (fw_resp_counter > 0) {
 		rx_queue->head = head;
-		tmp_qp->dequeued += resp_counter;
-		tmp_qp->stats.dequeued_count += resp_counter;
-		rx_queue->nb_processed_responses += resp_counter;
+		tmp_qp->dequeued += fw_resp_counter;
+		tmp_qp->stats.dequeued_count += fw_resp_counter;
+		rx_queue->nb_processed_responses += fw_resp_counter;
 
 		if (rx_queue->nb_processed_responses >
-						QAT_CSR_HEAD_WRITE_THRESH)
-			rxq_free_desc(tmp_qp, rx_queue);
+				QAT_CSR_HEAD_WRITE_THRESH) {
+			/* freeing only if inflights number is equal to zero */
+			uint32_t inflights =
+				tmp_qp->enqueued - tmp_qp->dequeued;
+			if (inflights == 0)
+				rxq_free_desc(tmp_qp, rx_queue);
+		}
 	}
+	QAT_DP_LOG(DEBUG, "Dequeue burst return: %d", op_resp_counter);
 
-	return resp_counter;
+	return op_resp_counter;
 }
 
 __rte_weak int
diff --git a/drivers/common/qat/qat_qp.h b/drivers/common/qat/qat_qp.h
index 0b95ea3c9..35a212180 100644
--- a/drivers/common/qat/qat_qp.h
+++ b/drivers/common/qat/qat_qp.h
@@ -88,6 +88,9 @@  extern const struct qat_qp_hw_data qat_gen3_qps[][ADF_MAX_QPS_ON_ANY_SERVICE];
 uint16_t
 qat_enqueue_op_burst(void *qp, void **ops, uint16_t nb_ops);
 
+uint16_t
+qat_enqueue_comp_op_burst(void *qp, void **ops, uint16_t nb_ops);
+
 uint16_t
 qat_dequeue_op_burst(void *qp, void **ops, uint16_t nb_ops);
 
@@ -103,6 +106,10 @@  int
 qat_qps_per_service(const struct qat_qp_hw_data *qp_hw_data,
 			enum qat_service_type service);
 
+const struct rte_memzone *
+qat_dma_zone_reserve(const char *queue_name, uint32_t queue_size,
+			int socket_id);
+
 /* Needed for weak function*/
 int
 qat_comp_process_response(void **op __rte_unused, uint8_t *resp __rte_unused,
diff --git a/drivers/compress/qat/qat_comp.c b/drivers/compress/qat/qat_comp.c
index 533e34f6b..8c2e43c7a 100644
--- a/drivers/compress/qat/qat_comp.c
+++ b/drivers/compress/qat/qat_comp.c
@@ -18,7 +18,6 @@ 
 #include "qat_comp.h"
 #include "qat_comp_pmd.h"
 
-
 int
 qat_comp_build_request(void *in_op, uint8_t *out_msg,
 		       void *op_cookie,
@@ -57,6 +56,53 @@  qat_comp_build_request(void *in_op, uint8_t *out_msg,
 	rte_mov128(out_msg, tmpl);
 	comp_req->comn_mid.opaque_data = (uint64_t)(uintptr_t)op;
 
+	if (likely(qat_xform->qat_comp_request_type ==
+			QAT_COMP_REQUEST_DYNAMIC_COMP_STATELESS)) {
+
+		if (unlikely(op->src.length > QAT_FALLBACK_THLD)) {
+			/* the operation must be split into pieces */
+			if (qat_xform->checksum_type !=
+					RTE_COMP_CHECKSUM_NONE) {
+				/* fallback to fixed compression in case any
+				 * checksum calculation was requested
+				 */
+				comp_req->comn_hdr.service_cmd_id =
+						ICP_QAT_FW_COMP_CMD_STATIC;
+
+				ICP_QAT_FW_COMN_NEXT_ID_SET(
+						&comp_req->comp_cd_ctrl,
+						ICP_QAT_FW_SLICE_DRAM_WR);
+
+				ICP_QAT_FW_COMN_NEXT_ID_SET(
+						&comp_req->u2.xlt_cd_ctrl,
+						ICP_QAT_FW_SLICE_NULL);
+				ICP_QAT_FW_COMN_CURR_ID_SET(
+						&comp_req->u2.xlt_cd_ctrl,
+						ICP_QAT_FW_SLICE_NULL);
+
+				QAT_DP_LOG(DEBUG, "QAT PMD: fallback to fixed compression!");
+			} else {
+				/* calculate num. of descriptors for split op */
+				int nb_descriptors_needed =
+					op->src.length / QAT_FALLBACK_THLD + 1;
+				QAT_LOG(DEBUG, "Input data is too big, op must be split into %d descriptors",
+						nb_descriptors_needed);
+				return nb_descriptors_needed;
+			}
+		}
+
+		/* set BFINAL bit according to flush_flag */
+		comp_req->comp_pars.req_par_flags =
+			ICP_QAT_FW_COMP_REQ_PARAM_FLAGS_BUILD(
+				ICP_QAT_FW_COMP_SOP,
+				ICP_QAT_FW_COMP_EOP,
+				op->flush_flag == RTE_COMP_FLUSH_FINAL ?
+					ICP_QAT_FW_COMP_BFINAL
+					: ICP_QAT_FW_COMP_NOT_BFINAL,
+				ICP_QAT_FW_COMP_CNV,
+				ICP_QAT_FW_COMP_CNV_RECOVERY);
+	}
+
 	if (op->op_type == RTE_COMP_OP_STATEFUL) {
 		comp_req->comp_pars.req_par_flags =
 			ICP_QAT_FW_COMP_REQ_PARAM_FLAGS_BUILD(
@@ -72,30 +118,6 @@  qat_comp_build_request(void *in_op, uint8_t *out_msg,
 				ICP_QAT_FW_COMP_NO_CNV_RECOVERY);
 	}
 
-	if (likely(qat_xform->qat_comp_request_type ==
-		    QAT_COMP_REQUEST_DYNAMIC_COMP_STATELESS)) {
-		if (unlikely(op->src.length > QAT_FALLBACK_THLD)) {
-
-			/* fallback to fixed compression */
-			comp_req->comn_hdr.service_cmd_id =
-					ICP_QAT_FW_COMP_CMD_STATIC;
-
-			ICP_QAT_FW_COMN_NEXT_ID_SET(&comp_req->comp_cd_ctrl,
-					ICP_QAT_FW_SLICE_DRAM_WR);
-
-			ICP_QAT_FW_COMN_NEXT_ID_SET(&comp_req->u2.xlt_cd_ctrl,
-					ICP_QAT_FW_SLICE_NULL);
-			ICP_QAT_FW_COMN_CURR_ID_SET(&comp_req->u2.xlt_cd_ctrl,
-					ICP_QAT_FW_SLICE_NULL);
-
-			QAT_DP_LOG(DEBUG, "QAT PMD: fallback to fixed "
-				   "compression! IM buffer size can be too low "
-				   "for produced data.\n Please use input "
-				   "buffer length lower than %d bytes",
-				   QAT_FALLBACK_THLD);
-		}
-	}
-
 	/* common for sgl and flat buffers */
 	comp_req->comp_pars.comp_len = op->src.length;
 	comp_req->comp_pars.out_buffer_sz = rte_pktmbuf_pkt_len(op->m_dst) -
@@ -233,6 +255,289 @@  qat_comp_build_request(void *in_op, uint8_t *out_msg,
 	return 0;
 }
 
+static inline uint32_t adf_modulo(uint32_t data, uint32_t modulo_mask)
+{
+	return data & modulo_mask;
+}
+
+static int
+qat_comp_allocate_child_memzones(struct qat_qp *qp, uint32_t parent_tail,
+				 uint32_t data_to_enqueue)
+{
+	struct qat_queue *txq = &(qp->tx_q);
+	uint32_t children_count = (data_to_enqueue + QAT_FALLBACK_THLD - 1) /
+			QAT_FALLBACK_THLD;
+	uint32_t memzone_size = RTE_PMD_QAT_COMP_IM_BUFFER_SIZE;
+	uint32_t tail = parent_tail;
+	uint32_t i;
+
+	for (i = 0; i < children_count; i++) {
+		struct qat_comp_op_cookie *child_cookie;
+		uint32_t cookie_index;
+
+		tail = adf_modulo(tail + txq->msg_size, txq->modulo_mask);
+		cookie_index = tail / txq->msg_size;
+		child_cookie = (struct qat_comp_op_cookie *)
+				qp->op_cookies[cookie_index];
+
+		snprintf(child_cookie->dst_memz_name,
+				sizeof(child_cookie->dst_memz_name),
+				"dst_%u_%u_%u_%u",
+				qp->qat_dev->qat_dev_id, txq->hw_bundle_number,
+				txq->hw_queue_number, cookie_index);
+		child_cookie->dst_memzone = qat_dma_zone_reserve(
+				child_cookie->dst_memz_name,
+				memzone_size,
+				SOCKET_ID_ANY);
+		if (child_cookie->dst_memzone == NULL) {
+			uint32_t j;
+
+			QAT_LOG(ERR, "Failed to allocate dst buffer memzone");
+
+			/* let's free everything allocated up to now */
+			tail = parent_tail;
+			for (j = 0; j < i; j++) {
+				tail = adf_modulo(tail + txq->msg_size,
+						txq->modulo_mask);
+				cookie_index = tail / txq->msg_size;
+				child_cookie = (struct qat_comp_op_cookie *)
+						qp->op_cookies[cookie_index];
+				rte_memzone_free(child_cookie->dst_memzone);
+				child_cookie->dst_memzone = NULL;
+			}
+			return -ENOMEM;
+		}
+	}
+
+	return 0;
+}
+
+int
+qat_comp_build_multiple_requests(void *in_op, struct qat_qp *qp,
+				 uint32_t parent_tail, int nb_descr)
+{
+	struct rte_comp_op *op = in_op;
+	struct qat_queue *txq = &(qp->tx_q);
+	uint8_t *base_addr = (uint8_t *)txq->base_addr;
+	uint8_t *out_msg = base_addr + parent_tail;
+	uint32_t tail = parent_tail;
+	struct icp_qat_fw_comp_req *comp_req =
+			(struct icp_qat_fw_comp_req *)out_msg;
+	struct qat_comp_op_cookie *parent_cookie =
+			(struct qat_comp_op_cookie *)
+			qp->op_cookies[parent_tail / txq->msg_size];
+	struct qat_comp_op_cookie *child_cookie;
+	uint32_t data_to_enqueue, data_enqueued = 0;
+	int num_descriptors_built = 0;
+	int ret;
+
+	QAT_DP_LOG(DEBUG, "op %p, parent_cookie %p ", op, parent_cookie);
+
+	parent_cookie->nb_child_responses = 0;
+	parent_cookie->nb_children = 0;
+	parent_cookie->split_op = 1;
+	parent_cookie->orig_parent_src_len = op->src.length;
+	parent_cookie->orig_parent_flush_flag = op->flush_flag;
+	op->src.length = QAT_FALLBACK_THLD;
+	op->flush_flag = RTE_COMP_FLUSH_FULL;
+
+	data_to_enqueue = parent_cookie->orig_parent_src_len -
+			QAT_FALLBACK_THLD;
+
+	ret = qat_comp_build_request(in_op, out_msg, parent_cookie,
+			qp->qat_dev_gen);
+	if (ret == 0) {
+		/* allocate memzones for all children ops */
+		ret = qat_comp_allocate_child_memzones(qp, parent_tail,
+				data_to_enqueue);
+	}
+	if (ret != 0) {
+		/* restore op and clear cookie */
+		QAT_DP_LOG(WARNING, "Failed to build parent descriptor");
+		parent_cookie->split_op = 0;
+		op->src.length = parent_cookie->orig_parent_src_len;
+		parent_cookie->orig_parent_src_len =  0;
+		parent_cookie->orig_parent_flush_flag = 0;
+		return ret;
+	}
+
+	num_descriptors_built++;
+
+	data_enqueued = QAT_FALLBACK_THLD;
+	while (data_to_enqueue) {
+		/* create descriptor at next entry in tx queue */
+		uint32_t src_data_size = RTE_MIN(data_to_enqueue,
+				QAT_FALLBACK_THLD);
+		uint32_t dst_data_size = RTE_PMD_QAT_COMP_IM_BUFFER_SIZE;
+		const struct rte_memzone *mz;
+		uint32_t cookie_index;
+
+		tail = adf_modulo(tail + txq->msg_size, txq->modulo_mask);
+		cookie_index = tail / txq->msg_size;
+		child_cookie = (struct qat_comp_op_cookie *)
+				qp->op_cookies[cookie_index];
+		mz = child_cookie->dst_memzone;
+		comp_req = (struct icp_qat_fw_comp_req *)(base_addr + tail);
+
+		child_cookie->split_op = 1; /* must be set for child as well */
+		child_cookie->parent_cookie = parent_cookie; /* same as above */
+		child_cookie->nb_children = 0;
+
+		QAT_DP_LOG(DEBUG,
+				"cookie_index %d, child_cookie %p, comp_req %p",
+				cookie_index, child_cookie, comp_req);
+		QAT_DP_LOG(DEBUG,
+				"data_to_enqueue %d, data_enqueued %d, num_descriptors_built %d",
+				data_to_enqueue, data_enqueued,
+				num_descriptors_built);
+
+		rte_mov128((uint8_t *)comp_req, out_msg);
+
+		comp_req->comn_mid.opaque_data = (uint64_t)(uintptr_t)op;
+		comp_req->comn_mid.src_length = src_data_size;
+
+		if ((data_enqueued + src_data_size) >
+				rte_pktmbuf_data_len(op->m_src)) {
+			/* src */
+			ret = qat_sgl_fill_array(op->m_src,
+					data_enqueued,
+					child_cookie->qat_sgl_src_d,
+					src_data_size,
+					child_cookie->src_nb_elems);
+			if (ret) {
+				QAT_DP_LOG(ERR,
+						"QAT PMD (multiple_requests) Cannot fill src. sgl array");
+				op->status = RTE_COMP_OP_STATUS_INVALID_ARGS;
+				return ret;
+			}
+
+			child_cookie->qat_sgl_src_phys_addr =
+			      rte_malloc_virt2iova(child_cookie->qat_sgl_src_d);
+
+			comp_req->comn_mid.src_data_addr =
+					child_cookie->qat_sgl_src_phys_addr;
+
+			/* dst */
+			struct qat_sgl *list = (struct qat_sgl *)
+					child_cookie->qat_sgl_dst_d;
+
+			list->buffers[0].len = dst_data_size;
+			list->buffers[0].resrvd = 0;
+			list->buffers[0].addr = mz->iova;
+
+			comp_req->comn_mid.dst_length = dst_data_size;
+			comp_req->comn_mid.dest_data_addr =
+					child_cookie->qat_sgl_dst_phys_addr;
+
+			child_cookie->dest_buffer = (char *)mz->addr;
+
+			ICP_QAT_FW_COMN_PTR_TYPE_SET(
+					comp_req->comn_hdr.comn_req_flags,
+					QAT_COMN_PTR_TYPE_SGL);
+		} else {
+			op->src.offset = data_enqueued;
+			comp_req->comn_mid.src_data_addr =
+					rte_pktmbuf_mtophys_offset(op->m_src,
+					op->src.offset);
+
+			ICP_QAT_FW_COMN_PTR_TYPE_SET(
+					comp_req->comn_hdr.comn_req_flags,
+					QAT_COMN_PTR_TYPE_FLAT);
+
+			child_cookie->dest_buffer = mz->addr;
+
+			comp_req->comn_mid.dst_length = dst_data_size;
+			comp_req->comn_mid.dest_data_addr = mz->iova;
+		}
+
+		comp_req->comp_pars.comp_len = src_data_size;
+		comp_req->comp_pars.out_buffer_sz = dst_data_size;
+
+		data_to_enqueue -= src_data_size;
+		data_enqueued += src_data_size;
+		num_descriptors_built++;
+
+		comp_req->comp_pars.req_par_flags =
+			ICP_QAT_FW_COMP_REQ_PARAM_FLAGS_BUILD(
+				ICP_QAT_FW_COMP_SOP,
+				ICP_QAT_FW_COMP_EOP,
+				data_to_enqueue == 0 ?
+					ICP_QAT_FW_COMP_BFINAL
+					: ICP_QAT_FW_COMP_NOT_BFINAL,
+				ICP_QAT_FW_COMP_CNV,
+				ICP_QAT_FW_COMP_CNV_RECOVERY);
+	}
+
+	if (nb_descr != num_descriptors_built)
+		QAT_LOG(ERR, "split op. expected %d, built %d",
+				nb_descr, num_descriptors_built);
+
+	parent_cookie->nb_children = num_descriptors_built - 1;
+	return num_descriptors_built;
+}
+
+
+static inline void
+qat_comp_response_data_copy(struct qat_comp_op_cookie *cookie,
+		       struct rte_comp_op *rx_op)
+{
+	struct qat_comp_op_cookie *pc = cookie->parent_cookie;
+	uint32_t remaining_off = pc->total_produced;
+	struct rte_mbuf *sgl_buf = rx_op->m_dst;
+
+	uint32_t prod, sent;
+	void *op_dst_addr;
+
+	/* number of bytes left in the current segment */
+	uint32_t left_in_current;
+
+	/* sgl_buf - current sgl moved to the parent cookie */
+	while (remaining_off >= rte_pktmbuf_data_len(sgl_buf)) {
+		remaining_off -= rte_pktmbuf_data_len(sgl_buf);
+		sgl_buf = sgl_buf->next;
+		if (sgl_buf == NULL)
+			return;
+	}
+
+	op_dst_addr = rte_pktmbuf_mtod_offset(sgl_buf, uint8_t *,
+			remaining_off);
+
+	left_in_current = rte_pktmbuf_data_len(sgl_buf) - remaining_off;
+
+	if (rx_op->produced <= left_in_current)
+		rte_memcpy(op_dst_addr,  cookie->dest_buffer,
+				rx_op->produced);
+	else {
+		rte_memcpy(op_dst_addr,  cookie->dest_buffer,
+				left_in_current);
+		sgl_buf = sgl_buf->next;
+		prod = rx_op->produced - left_in_current;
+		sent = left_in_current;
+
+		while (prod > rte_pktmbuf_data_len(sgl_buf)) {
+			op_dst_addr = rte_pktmbuf_mtod_offset(sgl_buf,
+					uint8_t *, 0);
+
+			rte_memcpy(op_dst_addr,
+					((uint8_t *)cookie->dest_buffer) +
+					sent,
+					rte_pktmbuf_data_len(sgl_buf));
+
+			prod -= rte_pktmbuf_data_len(sgl_buf);
+			sent += rte_pktmbuf_data_len(sgl_buf);
+
+			sgl_buf = sgl_buf->next;
+		}
+
+		op_dst_addr = rte_pktmbuf_mtod_offset(sgl_buf, uint8_t *, 0);
+
+		rte_memcpy(op_dst_addr,
+				((uint8_t *)cookie->dest_buffer) +
+				sent,
+				prod);
+	}
+}
+
 int
 qat_comp_process_response(void **op, uint8_t *resp, void *op_cookie,
 			  uint64_t *dequeue_err_count)
@@ -241,6 +546,14 @@  qat_comp_process_response(void **op, uint8_t *resp, void *op_cookie,
 			(struct icp_qat_fw_comp_resp *)resp;
 	struct qat_comp_op_cookie *cookie =
 			(struct qat_comp_op_cookie *)op_cookie;
+
+	struct icp_qat_fw_resp_comp_pars *comp_resp1 =
+	  (struct icp_qat_fw_resp_comp_pars *)&resp_msg->comp_resp_pars;
+
+	QAT_DP_LOG(DEBUG, "input counter = %u, output counter = %u",
+		   comp_resp1->input_byte_counter,
+		   comp_resp1->output_byte_counter);
+
 	struct rte_comp_op *rx_op = (struct rte_comp_op *)(uintptr_t)
 			(resp_msg->opaque_data);
 	struct qat_comp_stream *stream;
@@ -275,7 +588,10 @@  qat_comp_process_response(void **op, uint8_t *resp, void *op_cookie,
 		rx_op->consumed = 0;
 		rx_op->produced = 0;
 		*op = (void *)rx_op;
-		return 0;
+		/* also in this case number of returned ops */
+		/* must be equal to one, */
+		/* appropriate status (error) must be set as well */
+		return 1;
 	}
 
 	if (likely(qat_xform->qat_comp_request_type
@@ -288,7 +604,7 @@  qat_comp_process_response(void **op, uint8_t *resp, void *op_cookie,
 			*op = (void *)rx_op;
 			QAT_DP_LOG(ERR, "QAT has wrong firmware");
 			++(*dequeue_err_count);
-			return 0;
+			return 1;
 		}
 	}
 
@@ -305,8 +621,9 @@  qat_comp_process_response(void **op, uint8_t *resp, void *op_cookie,
 		int8_t xlat_err_code =
 			(int8_t)resp_msg->comn_resp.comn_error.xlat_err_code;
 
-		/* handle recoverable out-of-buffer condition in stateful */
-		/* decompression scenario */
+		/* handle recoverable out-of-buffer condition in stateful
+		 * decompression scenario
+		 */
 		if (cmp_err_code == ERR_CODE_OVERFLOW_ERROR && !xlat_err_code
 				&& qat_xform->qat_comp_request_type
 					== QAT_COMP_REQUEST_DECOMPRESS
@@ -327,10 +644,12 @@  qat_comp_process_response(void **op, uint8_t *resp, void *op_cookie,
 		     xlat_err_code == ERR_CODE_OVERFLOW_ERROR)){
 
 			struct icp_qat_fw_resp_comp_pars *comp_resp =
-	  (struct icp_qat_fw_resp_comp_pars *)&resp_msg->comp_resp_pars;
+					(struct icp_qat_fw_resp_comp_pars *)
+					&resp_msg->comp_resp_pars;
 
-			/* handle recoverable out-of-buffer condition */
-			/* in stateless compression scenario */
+			/* handle recoverable out-of-buffer condition
+			 * in stateless compression scenario
+			 */
 			if (comp_resp->input_byte_counter) {
 				if ((qat_xform->qat_comp_request_type
 				== QAT_COMP_REQUEST_FIXED_COMP_STATELESS) ||
@@ -375,9 +694,74 @@  qat_comp_process_response(void **op, uint8_t *resp, void *op_cookie,
 				rx_op->output_chksum = comp_resp->curr_chksum;
 		}
 	}
-	*op = (void *)rx_op;
+	QAT_LOG(DEBUG, "About to check for split op :cookies: %p %p, split:%d",
+		cookie, cookie->parent_cookie, cookie->split_op);
+
+	if (cookie->split_op) {
+		*op = NULL;
+		struct qat_comp_op_cookie *pc = cookie->parent_cookie;
+
+		if  (cookie->nb_children > 0) {
+			QAT_LOG(DEBUG, "Parent");
+			/* parent - don't return until all children
+			 * responses are collected
+			 */
+			cookie->total_consumed = rx_op->consumed;
+			cookie->total_produced = rx_op->produced;
+		} else {
+			QAT_LOG(DEBUG, "Child");
+			qat_comp_response_data_copy(cookie, rx_op);
+
+			const struct rte_memzone *mz =
+				rte_memzone_lookup(cookie->dst_memz_name);
+			if (mz != NULL)	{
+				int status = rte_memzone_free(mz);
+				if (status != 0)
+					QAT_LOG(ERR,
+						"Error %d on freeing queue %s",
+						status, cookie->dst_memz_name);
+			}
+			cookie->dest_buffer = NULL;
+
+			pc->total_consumed += rx_op->consumed;
+			pc->total_produced += rx_op->produced;
+			pc->nb_child_responses++;
+
+			/* (child) cookie fields have to be reset
+			 * to avoid problems with reusability -
+			 * rx and tx queue starting from index zero
+			 */
+			cookie->nb_children = 0;
+			cookie->split_op = 0;
+			cookie->nb_child_responses = 0;
+
+			if (pc->nb_child_responses == pc->nb_children) {
+				uint8_t child_resp;
+
+				/* parent should be included as well */
+				child_resp = pc->nb_child_responses + 1;
+
+				rx_op->status = RTE_COMP_OP_STATUS_SUCCESS;
+				rx_op->consumed = pc->total_consumed;
+				rx_op->produced = pc->total_produced;
+				*op = (void *)rx_op;
+
+				/* (parent) cookie fields have to be reset
+				 * to avoid problems with reusability -
+				 * rx and tx queue starting from index zero
+				 */
+				pc->nb_children = 0;
+				pc->split_op = 0;
+				pc->nb_child_responses = 0;
+
+				return child_resp;
+			}
+		}
+		return 0;
+	}
 
-	return 0;
+	*op = (void *)rx_op;
+	return 1;
 }
 
 unsigned int
@@ -443,9 +827,9 @@  static int qat_comp_create_templates(struct qat_comp_xform *qat_xform,
 		comp_level = ICP_QAT_HW_COMPRESSION_DEPTH_1;
 		req_par_flags = ICP_QAT_FW_COMP_REQ_PARAM_FLAGS_BUILD(
 				ICP_QAT_FW_COMP_SOP, ICP_QAT_FW_COMP_EOP,
-				ICP_QAT_FW_COMP_BFINAL, ICP_QAT_FW_COMP_NO_CNV,
-				ICP_QAT_FW_COMP_NO_CNV_RECOVERY);
-
+				ICP_QAT_FW_COMP_BFINAL,
+				ICP_QAT_FW_COMP_CNV,
+				ICP_QAT_FW_COMP_CNV_RECOVERY);
 	} else {
 		if (xform->compress.level == RTE_COMP_LEVEL_PMD_DEFAULT)
 			comp_level = ICP_QAT_HW_COMPRESSION_DEPTH_8;
diff --git a/drivers/compress/qat/qat_comp.h b/drivers/compress/qat/qat_comp.h
index 2231451a1..235b8162c 100644
--- a/drivers/compress/qat/qat_comp.h
+++ b/drivers/compress/qat/qat_comp.h
@@ -11,6 +11,7 @@ 
 #include <rte_compressdev_pmd.h>
 
 #include "qat_common.h"
+#include "qat_qp.h"
 #include "icp_qat_hw.h"
 #include "icp_qat_fw_comp.h"
 #include "icp_qat_fw_la.h"
@@ -22,7 +23,7 @@ 
 #define ERR_CODE_QAT_COMP_WRONG_FW -99
 
 /* fallback to fixed compression threshold */
-#define QAT_FALLBACK_THLD ((uint32_t)(RTE_PMD_QAT_COMP_IM_BUFFER_SIZE / 1.1))
+#define QAT_FALLBACK_THLD ((uint32_t)(RTE_PMD_QAT_COMP_IM_BUFFER_SIZE / 1.3))
 
 #define QAT_MIN_OUT_BUF_SIZE 46
 
@@ -63,6 +64,21 @@  struct qat_comp_op_cookie {
 	uint16_t dst_nb_elems;
 	struct qat_sgl *qat_sgl_src_d;
 	struct qat_sgl *qat_sgl_dst_d;
+
+	uint8_t split_op;
+	uint8_t nb_children;
+	uint8_t nb_child_responses;
+
+	uint32_t orig_parent_src_len;
+	uint32_t orig_parent_dest_len;
+	uint32_t orig_parent_flush_flag;
+
+	uint32_t total_consumed;
+	uint32_t total_produced;
+	struct qat_comp_op_cookie *parent_cookie; /* used by the child only */
+	void *dest_buffer;
+	char dst_memz_name[RTE_MEMZONE_NAMESIZE];
+	const struct rte_memzone *dst_memzone;
 };
 
 struct qat_comp_xform {
@@ -86,6 +102,10 @@  int
 qat_comp_build_request(void *in_op, uint8_t *out_msg, void *op_cookie,
 		       enum qat_device_gen qat_dev_gen __rte_unused);
 
+int
+qat_comp_build_multiple_requests(void *in_op, struct qat_qp *qp,
+				 uint32_t parent_tail, int nb_descr);
+
 int
 qat_comp_process_response(void **op, uint8_t *resp, void *op_cookie,
 			  uint64_t *dequeue_err_count);
diff --git a/drivers/compress/qat/qat_comp_pmd.c b/drivers/compress/qat/qat_comp_pmd.c
index 7d4fdf10c..378145f52 100644
--- a/drivers/compress/qat/qat_comp_pmd.c
+++ b/drivers/compress/qat/qat_comp_pmd.c
@@ -560,20 +560,6 @@  qat_comp_dev_info_get(struct rte_compressdev *dev,
 	}
 }
 
-static uint16_t
-qat_comp_pmd_enqueue_op_burst(void *qp, struct rte_comp_op **ops,
-		uint16_t nb_ops)
-{
-	return qat_enqueue_op_burst(qp, (void **)ops, nb_ops);
-}
-
-static uint16_t
-qat_comp_pmd_dequeue_op_burst(void *qp, struct rte_comp_op **ops,
-			      uint16_t nb_ops)
-{
-	return qat_dequeue_op_burst(qp, (void **)ops, nb_ops);
-}
-
 static uint16_t
 qat_comp_pmd_enq_deq_dummy_op_burst(void *qp __rte_unused,
 				    struct rte_comp_op **ops __rte_unused,
@@ -603,7 +589,7 @@  static struct rte_compressdev_ops compress_qat_dummy_ops = {
 };
 
 static uint16_t
-qat_comp_pmd_dequeue_frst_op_burst(void *qp, struct rte_comp_op **ops,
+qat_comp_pmd_dequeue_first_op_burst(void *qp, struct rte_comp_op **ops,
 				   uint16_t nb_ops)
 {
 	uint16_t ret = qat_dequeue_op_burst(qp, (void **)ops, nb_ops);
@@ -623,7 +609,8 @@  qat_comp_pmd_dequeue_frst_op_burst(void *qp, struct rte_comp_op **ops,
 
 		} else {
 			tmp_qp->qat_dev->comp_dev->compressdev->dequeue_burst =
-					qat_comp_pmd_dequeue_op_burst;
+					(compressdev_dequeue_pkt_burst_t)
+					qat_dequeue_op_burst;
 		}
 	}
 	return ret;
@@ -698,8 +685,9 @@  qat_comp_dev_create(struct qat_pci_device *qat_pci_dev,
 
 	compressdev->dev_ops = &compress_qat_ops;
 
-	compressdev->enqueue_burst = qat_comp_pmd_enqueue_op_burst;
-	compressdev->dequeue_burst = qat_comp_pmd_dequeue_frst_op_burst;
+	compressdev->enqueue_burst = (compressdev_enqueue_pkt_burst_t)
+			qat_enqueue_comp_op_burst;
+	compressdev->dequeue_burst = qat_comp_pmd_dequeue_first_op_burst;
 
 	compressdev->feature_flags = RTE_COMPDEV_FF_HW_ACCELERATED;