[v3,3/6] dts: traffic generator abstractions

Message ID 20230719141303.33284-4-juraj.linkes@pantheon.tech (mailing list archive)
State Accepted, archived
Delegated to: Thomas Monjalon
Headers
Series dts: tg abstractions and scapy tg |

Checks

Context Check Description
ci/checkpatch success coding style OK

Commit Message

Juraj Linkeš July 19, 2023, 2:13 p.m. UTC
  There are traffic abstractions for all traffic generators and for
traffic generators that can capture (not just count) packets.

There also related abstractions, such as TGNode where the traffic
generators reside and some related code.

Signed-off-by: Juraj Linkeš <juraj.linkes@pantheon.tech>
---
 doc/guides/tools/dts.rst                      |  31 ++++
 dts/framework/dts.py                          |  61 ++++----
 dts/framework/remote_session/linux_session.py |  78 ++++++++++
 dts/framework/remote_session/os_session.py    |  15 ++
 dts/framework/test_suite.py                   |   4 +-
 dts/framework/testbed_model/__init__.py       |   1 +
 .../capturing_traffic_generator.py            | 136 ++++++++++++++++++
 dts/framework/testbed_model/hw/port.py        |  60 ++++++++
 dts/framework/testbed_model/node.py           |  20 ++-
 dts/framework/testbed_model/scapy.py          |  74 ++++++++++
 dts/framework/testbed_model/sut_node.py       |   1 +
 dts/framework/testbed_model/tg_node.py        |  99 +++++++++++++
 .../testbed_model/traffic_generator.py        |  72 ++++++++++
 dts/framework/utils.py                        |  13 ++
 14 files changed, 637 insertions(+), 28 deletions(-)
 create mode 100644 dts/framework/testbed_model/capturing_traffic_generator.py
 create mode 100644 dts/framework/testbed_model/hw/port.py
 create mode 100644 dts/framework/testbed_model/scapy.py
 create mode 100644 dts/framework/testbed_model/tg_node.py
 create mode 100644 dts/framework/testbed_model/traffic_generator.py
  

Patch

diff --git a/doc/guides/tools/dts.rst b/doc/guides/tools/dts.rst
index c7b31623e4..2f97d1df6e 100644
--- a/doc/guides/tools/dts.rst
+++ b/doc/guides/tools/dts.rst
@@ -153,6 +153,37 @@  There are two areas that need to be set up on a System Under Test:
 
       sudo usermod -aG sudo <sut_user>
 
+
+Setting up Traffic Generator Node
+---------------------------------
+
+These need to be set up on a Traffic Generator Node:
+
+#. **Traffic generator dependencies**
+
+   The traffic generator running on the traffic generator node must be installed beforehand.
+   For Scapy traffic generator, only a few Python libraries need to be installed:
+
+   .. code-block:: console
+
+      sudo apt install python3-pip
+      sudo pip install --upgrade pip
+      sudo pip install scapy==2.5.0
+
+#. **Hardware dependencies**
+
+   The traffic generators, like DPDK, need a proper driver and firmware.
+   The Scapy traffic generator doesn't have strict requirements - the drivers that come
+   with most OS distributions will be satisfactory.
+
+
+#. **User with administrator privileges**
+
+   Similarly to the System Under Test, traffic generators need administrator privileges
+   to be able to use the devices.
+   Refer to the `System Under Test section <sut_admin_user>` for details.
+
+
 Running DTS
 -----------
 
diff --git a/dts/framework/dts.py b/dts/framework/dts.py
index 1c4a637fbd..f773f0c38d 100644
--- a/dts/framework/dts.py
+++ b/dts/framework/dts.py
@@ -15,7 +15,7 @@ 
 from .logger import DTSLOG, getLogger
 from .test_result import BuildTargetResult, DTSResult, ExecutionResult, Result
 from .test_suite import get_test_suites
-from .testbed_model import SutNode
+from .testbed_model import SutNode, TGNode
 from .utils import check_dts_python_version
 
 dts_logger: DTSLOG = getLogger("DTSRunner")
@@ -33,29 +33,31 @@  def run_all() -> None:
     # check the python version of the server that run dts
     check_dts_python_version()
 
