[RFC,v1,3/5] dts: traffic generator abstractions

Message ID 20230420093109.594704-4-juraj.linkes@pantheon.tech (mailing list archive)
State Superseded, archived
Delegated to: Thomas Monjalon
Headers
Series dts: add tg abstractions and scapy |

Checks

Context Check Description
ci/checkpatch success coding style OK

Commit Message

Juraj Linkeš April 20, 2023, 9:31 a.m. UTC
  There are traffic abstractions for all traffic generators and for
traffic generators that can capture (not just count) packets.

There also related abstractions, such as TGNode where the traffic
generators reside and some related code.

Signed-off-by: Juraj Linkeš <juraj.linkes@pantheon.tech>
---
 dts/framework/remote_session/os_session.py    |  22 ++-
 dts/framework/remote_session/posix_session.py |   3 +
 .../capturing_traffic_generator.py            | 155 ++++++++++++++++++
 dts/framework/testbed_model/hw/port.py        |  55 +++++++
 dts/framework/testbed_model/node.py           |   4 +-
 dts/framework/testbed_model/sut_node.py       |   5 +-
 dts/framework/testbed_model/tg_node.py        |  62 +++++++
 .../testbed_model/traffic_generator.py        |  59 +++++++
 8 files changed, 360 insertions(+), 5 deletions(-)
 create mode 100644 dts/framework/testbed_model/capturing_traffic_generator.py
 create mode 100644 dts/framework/testbed_model/hw/port.py
 create mode 100644 dts/framework/testbed_model/tg_node.py
 create mode 100644 dts/framework/testbed_model/traffic_generator.py
  

Patch

diff --git a/dts/framework/remote_session/os_session.py b/dts/framework/remote_session/os_session.py
index 4c48ae2567..56d7fef06c 100644
--- a/dts/framework/remote_session/os_session.py
+++ b/dts/framework/remote_session/os_session.py
@@ -10,6 +10,7 @@ 
 from framework.logger import DTSLOG
 from framework.settings import SETTINGS
 from framework.testbed_model import LogicalCore
+from framework.testbed_model.hw.port import PortIdentifier
 from framework.utils import EnvVarsDict, MesonArgs
 
 from .remote import CommandResult, RemoteSession, create_remote_session
