[RFC,v1,4/5] dts: add trex traffic generator to dts framework

Message ID 20250423194011.1447679-5-npratte@iol.unh.edu (mailing list archive)
State Superseded
Delegated to: Paul Szczepanek
Headers
Series Add TREX Traffic Generator to DTS Framework |

Checks

Context Check Description
ci/checkpatch warning coding style issues

Commit Message

Nicholas Pratte April 23, 2025, 7:40 p.m. UTC
Implement the TREX traffic generator for use in the DTS framework. The
provided implementation leverages TREX's stateless API automation
library, via use of a Python shell. As such, version control of TREX may
be needed. The DTS context has been modified to include a performance
traffic generator in addition to a functional traffic generator.

Bugzilla ID: 1697
Signed-off-by: Nicholas Pratte <npratte@iol.unh.edu>
---
 dts/framework/config/test_run.py              |  20 +-
 dts/framework/context.py                      |  11 +-
 dts/framework/test_run.py                     |  28 +-
 dts/framework/test_suite.py                   |   6 +-
 .../traffic_generator/__init__.py             |  22 +-
 .../testbed_model/traffic_generator/trex.py   | 287 ++++++++++++++++++
 6 files changed, 356 insertions(+), 18 deletions(-)
 create mode 100644 dts/framework/testbed_model/traffic_generator/trex.py
  

Comments

Patrick Robb May 15, 2025, 7:25 p.m. UTC | #1
On Wed, Apr 23, 2025 at 3:40 PM Nicholas Pratte <npratte@iol.unh.edu> wrote:

> Implement the TREX traffic generator for use in the DTS framework. The
> provided implementation leverages TREX's stateless API automation
> library, via use of a Python shell. As such, version control of TREX may
> be needed. The DTS context has been modified to include a performance
> traffic generator in addition to a functional traffic generator.
>

I think the statement is that only certain versions are confirmed to work
off of your implementation? I think usage of the term version control is
confusing since we are just talking about using particular versions of
TREX, and not using a "version control" tool like git. This is more about
listing approved versions of the dependency for the TG.

>
>
>          dpdk_build_env = DPDKBuildEnvironment(config.dpdk.build, sut_node)
>          dpdk_runtime_env = DPDKRuntimeEnvironment(config.dpdk, sut_node,
> dpdk_build_env)
> -        traffic_generator =
> create_traffic_generator(config.traffic_generator, tg_node)
> +        # There is definitely a better way to do this.
> +        tg_dpdk_runtime_env = None
> +        if (
> +            config.perf_traffic_generator.type ==
> TrafficGeneratorType.TREX
> +            or config.func_traffic_generator.type ==
> TrafficGeneratorType.TREX
> +        ):
>

We have this from the testrun config

perf: false # disable performance testing
func: true # enable functional testing

So, use if testrunconfig.perf == true?