-    nodes: dict[str, SutNode] = {}
+    sut_nodes: dict[str, SutNode] = {}
+    tg_nodes: dict[str, TGNode] = {}
     try:
         # for all Execution sections
         for execution in CONFIGURATION.executions:
-            sut_node = None
-            if execution.system_under_test_node.name in nodes:
-                # a Node with the same name already exists
-                sut_node = nodes[execution.system_under_test_node.name]
-            else:
-                # the SUT has not been initialized yet
-                try:
+            sut_node = sut_nodes.get(execution.system_under_test_node.name)
+            tg_node = tg_nodes.get(execution.traffic_generator_node.name)
+
+            try:
+                if not sut_node:
                     sut_node = SutNode(execution.system_under_test_node)
-                    result.update_setup(Result.PASS)
-                except Exception as e:
-                    dts_logger.exception(
-                        f"Connection to node {execution.system_under_test_node} failed."
-                    )
-                    result.update_setup(Result.FAIL, e)
-                else:
-                    nodes[sut_node.name] = sut_node
-
-            if sut_node:
-                _run_execution(sut_node, execution, result)
+                    sut_nodes[sut_node.name] = sut_node
+                if not tg_node:
+                    tg_node = TGNode(execution.traffic_generator_node)
+                    tg_nodes[tg_node.name] = tg_node
+                result.update_setup(Result.PASS)
+            except Exception as e:
+                failed_node = execution.system_under_test_node.name
+                if sut_node:
+                    failed_node = execution.traffic_generator_node.name
+                dts_logger.exception(f"Creation of node {failed_node} failed.")
+                result.update_setup(Result.FAIL, e)
+
+            else:
+                _run_execution(sut_node, tg_node, execution, result)
 
     except Exception as e:
         dts_logger.exception("An unexpected error has occurred.")
@@ -64,7 +66,7 @@  def run_all() -> None:
 
     finally:
         try:
-            for node in nodes.values():
+            for node in (sut_nodes | tg_nodes).values():
                 node.close()
             result.update_teardown(Result.PASS)
         except Exception as e:
