[3/4] eventdev: add eth Tx adapter implementation

Message ID 1530859329-160189-3-git-send-email-nikhil.rao@intel.com (mailing list archive)
State Changes Requested, archived
Delegated to: Jerin Jacob
Headers
Series [1/4] eventdev: add eth Tx adapter APIs |

Checks

Context Check Description
ci/checkpatch warning coding style issues
ci/Intel-compilation fail Compilation issues

Commit Message

Rao, Nikhil July 6, 2018, 6:42 a.m. UTC
  This patch implements the Tx adapter APIs by invoking the
corresponding eventdev PMD callbacks and also provides
the common rte_service function based implementation when
the eventdev PMD support is absent.

Signed-off-by: Nikhil Rao <nikhil.rao@intel.com>
---
 config/rte_config.h                            |    1 +
 lib/librte_eventdev/rte_event_eth_tx_adapter.c | 1210 ++++++++++++++++++++++++
 config/common_base                             |    2 +-
 lib/librte_eventdev/Makefile                   |    2 +
 lib/librte_eventdev/meson.build                |    6 +-
 lib/librte_eventdev/rte_eventdev_version.map   |   13 +
 6 files changed, 1231 insertions(+), 3 deletions(-)
 create mode 100644 lib/librte_eventdev/rte_event_eth_tx_adapter.c
  

Patch

diff --git a/config/rte_config.h b/config/rte_config.h
index 0ba0ead..f60cc80 100644
--- a/config/rte_config.h
+++ b/config/rte_config.h
@@ -65,6 +65,7 @@ 
 #define RTE_EVENT_MAX_QUEUES_PER_DEV 64
 #define RTE_EVENT_TIMER_ADAPTER_NUM_MAX 32
 #define RTE_EVENT_CRYPTO_ADAPTER_MAX_INSTANCE 32
+#define RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE 32
 
 /* rawdev defines */
 #define RTE_RAWDEV_MAX_DEVS 10