@@ -37,6 +38,7 @@  def __init__(
         self.name = name
         self._logger = logger
         self.remote_session = create_remote_session(node_config, name, logger)
+        self._disable_terminal_colors()
 
     def close(self, force: bool = False) -> None:
         """
@@ -53,7 +55,7 @@  def is_alive(self) -> bool:
     def send_command(
         self,
         command: str,
-        timeout: float,
+        timeout: float = SETTINGS.timeout,
         verify: bool = False,
         env: EnvVarsDict | None = None,
     ) -> CommandResult:
@@ -64,6 +66,12 @@  def send_command(
         """
         return self.remote_session.send_command(command, timeout, verify, env)
 
+    @abstractmethod
+    def _disable_terminal_colors(self) -> None:
+        """
+        Disable the colors in the ssh session.
+        """
+
     @abstractmethod
     def guess_dpdk_remote_dir(self, remote_dir) -> PurePath:
         """
@@ -173,3 +181,15 @@  def setup_hugepages(self, hugepage_amount: int, force_first_numa: bool) -> None:
         if needed and mount the hugepages if needed.
         If force_first_numa is True, configure hugepages just on the first socket.
         """
+
+    @abstractmethod
+    def get_logical_name_of_port(self, id: PortIdentifier) -> str | None:
+        """
+        Gets the logical name (eno1, ens5, etc) of a port by the port's identifier.
+        """
+
+    @abstractmethod
+    def check_link_is_up(self, id: PortIdentifier) -> bool:
+        """
+        Check that the link is up.
+        """
diff --git a/dts/framework/remote_session/posix_session.py b/dts/framework/remote_session/posix_session.py
index d38062e8d6..288fbabf1e 100644
--- a/dts/framework/remote_session/posix_session.py
+++ b/dts/framework/remote_session/posix_session.py
@@ -219,3 +219,6 @@  def _remove_dpdk_runtime_dirs(
 
     def get_dpdk_file_prefix(self, dpdk_prefix) -> str:
         return ""
+
+    def _disable_terminal_colors(self) -> None:
+        self.remote_session.send_command("export TERM=xterm-mono")
diff --git a/dts/framework/testbed_model/capturing_traffic_generator.py b/dts/framework/testbed_model/capturing_traffic_generator.py
new file mode 100644
index 0000000000..7beeb139c1
--- /dev/null
+++ b/dts/framework/testbed_model/capturing_traffic_generator.py
@@ -0,0 +1,155 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2022 University of New Hampshire
+#
+
+import itertools
+import uuid
+from abc import abstractmethod
+
+import scapy.utils
+from scapy.packet import Packet
+
+from framework.testbed_model.hw.port import PortIdentifier
+from framework.settings import SETTINGS
+
+from .traffic_generator import TrafficGenerator
+
+
+def _get_default_capture_name() -> str:
+    """
+    This is the function used for the default implementation of capture names.
+    """
+    return str(uuid.uuid4())
+
+
+class CapturingTrafficGenerator(TrafficGenerator):
+    """
+    A mixin interface which enables a packet generator to declare that it can capture
+    packets and return them to the user.
+
+    All packet functions added by this class should write out the captured packets
+    to a pcap file in output, allowing for easier analysis of failed tests.
+    """
+
+    def is_capturing(self) -> bool:
+        return True
+
+    @abstractmethod
+    def send_packet_and_capture(
+        self,
+        send_port_id: PortIdentifier,
+        packet: Packet,
+        receive_port_id: PortIdentifier,
+        duration_s: int,
+        capture_name: str = _get_default_capture_name(),
+    ) -> list[Packet]:
+        """
+        Send a packet on the send port and then capture all traffic on receive port
+        for the given duration.
+
+        Captures packets and adds them to output/<capture_name>.pcap.
+
+        This function must handle no packets even being received.
+        """
+        raise NotImplementedError()
+
+    def send_packets_and_capture(
+        self,
+        send_port_id: PortIdentifier,
+        packets: list[Packet],
+        receive_port_id: PortIdentifier,
+        duration_s: int,
+        capture_name: str = _get_default_capture_name(),
+    ) -> list[Packet]:
+        """
+        Send a group of packets on the send port and then capture all traffic on the
+        receive port for the given duration.
+
+        This function must handle no packets even being received.
+        """
+        self.logger.info(
+            f"Incremental captures will be created at output/{capture_name}-<n>.pcap"
+        )
+        received_packets: list[list[Packet]] = []
+        for i, packet in enumerate(packets):
+            received_packets.append(
+                self.send_packet_and_capture(
+                    send_port_id,
+                    packet,
+                    receive_port_id,
+                    duration_s,
+                    capture_name=f"{capture_name}-{i}",
+                )
+            )
+
+        flattened_received_packets = list(
+            itertools.chain.from_iterable(received_packets)
+        )
+        scapy.utils.wrpcap(f"output/{capture_name}.pcap", flattened_received_packets)
+        return flattened_received_packets
+
+    def send_packet_and_expect_packet(
+        self,
+        send_port_id: PortIdentifier,
+        packet: Packet,
+        receive_port_id: PortIdentifier,
+        expected_packet: Packet,
+        timeout: int = SETTINGS.timeout,
+        capture_name: str = _get_default_capture_name(),
+    ) -> None:
+        """
+        Sends the provided packet, capturing received packets. Then asserts that the
+        only 1 packet was received, and that the packet that was received is equal to
+        the expected packet.
+        """
+        packets: list[Packet] = self.send_packet_and_capture(
+            send_port_id, packet, receive_port_id, timeout
+        )
+
+        assert len(packets) != 0, "Expected a packet, but none were captured"
+        assert len(packets) == 1, (
+            "More packets than expected were received, "
+            f"capture written to output/{capture_name}.pcap"
+        )
+        assert packets[0] == expected_packet, (
+            f"Received packet differed from expected packet, capture written to "
+            f"output/{capture_name}.pcap"
+        )
+
+    def send_packets_and_expect_packets(
+        self,
+        send_port_id: PortIdentifier,
+        packets: list[Packet],
+        receive_port_id: PortIdentifier,
+        expected_packets: list[Packet],
+        timeout: int = SETTINGS.timeout,
+        capture_name: str = _get_default_capture_name(),
+    ) -> None:
+        """
+        Sends the provided packets, capturing received packets. Then asserts that the
+        correct number of packets was received, and that the packets that were received
+        are equal to the expected packet. This equality is done by comparing packets
+        at the same index.
+        """
+        packets: list[Packet] = self.send_packets_and_capture(
+            send_port_id, packets, receive_port_id, timeout
+        )
+
+        if len(expected_packets) > 0:
+            assert len(packets) != 0, "Expected packets, but none were captured"
+
+        assert len(packets) == len(expected_packets), (
+            "A different number of packets than expected were received, "
+            f"capture written to output/{capture_name}.pcap or split across "
+            f"output/{capture_name}-<n>.pcap"
+        )
+        for i, expected_packet in enumerate(expected_packets):
+            assert packets[i] == expected_packet, (
+                f"Received packet {i} differed from expected packet, capture written "
+                f"to output/{capture_name}.pcap or output/{capture_name}-{i}.pcap"
+            )
+
+    def _write_capture_from_packets(self, capture_name: str, packets: list[Packet]):
+        file_name = f"output/{capture_name}.pcap"
+        self.logger.debug(f"Writing packets to {file_name}")
+        scapy.utils.wrpcap(file_name, packets)
diff --git a/dts/framework/testbed_model/hw/port.py b/dts/framework/testbed_model/hw/port.py
new file mode 100644
index 0000000000..ebaad563f8
--- /dev/null
+++ b/dts/framework/testbed_model/hw/port.py
@@ -0,0 +1,55 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2022 University of New Hampshire
+#
+
+from dataclasses import dataclass
+
+from framework.config import PortConfig
+
+
+@dataclass(slots=True, frozen=True)
+class PortIdentifier:
+    node: str
+    pci: str
+
+
+@dataclass(slots=True, frozen=True)
+class Port:
+    """
+    identifier: The information that uniquely identifies this port.
+    pci: The PCI address of the port.
+
+    os_driver: The driver normally used by this port (ex: i40e)
+    dpdk_os_driver: The driver that the os should bind this device to for DPDK to use it. (ex: vfio-pci)
+
+    Note: os_driver and dpdk_os_driver may be the same thing, see mlx5_core
+
+    peer: The identifier for whatever this port is plugged into.
+    """
+
+    id: int
+    identifier: PortIdentifier
+    os_driver: str
+    dpdk_os_driver: str
+    peer: PortIdentifier
+
+    @property
+    def node(self) -> str:
+        return self.identifier.node
+
+    @property
+    def pci(self) -> str:
+        return self.identifier.pci
+
+    @staticmethod
+    def from_config(node_name: str, config: PortConfig) -> "Port":
+        return Port(
+            id=config.id,
+            identifier=PortIdentifier(
+                node=node_name,
+                pci=config.pci,
+            ),
+            os_driver=config.os_driver,
+            dpdk_os_driver=config.dpdk_os_driver,
+            peer=PortIdentifier(node=config.peer_node, pci=config.peer_pci),
+        )
diff --git a/dts/framework/testbed_model/node.py b/dts/framework/testbed_model/node.py
index d48fafe65d..5d2d1a0cf6 100644
--- a/dts/framework/testbed_model/node.py
+++ b/dts/framework/testbed_model/node.py
@@ -47,6 +47,8 @@  def __init__(self, node_config: NodeConfiguration):
         self._logger = getLogger(self.name)
         self.main_session = create_session(self.config, self.name, self._logger)
 
+        self._logger.info(f"Connected to node: {self.name}")
+
         self._get_remote_cpus()
         # filter the node lcores according to user config
         self.lcores = LogicalCoreListFilter(
@@ -55,8 +57,6 @@  def __init__(self, node_config: NodeConfiguration):
 
         self._other_sessions = []
 
-        self._logger.info(f"Created node: {self.name}")
-
     def set_up_execution(self, execution_config: ExecutionConfiguration) -> None:
         """
         Perform the execution setup that will be done for each execution
diff --git a/dts/framework/testbed_model/sut_node.py b/dts/framework/testbed_model/sut_node.py
index 2b2b50d982..ec9180a98b 100644
--- a/dts/framework/testbed_model/sut_node.py
+++ b/dts/framework/testbed_model/sut_node.py
@@ -7,7 +7,7 @@ 
 import time
 from pathlib import PurePath
 
-from framework.config import BuildTargetConfiguration, NodeConfiguration
+from framework.config import BuildTargetConfiguration, SUTConfiguration
 from framework.remote_session import CommandResult, OSSession
 from framework.settings import SETTINGS
 from framework.utils import EnvVarsDict, MesonArgs
@@ -34,7 +34,7 @@  class SutNode(Node):
     _app_compile_timeout: float
     _dpdk_kill_session: OSSession | None
 
-    def __init__(self, node_config: NodeConfiguration):
+    def __init__(self, node_config: SUTConfiguration):
         super(SutNode, self).__init__(node_config)
         self._dpdk_prefix_list = []
         self._build_target_config = None
@@ -47,6 +47,7 @@  def __init__(self, node_config: NodeConfiguration):
         self._dpdk_timestamp = (
             f"{str(os.getpid())}_{time.strftime('%Y%m%d%H%M%S', time.localtime())}"
         )
+        self._logger.info(f"Created node: {self.name}")
 
     @property
     def _remote_dpdk_dir(self) -> PurePath:
diff --git a/dts/framework/testbed_model/tg_node.py b/dts/framework/testbed_model/tg_node.py
new file mode 100644
index 0000000000..fdb7329020
--- /dev/null
+++ b/dts/framework/testbed_model/tg_node.py
@@ -0,0 +1,62 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2010-2014 Intel Corporation
+# Copyright(c) 2022 University of New Hampshire
+# Copyright(c) 2023 PANTHEON.tech s.r.o.
+
+from scapy.layers.inet import IP, UDP
+from scapy.layers.l2 import Ether
+from scapy.packet import Raw
+
+from framework.config import TGConfiguration
+from framework.testbed_model.hw.port import Port, PortIdentifier
+from framework.testbed_model.traffic_generator import TrafficGenerator
+
+from .node import Node
+
+
+class TGNode(Node):
+    """
+    A class for managing connections to the Traffic Generator node and managing
+    traffic generators residing within.
+    """
+
+    ports: list[Port]
+    traffic_generator: TrafficGenerator
+
+    def __init__(self, node_config: TGConfiguration):
+        super(TGNode, self).__init__(node_config)
+        self.ports = [
+            Port.from_config(self.name, port_config)
+            for port_config in node_config.ports
+        ]
+        self.traffic_generator = TrafficGenerator.from_config(
+            self, node_config.traffic_generator
+        )
+        self._logger.info(f"Created node: {self.name}")
+
+    def get_ports_with_loop_topology(
+        self, peer_node: "Node"
+    ) -> tuple[PortIdentifier, PortIdentifier] | None:
+        for port1 in self.ports:
+            if port1.peer.node == peer_node.name:
+                for port2 in self.ports:
+                    if port2.peer.node == peer_node.name:
+                        return (port1.identifier, port2.identifier)
+        self._logger.warning(
+            f"Attempted to find loop topology between {self.name} and {peer_node.name}, but none could be found"
+        )
+        return None
+
+    def verify(self) -> None:
+        for port in self.ports:
+            self.traffic_generator.assert_port_is_connected(port.identifier)
+        port = self.ports[0]
+        # Check that the traffic generator is working by sending a packet.
+        # send_packet should throw an error if something goes wrong.
+        self.traffic_generator.send_packet(
+            port.identifier, Ether() / IP() / UDP() / Raw(b"Hello World")
+        )
+
+    def close(self) -> None:
+        self.traffic_generator.close()
+        super(TGNode, self).close()
diff --git a/dts/framework/testbed_model/traffic_generator.py b/dts/framework/testbed_model/traffic_generator.py
new file mode 100644
index 0000000000..ea8f361e8f
--- /dev/null
+++ b/dts/framework/testbed_model/traffic_generator.py
@@ -0,0 +1,59 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2022 University of New Hampshire
+#
+
+from abc import ABC, abstractmethod
+
+from scapy.packet import Packet
+
+from framework.config import TrafficGeneratorConfig, TrafficGeneratorType
+from framework.logger import DTSLOG
+from framework.testbed_model.hw.port import PortIdentifier
+
+
+class TrafficGenerator(ABC):
+    logger: DTSLOG
+
+    @abstractmethod
+    def send_packet(self, port: PortIdentifier, packet: Packet) -> None:
+        """
+        Sends a packet and blocks until it is fully sent.
+
+        What fully sent means is defined by the traffic generator.
+        """
+        raise NotImplementedError()
+
+    def send_packets(self, port: PortIdentifier, packets: list[Packet]) -> None:
+        """
+        Sends a list of packets and blocks until they are fully sent.
+
+        What "fully sent" means is defined by the traffic generator.
+        """
+        # default implementation, this should be overridden if there is a better
+        # way to do this on a specific packet generator
+        for packet in packets:
+            self.send_packet(port, packet)
+
+    def is_capturing(self) -> bool:
+        """
+        Whether this traffic generator can capture traffic
+        """
+        return False
+
+    @staticmethod
+    def from_config(
+        node: "Node", traffic_generator_config: TrafficGeneratorConfig
+    ) -> "TrafficGenerator":
+        from .scapy import ScapyTrafficGenerator
+
+        match traffic_generator_config.traffic_generator_type:
+            case TrafficGeneratorType.SCAPY:
+                return ScapyTrafficGenerator(node, node.ports)
+
+    @abstractmethod
+    def close(self):
+        pass
+
+    @abstractmethod
+    def assert_port_is_connected(self, id: PortIdentifier) -> None:
+        pass