[dpdk-dev,v1,4/6] compress/zlib: add enq deq apis

Message ID 1526380346-7386-5-git-send-email-shally.verma@caviumnetworks.com (mailing list archive)
State Changes Requested, archived
Delegated to: Pablo de Lara Guarch
Headers

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation success Compilation OK

Commit Message

Shally Verma May 15, 2018, 10:32 a.m. UTC
  implement enqueue and dequeue apis

Signed-off-by: Sunila Sahu <sunila.sahu@caviumnetworks.com>
Signed-off-by: Shally Verma <shally.verma@caviumnetworks.com>
Signed-off-by: Ashish Gupta <ashish.gupta@caviumnetworks.com>
---
 drivers/compress/meson.build      |   2 +-
 drivers/compress/zlib/meson.build |  11 ++
 drivers/compress/zlib/zlib_pmd.c  | 275 +++++++++++++++++++++++++++++++++++++-
 3 files changed, 286 insertions(+), 2 deletions(-)
  

Comments

Daly, Lee June 15, 2018, 11:09 a.m. UTC | #1
Comments inline.

> -----Original Message-----
> From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Shally Verma
> Sent: Tuesday, May 15, 2018 11:32 AM
> To: De Lara Guarch, Pablo <pablo.de.lara.guarch@intel.com>
> Cc: Trahe, Fiona <fiona.trahe@intel.com>; dev@dpdk.org;
> pathreay@caviumnetworks.com; Sunila Sahu
> <sunila.sahu@caviumnetworks.com>; Ashish Gupta
> <ashish.gupta@caviumnetworks.com>
> Subject: [dpdk-dev] [PATCH v1 4/6] compress/zlib: add enq deq apis
> 
> implement enqueue and dequeue apis

<...>
> diff --git a/drivers/compress/zlib/meson.build
> b/drivers/compress/zlib/meson.build
> new file mode 100644
> index 0000000..d66de95
> --- /dev/null
> +++ b/drivers/compress/zlib/meson.build
> @@ -0,0 +1,11 @@
> +# SPDX-License-Identifier: BSD-3-Clause # Copyright(c) 2018 Cavium
> +Networks
> +
> +dep = dependency('zlib', required: false) if not dep.found()
> +	build = false
> +endif
> +deps += 'bus_vdev'
> +sources = files('zlib_pmd.c', 'zlib_pmd_ops.c') ext_deps += dep
> +pkgconfig_extra_libs += '-lz'
[Lee] You have added "allow_experimental_apis" tag to the zlib/Makefile, better to add to the meson as well.
With the tag; allow_experimental_apis = true.