diff --git a/lib/librte_eventdev/rte_event_eth_tx_adapter.c b/lib/librte_eventdev/rte_event_eth_tx_adapter.c
new file mode 100644
index 0000000..b802a13
--- /dev/null
+++ b/lib/librte_eventdev/rte_event_eth_tx_adapter.c
@@ -0,0 +1,1210 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2018 Intel Corporation.
+ */
+#include <rte_spinlock.h>
+#include <rte_service_component.h>
+#include <rte_ethdev.h>
+
+#include "rte_eventdev_pmd.h"
+#include "rte_event_eth_tx_adapter.h"
+
+#define TXA_BATCH_SIZE		32
+#define TXA_SERVICE_NAME_LEN	32
+#define TXA_MEM_NAME_LEN	32
+#define TXA_FLUSH_THRESHOLD	1024
+#define TXA_RETRY_CNT		100
+#define TXA_MAX_NB_TX		128
+
+enum txa_pmd_type {
+	/* No PMD in use */
+	TXA_PMD_NONE = 0,
+	/* Event dev PMD */
+	TXA_PMD_EVENTDEV,
+	/* Service PMD */
+	TXA_PMD_SERVICE,
+};
+
+/* Tx retry callback structure */
+struct txa_retry {
+	/* Ethernet port id */
+	uint16_t port_id;
+	/* Tx queue */
+	uint16_t tx_queue;
+	/* Adapter ID */
+	uint8_t id;
+};
+
+/* Per queue structure */
+struct txa_service_queue_info {
+	/* Queue has been added */
+	uint8_t added;
+	/* Retry callback argument */
+	struct txa_retry txa_retry;
+	/* Tx buffer */
+	struct rte_eth_dev_tx_buffer *tx_buf;
+};
+
+/* PMD private structure */
+struct txa_service_data {
+	/* Event port ID */
+	uint8_t port_id;
+	/* Per adapter EAL service */
+	uint32_t service_id;
+	/* Adapter started */
+	int started;
+	/* Lock to serialize config updates with service function */
+	rte_spinlock_t tx_lock;
+	/* stats */
+	struct rte_event_eth_tx_adapter_stats stats;
+	/* Loop count to flush Tx buffers */
+	int loop_cnt;
+};
+
+struct txa_dev_ops {
+	event_tx_adapter_enqueue enqueue;
+	eventdev_eth_tx_adapter_queue_add_t queue_add;
+	eventdev_eth_tx_adapter_queue_del_t queue_del;
+	eventdev_eth_tx_adapter_stats_get_t stats_get;
+	eventdev_eth_tx_adapter_stats_reset_t stats_reset;
+	eventdev_eth_tx_adapter_init_t init;
+	eventdev_eth_tx_adapter_free_t free;
+	eventdev_eth_tx_adapter_start_t start;
+	eventdev_eth_tx_adapter_stop_t stop;
+	eventdev_eth_tx_adapter_event_port_get event_port_get;
+};
+
+/* Library private structure */
+struct txa_internal {
+	/* Tx adapter PMD type */
+	enum txa_pmd_type pmd_type;
+	/* Conf arg must be freed */
+	uint8_t conf_free;
+	/* Original dev ops from event device */
+	struct txa_dev_ops dev_ops;
+};
+
+#define txa_evdev(t) (&rte_eventdevs[(t)->eventdev_id])
+
+#define txa_internal(t) (txa_internals[(t)->id])
+
+#define txa_caps_get(t) txa_evdev(t)->dev_ops->eth_tx_adapter_caps_get
+
+#define txa_enqueue(t) txa_evdev(t)->txa_enqueue
+
+#define txa_event_port_get(t) \
+			txa_evdev(t)->dev_ops->eth_tx_adapter_event_port_get
+
+#define txa_pmd_free(t) txa_evdev(t)->dev_ops->eth_tx_adapter_free
+
+#define txa_pmd_init_func(t) txa_evdev(t)->dev_ops->eth_tx_adapter_init
+
+#define txa_pmd_none(t) (txa_internal(t)->pmd_type == TXA_PMD_NONE)
+
+#define txa_pmd_service(t) (txa_internal(t)->pmd_type == TXA_PMD_SERVICE)
+
+#define txa_queue_add(t) txa_evdev(t)->dev_ops->eth_tx_adapter_queue_add
+
+#define txa_queue_del(t) txa_evdev(t)->dev_ops->eth_tx_adapter_queue_del
+
+#define txa_start(t) txa_evdev(t)->dev_ops->eth_tx_adapter_start
+
+#define txa_stats_reset(t) txa_evdev(t)->dev_ops->eth_tx_adapter_stats_reset
+
+#define txa_stats_get(t) txa_evdev(t)->dev_ops->eth_tx_adapter_stats_get
+
+#define txa_stop(t) txa_evdev(t)->dev_ops->eth_tx_adapter_stop
+
+struct rte_event_eth_tx_adapters rte_event_eth_tx_adapters;
+static struct txa_internal **txa_internals;
+
+static inline struct txa_service_queue_info *
+txa_service_queue(struct rte_event_eth_tx_adapter *txa, uint16_t port_id,
+		uint16_t tx_queue_id)
+{
+	struct txa_service_queue_info *tqi;
+
+	tqi = txa->txa_ethdev[port_id].queues;
+
+	return tqi != NULL ? tqi + tx_queue_id : NULL;
+}
+
+static inline int
+txa_valid_id(uint8_t id)
+{
+	return id < RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE;
+}
+
+#define RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) \
+do { \
+	if (!txa_valid_id(id)) { \
+		RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d", id); \
+		return retval; \
+	} \
+} while (0)
+
+/* Private definition of tx queue identifier within mbuf */
+struct txa_mbuf_txq_id {
+	uint32_t resvd1;
+	uint16_t resvd2;
+	uint16_t txq_id;
+};
+
+#define TXA_QID_READ(m)						\
+({								\
+	const struct txa_mbuf_txq_id *txa_qid;			\
+	txa_qid = (struct txa_mbuf_txq_id *)(&(m)->hash);	\
+	txa_qid->txq_id;					\
+})
+
+#define TXA_QID_WRITE(m, qid)					\
+({								\
+	struct txa_mbuf_txq_id *txa_qid;			\
+	txa_qid = (struct txa_mbuf_txq_id *)(&(m)->hash);	\
+	txa_qid->txq_id	= qid;					\
+})
+
+static int
+txa_service_get_service_id(struct rte_event_eth_tx_adapter *txa,
+			uint32_t *service_id);
+static int
+txa_service_event_port_get(struct rte_event_eth_tx_adapter *txa, uint8_t *port);
+
+static uint16_t
+txa_service_enqueue(void *adapter,
+		const struct rte_eventdev *dev, void *port,
+		struct rte_event ev[], uint16_t nb_events);
+
+static int
+txa_service_pmd_init(struct rte_event_eth_tx_adapter *txa);
+
+static int
+txa_service_pmd_free(struct rte_event_eth_tx_adapter *txa);
+
+static int
+txa_service_queue_add(struct rte_event_eth_tx_adapter *txa,
+		const struct rte_eth_dev *dev,
+		int32_t tx_queue_id);
+static int
+txa_service_queue_del(struct rte_event_eth_tx_adapter *txa,
+		const struct rte_eth_dev *dev,
+		int32_t tx_queue_id);
+
+static int
+txa_service_start(struct rte_event_eth_tx_adapter *txa);
+
+static int
+txa_service_stats_get(struct rte_event_eth_tx_adapter *txa,
+		struct rte_event_eth_tx_adapter_stats *stats);
+
+static int
+txa_service_stats_reset(struct rte_event_eth_tx_adapter *txa);
+
+static int
+txa_service_stop(struct rte_event_eth_tx_adapter *txa);
+
+static struct rte_event_eth_tx_adapter **
+txa_adapter_init(void)
+{
+	const char *name = "rte_event_eth_tx_adapter_array";
+	const struct rte_memzone *mz;
+	unsigned int sz;
+
+	sz = sizeof(void *) *
+	    RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE;
+	sz = RTE_ALIGN(2 * sz, RTE_CACHE_LINE_SIZE);
+
+	mz = rte_memzone_lookup(name);
+	if (mz == NULL) {
+		mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
+						 RTE_CACHE_LINE_SIZE);
+		if (mz == NULL) {
+			RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
+					PRId32, rte_errno);
+			return NULL;
+		}
+	}
+
+	return  mz->addr;
+}
+
+static inline struct rte_event_eth_tx_adapter *
+txa_id_to_adapter(uint8_t id)
+{
+	struct rte_event_eth_tx_adapter **p;
+
+	p = rte_event_eth_tx_adapters.data;
+	if (!p) {
+		int n = RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE;
+		p = rte_event_eth_tx_adapters.data = txa_adapter_init();
+		txa_internals = (struct txa_internal **)(p + n);
+	}
+	return p ? p[id] : NULL;
+}
+
+static void
+txa_save_ops(struct rte_event_eth_tx_adapter *txa)
+{
+	struct txa_dev_ops  *ops;
+
+	ops = &txa_internal(txa)->dev_ops;
+
+	ops->enqueue = txa_enqueue(txa);
+	ops->queue_add = txa_queue_add(txa);
+	ops->queue_del = txa_queue_del(txa);
+	ops->stats_get = txa_stats_get(txa);
+	ops->stats_reset = txa_stats_reset(txa);
+	ops->init = txa_pmd_init_func(txa);
+	ops->free = txa_pmd_free(txa);
+	ops->start = txa_start(txa);
+	ops->stop = txa_stop(txa);
+	ops->event_port_get = txa_event_port_get(txa);
+}
+
+static void
+txa_restore_ops(struct rte_event_eth_tx_adapter *txa)
+{
+	struct txa_dev_ops  *ops;
+
+	ops = &txa_internal(txa)->dev_ops;
+
+	txa_enqueue(txa) = ops->enqueue;
+	txa_queue_add(txa) = ops->queue_add;
+	txa_queue_del(txa) = ops->queue_del;
+	txa_stats_get(txa) = ops->stats_get;
+	txa_stats_reset(txa) = ops->stats_reset;
+	txa_pmd_init_func(txa) = ops->init;
+	txa_pmd_free(txa) = ops->free;
+	txa_start(txa) = ops->start;
+	txa_stop(txa) = ops->stop;
+	txa_event_port_get(txa) = ops->event_port_get;
+}
+
+static int
+txa_default_conf_cb(uint8_t id, uint8_t dev_id,
+		struct rte_event_eth_tx_adapter_conf *conf, void *arg)
+{
+	int ret;
+	struct rte_eventdev *dev;
+	struct rte_event_port_conf *pc;
+	struct rte_event_eth_tx_adapter *txa;
+	struct rte_event_dev_config dev_conf;
+	int started;
+	uint8_t port_id;
+
+	pc = arg;
+	txa = txa_id_to_adapter(id);
+	dev = txa_evdev(txa);
+	dev_conf = dev->data->dev_conf;
+
+	started = dev->data->dev_started;
+	if (started)
+		rte_event_dev_stop(dev_id);
+
+	port_id = dev_conf.nb_event_ports;
+	dev_conf.nb_event_ports += 1;
+
+	ret = rte_event_dev_configure(dev_id, &dev_conf);
+	if (ret) {
+		RTE_EDEV_LOG_ERR("failed to configure event dev %u",
+						dev_id);
+		if (started) {
+			if (rte_event_dev_start(dev_id))
+				return -EIO;
+		}
+		return ret;
+	}
+
+	ret = rte_event_port_setup(dev_id, port_id, pc);
+	if (ret) {
+		RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
+					port_id);
+		if (started) {
+			if (rte_event_dev_start(dev_id))
+				return -EIO;
+		}
+		return ret;
+	}
+
+	conf->event_port_id = port_id;
+	conf->max_nb_tx = TXA_MAX_NB_TX;
+	if (started)
+		ret = rte_event_dev_start(dev_id);
+	return ret;
+}
+
+static int
+txa_ethdev_ok(struct rte_event_eth_tx_adapter *txa, uint16_t eth_dev_id)
+{
+	return eth_dev_id < txa->dev_count;
+}
+
+static int
+txa_service_queue_array_alloc(struct rte_event_eth_tx_adapter *txa,
+			uint16_t port_id)
+
+{
+	struct txa_service_queue_info *tqi;
+	uint16_t nb_queue;
+
+	if (txa->txa_ethdev[port_id].queues)
+		return 0;
+
+	nb_queue = txa->txa_ethdev[port_id].dev->data->nb_tx_queues;
+	tqi = rte_zmalloc_socket(txa->mem_name,
+				nb_queue *
+				sizeof(struct txa_service_queue_info), 0,
+				txa->socket_id);
+	if (tqi == NULL)
+		return -ENOMEM;
+	txa->txa_ethdev[port_id].queues = tqi;
+	return 0;
+}
+
+static void
+txa_service_queue_array_free(struct rte_event_eth_tx_adapter *txa,
+			uint16_t port_id)
+{
+	struct rte_event_eth_tx_adapter_ethdev *txa_ethdev;
+	struct txa_service_queue_info *tqi;
+
+	txa_ethdev = &txa->txa_ethdev[port_id];
+	if (txa->txa_ethdev == NULL || txa_ethdev->nb_queues != 0)
+		return;
+
+	tqi = txa_ethdev->queues;
+	txa_ethdev->queues = NULL;
+	rte_free(tqi);
+}
+
+static int
+txa_cap_int_port(struct rte_event_eth_tx_adapter *txa)
+{
+	uint32_t caps = 0;
+
+	if (txa_caps_get(txa))
+		(txa_caps_get(txa))(txa_evdev(txa), &caps);
+	return !!(caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT);
+}
+
+static int
+txa_init(struct rte_event_eth_tx_adapter *txa)
+{
+	int ret;
+	int txa_service;
+	uint16_t i;
+	struct rte_event_eth_tx_adapter_ethdev *txa_ethdev;
+
+	if (txa->txa_ethdev)
+		return 0;
+
+	txa_save_ops(txa);
+	txa_service = 0;
+
+	txa_ethdev = rte_zmalloc_socket(txa->mem_name,
+					txa->dev_count *
+					sizeof(*txa_ethdev), 0,
+					txa->socket_id);
+	if (txa_ethdev == NULL) {
+		RTE_EDEV_LOG_ERR("Failed to alloc txa::txa_ethdev ");
+		return -ENOMEM;
+	}
+
+	RTE_ETH_FOREACH_DEV(i) {
+		if (i == txa->dev_count)
+			break;
+		txa_ethdev[i].dev = &rte_eth_devices[i];
+	}
+
+	if (!txa_cap_int_port(txa)) {
+		txa_pmd_init_func(txa) = txa_service_pmd_init;
+		txa_pmd_free(txa) = txa_service_pmd_free;
+		txa_queue_add(txa) = txa_service_queue_add;
+		txa_queue_del(txa) = txa_service_queue_del;
+		txa_enqueue(txa) = txa_service_enqueue;
+		txa_start(txa) = txa_service_start;
+		txa_stop(txa) = txa_service_stop;
+		txa_stats_get(txa) = txa_service_stats_get;
+		txa_stats_reset(txa) = txa_service_stats_reset;
+		txa_event_port_get(txa) = txa_service_event_port_get;
+		txa_service = 1;
+	}
+
+	ret = (txa_pmd_init_func(txa)) ?
+			txa_pmd_init_func(txa)(txa)
+			: 0;
+
+	txa_internal(txa)->pmd_type = TXA_PMD_NONE;
+	if (ret == 0) {
+		txa_internal(txa)->pmd_type = txa_service ?
+					TXA_PMD_SERVICE :
+					TXA_PMD_EVENTDEV;
+		txa->txa_ethdev = txa_ethdev;
+	} else {
+		rte_free(txa_ethdev);
+	}
+
+	return ret;
+}
+
+static void
+txa_service_tx(struct rte_event_eth_tx_adapter *txa, struct rte_event *ev,
+	uint32_t n)
+{
+	uint32_t i;
+	uint16_t nb_tx;
+	struct txa_service_data *data;
+	struct rte_event_eth_tx_adapter_ethdev *tdi;
+	struct rte_event_eth_tx_adapter_stats *stats;
+
+	tdi = txa->txa_ethdev;
+	data = txa->dev_private;
+	stats = &data->stats;
+
+	nb_tx = 0;
+	for (i = 0; i < n; i++) {
+		struct rte_mbuf *m;
+		uint16_t port;
+		uint16_t queue;
+		struct txa_service_queue_info *tqi;
+
+		m = ev[i].mbuf;
+		port = m->port;
+		queue = TXA_QID_READ(m);
+
+		tqi = txa_service_queue(txa, port, queue);
+		if (unlikely(tdi == NULL ||
+			tdi[port].nb_queues == 0 || !tqi->added)) {
+			rte_pktmbuf_free(m);
+			continue;
+		}
+
+		nb_tx += rte_eth_tx_buffer(port, queue, tqi->tx_buf, m);
+	}
+
+	stats->tx_packets += nb_tx;
+}
+
+static int32_t
+txa_service_func(void *args)
+{
+	struct rte_event_eth_tx_adapter *txa = args;
+	uint8_t dev_id;
+	uint8_t port;
+	uint16_t n;
+	uint32_t nb_tx, max_nb_tx;
+	struct rte_event ev[TXA_BATCH_SIZE];
+	struct txa_service_data *data;
+
+	dev_id = txa->eventdev_id;
+	max_nb_tx = txa->max_nb_tx;
+	data = txa->dev_private;
+	port = data->port_id;
+
+	if (!rte_spinlock_trylock(&txa->tx_lock))
+		return 0;
+
+	for (nb_tx = 0; nb_tx < max_nb_tx; nb_tx += n) {
+
+		n = rte_event_dequeue_burst(dev_id, port, ev, RTE_DIM(ev), 0);
+		if (!n)
+			break;
+		txa_service_tx(txa, ev, n);
+	}
+
+	if ((data->loop_cnt++ & (TXA_FLUSH_THRESHOLD - 1)) == 0) {
+
+		struct rte_event_eth_tx_adapter_ethdev *tdi;
+		struct txa_service_queue_info *tqi;
+		struct rte_eth_dev *dev;
+		uint16_t i;
+
+		tdi = txa->txa_ethdev;
+		nb_tx = 0;
+
+		RTE_ETH_FOREACH_DEV(i) {
+			uint16_t q;
+
+			if (i == txa->dev_count)
+				break;
+
+			dev = tdi[i].dev;
+			if (tdi[i].nb_queues == 0)
+				continue;
+			for (q = 0; q < dev->data->nb_tx_queues; q++) {
+
+				tqi = txa_service_queue(txa, i, q);
+				if (!tqi->added)
+					continue;
+
+				nb_tx += rte_eth_tx_buffer_flush(i, q,
+							tqi->tx_buf);
+			}
+		}
+
+		data->stats.tx_packets += nb_tx;
+	}
+	rte_spinlock_unlock(&txa->tx_lock);
+	return 0;
+}
+
+static int
+txa_service_init(struct rte_event_eth_tx_adapter *txa)
+{
+	int ret;
+	struct rte_service_spec service;
+	struct rte_event_eth_tx_adapter_conf conf;
+	struct txa_service_data *data;
+
+	data = txa->dev_private;
+
+	memset(&service, 0, sizeof(service));
+	snprintf(service.name, TXA_SERVICE_NAME_LEN,
+		"rte_event_eth_txa_%d", txa->id);
+	service.socket_id = txa->socket_id;
+	service.callback = txa_service_func;
+	service.callback_userdata = txa;
+	/* Service function handles locking for queue add/del updates */
+	service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
+	ret = rte_service_component_register(&service, &data->service_id);
+	if (ret) {
+		RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
+			service.name, ret);
+		return ret;
+	}
+
+	ret = txa->conf_cb(txa->id, txa->eventdev_id, &conf, txa->conf_arg);
+	if (ret) {
+		rte_service_component_unregister(data->service_id);
+		return ret;
+	}
+
+	data->port_id = conf.event_port_id;
+	txa->max_nb_tx = conf.max_nb_tx;
+	return 0;
+}
+
+static int
+txa_service_pmd_free(struct rte_event_eth_tx_adapter *txa)
+{
+	struct txa_service_data *data;
+
+	data = txa->dev_private;
+
+	if (txa->nb_queues != 0)
+		return 0;
+
+	if (txa_pmd_service(txa)) {
+		rte_service_component_runstate_set(data->service_id, 0);
+		while (rte_service_may_be_active(data->service_id))
+			rte_pause();
+		rte_service_component_unregister(data->service_id);
+	}
+
+	rte_free(txa->dev_private);
+	txa->dev_private = NULL;
+
+	return 0;
+}
+
+static int
+txa_service_pmd_init(struct rte_event_eth_tx_adapter *txa)
+{
+	int ret;
+	struct txa_service_data *data;
+
+	data = rte_zmalloc_socket(txa->mem_name,
+				sizeof(*data), 0,
+				txa->socket_id);
+	if (!data) {
+		RTE_EDEV_LOG_ERR("Failed to alloc PMD private data");
+		return -ENOMEM;
+	}
+
+	txa->dev_private = data;
+	ret = txa_service_init(txa);
+	if (ret) {
+		rte_free(data);
+		txa->dev_private = NULL;
+		return ret;
+	}
+
+	return 0;
+}
+
+static int
+txa_service_ctrl(struct rte_event_eth_tx_adapter *txa, int start)
+{
+	int ret;
+	struct txa_service_data *data = txa->dev_private;
+
+	ret = rte_service_runstate_set(data->service_id, start);
+	if (ret == 0 && !start) {
+		while (rte_service_may_be_active(data->service_id))
+			rte_pause();
+	}
+	return ret;
+}
+
+static int
+txa_service_start(struct rte_event_eth_tx_adapter *txa)
+{
+	return txa_service_ctrl(txa, 1);
+}
+
+static int
+txa_service_stop(struct rte_event_eth_tx_adapter *txa)
+{
+	return txa_service_ctrl(txa, 0);
+}
+
+static int
+txa_service_event_port_get(struct rte_event_eth_tx_adapter *txa, uint8_t *port)
+{
+	struct txa_service_data *data = txa->dev_private;
+
+	*port = data->port_id;
+	return 0;
+}
+
+static void
+txa_service_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent,
+			void *userdata)
+{
+	struct txa_retry *tr;
+	struct txa_service_data *data;
+	struct rte_event_eth_tx_adapter_stats *stats;
+	uint16_t sent = 0;
+	unsigned int retry = 0;
+	uint16_t i, n;
+
+	tr = (struct txa_retry *)(uintptr_t)userdata;
+	data = txa_id_to_adapter(tr->id)->dev_private;
+	stats = &data->stats;
+
+	do {
+		n = rte_eth_tx_burst(tr->port_id, tr->tx_queue,
+			       &pkts[sent], unsent - sent);
+
+		sent += n;
+	} while (sent != unsent && retry++ < TXA_RETRY_CNT);
+
+	for (i = sent; i < unsent; i++)
+		rte_pktmbuf_free(pkts[i]);
+
+	stats->tx_retry += retry;
+	stats->tx_packets += sent;
+	stats->tx_dropped += unsent - sent;
+}
+
+static struct rte_eth_dev_tx_buffer *
+txa_service_tx_buf_alloc(struct rte_event_eth_tx_adapter *txa,
+			const struct rte_eth_dev *dev)
+{
+	struct rte_eth_dev_tx_buffer *tb;
+	uint16_t port_id;
+
+	port_id = dev->data->port_id;
+	tb = rte_zmalloc_socket(txa->mem_name,
+				RTE_ETH_TX_BUFFER_SIZE(TXA_BATCH_SIZE),
+				0,
+				rte_eth_dev_socket_id(port_id));
+	if (tb == NULL)
+		RTE_EDEV_LOG_ERR("Failed to allocate memory for tx buffer");
+	return tb;
+}
+
+static int
+txa_service_queue_del(struct rte_event_eth_tx_adapter *txa,
+		const struct rte_eth_dev *dev,
+		int32_t tx_queue_id)
+{
+	struct txa_service_queue_info *tqi;
+	struct rte_eth_dev_tx_buffer *tb;
+	uint16_t port_id;
+
+	if (tx_queue_id == -1) {
+		uint16_t i;
+		int ret;
+
+		for (i = 0; i < dev->data->nb_tx_queues; i++) {
+			ret = txa_service_queue_del(txa, dev, i);
+			if (ret != 0)
+				break;
+		}
+		return ret;
+	}
+
+	port_id = dev->data->port_id;
+	tqi = txa_service_queue(txa, port_id, tx_queue_id);
+	if (!tqi || !tqi->added)
+		return 0;
+
+	tb = tqi->tx_buf;
+
+	tqi->added = 0;
+	tqi->tx_buf = NULL;
+	rte_free(tb);
+	txa->nb_queues--;
+	txa->txa_ethdev[port_id].nb_queues--;
+
+	txa_service_queue_array_free(txa, port_id);
+	return 0;
+}
+
+static int
+txa_service_queue_added(struct rte_event_eth_tx_adapter *txa,
+			const struct rte_eth_dev *dev,
+			uint16_t tx_queue_id)
+{
+	struct txa_service_queue_info *tqi;
+
+	tqi = txa_service_queue(txa, dev->data->port_id, tx_queue_id);
+	return tqi && tqi->added;
+}
+
+static int
+txa_service_queue_add(struct rte_event_eth_tx_adapter *txa,
+		const struct rte_eth_dev *dev,
+		int32_t tx_queue_id)
+{
+	struct txa_service_data *data = txa->dev_private;
+	struct rte_event_eth_tx_adapter_ethdev *tdi;
+	struct txa_service_queue_info *tqi;
+	struct rte_eth_dev_tx_buffer *tb;
+	struct txa_retry *txa_retry;
+	int ret;
+
+	ret = txa_service_queue_array_alloc(txa, dev->data->port_id);
+	if (ret)
+		return ret;
+	tdi = &txa->txa_ethdev[dev->data->port_id];
+	if (tx_queue_id == -1) {
+		int nb_queues = dev->data->nb_tx_queues - tdi->nb_queues;
+		uint16_t i, j;
+		uint16_t *qdone;
+
+		qdone = rte_zmalloc(txa->mem_name,
+				nb_queues * sizeof(*qdone), 0);
+		j = 0;
+		for (i = 0; i < nb_queues; i++) {
+			if (txa_service_queue_added(txa, dev, i))
+				continue;
+			ret = txa_service_queue_add(txa, dev, i);
+			if (ret == 0)
+				qdone[j++] = i;
+			else
+				break;
+		}
+
+		if (i != nb_queues) {
+			for (i = 0; i < j; i++)
+				txa_service_queue_del(txa, dev, qdone[i]);
+		}
+		rte_free(qdone);
+		return ret;
+	}
+
+	if (txa_service_queue_added(txa, dev, tx_queue_id))
+		return 0;
+
+	tb = txa_service_tx_buf_alloc(txa, dev);
+	if (tb == NULL)
+		return -ENOMEM;
+
+	tqi = txa_service_queue(txa, dev->data->port_id, tx_queue_id);
+
+	txa_retry = &tqi->txa_retry;
+	txa_retry->id = txa->id;
+	txa_retry->port_id = dev->data->port_id;
+	txa_retry->tx_queue = tx_queue_id;
+
+	rte_eth_tx_buffer_init(tb, TXA_BATCH_SIZE);
+	rte_eth_tx_buffer_set_err_callback(tb,
+		txa_service_buffer_retry, txa_retry);
+
+	tqi->tx_buf = tb;
+	tqi->added = 1;
+	rte_service_component_runstate_set(data->service_id, 1);
+	tdi->nb_queues++;
+	txa->nb_queues++;
+	return 0;
+}
+
+static uint16_t
+txa_service_enqueue(void *adapter,
+		const struct rte_eventdev *dev, void *port,
+		struct rte_event ev[], uint16_t nb_events)
+{
+	RTE_SET_USED(adapter);
+	RTE_SET_USED(dev);
+	RTE_SET_USED(port);
+	RTE_SET_USED(ev);
+	RTE_SET_USED(nb_events);
+
+	RTE_EDEV_LOG_ERR("Service adapter does not support enqueue callback");
+	rte_errno = ENOTSUP;
+	return 0;
+}
+
+static int
+txa_service_stats_get(struct rte_event_eth_tx_adapter *txa,
+		struct rte_event_eth_tx_adapter_stats *stats)
+{
+	struct txa_service_data *data;
+
+	data = txa->dev_private;
+	*stats = data->stats;
+	return 0;
+}
+
+static int
+txa_service_get_service_id(struct rte_event_eth_tx_adapter *txa,
+			uint32_t *service_id)
+{
+	struct txa_service_data *data;
+
+	data = txa->dev_private;
+	if (data == NULL)
+		return -ESRCH;
+
+	*service_id = data->service_id;
+	return 0;
+}
+
+static int
+txa_service_stats_reset(struct rte_event_eth_tx_adapter *txa)
+{
+	struct txa_service_data *data;
+
+	data = txa->dev_private;
+
+	memset(&data->stats, 0, sizeof(data->stats));
+	return 0;
+}
+
+int __rte_experimental
+rte_event_eth_tx_adapter_create(uint8_t id, uint8_t dev_id,
+				struct rte_event_port_conf *port_conf)
+{
+	struct rte_event_port_conf *cb_conf;
+	struct rte_event_eth_tx_adapter *txa;
+	int ret;
+
+	if (port_conf == NULL)
+		return -EINVAL;
+
+	RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
+
+	txa = txa_id_to_adapter(id);
+	if (txa != NULL) {
+		RTE_EDEV_LOG_ERR("Eth Tx adapter exists id = %" PRIu8, id);
+		return -EEXIST;
+	}
+
+	cb_conf = rte_malloc(NULL, sizeof(*cb_conf), 0);
+	if (cb_conf == NULL)
+		return -ENOMEM;
+	*cb_conf = *port_conf;
+	ret = rte_event_eth_tx_adapter_create_ext(id, dev_id,
+						txa_default_conf_cb,
+						cb_conf);
+	if (ret) {
+		rte_free(cb_conf);
+		return ret;
+	}
+
+	txa = txa_id_to_adapter(id);
+	txa_internal(txa)->conf_free = 1;
+	return ret;
+}
+
+int __rte_experimental
+rte_event_eth_tx_adapter_create_ext(uint8_t id, uint8_t dev_id,
+				rte_event_eth_tx_adapter_conf_cb conf_cb,
+				void *conf_arg)
+{
+	struct rte_event_eth_tx_adapter *txa;
+	struct txa_internal *internal;
+	int socket_id;
+	char mem_name[TXA_SERVICE_NAME_LEN];
+
+	RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
+	RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
+	if (conf_cb == NULL)
+		return -EINVAL;
+
+	txa = txa_id_to_adapter(id);
+	if (txa != NULL) {
+		RTE_EDEV_LOG_ERR("Eth Tx adapter exists id = %" PRIu8, id);
+		return -EEXIST;
+	}
+
+	socket_id = rte_event_dev_socket_id(dev_id);
+	snprintf(mem_name, TXA_MEM_NAME_LEN,
+		"rte_event_eth_txa_%d",
+		id);
+
+	txa = rte_zmalloc_socket(mem_name,
+				sizeof(*txa),
+				RTE_CACHE_LINE_SIZE, socket_id);
+	if (txa == NULL) {
+		RTE_EDEV_LOG_ERR("failed to get mem for tx adapter");
+		return -ENOMEM;
+	}
+
+	internal = rte_zmalloc_socket(mem_name,
+				sizeof(*internal),
+				RTE_CACHE_LINE_SIZE, socket_id);
+	if (internal == NULL) {
+		RTE_EDEV_LOG_ERR("failed to get mem for tx adapter internal"
+			" data");
+		rte_free(txa);
+		return -ENOMEM;
+	}
+
+	txa->id = id;
+	txa->eventdev_id = dev_id;
+	txa->socket_id = socket_id;
+	strncpy(txa->mem_name, mem_name, TXA_SERVICE_NAME_LEN);
+	txa->conf_cb = conf_cb;
+	txa->conf_arg = conf_arg;
+	rte_spinlock_init(&txa->tx_lock);
+	rte_event_eth_tx_adapters.data[id] = txa;
+	txa_internals[id] = internal;
+	return 0;
+}
+
+int __rte_experimental
+rte_event_eth_tx_adapter_event_port_get(uint8_t id, uint8_t *event_port_id)
+{
+	struct rte_event_eth_tx_adapter *txa;
+
+	RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
+	txa = txa_id_to_adapter(id);
+	if (txa == NULL)
+		return -EINVAL;
+
+	return txa_event_port_get(txa) ?
+			txa_event_port_get(txa)(txa, event_port_id) :
+			-ENOTSUP;
+}
+
+int __rte_experimental
+rte_event_eth_tx_adapter_free(uint8_t id)
+{
+	struct rte_event_eth_tx_adapter *txa;
+	struct txa_internal *internal;
+
+	RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
+	txa = txa_id_to_adapter(id);
+	if (txa == NULL)
+		return -EINVAL;
+
+	if (txa->nb_queues) {
+		RTE_EDEV_LOG_ERR("%" PRIu16 " Tx queues not deleted",
+				txa->nb_queues);
+		return -EBUSY;
+	}
+
+	internal = txa_internal(txa);
+	txa_restore_ops(txa);
+	if (internal->conf_free)
+		rte_free(txa->conf_arg);
+	rte_free(txa);
+	rte_free(internal);
+	rte_event_eth_tx_adapters.data[id] = NULL;
+	txa_internals[id] = NULL;
+	return 0;
+}
+
+int __rte_experimental
+rte_event_eth_tx_adapter_queue_add(uint8_t id,
+				uint16_t eth_dev_id,
+				int32_t queue)
+{
+	struct rte_event_eth_tx_adapter *txa;
+	int ret;
+
+	RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
+	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
+
+	txa = txa_id_to_adapter(id);
+	if (txa == NULL)
+		return -EINVAL;
+
+	if (txa->dev_count == 0)
+		txa->dev_count = rte_eth_dev_count_total();
+
+	if (txa->dev_count == 0)
+		return -EINVAL;
+
+	if (!txa_ethdev_ok(txa, eth_dev_id)) {
+		RTE_EDEV_LOG_ERR("Hot plugged device is unsupported eth port %"
+				PRIu16, eth_dev_id);
+		return -ENOTSUP;
+	}
+
+	if (queue != -1 && (uint16_t)queue >=
+			rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
+		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
+				(uint16_t)queue);
+		return -EINVAL;
+	}
+
+	ret = txa_init(txa);
+	if (ret)
+		return ret;
+
+	rte_spinlock_lock(&txa->tx_lock);
+	ret =  txa_queue_add(txa) ?
+			txa_queue_add(txa)(txa,
+				&rte_eth_devices[eth_dev_id],
+				queue)
+			: 0;
+
+	if (txa->nb_queues == 0) {
+		txa_pmd_free(txa)(txa);
+		txa_internal(txa)->pmd_type = TXA_PMD_NONE;
+	}
+
+	rte_spinlock_unlock(&txa->tx_lock);
+	return ret;
+}
+
+int __rte_experimental
+rte_event_eth_tx_adapter_queue_del(uint8_t id,
+					uint16_t eth_dev_id,
+					int32_t queue)
+{
+	struct rte_event_eth_tx_adapter *txa;
+	int ret;
+
+	RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
+	RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
+
+	txa = txa_id_to_adapter(id);
+	if (txa == NULL)
+		return -EINVAL;
+
+	if (queue != -1 && (uint16_t)queue >=
+			rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
+		RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
+				(uint16_t)queue);
+		return -EINVAL;
+	}
+
+	if (txa_pmd_none(txa))
+		return 0;
+
+	rte_spinlock_lock(&txa->tx_lock);
+
+	ret =  txa_queue_del(txa) ?
+			txa_queue_del(txa)(txa,
+					&rte_eth_devices[eth_dev_id],
+					queue)
+					: 0;
+
+	if (ret != 0)
+		return ret;
+
+	if (txa->nb_queues == 0) {
+		txa_pmd_free(txa)(txa);
+		txa_internal(txa)->pmd_type = TXA_PMD_NONE;
+	}
+
+	rte_spinlock_unlock(&txa->tx_lock);
+	return 0;
+}
+
+int __rte_experimental
+rte_event_eth_tx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
+{
+	struct rte_event_eth_tx_adapter *txa;
+
+	RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
+
+	txa = txa_id_to_adapter(id);
+	if (txa == NULL || service_id == NULL)
+		return -EINVAL;
+
+	return txa_pmd_service(txa) ?
+				txa_service_get_service_id(txa, service_id) :
+				-ESRCH;
+}
+
+int __rte_experimental
+rte_event_eth_tx_adapter_start(uint8_t id)
+{
+	struct rte_event_eth_tx_adapter *txa;
+
+	RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
+	txa = txa_id_to_adapter(id);
+	if (txa == NULL)
+		return -EINVAL;
+
+	if (txa_pmd_none(txa))
+		return 0;
+
+	return (txa_start(txa)) ? txa_start(txa)(txa) : 0;
+}
+
+int __rte_experimental
+rte_event_eth_tx_adapter_stats_get(uint8_t id,
+				struct rte_event_eth_tx_adapter_stats *stats)
+{
+	struct rte_event_eth_tx_adapter *txa;
+
+	RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
+
+	txa = txa_id_to_adapter(id);
+	if (txa == NULL || stats == NULL)
+		return -EINVAL;
+
+	if (txa_pmd_none(txa)) {
+		memset(stats, 0, sizeof(*stats));
+		return 0;
+	}
+
+	return txa_stats_get(txa) ?
+		txa_stats_get(txa)(txa, stats) :
+		-ENOTSUP;
+}
+
+int __rte_experimental
+rte_event_eth_tx_adapter_stats_reset(uint8_t id)
+{
+	struct rte_event_eth_tx_adapter *txa;
+
+	RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
+
+	txa = txa_id_to_adapter(id);
+	if (txa == NULL)
+		return -EINVAL;
+
+	if (txa_pmd_none(txa))
+		return 0;
+
+	return txa_stats_reset(txa) ?
+		txa_stats_reset(txa)(txa) : -ENOTSUP;
+}
+
+int rte_event_eth_tx_adapter_stop(uint8_t id)
+{
+	struct rte_event_eth_tx_adapter *txa;
+
+	RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
+	txa = txa_id_to_adapter(id);
+	if (txa == NULL)
+		return -EINVAL;
+
+	if (txa_pmd_none(txa))
+		return 0;
+
+	return (txa_stop(txa)) ? txa_stop(txa)(txa) : 0;
+}
+
+void
+rte_event_eth_tx_adapter_txq_set(struct rte_mbuf *pkt, uint16_t txq_id)
+{
+	TXA_QID_WRITE(pkt, txq_id);
+}
diff --git a/config/common_base b/config/common_base
index 721e59b..ea5b06f 100644
--- a/config/common_base
+++ b/config/common_base
@@ -593,7 +593,7 @@  CONFIG_RTE_EVENT_MAX_DEVS=16
 CONFIG_RTE_EVENT_MAX_QUEUES_PER_DEV=64
 CONFIG_RTE_EVENT_TIMER_ADAPTER_NUM_MAX=32
 CONFIG_RTE_EVENT_CRYPTO_ADAPTER_MAX_INSTANCE=32