>
> +
> +@dataclass
> +class TrexPerformanceStats(PerformanceTrafficStats):
> +    """Data structure to store performance statistics for a given test
> run.
> +
> +    Attributes:
> +        packet: The packet that was sent in the test run.
> +        frame_size: The total length of the frame. (L2 downward)
> +        tx_expected_bps: The expected bits per second on a given NIC.
> +        tx_expected_cps: ...
>

"The expected connections per second" ? I think you just missed this one.


> +
> +        Attributes:
> +            ports: Related ports utilized in TG instance.
> +        """
> +        super().setup(ports)
> +        # Start TREX server process.
> +        try:
> +            self._logger.info("Starting TREX server process: sending 45
> second sleep.")
> +            privileged_command = self._os_session._get_privileged_command(
> +                f"""
> +                    cd /opt/v3.03/; {self._tg_config.remote_path}/t-rex-64
> +                     --cfg {self._tg_config.config} -i
> +                """
>

I know this was just a work in progress for RFC with some hardcoding, but
you already have tg_config.remote_path, right? So, the leading hardcoded cd
does not do anything, and even if you had to cd there you could use the
tg_config.remote_path?

Reviewed-by: Patrick Robb <probb@iol.unh.edu>
  
Nicholas Pratte May 16, 2025, 7:45 p.m. UTC | #2
>> Implement the TREX traffic generator for use in the DTS framework. The
>> provided implementation leverages TREX's stateless API automation
>> library, via use of a Python shell. As such, version control of TREX may
>> be needed. The DTS context has been modified to include a performance
>> traffic generator in addition to a functional traffic generator.
>
>
> I think the statement is that only certain versions are confirmed to work off of your implementation? I think usage of the term version control is confusing since we are just talking about using particular versions of TREX, and not using a "version control" tool like git. This is more about listing approved versions of the dependency for the TG.

I can fix that!
>>
>>
>>
>>          dpdk_build_env = DPDKBuildEnvironment(config.dpdk.build, sut_node)
>>          dpdk_runtime_env = DPDKRuntimeEnvironment(config.dpdk, sut_node, dpdk_build_env)
>> -        traffic_generator = create_traffic_generator(config.traffic_generator, tg_node)
>> +        # There is definitely a better way to do this.
>> +        tg_dpdk_runtime_env = None
>> +        if (
>> +            config.perf_traffic_generator.type == TrafficGeneratorType.TREX
>> +            or config.func_traffic_generator.type == TrafficGeneratorType.TREX
>> +        ):
>
>
> We have this from the testrun config
>
> perf: false # disable performance testing
> func: true # enable functional testing
>
> So, use if testrunconfig.perf == true?

I see what you're saying, I can make that change for the time being.
My reason for explicitly checking traffic generators types was that
any future implementations may not need a DPDK runtime environment.

>>
>>
>> +
>> +@dataclass
>> +class TrexPerformanceStats(PerformanceTrafficStats):
>> +    """Data structure to store performance statistics for a given test run.
>> +
>> +    Attributes:
>> +        packet: The packet that was sent in the test run.
>> +        frame_size: The total length of the frame. (L2 downward)
>> +        tx_expected_bps: The expected bits per second on a given NIC.
>> +        tx_expected_cps: ...
>
>
> "The expected connections per second" ? I think you just missed this one.

This was ultimately removed from the final version, incidentally. But
the way this was implemented does allow us to check this data.

>
>>
>> +
>> +        Attributes:
>> +            ports: Related ports utilized in TG instance.
>> +        """
>> +        super().setup(ports)
>> +        # Start TREX server process.
>> +        try:
>> +            self._logger.info("Starting TREX server process: sending 45 second sleep.")
>> +            privileged_command = self._os_session._get_privileged_command(
>> +                f"""
>> +                    cd /opt/v3.03/; {self._tg_config.remote_path}/t-rex-64
>> +                     --cfg {self._tg_config.config} -i
>> +                """
>
>
> I know this was just a work in progress for RFC with some hardcoding, but you already have tg_config.remote_path, right? So, the leading hardcoded cd does not do anything, and even if you had to cd there you could use the tg_config.remote_path?

Yes, that was hard coded as a temporary measure. The cd is needed to
ensure that the TREX executable actually runs properly. I must have
just forgot to switch that back as I was working with haste.
  

Patch

diff --git a/dts/framework/config/test_run.py b/dts/framework/config/test_run.py
index 06fe28143c..5cdd391b49 100644
--- a/dts/framework/config/test_run.py
+++ b/dts/framework/config/test_run.py
@@ -393,6 +393,8 @@  class TrafficGeneratorType(str, Enum):
 
     #:
     SCAPY = "SCAPY"
+    #:
+    TREX = "TREX"
 
 
 class TrafficGeneratorConfig(FrozenModel):
@@ -401,6 +403,8 @@  class TrafficGeneratorConfig(FrozenModel):
     #: The traffic generator type the child class is required to define to be distinguished among
     #: others.
     type: TrafficGeneratorType
+    remote_path: PurePath
+    config: PurePath
 
 
 class ScapyTrafficGeneratorConfig(TrafficGeneratorConfig):
@@ -409,8 +413,16 @@  class ScapyTrafficGeneratorConfig(TrafficGeneratorConfig):
     type: Literal[TrafficGeneratorType.SCAPY]
 
 
+class TrexTrafficGeneratorConfig(TrafficGeneratorConfig):
+    """TREX traffic generator specific configuration."""
+
+    type: Literal[TrafficGeneratorType.TREX]
+
+
 #: A union type discriminating traffic generators by the `type` field.
-TrafficGeneratorConfigTypes = Annotated[ScapyTrafficGeneratorConfig, Field(discriminator="type")]
+TrafficGeneratorConfigTypes = Annotated[
+    TrexTrafficGeneratorConfig, ScapyTrafficGeneratorConfig, Field(discriminator="type")
+]
 
 #: Comma-separated list of logical cores to use. An empty string or ```any``` means use all lcores.
 LogicalCores = Annotated[
@@ -458,8 +470,10 @@  class TestRunConfiguration(FrozenModel):
 
     #: The DPDK configuration used to test.
     dpdk: DPDKConfiguration
-    #: The traffic generator configuration used to test.
-    traffic_generator: TrafficGeneratorConfigTypes
+    #: The traffic generator configuration used for functional tests.
+    func_traffic_generator: TrafficGeneratorConfig
+    #: The traffic generator configuration used for performance tests.
+    perf_traffic_generator: TrafficGeneratorConfig
     #: Whether to run performance tests.
     perf: bool
     #: Whether to run functional tests.
diff --git a/dts/framework/context.py b/dts/framework/context.py
index ddd7ed4d36..0a83e2b269 100644
--- a/dts/framework/context.py
+++ b/dts/framework/context.py
@@ -15,7 +15,12 @@ 
 
 if TYPE_CHECKING:
     from framework.remote_session.dpdk import DPDKBuildEnvironment, DPDKRuntimeEnvironment
-    from framework.testbed_model.traffic_generator.traffic_generator import TrafficGenerator
+    from framework.testbed_model.traffic_generator.capturing_traffic_generator import (
+        CapturingTrafficGenerator,
+    )
+    from framework.testbed_model.traffic_generator.performance_traffic_generator import (
+        PerformanceTrafficGenerator,
+    )
 
 P = ParamSpec("P")
 
@@ -68,7 +73,9 @@  class Context:
     topology: Topology
     dpdk_build: "DPDKBuildEnvironment"
     dpdk: "DPDKRuntimeEnvironment"
-    tg: "TrafficGenerator"
+    tg_dpdk: "DPDKRuntimeEnvironment" | None
+    func_tg: "CapturingTrafficGenerator"
+    perf_tg: "PerformanceTrafficGenerator"
     local: LocalContext = field(default_factory=LocalContext)
 
 
diff --git a/dts/framework/test_run.py b/dts/framework/test_run.py
index f9cfe5e908..81dc553b00 100644
--- a/dts/framework/test_run.py
+++ b/dts/framework/test_run.py
@@ -107,7 +107,7 @@ 
 from types import MethodType
 from typing import ClassVar, Protocol, Union
 
-from framework.config.test_run import TestRunConfiguration
+from framework.config.test_run import TestRunConfiguration, TrafficGeneratorType
 from framework.context import Context, init_ctx
 from framework.exception import (
     InternalError,
@@ -204,10 +204,25 @@  def __init__(
 
         dpdk_build_env = DPDKBuildEnvironment(config.dpdk.build, sut_node)
         dpdk_runtime_env = DPDKRuntimeEnvironment(config.dpdk, sut_node, dpdk_build_env)
-        traffic_generator = create_traffic_generator(config.traffic_generator, tg_node)
+        # There is definitely a better way to do this.
+        tg_dpdk_runtime_env = None
+        if (
+            config.perf_traffic_generator.type == TrafficGeneratorType.TREX
+            or config.func_traffic_generator.type == TrafficGeneratorType.TREX
+        ):
+            tg_dpdk_runtime_env = DPDKRuntimeEnvironment(config.dpdk, tg_node, None)
+        func_traffic_generator = create_traffic_generator(config.func_traffic_generator, tg_node)
+        perf_traffic_generator = create_traffic_generator(config.perf_traffic_generator, tg_node)
 
         self.ctx = Context(
-            sut_node, tg_node, topology, dpdk_build_env, dpdk_runtime_env, traffic_generator
+            sut_node,
+            tg_node,
+            topology,
+            dpdk_build_env,
+            dpdk_runtime_env,
+            tg_dpdk_runtime_env,
+            func_traffic_generator,
+            perf_traffic_generator,
         )
         self.result = result
         self.selected_tests = list(self.config.filter_tests(tests_config))
@@ -345,7 +360,9 @@  def next(self) -> State | None:
         test_run.ctx.sut_node.setup()
         test_run.ctx.tg_node.setup()
         test_run.ctx.dpdk.setup(test_run.ctx.topology.sut_ports)
-        test_run.ctx.tg.setup(test_run.ctx.topology.tg_ports)
+        test_run.ctx.tg_dpdk.setup(test_run.ctx.topology.tg_ports)
+        test_run.ctx.func_tg.setup(test_run.ctx.topology.tg_ports)
+        test_run.ctx.perf_tg.setup(test_run.ctx.topology.tg_ports)
 
         self.result.ports = test_run.ctx.topology.sut_ports + test_run.ctx.topology.tg_ports
         self.result.sut_info = test_run.ctx.sut_node.node_info
@@ -430,7 +447,8 @@  def description(self) -> str:
 
     def next(self) -> State | None:
         """Next state."""
-        self.test_run.ctx.tg.teardown(self.test_run.ctx.topology.tg_ports)
+        self.test_run.ctx.func_tg.teardown(self.test_run.ctx.topology.tg_ports)
+        self.test_run.ctx.perf_tg.teardown(self.test_run.ctx.topology.tg_ports)
         self.test_run.ctx.dpdk.teardown(self.test_run.ctx.topology.sut_ports)
         self.test_run.ctx.tg_node.teardown()
         self.test_run.ctx.sut_node.teardown()
diff --git a/dts/framework/test_suite.py b/dts/framework/test_suite.py
index e07c327b77..507df508cb 100644
--- a/dts/framework/test_suite.py
+++ b/dts/framework/test_suite.py
@@ -254,11 +254,11 @@  def send_packets_and_capture(
             A list of received packets.
         """
         assert isinstance(
-            self._ctx.tg, CapturingTrafficGenerator
+            self._ctx.func_tg, CapturingTrafficGenerator
         ), "Cannot capture with a non-capturing traffic generator"
         # TODO: implement @requires for types of traffic generator
         packets = self._adjust_addresses(packets)
-        return self._ctx.tg.send_packets_and_capture(
+        return self._ctx.func_tg.send_packets_and_capture(
             packets,
             self._ctx.topology.tg_port_egress,
             self._ctx.topology.tg_port_ingress,
@@ -276,7 +276,7 @@  def send_packets(
             packets: Packets to send.
         """
         packets = self._adjust_addresses(packets)
-        self._ctx.tg.send_packets(packets, self._ctx.topology.tg_port_egress)
+        self._ctx.func_tg.send_packets(packets, self._ctx.topology.tg_port_egress)
 
     def get_expected_packets(
         self,
diff --git a/dts/framework/testbed_model/traffic_generator/__init__.py b/dts/framework/testbed_model/traffic_generator/__init__.py
index 2a259a6e6c..53125995cd 100644
--- a/dts/framework/testbed_model/traffic_generator/__init__.py
+++ b/dts/framework/testbed_model/traffic_generator/__init__.py
@@ -14,17 +14,27 @@ 
 and a capturing traffic generator is required.
 """
 
-from framework.config.test_run import ScapyTrafficGeneratorConfig, TrafficGeneratorConfig
+from framework.config.test_run import (
+    ScapyTrafficGeneratorConfig as ScapyTrafficGeneratorConfig,
+)
+from framework.config.test_run import (
+    TrafficGeneratorConfig,
+    TrafficGeneratorType,
+)
+from framework.config.test_run import (
+    TrexTrafficGeneratorConfig as TrexTrafficGeneratorConfig,
+)
 from framework.exception import ConfigurationError
 from framework.testbed_model.node import Node
 
-from .capturing_traffic_generator import CapturingTrafficGenerator
 from .scapy import ScapyTrafficGenerator
+from .traffic_generator import TrafficGenerator
+from .trex import TrexTrafficGenerator
 
 
 def create_traffic_generator(
     traffic_generator_config: TrafficGeneratorConfig, node: Node
-) -> CapturingTrafficGenerator:
+) -> TrafficGenerator:
     """The factory function for creating traffic generator objects from the test run configuration.
 
     Args:
@@ -37,8 +47,10 @@  def create_traffic_generator(
     Raises:
         ConfigurationError: If an unknown traffic generator has been setup.
     """
-    match traffic_generator_config:
-        case ScapyTrafficGeneratorConfig():
+    match traffic_generator_config.type:
+        case TrafficGeneratorType.SCAPY:
             return ScapyTrafficGenerator(node, traffic_generator_config, privileged=True)
+        case TrafficGeneratorType.TREX:
+            return TrexTrafficGenerator(node, traffic_generator_config, privileged=True)
         case _:
             raise ConfigurationError(f"Unknown traffic generator: {traffic_generator_config.type}")
diff --git a/dts/framework/testbed_model/traffic_generator/trex.py b/dts/framework/testbed_model/traffic_generator/trex.py
new file mode 100644
index 0000000000..0053174ede
--- /dev/null
+++ b/dts/framework/testbed_model/traffic_generator/trex.py
@@ -0,0 +1,287 @@ 
+"""Implementation for TREX performance traffic generator."""
+
+import time
+from dataclasses import dataclass
+from enum import Flag, auto
+from typing import Callable, ClassVar
+
+from invoke.runners import Promise
+from scapy.packet import Packet
+
+from framework.config.node import NodeConfiguration
+from framework.config.test_run import TrafficGeneratorConfig
+from framework.exception import SSHTimeoutError
+from framework.remote_session.python_shell import PythonShell
+from framework.remote_session.ssh_session import SSHSession
+from framework.testbed_model.linux_session import LinuxSession
+from framework.testbed_model.node import Node, create_session
+from framework.testbed_model.traffic_generator.performance_traffic_generator import (
+    PerformanceTrafficGenerator,
+    PerformanceTrafficStats,
+)
+
+
+@dataclass
+class TrexPerPortStats:
+    """Performance statistics on a per port basis.
+
+    Attributes:
+        opackets: Number of packets sent.
+        obytes: Number of egress bytes sent.
+        tx_bps: Maximum bits per second transmitted.
+        tx_pps: Number of transmitted packets sent.
+    """
+
+    opackets: float
+    obytes: float
+    tx_bps: float
+    tx_pps: float
+
+
+@dataclass
+class TrexPerformanceStats(PerformanceTrafficStats):
+    """Data structure to store performance statistics for a given test run.
+
+    Attributes:
+        packet: The packet that was sent in the test run.
+        frame_size: The total length of the frame. (L2 downward)
+        tx_expected_bps: The expected bits per second on a given NIC.
+        tx_expected_cps: ...
+        tx_expected_pps: The expected packets per second of a given NIC.
+        tx_pps: The recorded maximum packets per second of the tested NIC.
+        tx_cps: The recorded maximum cps of the tested NIC
+        tx_bps: The recorded maximum bits per second of the tested NIC.
+        obytes: Total bytes output during test run.
+        port_stats: A list of :class:`TrexPerPortStats` provided by TREX.
+    """
+
+    packet: Packet
+    frame_size: int
+
+    tx_expected_bps: float
+    tx_expected_cps: float
+    tx_expected_pps: float
+
+    tx_pps: float
+    tx_cps: float
+    tx_bps: float
+
+    obytes: float
+
+    port_stats: list[TrexPerPortStats] | None
+
+
+class TrexStatelessTXModes(Flag):
+    """Flags indicating TREX instance's current trasmission mode."""
+
+    CONTINUOUS = auto()
+    SINGLE_BURST = auto()
+    MULTI_BURST = auto()
+
+
+class TrexTrafficGenerator(PythonShell, PerformanceTrafficGenerator):
+    """TREX traffic generator.
+
+    This implementation leverages the stateless API library provided in the TREX installation.
+
+    Attributes:
+        stl_client_name: The name of the stateless client used in the stateless API.
+        packet_stream_name: The name of the stateless packet stream used in the stateless API.
+        timeout_duration: Internal timeout for connection to the TREX server.
+    """
+
+    _os_session: LinuxSession
+    _server_remote_session: SSHSession
+    _trex_server_process: Promise
+
+    _tg_config: TrafficGeneratorConfig
+    _node_config: NodeConfiguration
+
+    _python_indentation: ClassVar[str] = " " * 4
+
+    stl_client_name: ClassVar[str] = "client"
+    packet_stream_name: ClassVar[str] = "stream"
+
+    _streaming_mode: TrexStatelessTXModes = TrexStatelessTXModes.CONTINUOUS
+
+    timeout_duration: int
+
+    def __init__(
+        self, tg_node: Node, config: TrafficGeneratorConfig, timeout_duration: int = 5, **kwargs
+    ) -> None:
+        """Initialize the TREX server.
+
+        Initializes needed OS sessions for the creation of the TREX server process.
+
+        Attributes:
+            tg_node: TG node the TREX instance is operating on.
+            config: Traffic generator config provided for TREX instance.
+            timeout_duration: Internal timeout for connection to the TREX server.
+        """
+        super().__init__(node=tg_node, config=config, tg_node=tg_node, **kwargs)
+        self._node_config = tg_node.config
+        self._tg_config = config
+        self.timeout_duration = timeout_duration
+
+        # Create TREX server session.
+        self._tg_node._other_sessions.append(
+            create_session(self._tg_node.config, "TREX Server.", self._logger)
+        )
+        self._os_session = self._tg_node._other_sessions[0]
+        self._server_remote_session = self._os_session.remote_session
+
+    def setup(self, ports):
+        """Initialize and start a TREX server process.
+
+        Binds TG ports to vfio-pci and starts the trex process.
+
+        Attributes:
+            ports: Related ports utilized in TG instance.
+        """
+        super().setup(ports)
+        # Start TREX server process.
+        try:
+            self._logger.info("Starting TREX server process: sending 45 second sleep.")
+            privileged_command = self._os_session._get_privileged_command(
+                f"""
+                    cd /opt/v3.03/; {self._tg_config.remote_path}/t-rex-64
+                     --cfg {self._tg_config.config} -i
+                """
+            )
+            self._server_remote_session = self._server_remote_session._send_async_command(
+                privileged_command, timeout=None, env=None
+            )
+            time.sleep(45)
+        except SSHTimeoutError as e:
+            self._logger.exception("Failed to start TREX server process.", e)
+
+        # Start Python shell.
+        self.start_application()
+        self.send_command("import os")
+        # Parent directory: /opt/v3.03/automation/trex_control_plane/interactive
+        self.send_command(
+            f"os.chdir('{self._tg_config.remote_path}/automation/trex_control_plane/interactive')"
+        )
+
+        # Import stateless API components.
+        imports = [
+            "import trex",
+            "import trex.stl",
+            "import trex.stl.trex_stl_client",
+            "import trex.stl.trex_stl_streams",
+            "import trex.stl.trex_stl_packet_builder_scapy",
+            "from scapy.layers.l2 import Ether",
+            "from scapy.layers.inet import IP",
+            "from scapy.packet import Raw",
+        ]
+        self.send_command("\n".join(imports))
+
+        stateless_client = [
+            f"{self.stl_client_name} = trex.stl.trex_stl_client.STLClient(",
+            f"username='{self._node_config.user}',",
+            "server='127.0.0.1',",
+            f"sync_timeout={self.timeout_duration}",
+            ")",
+        ]
+        self.send_command(f"\n{self._python_indentation}".join(stateless_client))
+        self.send_command(f"{self.stl_client_name}.connect()")
+
+    def teardown(self, ports):
+        """Teardown the TREX server and stateless implementation.
+
+        close the TREX server process, and stop the Python shell.
+
+        Attributes:
+            ports: Associated ports used by the TREX instance.
+        """
+        super().teardown(ports)
+        self.send_command(f"{self.stl_client_name}.disconnect()")
+        self.close()
+        self._trex_server_process.join()
+
+    def _calculate_traffic_stats(
+        self, packet: Packet, duration: float, callback: Callable[[Packet, float], str]
+    ) -> PerformanceTrafficStats:
+        """Calculate the traffic statistics, using provided TG output.
+
+        Takes in the statistics output provided by the stateless API implementation, and collects
+        them into a performance statistics data structure.
+
+        Attributes:
+            packet: The packet being used for the performance test.
+            duration: The duration of the test.
+            callback: The callback function used to generate the traffic.
+        """
+        # Convert to a dictionary.
+        stats_output = eval(callback(packet, duration))
+        return TrexPerformanceStats(
+            len(packet),
+            packet,
+            stats_output.get("tx_expected_bps", "ERROR - DATA NOT FOUND"),
+            stats_output.get("tx_expected_cps", "ERROR - DATA NOT FOUND"),
+            stats_output.get("tx_expected_pps", "ERROR - DATA NOT FOUND"),
+            stats_output.get("tx_pps", "ERROR - DATA NOT FOUND"),
+            stats_output.get("tx_cps", "ERROR - DATA NOT FOUND"),
+            stats_output.get("tx_bps", "ERROR - DATA NOT FOUND"),
+            stats_output.get("obytes", "ERROR - DATA NOT FOUND"),
+            None,
+        )
+
+    def set_streaming_mode(self, streaming_mode: TrexStatelessTXModes) -> None:
+        """Set the streaming mode of the TREX instance."""
+        # Streaming modes are mutually exclusive.
+        self._streaming_mode = self._streaming_mode & streaming_mode
+
+    def _generate_traffic(self, packet: Packet, duration: float) -> str:
+        """Generate traffic using provided packet.
+
+        Uses the provided packet to generate traffic for the provided duration.
+
+        Attributes:
+            packet: The packet being used for the performance test.
+            duration: The duration of the test being performed.
+
+        Returns:
+            a string output of statistics provided by the traffic generator.
+        """
+        """Implementation for :method:`generate_traffic_and_stats`."""
+        streaming_mode = ""
+        if self._streaming_mode == TrexStatelessTXModes.CONTINUOUS:
+            streaming_mode = "STLTXCont"
+        elif self._streaming_mode == TrexStatelessTXModes.SINGLE_BURST:
+            streaming_mode = "STLTXSingleBurst"
+        elif self._streaming_mode == TrexStatelessTXModes.MULTI_BURST:
+            streaming_mode = "STLTXMultiBurst"
+
+        packet_stream = [
+            f"{self.packet_stream_name} = trex.stl.trex_stl_streams.STLStream(",
+            f"name='Test_{len(packet)}_bytes',",
+            f"packet=trex.stl.trex_stl_packet_builder_scapy.STLPktBuilder(pkt={packet.command()}),",
+            f"mode=trex.stl.trex_stl_streams.{streaming_mode}(),",
+            ")",
+        ]
+        self.send_command("\n".join(packet_stream))
+
+        # Prepare TREX console for next performance test.
+        procedure = [
+            f"{self.stl_client_name}.connect()",
+            f"{self.stl_client_name}.reset(ports = [0, 1])",
+            f"{self.stl_client_name}.add_streams({self.packet_stream_name}, ports=[0, 1])",
+            f"{self.stl_client_name}.clear_stats()",
+            ")",
+        ]
+        self.send_command("\n".join(procedure))
+
+        start_test = [
+            f"{self.stl_client_name}.start(ports=[0, 1], duration={duration})",
+            f"{self.stl_client_name}.wait_on_traffic(ports=[0, 1])",
+        ]
+        self.send_command("\n".join(start_test))
+        import time
+
+        time.sleep(duration + 1)
+
+        # Gather statistics output for parsing.
+        return self.send_command(
+            f"{self.stl_client_name}.get_stats(ports=[0, 1])", skip_first_line=True
+        )