@@ -153,6 +153,37 @@ There are two areas that need to be set up on a System Under Test:
sudo usermod -aG sudo <sut_user>
+
+Setting up Traffic Generator Node
+---------------------------------
+
+These need to be set up on a Traffic Generator Node:
+
+#. **Traffic generator dependencies**
+
+ The traffic generator running on the traffic generator node must be installed beforehand.
+ For Scapy traffic generator, only a few Python libraries need to be installed:
+
+ .. code-block:: console
+
+ sudo apt install python3-pip
+ sudo pip install --upgrade pip
+ sudo pip install scapy==2.5.0
+
+#. **Hardware dependencies**
+
+ The traffic generators, like DPDK, need a proper driver and firmware.
+ The Scapy traffic generator doesn't have strict requirements - the drivers that come
+ with most OS distributions will be satisfactory.
+
+
+#. **User with administrator privileges**
+
+ Similarly to the System Under Test, traffic generators need administrator privileges
+ to be able to use the devices.
+ Refer to the `System Under Test section <sut_admin_user>` for details.
+
+
Running DTS
-----------
@@ -15,7 +15,7 @@
from .logger import DTSLOG, getLogger
from .test_result import BuildTargetResult, DTSResult, ExecutionResult, Result
from .test_suite import get_test_suites
-from .testbed_model import SutNode
+from .testbed_model import SutNode, TGNode
from .utils import check_dts_python_version
dts_logger: DTSLOG = getLogger("DTSRunner")
@@ -33,29 +33,31 @@ def run_all() -> None:
# check the python version of the server that run dts
check_dts_python_version()
- nodes: dict[str, SutNode] = {}
+ sut_nodes: dict[str, SutNode] = {}
+ tg_nodes: dict[str, TGNode] = {}
try:
# for all Execution sections
for execution in CONFIGURATION.executions:
- sut_node = None
- if execution.system_under_test_node.name in nodes:
- # a Node with the same name already exists
- sut_node = nodes[execution.system_under_test_node.name]
- else:
- # the SUT has not been initialized yet
- try:
+ sut_node = sut_nodes.get(execution.system_under_test_node.name)
+ tg_node = tg_nodes.get(execution.traffic_generator_node.name)
+
+ try:
+ if not sut_node:
sut_node = SutNode(execution.system_under_test_node)
- result.update_setup(Result.PASS)
- except Exception as e:
- dts_logger.exception(
- f"Connection to node {execution.system_under_test_node} failed."
- )
- result.update_setup(Result.FAIL, e)
- else:
- nodes[sut_node.name] = sut_node
-
- if sut_node:
- _run_execution(sut_node, execution, result)
+ sut_nodes[sut_node.name] = sut_node
+ if not tg_node:
+ tg_node = TGNode(execution.traffic_generator_node)
+ tg_nodes[tg_node.name] = tg_node
+ result.update_setup(Result.PASS)
+ except Exception as e:
+ failed_node = execution.system_under_test_node.name
+ if sut_node:
+ failed_node = execution.traffic_generator_node.name
+ dts_logger.exception(f"Creation of node {failed_node} failed.")
+ result.update_setup(Result.FAIL, e)
+
+ else:
+ _run_execution(sut_node, tg_node, execution, result)
except Exception as e:
dts_logger.exception("An unexpected error has occurred.")
@@ -64,7 +66,7 @@ def run_all() -> None:
finally:
try:
- for node in nodes.values():
+ for node in (sut_nodes | tg_nodes).values():
node.close()
result.update_teardown(Result.PASS)
except Exception as e:
@@ -81,7 +83,10 @@ def run_all() -> None:
def _run_execution(
- sut_node: SutNode, execution: ExecutionConfiguration, result: DTSResult
+ sut_node: SutNode,
+ tg_node: TGNode,
+ execution: ExecutionConfiguration,
+ result: DTSResult,
) -> None:
"""
Run the given execution. This involves running the execution setup as well as
@@ -102,7 +107,9 @@ def _run_execution(
else:
for build_target in execution.build_targets:
- _run_build_target(sut_node, build_target, execution, execution_result)
+ _run_build_target(
+ sut_node, tg_node, build_target, execution, execution_result
+ )
finally:
try:
@@ -115,6 +122,7 @@ def _run_execution(
def _run_build_target(
sut_node: SutNode,
+ tg_node: TGNode,
build_target: BuildTargetConfiguration,
execution: ExecutionConfiguration,
execution_result: ExecutionResult,
@@ -135,7 +143,7 @@ def _run_build_target(
build_target_result.update_setup(Result.FAIL, e)
else:
- _run_all_suites(sut_node, execution, build_target_result)
+ _run_all_suites(sut_node, tg_node, execution, build_target_result)
finally:
try:
@@ -148,6 +156,7 @@ def _run_build_target(
def _run_all_suites(
sut_node: SutNode,
+ tg_node: TGNode,
execution: ExecutionConfiguration,
build_target_result: BuildTargetResult,
) -> None:
@@ -162,7 +171,7 @@ def _run_all_suites(
for test_suite_config in execution.test_suites:
try:
_run_single_suite(
- sut_node, execution, build_target_result, test_suite_config
+ sut_node, tg_node, execution, build_target_result, test_suite_config
)
except BlockingTestSuiteError as e:
dts_logger.exception(
@@ -178,6 +187,7 @@ def _run_all_suites(
def _run_single_suite(
sut_node: SutNode,
+ tg_node: TGNode,
execution: ExecutionConfiguration,
build_target_result: BuildTargetResult,
test_suite_config: TestSuiteConfig,
@@ -206,6 +216,7 @@ def _run_single_suite(
for test_suite_class in test_suite_classes:
test_suite = test_suite_class(
sut_node,
+ tg_node,
test_suite_config.test_cases,
execution.func,
build_target_result,
@@ -2,13 +2,47 @@
# Copyright(c) 2023 PANTHEON.tech s.r.o.
# Copyright(c) 2023 University of New Hampshire
+import json
+from typing import TypedDict
+
+from typing_extensions import NotRequired
+
from framework.exception import RemoteCommandExecutionError
from framework.testbed_model import LogicalCore
+from framework.testbed_model.hw.port import Port
from framework.utils import expand_range
from .posix_session import PosixSession
+class LshwConfigurationOutput(TypedDict):
+ link: str
+
+
+class LshwOutput(TypedDict):
+ """
+ A model of the relevant information from json lshw output, e.g.:
+ {
+ ...
+ "businfo" : "pci@0000:08:00.0",
+ "logicalname" : "enp8s0",
+ "version" : "00",
+ "serial" : "52:54:00:59:e1:ac",
+ ...
+ "configuration" : {
+ ...
+ "link" : "yes",
+ ...
+ },
+ ...
+ """
+
+ businfo: str
+ logicalname: NotRequired[str]
+ serial: NotRequired[str]
+ configuration: LshwConfigurationOutput
+
+
class LinuxSession(PosixSession):
"""
The implementation of non-Posix compliant parts of Linux remote sessions.
@@ -103,3 +137,47 @@ def _configure_huge_pages(
self.send_command(
f"echo {amount} | tee {hugepage_config_path}", privileged=True
)
+
+ def update_ports(self, ports: list[Port]) -> None:
+ self._logger.debug("Gathering port info.")
+ for port in ports:
+ assert (
+ port.node == self.name
+ ), "Attempted to gather port info on the wrong node"
+
+ port_info_list = self._get_lshw_info()
+ for port in ports:
+ for port_info in port_info_list:
+ if f"pci@{port.pci}" == port_info.get("businfo"):
+ self._update_port_attr(
+ port, port_info.get("logicalname"), "logical_name"
+ )
+ self._update_port_attr(port, port_info.get("serial"), "mac_address")
+ port_info_list.remove(port_info)
+ break
+ else:
+ self._logger.warning(f"No port at pci address {port.pci} found.")
+
+ def _get_lshw_info(self) -> list[LshwOutput]:
+ output = self.send_command("lshw -quiet -json -C network", verify=True)
+ return json.loads(output.stdout)
+
+ def _update_port_attr(
+ self, port: Port, attr_value: str | None, attr_name: str
+ ) -> None:
+ if attr_value:
+ setattr(port, attr_name, attr_value)
+ self._logger.debug(
+ f"Found '{attr_name}' of port {port.pci}: '{attr_value}'."
+ )
+ else:
+ self._logger.warning(
+ f"Attempted to get '{attr_name}' of port {port.pci}, "
+ f"but it doesn't exist."
+ )
+
+ def configure_port_state(self, port: Port, enable: bool) -> None:
+ state = "up" if enable else "down"
+ self.send_command(
+ f"ip link set dev {port.logical_name} {state}", privileged=True
+ )
@@ -12,6 +12,7 @@
from framework.remote_session.remote import InteractiveShell
from framework.settings import SETTINGS
from framework.testbed_model import LogicalCore
+from framework.testbed_model.hw.port import Port
from framework.utils import MesonArgs
from .remote import (
@@ -249,3 +250,17 @@ def get_node_info(self) -> NodeInfo:
"""
Collect information about the node
"""
+
+ @abstractmethod
+ def update_ports(self, ports: list[Port]) -> None:
+ """
+ Get additional information about ports:
+ Logical name (e.g. enp7s0) if applicable
+ Mac address
+ """
+
+ @abstractmethod
+ def configure_port_state(self, port: Port, enable: bool) -> None:
+ """
+ Enable/disable port.
+ """
@@ -20,7 +20,7 @@
from .logger import DTSLOG, getLogger
from .settings import SETTINGS
from .test_result import BuildTargetResult, Result, TestCaseResult, TestSuiteResult
-from .testbed_model import SutNode
+from .testbed_model import SutNode, TGNode
class TestSuite(object):
@@ -51,11 +51,13 @@ class TestSuite(object):
def __init__(
self,
sut_node: SutNode,
+ tg_node: TGNode,
test_cases: list[str],
func: bool,
build_target_result: BuildTargetResult,
):
self.sut_node = sut_node
+ self.tg_node = tg_node
self._logger = getLogger(self.__class__.__name__)
self._test_cases_to_run = test_cases
self._test_cases_to_run.extend(SETTINGS.test_cases)
@@ -20,3 +20,4 @@
)
from .node import Node
from .sut_node import SutNode
+from .tg_node import TGNode
new file mode 100644
@@ -0,0 +1,136 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2022 University of New Hampshire
+# Copyright(c) 2023 PANTHEON.tech s.r.o.
+
+"""Traffic generator that can capture packets.
+
+In functional testing, we need to interrogate received packets to check their validity.
+The module defines the interface common to all traffic generators capable of capturing
+traffic.
+"""
+
+import uuid
+from abc import abstractmethod
+
+import scapy.utils # type: ignore[import]
+from scapy.packet import Packet # type: ignore[import]
+
+from framework.settings import SETTINGS
+from framework.utils import get_packet_summaries
+
+from .hw.port import Port
+from .traffic_generator import TrafficGenerator
+
+
+def _get_default_capture_name() -> str:
+ """
+ This is the function used for the default implementation of capture names.
+ """
+ return str(uuid.uuid4())
+
+
+class CapturingTrafficGenerator(TrafficGenerator):
+ """Capture packets after sending traffic.
+
+ A mixin interface which enables a packet generator to declare that it can capture
+ packets and return them to the user.
+
+ The methods of capturing traffic generators obey the following workflow:
+ 1. send packets
+ 2. capture packets
+ 3. write the capture to a .pcap file
+ 4. return the received packets
+ """
+
+ @property
+ def is_capturing(self) -> bool:
+ return True
+
+ def send_packet_and_capture(
+ self,
+ packet: Packet,
+ send_port: Port,
+ receive_port: Port,
+ duration: float,
+ capture_name: str = _get_default_capture_name(),
+ ) -> list[Packet]:
+ """Send a packet, return received traffic.
+
+ Send a packet on the send_port and then return all traffic captured
+ on the receive_port for the given duration. Also record the captured traffic
+ in a pcap file.
+
+ Args:
+ packet: The packet to send.
+ send_port: The egress port on the TG node.
+ receive_port: The ingress port in the TG node.
+ duration: Capture traffic for this amount of time after sending the packet.
+ capture_name: The name of the .pcap file where to store the capture.
+
+ Returns:
+ A list of received packets. May be empty if no packets are captured.
+ """
+ return self.send_packets_and_capture(
+ [packet], send_port, receive_port, duration, capture_name
+ )
+
+ def send_packets_and_capture(
+ self,
+ packets: list[Packet],
+ send_port: Port,
+ receive_port: Port,
+ duration: float,
+ capture_name: str = _get_default_capture_name(),
+ ) -> list[Packet]:
+ """Send packets, return received traffic.
+
+ Send packets on the send_port and then return all traffic captured
+ on the receive_port for the given duration. Also record the captured traffic
+ in a pcap file.
+
+ Args:
+ packets: The packets to send.
+ send_port: The egress port on the TG node.
+ receive_port: The ingress port in the TG node.
+ duration: Capture traffic for this amount of time after sending the packets.
+ capture_name: The name of the .pcap file where to store the capture.
+
+ Returns:
+ A list of received packets. May be empty if no packets are captured.
+ """
+ self._logger.debug(get_packet_summaries(packets))
+ self._logger.debug(
+ f"Sending packet on {send_port.logical_name}, "
+ f"receiving on {receive_port.logical_name}."
+ )
+ received_packets = self._send_packets_and_capture(
+ packets,
+ send_port,
+ receive_port,
+ duration,
+ )
+
+ self._logger.debug(
+ f"Received packets: {get_packet_summaries(received_packets)}"
+ )
+ self._write_capture_from_packets(capture_name, received_packets)
+ return received_packets
+
+ @abstractmethod
+ def _send_packets_and_capture(
+ self,
+ packets: list[Packet],
+ send_port: Port,
+ receive_port: Port,
+ duration: float,
+ ) -> list[Packet]:
+ """
+ The extended classes must implement this method which
+ sends packets on send_port and receives packets on the receive_port
+ for the specified duration. It must be able to handle no received packets.
+ """
+
+ def _write_capture_from_packets(self, capture_name: str, packets: list[Packet]):
+ file_name = f"{SETTINGS.output_dir}/{capture_name}.pcap"
+ self._logger.debug(f"Writing packets to {file_name}.")
+ scapy.utils.wrpcap(file_name, packets)
new file mode 100644
@@ -0,0 +1,60 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2022 University of New Hampshire
+# Copyright(c) 2023 PANTHEON.tech s.r.o.
+
+from dataclasses import dataclass
+
+from framework.config import PortConfig
+
+
+@dataclass(slots=True, frozen=True)
+class PortIdentifier:
+ node: str
+ pci: str
+
+
+@dataclass(slots=True)
+class Port:
+ """
+ identifier: The PCI address of the port on a node.
+
+ os_driver: The driver used by this port when the OS is controlling it.
+ Example: i40e
+ os_driver_for_dpdk: The driver the device must be bound to for DPDK to use it,
+ Example: vfio-pci.
+
+ Note: os_driver and os_driver_for_dpdk may be the same thing.
+ Example: mlx5_core
+
+ peer: The identifier of a port this port is connected with.
+ """
+
+ identifier: PortIdentifier
+ os_driver: str
+ os_driver_for_dpdk: str
+ peer: PortIdentifier
+ mac_address: str = ""
+ logical_name: str = ""
+
+ def __init__(self, node_name: str, config: PortConfig):
+ self.identifier = PortIdentifier(
+ node=node_name,
+ pci=config.pci,
+ )
+ self.os_driver = config.os_driver
+ self.os_driver_for_dpdk = config.os_driver_for_dpdk
+ self.peer = PortIdentifier(node=config.peer_node, pci=config.peer_pci)
+
+ @property
+ def node(self) -> str:
+ return self.identifier.node
+
+ @property
+ def pci(self) -> str:
+ return self.identifier.pci
+
+
+@dataclass(slots=True, frozen=True)
+class PortLink:
+ sut_port: Port
+ tg_port: Port
@@ -7,6 +7,7 @@
A node is a generic host that DTS connects to and manages.
"""
+from abc import ABC
from typing import Any, Callable, Type
from framework.config import (
@@ -26,9 +27,10 @@
VirtualDevice,
lcore_filter,
)
+from .hw.port import Port
-class Node(object):
+class Node(ABC):
"""
Basic class for node management. This class implements methods that
manage a node, such as information gathering (of CPU/PCI/NIC) and
@@ -39,6 +41,7 @@ class Node(object):
config: NodeConfiguration
name: str
lcores: list[LogicalCore]
+ ports: list[Port]
_logger: DTSLOG
_other_sessions: list[OSSession]
_execution_config: ExecutionConfiguration
@@ -50,6 +53,8 @@ def __init__(self, node_config: NodeConfiguration):
self._logger = getLogger(self.name)
self.main_session = create_session(self.config, self.name, self._logger)
+ self._logger.info(f"Connected to node: {self.name}")
+
self._get_remote_cpus()
# filter the node lcores according to user config
self.lcores = LogicalCoreListFilter(
@@ -58,8 +63,13 @@ def __init__(self, node_config: NodeConfiguration):
self._other_sessions = []
self.virtual_devices = []
+ self._init_ports()
- self._logger.info(f"Created node: {self.name}")
+ def _init_ports(self) -> None:
+ self.ports = [Port(self.name, port_config) for port_config in self.config.ports]
+ self.main_session.update_ports(self.ports)
+ for port in self.ports:
+ self.configure_port_state(port)
def set_up_execution(self, execution_config: ExecutionConfiguration) -> None:
"""
@@ -205,6 +215,12 @@ def _setup_hugepages(self):
self.config.hugepages.amount, self.config.hugepages.force_first_numa
)
+ def configure_port_state(self, port: Port, enable: bool = True) -> None:
+ """
+ Enable/disable port.
+ """
+ self.main_session.configure_port_state(port, enable)
+
def close(self) -> None:
"""
Close all connections and free other resources.
new file mode 100644
@@ -0,0 +1,74 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2022 University of New Hampshire
+# Copyright(c) 2023 PANTHEON.tech s.r.o.
+
+"""Scapy traffic generator.
+
+Traffic generator used for functional testing, implemented using the Scapy library.
+The traffic generator uses an XML-RPC server to run Scapy on the remote TG node.
+
+The XML-RPC server runs in an interactive remote SSH session running Python console,
+where we start the server. The communication with the server is facilitated with
+a local server proxy.
+"""
+
+from scapy.packet import Packet # type: ignore[import]
+
+from framework.config import OS, ScapyTrafficGeneratorConfig
+from framework.logger import getLogger
+
+from .capturing_traffic_generator import (
+ CapturingTrafficGenerator,
+ _get_default_capture_name,
+)
+from .hw.port import Port
+from .tg_node import TGNode
+
+
+class ScapyTrafficGenerator(CapturingTrafficGenerator):
+ """Provides access to scapy functions via an RPC interface.
+
+ The traffic generator first starts an XML-RPC on the remote TG node.
+ Then it populates the server with functions which use the Scapy library
+ to send/receive traffic.
+
+ Any packets sent to the remote server are first converted to bytes.
+ They are received as xmlrpc.client.Binary objects on the server side.
+ When the server sends the packets back, they are also received as
+ xmlrpc.client.Binary object on the client side, are converted back to Scapy
+ packets and only then returned from the methods.
+
+ Arguments:
+ tg_node: The node where the traffic generator resides.
+ config: The user configuration of the traffic generator.
+ """
+
+ _config: ScapyTrafficGeneratorConfig
+ _tg_node: TGNode
+
+ def __init__(self, tg_node: TGNode, config: ScapyTrafficGeneratorConfig):
+ self._config = config
+ self._tg_node = tg_node
+ self._logger = getLogger(
+ f"{self._tg_node.name} {self._config.traffic_generator_type}"
+ )
+
+ assert (
+ self._tg_node.config.os == OS.linux
+ ), "Linux is the only supported OS for scapy traffic generation"
+
+ def _send_packets(self, packets: list[Packet], port: Port) -> None:
+ raise NotImplementedError()
+
+ def _send_packets_and_capture(
+ self,
+ packets: list[Packet],
+ send_port: Port,
+ receive_port: Port,
+ duration: float,
+ capture_name: str = _get_default_capture_name(),
+ ) -> list[Packet]:
+ raise NotImplementedError()
+
+ def close(self):
+ pass
@@ -105,6 +105,7 @@ def __init__(self, node_config: SutNodeConfiguration):
self._dpdk_version = None
self._node_info = None
self._compiler_version = None
+ self._logger.info(f"Created node: {self.name}")
@property
def _remote_dpdk_dir(self) -> PurePath:
new file mode 100644
@@ -0,0 +1,99 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2010-2014 Intel Corporation
+# Copyright(c) 2022 University of New Hampshire
+# Copyright(c) 2023 PANTHEON.tech s.r.o.
+
+"""Traffic generator node.
+
+This is the node where the traffic generator resides.
+The distinction between a node and a traffic generator is as follows:
+A node is a host that DTS connects to. It could be a baremetal server,
+a VM or a container.
+A traffic generator is software running on the node.
+A traffic generator node is a node running a traffic generator.
+A node can be a traffic generator node as well as system under test node.
+"""
+
+from scapy.packet import Packet # type: ignore[import]
+
+from framework.config import (
+ ScapyTrafficGeneratorConfig,
+ TGNodeConfiguration,
+ TrafficGeneratorType,
+)
+from framework.exception import ConfigurationError
+
+from .capturing_traffic_generator import CapturingTrafficGenerator
+from .hw.port import Port
+from .node import Node
+
+
+class TGNode(Node):
+ """Manage connections to a node with a traffic generator.
+
+ Apart from basic node management capabilities, the Traffic Generator node has
+ specialized methods for handling the traffic generator running on it.
+
+ Arguments:
+ node_config: The user configuration of the traffic generator node.
+
+ Attributes:
+ traffic_generator: The traffic generator running on the node.
+ """
+
+ traffic_generator: CapturingTrafficGenerator
+
+ def __init__(self, node_config: TGNodeConfiguration):
+ super(TGNode, self).__init__(node_config)
+ self.traffic_generator = create_traffic_generator(
+ self, node_config.traffic_generator
+ )
+ self._logger.info(f"Created node: {self.name}")
+
+ def send_packet_and_capture(
+ self,
+ packet: Packet,
+ send_port: Port,
+ receive_port: Port,
+ duration: float = 1,
+ ) -> list[Packet]:
+ """Send a packet, return received traffic.
+
+ Send a packet on the send_port and then return all traffic captured
+ on the receive_port for the given duration. Also record the captured traffic
+ in a pcap file.
+
+ Args:
+ packet: The packet to send.
+ send_port: The egress port on the TG node.
+ receive_port: The ingress port in the TG node.
+ duration: Capture traffic for this amount of time after sending the packet.
+
+ Returns:
+ A list of received packets. May be empty if no packets are captured.
+ """
+ return self.traffic_generator.send_packet_and_capture(
+ packet, send_port, receive_port, duration
+ )
+
+ def close(self) -> None:
+ """Free all resources used by the node"""
+ self.traffic_generator.close()
+ super(TGNode, self).close()
+
+
+def create_traffic_generator(
+ tg_node: TGNode, traffic_generator_config: ScapyTrafficGeneratorConfig
+) -> CapturingTrafficGenerator:
+ """A factory function for creating traffic generator object from user config."""
+
+ from .scapy import ScapyTrafficGenerator
+
+ match traffic_generator_config.traffic_generator_type:
+ case TrafficGeneratorType.SCAPY:
+ return ScapyTrafficGenerator(tg_node, traffic_generator_config)
+ case _:
+ raise ConfigurationError(
+ "Unknown traffic generator: "
+ f"{traffic_generator_config.traffic_generator_type}"
+ )
new file mode 100644
@@ -0,0 +1,72 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2022 University of New Hampshire
+# Copyright(c) 2023 PANTHEON.tech s.r.o.
+
+"""The base traffic generator.
+
+These traffic generators can't capture received traffic,
+only count the number of received packets.
+"""
+
+from abc import ABC, abstractmethod
+
+from scapy.packet import Packet # type: ignore[import]
+
+from framework.logger import DTSLOG
+from framework.utils import get_packet_summaries
+
+from .hw.port import Port
+
+
+class TrafficGenerator(ABC):
+ """The base traffic generator.
+
+ Defines the few basic methods that each traffic generator must implement.
+ """
+
+ _logger: DTSLOG
+
+ def send_packet(self, packet: Packet, port: Port) -> None:
+ """Send a packet and block until it is fully sent.
+
+ What fully sent means is defined by the traffic generator.
+
+ Args:
+ packet: The packet to send.
+ port: The egress port on the TG node.
+ """
+ self.send_packets([packet], port)
+
+ def send_packets(self, packets: list[Packet], port: Port) -> None:
+ """Send packets and block until they are fully sent.
+
+ What fully sent means is defined by the traffic generator.
+
+ Args:
+ packets: The packets to send.
+ port: The egress port on the TG node.
+ """
+ self._logger.info(f"Sending packet{'s' if len(packets) > 1 else ''}.")
+ self._logger.debug(get_packet_summaries(packets))
+ self._send_packets(packets, port)
+
+ @abstractmethod
+ def _send_packets(self, packets: list[Packet], port: Port) -> None:
+ """
+ The extended classes must implement this method which
+ sends packets on send_port. The method should block until all packets
+ are fully sent.
+ """
+
+ @property
+ def is_capturing(self) -> bool:
+ """Whether this traffic generator can capture traffic.
+
+ Returns:
+ True if the traffic generator can capture traffic, False otherwise.
+ """
+ return False
+
+ @abstractmethod
+ def close(self) -> None:
+ """Free all resources used by the traffic generator."""
@@ -4,6 +4,7 @@
# Copyright(c) 2022-2023 University of New Hampshire
import atexit
+import json
import os
import subprocess
import sys
@@ -11,6 +12,8 @@
from pathlib import Path
from subprocess import SubprocessError
+from scapy.packet import Packet # type: ignore[import]
+
from .exception import ConfigurationError
@@ -64,6 +67,16 @@ def expand_range(range_str: str) -> list[int]:
return expanded_range
+def get_packet_summaries(packets: list[Packet]):
+ if len(packets) == 1:
+ packet_summaries = packets[0].summary()
+ else:
+ packet_summaries = json.dumps(
+ list(map(lambda pkt: pkt.summary(), packets)), indent=4
+ )
+ return f"Packet contents: \n{packet_summaries}"
+
+
def RED(text: str) -> str:
return f"\u001B[31;1m{str(text)}\u001B[0m"