-
+CONFIG_RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE=32
 #
 # Compile PMD for skeleton event device
 #
diff --git a/lib/librte_eventdev/Makefile b/lib/librte_eventdev/Makefile
index b3e2546..0c077f6 100644
--- a/lib/librte_eventdev/Makefile
+++ b/lib/librte_eventdev/Makefile
@@ -23,6 +23,7 @@  SRCS-y += rte_event_ring.c
 SRCS-y += rte_event_eth_rx_adapter.c
 SRCS-y += rte_event_timer_adapter.c
 SRCS-y += rte_event_crypto_adapter.c
+SRCS-y += rte_event_eth_tx_adapter.c
 
 # export include files
 SYMLINK-y-include += rte_eventdev.h
@@ -34,6 +35,7 @@  SYMLINK-y-include += rte_event_eth_rx_adapter.h
 SYMLINK-y-include += rte_event_timer_adapter.h
 SYMLINK-y-include += rte_event_timer_adapter_pmd.h
 SYMLINK-y-include += rte_event_crypto_adapter.h
+SYMLINK-y-include += rte_event_eth_tx_adapter.h
 
 # versioning export map
 EXPORT_MAP := rte_eventdev_version.map
diff --git a/lib/librte_eventdev/meson.build b/lib/librte_eventdev/meson.build
index bd138bd..d885743 100644
--- a/lib/librte_eventdev/meson.build
+++ b/lib/librte_eventdev/meson.build
@@ -7,7 +7,8 @@  sources = files('rte_eventdev.c',
 		'rte_event_ring.c',
 		'rte_event_eth_rx_adapter.c',
 		'rte_event_timer_adapter.c',
-		'rte_event_crypto_adapter.c')
+		'rte_event_crypto_adapter.c',
+		'rte_event_eth_tx_adapter.c')
 headers = files('rte_eventdev.h',
 		'rte_eventdev_pmd.h',
 		'rte_eventdev_pmd_pci.h',
@@ -16,5 +17,6 @@  headers = files('rte_eventdev.h',
 		'rte_event_eth_rx_adapter.h',
 		'rte_event_timer_adapter.h',
 		'rte_event_timer_adapter_pmd.h',
-		'rte_event_crypto_adapter.h')
+		'rte_event_crypto_adapter.h'
+		'rte_event_eth_tx_adapter.h')
 deps += ['ring', 'ethdev', 'hash', 'mempool', 'mbuf', 'timer', 'cryptodev']
diff --git a/lib/librte_eventdev/rte_eventdev_version.map b/lib/librte_eventdev/rte_eventdev_version.map
index c3f18d6..8284c7c 100644
--- a/lib/librte_eventdev/rte_eventdev_version.map
+++ b/lib/librte_eventdev/rte_eventdev_version.map
@@ -109,4 +109,17 @@  EXPERIMENTAL {
 	rte_event_crypto_adapter_stats_get;
 	rte_event_crypto_adapter_stats_reset;
 	rte_event_crypto_adapter_stop;
+	rte_event_eth_tx_adapter_caps_get;
+	rte_event_eth_tx_adapter_create;
+	rte_event_eth_tx_adapter_create_ext;
+	rte_event_eth_tx_adapter_event_port_get;
+	rte_event_eth_tx_adapter_free;
+	rte_event_eth_tx_adapter_queue_add;
+	rte_event_eth_tx_adapter_queue_del;
+	rte_event_eth_tx_adapter_service_id_get;
+	rte_event_eth_tx_adapter_start;
+	rte_event_eth_tx_adapter_stats_get;
+	rte_event_eth_tx_adapter_stats_reset;
+	rte_event_eth_tx_adapter_stop;
+	rte_event_eth_tx_adapter_txq_set;
 };