> diff --git a/drivers/compress/zlib/zlib_pmd.c
> b/drivers/compress/zlib/zlib_pmd.c
> index 3dc71ec..e2681a7 100644
> --- a/drivers/compress/zlib/zlib_pmd.c
> +++ b/drivers/compress/zlib/zlib_pmd.c
> @@ -17,7 +17,239 @@
>  #include "zlib_pmd_private.h"
> 
>  static uint8_t compressdev_driver_id;
> -int zlib_logtype_driver;
> +
> +/** compute the next mbuf in list and assign dst buffer and dlen,
> + * set op->status to appropriate flag if we run out of mbuf  */
> +#define COMPUTE_DST_BUF(mbuf, dst, dlen)		\
> +	((mbuf = mbuf->next) ?
> 	\
> +		(dst = rte_pktmbuf_mtod(mbuf, uint8_t *)),		\
> +		dlen = rte_pktmbuf_data_len(mbuf) :			\
> +			!(op->status =					\
> +			((op->op_type == RTE_COMP_OP_STATELESS) ?
> 	\
[Lee] Clever & useful macro.

> +
> 	RTE_COMP_OP_STATUS_OUT_OF_SPACE_TERMINATED :	\
> +
> 	RTE_COMP_OP_STATUS_OUT_OF_SPACE_RECOVERABLE)))
> +
> +static void
> +process_zlib_deflate(struct rte_comp_op *op, z_stream *strm) {
> +	int ret, flush, fin_flush;
> +	uint8_t *src, *dst;
> +	uint32_t sl, dl, have;
> +	struct rte_mbuf *mbuf_src = op->m_src;
> +	struct rte_mbuf *mbuf_dst = op->m_dst;
> +
> +	src = rte_pktmbuf_mtod_offset(mbuf_src, uint8_t *, op-
> >src.offset);
> +
> +	sl = rte_pktmbuf_data_len(mbuf_src) - op->src.offset;
> +
> +	dst = rte_pktmbuf_mtod_offset(mbuf_dst, unsigned char *,
> +			op->dst.offset);
> +
> +	dl = rte_pktmbuf_data_len(mbuf_dst) - op->dst.offset;
> +
> +	if (unlikely(!src || !dst || !strm)) {
> +		op->status = RTE_COMP_OP_STATUS_INVALID_ARGS;
> +		ZLIB_LOG_ERR("\nInvalid source or destination buffers");
> +		return;
> +	}
> +	switch (op->flush_flag) {
> +	case RTE_COMP_FLUSH_NONE:
> +		fin_flush = Z_NO_FLUSH;
> +		break;
> +	case RTE_COMP_FLUSH_SYNC:
> +		fin_flush = Z_SYNC_FLUSH;
> +		break;
> +	case RTE_COMP_FLUSH_FULL:
> +		fin_flush = Z_FULL_FLUSH;
> +		break;
> +	case RTE_COMP_FLUSH_FINAL:
> +		fin_flush = Z_FINISH;
> +		break;
> +	default:
> +		op->status = RTE_COMP_OP_STATUS_ERROR;
> +		goto def_end;
> +	}
> +	if (op->src.length <= sl) {
> +		sl = op->src.length;
> +		flush = fin_flush;
> +	} else {
> +		/* if there're more than one mbufs in input,
> +		 * process intermediate with NO_FLUSH
> +		 */
> +		flush = Z_NO_FLUSH;
> +	}
> +	/* initialize status to SUCCESS */
> +	op->status = RTE_COMP_OP_STATUS_SUCCESS;
> +
> +	do {
> +		/* Update z_stream with the inputs provided by application
> */
> +		strm->next_in = src;
> +		strm->avail_in = sl;
> +
> +		do {
> +			strm->avail_out = dl;
> +			strm->next_out = dst;
> +			ret = deflate(strm, flush);
> +			if (unlikely(ret == Z_STREAM_ERROR)) {
> +				/* error return, do not process further */
> +				op->status =
> RTE_COMP_OP_STATUS_ERROR;
> +				goto def_end;
> +			}
> +			/* Update op stats */
> +			op->produced += dl - strm->avail_out;
> +			op->consumed += sl - strm->avail_in;
[Lee] strm struct has a total_in & total_out field which can be used here and will save these cycles doing the calculation each iteration.

> +		/* Break if Z_STREAM_END is encountered or dst mbuf gets
> over */
> +		} while (!(ret == Z_STREAM_END) && (strm->avail_out == 0)
> &&
> +				COMPUTE_DST_BUF(mbuf_dst, dst, dl));
> +
> +		/** Compress till the end of compressed blocks provided
> +		 * or till Z_FINISH
> +		 * Exit if op->status is not SUCCESS.
> +		 */
> +		if ((op->status != RTE_COMP_OP_STATUS_SUCCESS) ||
> +			(ret == Z_STREAM_END) ||
> +			op->consumed == op->src.length)
> +			goto def_end;
> +
> +		/** Update last output buffer with respect to availed space
> */
> +		have = dl - strm->avail_out;
> +		dst += have;
> +		dl = strm->avail_out;
[Lee] From what I can see this assignment isn't doing anything, in the while condition macro dl gets reassigned and inside the above while loop "strm->avail_out = dl", but I may be missing something.
 
> +		/** Update source buffer to next mbuf*/
> +		mbuf_src = mbuf_src->next;
> +		src = rte_pktmbuf_mtod(mbuf_src, uint8_t *);
> +		sl = rte_pktmbuf_data_len(mbuf_src);
> +
> +		/** Last block to be compressed
> +		 * Update flush with value provided by app for last block,
> +		 * For stateless flush should be always Z_FINISH
> +		 */
> +
> +		if ((op->src.length - op->consumed) <= sl) {
> +			sl = (op->src.length - op->consumed);
> +			flush = fin_flush;
[Lee] Just a clarification of my understanding please, this assignment of fin_flush to flush is what will cause the last return from the zlib deflate to be Z_STREAM_END, only if fin_flush is set to Z_FINISH. fin_flush is passed from the application, if the application doesn't pass Z_FINISH as the flush flag (I See your comment above.), this means there will never be a Z_STREAM_END return from deflate, and eventually deflate will just return an error. I believe a check could be done long before this point to ensure the flush flag is Z_FINAL, since the PMD only supports stateful at the moment. This will save cycles compressing data that we only find out at the end of the op is not valid.

> +		}
> +
> +	} while (1);
[Lee] This will be to be rewritten as to not use an infinite while loop. Undefined behaviour may lead to system hanging, so we don't use them in DPDK.

> +def_end:
> +	if (op->op_type == RTE_COMP_OP_STATELESS)
> +		deflateReset(strm);
> +}
> +
> +static void
> +process_zlib_inflate(struct rte_comp_op *op, z_stream *strm) {
> +	int ret, flush;
> +	uint8_t *src, *dst;
> +	uint32_t sl, dl, have;
> +	struct rte_mbuf *mbuf_src = op->m_src;
> +	struct rte_mbuf *mbuf_dst = op->m_dst;
> +
[Lee] same comments apply for inflate as deflate.
Thanks,
Lee.
  

Patch

diff --git a/drivers/compress/meson.build b/drivers/compress/meson.build
index fb136e1..e4d5e5c 100644
--- a/drivers/compress/meson.build
+++ b/drivers/compress/meson.build
@@ -1,7 +1,7 @@ 
 # SPDX-License-Identifier: BSD-3-Clause
 # Copyright(c) 2018 Intel Corporation
 
-drivers = ['isal']
+drivers = ['isal','zlib']
 
 std_deps = ['compressdev'] # compressdev pulls in all other needed deps
 config_flag_fmt = 'RTE_LIBRTE_@0@_PMD'
diff --git a/drivers/compress/zlib/meson.build b/drivers/compress/zlib/meson.build
new file mode 100644
index 0000000..d66de95
--- /dev/null
+++ b/drivers/compress/zlib/meson.build
@@ -0,0 +1,11 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2018 Cavium Networks
+
+dep = dependency('zlib', required: false)
+if not dep.found()
+	build = false
+endif
+deps += 'bus_vdev'
+sources = files('zlib_pmd.c', 'zlib_pmd_ops.c')
+ext_deps += dep
+pkgconfig_extra_libs += '-lz'
diff --git a/drivers/compress/zlib/zlib_pmd.c b/drivers/compress/zlib/zlib_pmd.c
index 3dc71ec..e2681a7 100644
--- a/drivers/compress/zlib/zlib_pmd.c
+++ b/drivers/compress/zlib/zlib_pmd.c
@@ -17,7 +17,239 @@ 
 #include "zlib_pmd_private.h"
 
 static uint8_t compressdev_driver_id;
-int zlib_logtype_driver;
+
+/** compute the next mbuf in list and assign dst buffer and dlen,
+ * set op->status to appropriate flag if we run out of mbuf
+ */
+#define COMPUTE_DST_BUF(mbuf, dst, dlen)		\
+	((mbuf = mbuf->next) ?						\
+		(dst = rte_pktmbuf_mtod(mbuf, uint8_t *)),		\
+		dlen = rte_pktmbuf_data_len(mbuf) :			\
+			!(op->status =					\
+			((op->op_type == RTE_COMP_OP_STATELESS) ?	\
+			RTE_COMP_OP_STATUS_OUT_OF_SPACE_TERMINATED :	\
+			RTE_COMP_OP_STATUS_OUT_OF_SPACE_RECOVERABLE)))
+
+static void
+process_zlib_deflate(struct rte_comp_op *op, z_stream *strm)
+{
+	int ret, flush, fin_flush;
+	uint8_t *src, *dst;
+	uint32_t sl, dl, have;
+	struct rte_mbuf *mbuf_src = op->m_src;
+	struct rte_mbuf *mbuf_dst = op->m_dst;
+
+	src = rte_pktmbuf_mtod_offset(mbuf_src, uint8_t *, op->src.offset);
+
+	sl = rte_pktmbuf_data_len(mbuf_src) - op->src.offset;
+
+	dst = rte_pktmbuf_mtod_offset(mbuf_dst, unsigned char *,
+			op->dst.offset);
+
+	dl = rte_pktmbuf_data_len(mbuf_dst) - op->dst.offset;
+
+	if (unlikely(!src || !dst || !strm)) {
+		op->status = RTE_COMP_OP_STATUS_INVALID_ARGS;
+		ZLIB_LOG_ERR("\nInvalid source or destination buffers");
+		return;
+	}
+	switch (op->flush_flag) {
+	case RTE_COMP_FLUSH_NONE:
+		fin_flush = Z_NO_FLUSH;
+		break;
+	case RTE_COMP_FLUSH_SYNC:
+		fin_flush = Z_SYNC_FLUSH;
+		break;
+	case RTE_COMP_FLUSH_FULL:
+		fin_flush = Z_FULL_FLUSH;
+		break;
+	case RTE_COMP_FLUSH_FINAL:
+		fin_flush = Z_FINISH;
+		break;
+	default:
+		op->status = RTE_COMP_OP_STATUS_ERROR;
+		goto def_end;
+	}
+	if (op->src.length <= sl) {
+		sl = op->src.length;
+		flush = fin_flush;
+	} else {
+		/* if there're more than one mbufs in input,
+		 * process intermediate with NO_FLUSH
+		 */
+		flush = Z_NO_FLUSH;
+	}
+	/* initialize status to SUCCESS */
+	op->status = RTE_COMP_OP_STATUS_SUCCESS;
+
+	do {
+		/* Update z_stream with the inputs provided by application */
+		strm->next_in = src;
+		strm->avail_in = sl;
+
+		do {
+			strm->avail_out = dl;
+			strm->next_out = dst;
+			ret = deflate(strm, flush);
+			if (unlikely(ret == Z_STREAM_ERROR)) {
+				/* error return, do not process further */
+				op->status =  RTE_COMP_OP_STATUS_ERROR;
+				goto def_end;
+			}
+			/* Update op stats */
+			op->produced += dl - strm->avail_out;
+			op->consumed += sl - strm->avail_in;
+		/* Break if Z_STREAM_END is encountered or dst mbuf gets over */
+		} while (!(ret == Z_STREAM_END) && (strm->avail_out == 0) &&
+				COMPUTE_DST_BUF(mbuf_dst, dst, dl));
+
+		/** Compress till the end of compressed blocks provided
+		 * or till Z_FINISH
+		 * Exit if op->status is not SUCCESS.
+		 */
+		if ((op->status != RTE_COMP_OP_STATUS_SUCCESS) ||
+			(ret == Z_STREAM_END) ||
+			op->consumed == op->src.length)
+			goto def_end;
+
+		/** Update last output buffer with respect to availed space */
+		have = dl - strm->avail_out;
+		dst += have;
+		dl = strm->avail_out;
+		/** Update source buffer to next mbuf*/
+		mbuf_src = mbuf_src->next;
+		src = rte_pktmbuf_mtod(mbuf_src, uint8_t *);
+		sl = rte_pktmbuf_data_len(mbuf_src);
+
+		/** Last block to be compressed
+		 * Update flush with value provided by app for last block,
+		 * For stateless flush should be always Z_FINISH
+		 */
+
+		if ((op->src.length - op->consumed) <= sl) {
+			sl = (op->src.length - op->consumed);
+			flush = fin_flush;
+		}
+
+	} while (1);
+def_end:
+	if (op->op_type == RTE_COMP_OP_STATELESS)
+		deflateReset(strm);
+}
+
+static void
+process_zlib_inflate(struct rte_comp_op *op, z_stream *strm)
+{
+	int ret, flush;
+	uint8_t *src, *dst;
+	uint32_t sl, dl, have;
+	struct rte_mbuf *mbuf_src = op->m_src;
+	struct rte_mbuf *mbuf_dst = op->m_dst;
+
+	src = rte_pktmbuf_mtod_offset(mbuf_src, uint8_t *, op->src.offset);
+
+	sl = rte_pktmbuf_data_len(mbuf_src) - op->src.offset;
+
+	dst = rte_pktmbuf_mtod_offset(mbuf_dst, unsigned char *,
+			op->dst.offset);
+
+	dl = rte_pktmbuf_data_len(mbuf_dst) - op->dst.offset;
+
+	if (unlikely(!src || !dst || !strm)) {
+		op->status = RTE_COMP_OP_STATUS_INVALID_ARGS;
+		ZLIB_LOG_ERR("\nInvalid source or destination buffers");
+		return;
+	}
+	if (op->src.length <= sl)
+		sl = op->src.length;
+
+	/** Ignoring flush value provided from application for decompression */
+	flush = Z_NO_FLUSH;
+	/* initialize status to SUCCESS */
+	op->status = RTE_COMP_OP_STATUS_SUCCESS;
+
+	do {
+		/** Update z_stream with the inputs provided by application */
+		strm->avail_in = sl;
+		strm->next_in = src;
+		do {
+			strm->avail_out = dl;
+			strm->next_out = dst;
+
+			ret = inflate(strm, flush);
+
+			switch (ret) {
+			case Z_NEED_DICT:
+				ret = Z_DATA_ERROR;     /* and fall through */
+			case Z_DATA_ERROR:
+			case Z_MEM_ERROR:
+			case Z_STREAM_ERROR:
+				op->status = RTE_COMP_OP_STATUS_ERROR;
+				goto inf_end;
+			default:
+				/** Update op stats */
+				op->produced += dl - strm->avail_out;
+				op->consumed += sl - strm->avail_in;
+
+			}
+		/* Break if Z_STREAM_END is encountered or dst mbuf gets over */
+		} while (!(ret == Z_STREAM_END) && (strm->avail_out == 0) &&
+				COMPUTE_DST_BUF(mbuf_dst, dst, dl));
+
+		/** Compress till the end of compressed blocks provided
+		 * or till Z_STREAM_END.
+		 * Exit if op->status is not SUCCESS.
+		 */
+		if ((op->status != RTE_COMP_OP_STATUS_SUCCESS) ||
+				(ret == Z_STREAM_END) ||
+				op->consumed == op->src.length) {
+			goto inf_end;
+		}
+		/** Adjust previous output buffer with respect to avail_out */
+		have = dl - strm->avail_out;
+		dst += have;
+		dl = strm->avail_out;
+		/** Read next input buffer to be processed */
+		mbuf_src = mbuf_src->next;
+		src = rte_pktmbuf_mtod(mbuf_src, uint8_t *);
+		sl = rte_pktmbuf_data_len(mbuf_src);
+		if ((op->src.length - op->consumed) < sl)
+			sl = (op->src.length - op->consumed);
+	} while (1);
+inf_end:
+	if (op->op_type == RTE_COMP_OP_STATELESS)
+		inflateReset(strm);
+}
+
+/** Process comp operation for mbuf */
+static inline int
+process_zlib_op(struct zlib_qp *qp, struct rte_comp_op *op)
+{
+	struct zlib_stream *stream;
+
+	if (op->src.offset > rte_pktmbuf_data_len(op->m_src) ||
+			op->dst.offset > rte_pktmbuf_data_len(op->m_dst)) {
+		op->status = RTE_COMP_OP_STATUS_INVALID_ARGS;
+		ZLIB_LOG_ERR("\nInvalid source or destination buffers");
+		goto comp_err;
+	}
+
+	if (op->op_type == RTE_COMP_OP_STATELESS)
+		stream = &((struct zlib_priv_xform *)op->private_xform)->stream;
+	else if (op->op_type == RTE_COMP_OP_STATEFUL)
+		stream = (struct zlib_stream *)op->stream;
+	else {
+		op->status = RTE_COMP_OP_STATUS_INVALID_ARGS;
+		ZLIB_LOG_ERR("\nInvalid operation type");
+		goto comp_err;
+	}
+	stream->comp(op, &stream->strm);
+comp_err:
+	/* whatever is out of op, put it into completion queue with
+	 * its status
+	 */
+	return rte_ring_enqueue(qp->processed_pkts, (void *)op);
+}
 
 /** Parse comp xform and set private xform/Stream parameters */
 int
@@ -118,6 +350,43 @@  zlib_set_stream_parameters(const struct rte_comp_xform *xform,
 	return 0;
 }
 
+static uint16_t
+zlib_pmd_enqueue_burst(void *queue_pair,
+			struct rte_comp_op **ops, uint16_t nb_ops)
+{
+	struct zlib_qp *qp = queue_pair;
+	int ret, i;
+	int enqd = 0;
+	for (i = 0; i < nb_ops; i++) {
+		ret = process_zlib_op(qp, ops[i]);
+		if (unlikely(ret < 0)) {
+			/* increment count if failed to push to completion
+			 * queue
+			 */
+			qp->qp_stats.enqueue_err_count++;
+		} else {
+			qp->qp_stats.enqueued_count++;
+			enqd++;
+		}
+	}
+	return enqd;
+}
+
+static uint16_t
+zlib_pmd_dequeue_burst(void *queue_pair,
+			struct rte_comp_op **ops, uint16_t nb_ops)
+{
+	struct zlib_qp *qp = queue_pair;
+
+	unsigned int nb_dequeued = 0;
+
+	nb_dequeued = rte_ring_dequeue_burst(qp->processed_pkts,
+			(void **)ops, nb_ops, NULL);
+	qp->qp_stats.dequeued_count += nb_dequeued;
+
+	return nb_dequeued;
+}
+
 static int zlib_remove(struct rte_vdev_device *vdev);
 
 static int
@@ -138,6 +407,10 @@  zlib_create(const char *name,
 	dev->driver_id = compressdev_driver_id;
 	dev->dev_ops = rte_zlib_pmd_ops;
 
+	/* register rx/tx burst functions for data path */
+	dev->dequeue_burst = zlib_pmd_dequeue_burst;
+	dev->enqueue_burst = zlib_pmd_enqueue_burst;
+
 	dev->feature_flags = 0;
 	dev->feature_flags |= RTE_COMP_FF_SHAREABLE_PRIV_XFORM |
 				RTE_COMP_FF_NONCOMPRESSED_BLOCKS |