@@ -81,7 +83,10 @@  def run_all() -> None:
 
 
 def _run_execution(
-    sut_node: SutNode, execution: ExecutionConfiguration, result: DTSResult
+    sut_node: SutNode,
+    tg_node: TGNode,
+    execution: ExecutionConfiguration,
+    result: DTSResult,
 ) -> None:
     """
     Run the given execution. This involves running the execution setup as well as
@@ -102,7 +107,9 @@  def _run_execution(
 
     else:
         for build_target in execution.build_targets:
-            _run_build_target(sut_node, build_target, execution, execution_result)
+            _run_build_target(
+                sut_node, tg_node, build_target, execution, execution_result
+            )
 
     finally:
         try:
@@ -115,6 +122,7 @@  def _run_execution(
 
 def _run_build_target(
     sut_node: SutNode,
+    tg_node: TGNode,
     build_target: BuildTargetConfiguration,
     execution: ExecutionConfiguration,
     execution_result: ExecutionResult,
@@ -135,7 +143,7 @@  def _run_build_target(
         build_target_result.update_setup(Result.FAIL, e)
 
     else:
-        _run_all_suites(sut_node, execution, build_target_result)
+        _run_all_suites(sut_node, tg_node, execution, build_target_result)
 
     finally:
         try:
@@ -148,6 +156,7 @@  def _run_build_target(
 
 def _run_all_suites(
     sut_node: SutNode,
+    tg_node: TGNode,
     execution: ExecutionConfiguration,
     build_target_result: BuildTargetResult,
 ) -> None:
@@ -162,7 +171,7 @@  def _run_all_suites(
     for test_suite_config in execution.test_suites:
         try:
             _run_single_suite(
-                sut_node, execution, build_target_result, test_suite_config
+                sut_node, tg_node, execution, build_target_result, test_suite_config
             )
         except BlockingTestSuiteError as e:
             dts_logger.exception(
@@ -178,6 +187,7 @@  def _run_all_suites(
 
 def _run_single_suite(
     sut_node: SutNode,
+    tg_node: TGNode,
     execution: ExecutionConfiguration,
     build_target_result: BuildTargetResult,
     test_suite_config: TestSuiteConfig,
@@ -206,6 +216,7 @@  def _run_single_suite(
         for test_suite_class in test_suite_classes:
             test_suite = test_suite_class(
                 sut_node,
+                tg_node,
                 test_suite_config.test_cases,
                 execution.func,
                 build_target_result,
diff --git a/dts/framework/remote_session/linux_session.py b/dts/framework/remote_session/linux_session.py
index f64aa8efb0..decce4039c 100644
--- a/dts/framework/remote_session/linux_session.py
+++ b/dts/framework/remote_session/linux_session.py
@@ -2,13 +2,47 @@ 
 # Copyright(c) 2023 PANTHEON.tech s.r.o.
 # Copyright(c) 2023 University of New Hampshire
 
+import json
+from typing import TypedDict
+
+from typing_extensions import NotRequired
+
 from framework.exception import RemoteCommandExecutionError
 from framework.testbed_model import LogicalCore
+from framework.testbed_model.hw.port import Port
 from framework.utils import expand_range
 
 from .posix_session import PosixSession
 
 
+class LshwConfigurationOutput(TypedDict):
+    link: str
+
+
+class LshwOutput(TypedDict):
+    """
+    A model of the relevant information from json lshw output, e.g.:
+    {
+    ...
+    "businfo" : "pci@0000:08:00.0",
+    "logicalname" : "enp8s0",
+    "version" : "00",
+    "serial" : "52:54:00:59:e1:ac",
+    ...
+    "configuration" : {
+      ...
+      "link" : "yes",
+      ...
+    },
+    ...
+    """
+
+    businfo: str
+    logicalname: NotRequired[str]
+    serial: NotRequired[str]
+    configuration: LshwConfigurationOutput
+
+
 class LinuxSession(PosixSession):
     """
     The implementation of non-Posix compliant parts of Linux remote sessions.
@@ -103,3 +137,47 @@  def _configure_huge_pages(
         self.send_command(
             f"echo {amount} | tee {hugepage_config_path}", privileged=True
         )
+
+    def update_ports(self, ports: list[Port]) -> None:
+        self._logger.debug("Gathering port info.")
+        for port in ports:
+            assert (
+                port.node == self.name
+            ), "Attempted to gather port info on the wrong node"
+
+        port_info_list = self._get_lshw_info()
+        for port in ports:
+            for port_info in port_info_list:
+                if f"pci@{port.pci}" == port_info.get("businfo"):
+                    self._update_port_attr(
+                        port, port_info.get("logicalname"), "logical_name"
+                    )
+                    self._update_port_attr(port, port_info.get("serial"), "mac_address")
+                    port_info_list.remove(port_info)
+                    break
+            else:
+                self._logger.warning(f"No port at pci address {port.pci} found.")
+
+    def _get_lshw_info(self) -> list[LshwOutput]:
+        output = self.send_command("lshw -quiet -json -C network", verify=True)
+        return json.loads(output.stdout)
+
+    def _update_port_attr(
+        self, port: Port, attr_value: str | None, attr_name: str
+    ) -> None:
+        if attr_value:
+            setattr(port, attr_name, attr_value)
+            self._logger.debug(
+                f"Found '{attr_name}' of port {port.pci}: '{attr_value}'."
+            )
+        else:
+            self._logger.warning(
+                f"Attempted to get '{attr_name}' of port {port.pci}, "
+                f"but it doesn't exist."
+            )
+
+    def configure_port_state(self, port: Port, enable: bool) -> None:
+        state = "up" if enable else "down"
+        self.send_command(
+            f"ip link set dev {port.logical_name} {state}", privileged=True
+        )
diff --git a/dts/framework/remote_session/os_session.py b/dts/framework/remote_session/os_session.py
index f543ce3acc..ab4bfbfe4c 100644
--- a/dts/framework/remote_session/os_session.py
+++ b/dts/framework/remote_session/os_session.py
@@ -12,6 +12,7 @@ 
 from framework.remote_session.remote import InteractiveShell
 from framework.settings import SETTINGS
 from framework.testbed_model import LogicalCore
+from framework.testbed_model.hw.port import Port
 from framework.utils import MesonArgs
 
 from .remote import (
@@ -249,3 +250,17 @@  def get_node_info(self) -> NodeInfo:
         """
         Collect information about the node
         """
+
+    @abstractmethod
+    def update_ports(self, ports: list[Port]) -> None:
+        """
+        Get additional information about ports:
+            Logical name (e.g. enp7s0) if applicable
+            Mac address
+        """
+
+    @abstractmethod
+    def configure_port_state(self, port: Port, enable: bool) -> None:
+        """
+        Enable/disable port.
+        """
diff --git a/dts/framework/test_suite.py b/dts/framework/test_suite.py
index de94c9332d..056460dd05 100644
--- a/dts/framework/test_suite.py
+++ b/dts/framework/test_suite.py
@@ -20,7 +20,7 @@ 
 from .logger import DTSLOG, getLogger
 from .settings import SETTINGS
 from .test_result import BuildTargetResult, Result, TestCaseResult, TestSuiteResult
-from .testbed_model import SutNode
+from .testbed_model import SutNode, TGNode
 
 
 class TestSuite(object):
@@ -51,11 +51,13 @@  class TestSuite(object):
     def __init__(
         self,
         sut_node: SutNode,
+        tg_node: TGNode,
         test_cases: list[str],
         func: bool,
         build_target_result: BuildTargetResult,
     ):
         self.sut_node = sut_node
+        self.tg_node = tg_node
         self._logger = getLogger(self.__class__.__name__)
         self._test_cases_to_run = test_cases
         self._test_cases_to_run.extend(SETTINGS.test_cases)
diff --git a/dts/framework/testbed_model/__init__.py b/dts/framework/testbed_model/__init__.py
index f54a947051..5cbb859e47 100644
--- a/dts/framework/testbed_model/__init__.py
+++ b/dts/framework/testbed_model/__init__.py
@@ -20,3 +20,4 @@ 
 )
 from .node import Node
 from .sut_node import SutNode
+from .tg_node import TGNode
diff --git a/dts/framework/testbed_model/capturing_traffic_generator.py b/dts/framework/testbed_model/capturing_traffic_generator.py
new file mode 100644
index 0000000000..ab98987f8e
--- /dev/null
+++ b/dts/framework/testbed_model/capturing_traffic_generator.py
@@ -0,0 +1,136 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2022 University of New Hampshire
+# Copyright(c) 2023 PANTHEON.tech s.r.o.
+
+"""Traffic generator that can capture packets.
+
+In functional testing, we need to interrogate received packets to check their validity.
+The module defines the interface common to all traffic generators capable of capturing
+traffic.
+"""
+
+import uuid
+from abc import abstractmethod
+
+import scapy.utils  # type: ignore[import]
+from scapy.packet import Packet  # type: ignore[import]
+
+from framework.settings import SETTINGS
+from framework.utils import get_packet_summaries
+
+from .hw.port import Port
+from .traffic_generator import TrafficGenerator
+
+
+def _get_default_capture_name() -> str:
+    """
+    This is the function used for the default implementation of capture names.
+    """
+    return str(uuid.uuid4())
+
+
+class CapturingTrafficGenerator(TrafficGenerator):
+    """Capture packets after sending traffic.
+
+    A mixin interface which enables a packet generator to declare that it can capture
+    packets and return them to the user.
+
+    The methods of capturing traffic generators obey the following workflow:
+        1. send packets
+        2. capture packets
+        3. write the capture to a .pcap file
+        4. return the received packets
+    """
+
+    @property
+    def is_capturing(self) -> bool:
+        return True
+
+    def send_packet_and_capture(
+        self,
+        packet: Packet,
+        send_port: Port,
+        receive_port: Port,
+        duration: float,
+        capture_name: str = _get_default_capture_name(),
+    ) -> list[Packet]:
+        """Send a packet, return received traffic.
+
+        Send a packet on the send_port and then return all traffic captured
+        on the receive_port for the given duration. Also record the captured traffic
+        in a pcap file.
+
+        Args:
+            packet: The packet to send.
+            send_port: The egress port on the TG node.
+            receive_port: The ingress port in the TG node.
+            duration: Capture traffic for this amount of time after sending the packet.
+            capture_name: The name of the .pcap file where to store the capture.
+
+        Returns:
+             A list of received packets. May be empty if no packets are captured.
+        """
+        return self.send_packets_and_capture(
+            [packet], send_port, receive_port, duration, capture_name
+        )
+
+    def send_packets_and_capture(
+        self,
+        packets: list[Packet],
+        send_port: Port,
+        receive_port: Port,
+        duration: float,
+        capture_name: str = _get_default_capture_name(),
+    ) -> list[Packet]:
+        """Send packets, return received traffic.
+
+        Send packets on the send_port and then return all traffic captured
+        on the receive_port for the given duration. Also record the captured traffic
+        in a pcap file.
+
+        Args:
+            packets: The packets to send.
+            send_port: The egress port on the TG node.
+            receive_port: The ingress port in the TG node.
+            duration: Capture traffic for this amount of time after sending the packets.
+            capture_name: The name of the .pcap file where to store the capture.
+
+        Returns:
+             A list of received packets. May be empty if no packets are captured.
+        """
+        self._logger.debug(get_packet_summaries(packets))
+        self._logger.debug(
+            f"Sending packet on {send_port.logical_name}, "
+            f"receiving on {receive_port.logical_name}."
+        )
+        received_packets = self._send_packets_and_capture(
+            packets,
+            send_port,
+            receive_port,
+            duration,
+        )
+
+        self._logger.debug(
+            f"Received packets: {get_packet_summaries(received_packets)}"
+        )
+        self._write_capture_from_packets(capture_name, received_packets)
+        return received_packets
+
+    @abstractmethod
+    def _send_packets_and_capture(
+        self,
+        packets: list[Packet],
+        send_port: Port,
+        receive_port: Port,
+        duration: float,
+    ) -> list[Packet]:
+        """
+        The extended classes must implement this method which
+        sends packets on send_port and receives packets on the receive_port
+        for the specified duration. It must be able to handle no received packets.
+        """
+
+    def _write_capture_from_packets(self, capture_name: str, packets: list[Packet]):
+        file_name = f"{SETTINGS.output_dir}/{capture_name}.pcap"
+        self._logger.debug(f"Writing packets to {file_name}.")
+        scapy.utils.wrpcap(file_name, packets)
diff --git a/dts/framework/testbed_model/hw/port.py b/dts/framework/testbed_model/hw/port.py
new file mode 100644
index 0000000000..680c29bfe3
--- /dev/null
+++ b/dts/framework/testbed_model/hw/port.py
@@ -0,0 +1,60 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2022 University of New Hampshire
+# Copyright(c) 2023 PANTHEON.tech s.r.o.
+
+from dataclasses import dataclass
+
+from framework.config import PortConfig
+
+
+@dataclass(slots=True, frozen=True)
+class PortIdentifier:
+    node: str
+    pci: str
+
+
+@dataclass(slots=True)
+class Port:
+    """
+    identifier: The PCI address of the port on a node.
+
+    os_driver: The driver used by this port when the OS is controlling it.
+        Example: i40e
+    os_driver_for_dpdk: The driver the device must be bound to for DPDK to use it,
+        Example: vfio-pci.
+
+    Note: os_driver and os_driver_for_dpdk may be the same thing.
+        Example: mlx5_core
+
+    peer: The identifier of a port this port is connected with.
+    """
+
+    identifier: PortIdentifier
+    os_driver: str
+    os_driver_for_dpdk: str
+    peer: PortIdentifier
+    mac_address: str = ""
+    logical_name: str = ""
+
+    def __init__(self, node_name: str, config: PortConfig):
+        self.identifier = PortIdentifier(
+            node=node_name,
+            pci=config.pci,
+        )
+        self.os_driver = config.os_driver
+        self.os_driver_for_dpdk = config.os_driver_for_dpdk
+        self.peer = PortIdentifier(node=config.peer_node, pci=config.peer_pci)
+
+    @property
+    def node(self) -> str:
+        return self.identifier.node
+
+    @property
+    def pci(self) -> str:
+        return self.identifier.pci
+
+
+@dataclass(slots=True, frozen=True)
+class PortLink:
+    sut_port: Port
+    tg_port: Port
diff --git a/dts/framework/testbed_model/node.py b/dts/framework/testbed_model/node.py
index d237b3f75b..c666dfbf4e 100644
--- a/dts/framework/testbed_model/node.py
+++ b/dts/framework/testbed_model/node.py
@@ -7,6 +7,7 @@ 
 A node is a generic host that DTS connects to and manages.
 """
 
+from abc import ABC
 from typing import Any, Callable, Type
 
 from framework.config import (
@@ -26,9 +27,10 @@ 
     VirtualDevice,
     lcore_filter,
 )
+from .hw.port import Port
 
 
-class Node(object):
+class Node(ABC):
     """
     Basic class for node management. This class implements methods that
     manage a node, such as information gathering (of CPU/PCI/NIC) and
@@ -39,6 +41,7 @@  class Node(object):
     config: NodeConfiguration
     name: str
     lcores: list[LogicalCore]
+    ports: list[Port]
     _logger: DTSLOG
     _other_sessions: list[OSSession]
     _execution_config: ExecutionConfiguration
@@ -50,6 +53,8 @@  def __init__(self, node_config: NodeConfiguration):
         self._logger = getLogger(self.name)
         self.main_session = create_session(self.config, self.name, self._logger)
 
+        self._logger.info(f"Connected to node: {self.name}")
+
         self._get_remote_cpus()
         # filter the node lcores according to user config
         self.lcores = LogicalCoreListFilter(
@@ -58,8 +63,13 @@  def __init__(self, node_config: NodeConfiguration):
 
         self._other_sessions = []
         self.virtual_devices = []
+        self._init_ports()
 
-        self._logger.info(f"Created node: {self.name}")
+    def _init_ports(self) -> None:
+        self.ports = [Port(self.name, port_config) for port_config in self.config.ports]
+        self.main_session.update_ports(self.ports)
+        for port in self.ports:
+            self.configure_port_state(port)
 
     def set_up_execution(self, execution_config: ExecutionConfiguration) -> None:
         """
@@ -205,6 +215,12 @@  def _setup_hugepages(self):
                 self.config.hugepages.amount, self.config.hugepages.force_first_numa
             )
 
+    def configure_port_state(self, port: Port, enable: bool = True) -> None:
+        """
+        Enable/disable port.
+        """
+        self.main_session.configure_port_state(port, enable)
+
     def close(self) -> None:
         """
         Close all connections and free other resources.
diff --git a/dts/framework/testbed_model/scapy.py b/dts/framework/testbed_model/scapy.py
new file mode 100644
index 0000000000..1a23dc9fa3
--- /dev/null
+++ b/dts/framework/testbed_model/scapy.py
@@ -0,0 +1,74 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2022 University of New Hampshire
+# Copyright(c) 2023 PANTHEON.tech s.r.o.
+
+"""Scapy traffic generator.
+
+Traffic generator used for functional testing, implemented using the Scapy library.
+The traffic generator uses an XML-RPC server to run Scapy on the remote TG node.
+
+The XML-RPC server runs in an interactive remote SSH session running Python console,
+where we start the server. The communication with the server is facilitated with
+a local server proxy.
+"""
+
+from scapy.packet import Packet  # type: ignore[import]
+
+from framework.config import OS, ScapyTrafficGeneratorConfig
+from framework.logger import getLogger
+
+from .capturing_traffic_generator import (
+    CapturingTrafficGenerator,
+    _get_default_capture_name,
+)
+from .hw.port import Port
+from .tg_node import TGNode
+
+
+class ScapyTrafficGenerator(CapturingTrafficGenerator):
+    """Provides access to scapy functions via an RPC interface.
+
+    The traffic generator first starts an XML-RPC on the remote TG node.
+    Then it populates the server with functions which use the Scapy library
+    to send/receive traffic.
+
+    Any packets sent to the remote server are first converted to bytes.
+    They are received as xmlrpc.client.Binary objects on the server side.
+    When the server sends the packets back, they are also received as
+    xmlrpc.client.Binary object on the client side, are converted back to Scapy
+    packets and only then returned from the methods.
+
+    Arguments:
+        tg_node: The node where the traffic generator resides.
+        config: The user configuration of the traffic generator.
+    """
+
+    _config: ScapyTrafficGeneratorConfig
+    _tg_node: TGNode
+
+    def __init__(self, tg_node: TGNode, config: ScapyTrafficGeneratorConfig):
+        self._config = config
+        self._tg_node = tg_node
+        self._logger = getLogger(
+            f"{self._tg_node.name} {self._config.traffic_generator_type}"
+        )
+
+        assert (
+            self._tg_node.config.os == OS.linux
+        ), "Linux is the only supported OS for scapy traffic generation"
+
+    def _send_packets(self, packets: list[Packet], port: Port) -> None:
+        raise NotImplementedError()
+
+    def _send_packets_and_capture(
+        self,
+        packets: list[Packet],
+        send_port: Port,
+        receive_port: Port,
+        duration: float,
+        capture_name: str = _get_default_capture_name(),
+    ) -> list[Packet]:
+        raise NotImplementedError()
+
+    def close(self):
+        pass
diff --git a/dts/framework/testbed_model/sut_node.py b/dts/framework/testbed_model/sut_node.py
index ad3bffd9d3..f0b017a383 100644
--- a/dts/framework/testbed_model/sut_node.py
+++ b/dts/framework/testbed_model/sut_node.py
@@ -105,6 +105,7 @@  def __init__(self, node_config: SutNodeConfiguration):
         self._dpdk_version = None
         self._node_info = None
         self._compiler_version = None
+        self._logger.info(f"Created node: {self.name}")
 
     @property
     def _remote_dpdk_dir(self) -> PurePath:
diff --git a/dts/framework/testbed_model/tg_node.py b/dts/framework/testbed_model/tg_node.py
new file mode 100644
index 0000000000..27025cfa31
--- /dev/null
+++ b/dts/framework/testbed_model/tg_node.py
@@ -0,0 +1,99 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2010-2014 Intel Corporation
+# Copyright(c) 2022 University of New Hampshire
+# Copyright(c) 2023 PANTHEON.tech s.r.o.
+
+"""Traffic generator node.
+
+This is the node where the traffic generator resides.
+The distinction between a node and a traffic generator is as follows:
+A node is a host that DTS connects to. It could be a baremetal server,
+a VM or a container.
+A traffic generator is software running on the node.
+A traffic generator node is a node running a traffic generator.
+A node can be a traffic generator node as well as system under test node.
+"""
+
+from scapy.packet import Packet  # type: ignore[import]
+
+from framework.config import (
+    ScapyTrafficGeneratorConfig,
+    TGNodeConfiguration,
+    TrafficGeneratorType,
+)
+from framework.exception import ConfigurationError
+
+from .capturing_traffic_generator import CapturingTrafficGenerator
+from .hw.port import Port
+from .node import Node
+
+
+class TGNode(Node):
+    """Manage connections to a node with a traffic generator.
+
+    Apart from basic node management capabilities, the Traffic Generator node has
+    specialized methods for handling the traffic generator running on it.
+
+    Arguments:
+        node_config: The user configuration of the traffic generator node.
+
+    Attributes:
+        traffic_generator: The traffic generator running on the node.
+    """
+
+    traffic_generator: CapturingTrafficGenerator
+
+    def __init__(self, node_config: TGNodeConfiguration):
+        super(TGNode, self).__init__(node_config)
+        self.traffic_generator = create_traffic_generator(
+            self, node_config.traffic_generator
+        )
+        self._logger.info(f"Created node: {self.name}")
+
+    def send_packet_and_capture(
+        self,
+        packet: Packet,
+        send_port: Port,
+        receive_port: Port,
+        duration: float = 1,
+    ) -> list[Packet]:
+        """Send a packet, return received traffic.
+
+        Send a packet on the send_port and then return all traffic captured
+        on the receive_port for the given duration. Also record the captured traffic
+        in a pcap file.
+
+        Args:
+            packet: The packet to send.
+            send_port: The egress port on the TG node.
+            receive_port: The ingress port in the TG node.
+            duration: Capture traffic for this amount of time after sending the packet.
+
+        Returns:
+             A list of received packets. May be empty if no packets are captured.
+        """
+        return self.traffic_generator.send_packet_and_capture(
+            packet, send_port, receive_port, duration
+        )
+
+    def close(self) -> None:
+        """Free all resources used by the node"""
+        self.traffic_generator.close()
+        super(TGNode, self).close()
+
+
+def create_traffic_generator(
+    tg_node: TGNode, traffic_generator_config: ScapyTrafficGeneratorConfig
+) -> CapturingTrafficGenerator:
+    """A factory function for creating traffic generator object from user config."""
+
+    from .scapy import ScapyTrafficGenerator
+
+    match traffic_generator_config.traffic_generator_type:
+        case TrafficGeneratorType.SCAPY:
+            return ScapyTrafficGenerator(tg_node, traffic_generator_config)
+        case _:
+            raise ConfigurationError(
+                "Unknown traffic generator: "
+                f"{traffic_generator_config.traffic_generator_type}"
+            )
diff --git a/dts/framework/testbed_model/traffic_generator.py b/dts/framework/testbed_model/traffic_generator.py
new file mode 100644
index 0000000000..28c35d3ce4
--- /dev/null
+++ b/dts/framework/testbed_model/traffic_generator.py
@@ -0,0 +1,72 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2022 University of New Hampshire
+# Copyright(c) 2023 PANTHEON.tech s.r.o.
+
+"""The base traffic generator.
+
+These traffic generators can't capture received traffic,
+only count the number of received packets.
+"""
+
+from abc import ABC, abstractmethod
+
+from scapy.packet import Packet  # type: ignore[import]
+
+from framework.logger import DTSLOG
+from framework.utils import get_packet_summaries
+
+from .hw.port import Port
+
+
+class TrafficGenerator(ABC):
+    """The base traffic generator.
+
+    Defines the few basic methods that each traffic generator must implement.
+    """
+
+    _logger: DTSLOG
+
+    def send_packet(self, packet: Packet, port: Port) -> None:
+        """Send a packet and block until it is fully sent.
+
+        What fully sent means is defined by the traffic generator.
+
+        Args:
+            packet: The packet to send.
+            port: The egress port on the TG node.
+        """
+        self.send_packets([packet], port)
+
+    def send_packets(self, packets: list[Packet], port: Port) -> None:
+        """Send packets and block until they are fully sent.
+
+        What fully sent means is defined by the traffic generator.
+
+        Args:
+            packets: The packets to send.
+            port: The egress port on the TG node.
+        """
+        self._logger.info(f"Sending packet{'s' if len(packets) > 1 else ''}.")
+        self._logger.debug(get_packet_summaries(packets))
+        self._send_packets(packets, port)
+
+    @abstractmethod
+    def _send_packets(self, packets: list[Packet], port: Port) -> None:
+        """
+        The extended classes must implement this method which
+        sends packets on send_port. The method should block until all packets
+        are fully sent.
+        """
+
+    @property
+    def is_capturing(self) -> bool:
+        """Whether this traffic generator can capture traffic.
+
+        Returns:
+            True if the traffic generator can capture traffic, False otherwise.
+        """
+        return False
+
+    @abstractmethod
+    def close(self) -> None:
+        """Free all resources used by the traffic generator."""
diff --git a/dts/framework/utils.py b/dts/framework/utils.py
index 60abe46edf..d27c2c5b5f 100644
--- a/dts/framework/utils.py
+++ b/dts/framework/utils.py
@@ -4,6 +4,7 @@ 
 # Copyright(c) 2022-2023 University of New Hampshire
 
 import atexit
+import json
 import os
 import subprocess
 import sys
@@ -11,6 +12,8 @@ 
 from pathlib import Path
 from subprocess import SubprocessError
 
+from scapy.packet import Packet  # type: ignore[import]
+
 from .exception import ConfigurationError
 
 
@@ -64,6 +67,16 @@  def expand_range(range_str: str) -> list[int]:
     return expanded_range
 
 
+def get_packet_summaries(packets: list[Packet]):
+    if len(packets) == 1:
+        packet_summaries = packets[0].summary()
+    else:
+        packet_summaries = json.dumps(
+            list(map(lambda pkt: pkt.summary(), packets)), indent=4
+        )
+    return f"Packet contents: \n{packet_summaries}"
+
+
 def RED(text: str) -> str:
     return f"\u001B[31;1m{str(text)}\u001B[0m"