[v2,6/6] dts: add basic UDP test case

Message ID 20230717110709.39220-7-juraj.linkes@pantheon.tech (mailing list archive)
State Superseded, archived
Delegated to: Thomas Monjalon
Headers
Series dts: tg abstractions and scapy tg |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation warning apply issues

Commit Message

Juraj Linkeš July 17, 2023, 11:07 a.m. UTC
  The test cases showcases the scapy traffic generator code.

Signed-off-by: Juraj Linkeš <juraj.linkes@pantheon.tech>
---
 dts/conf.yaml                                 |   1 +
 dts/framework/config/conf_yaml_schema.json    |   3 +-
 dts/framework/remote_session/linux_session.py |  20 +-
 dts/framework/remote_session/os_session.py    |  20 +-
 dts/framework/test_suite.py                   | 217 +++++++++++++++++-
 dts/framework/testbed_model/node.py           |  14 +-
 dts/framework/testbed_model/sut_node.py       |   3 +
 dts/tests/TestSuite_os_udp.py                 |  45 ++++
 8 files changed, 315 insertions(+), 8 deletions(-)
 create mode 100644 dts/tests/TestSuite_os_udp.py
  

Patch

diff --git a/dts/conf.yaml b/dts/conf.yaml
index 7f089022ba..ba228c5ab2 100644
--- a/dts/conf.yaml
+++ b/dts/conf.yaml
@@ -13,6 +13,7 @@  executions:
     skip_smoke_tests: false # optional flag that allow you to skip smoke tests
     test_suites:
       - hello_world
+      - os_udp
     system_under_test_node:
       node_name: "SUT 1"
       vdevs: # optional; if removed, vdevs won't be used in the execution
diff --git a/dts/framework/config/conf_yaml_schema.json b/dts/framework/config/conf_yaml_schema.json
index 76df84840a..a2f14f0e52 100644
--- a/dts/framework/config/conf_yaml_schema.json
+++ b/dts/framework/config/conf_yaml_schema.json
@@ -185,7 +185,8 @@ 
     "test_suite": {
       "type": "string",
       "enum": [
-        "hello_world"
+        "hello_world",
+        "os_udp"
       ]
     },
     "test_target": {
diff --git a/dts/framework/remote_session/linux_session.py b/dts/framework/remote_session/linux_session.py
index 284c74795d..94023920a7 100644
--- a/dts/framework/remote_session/linux_session.py
+++ b/dts/framework/remote_session/linux_session.py
@@ -3,7 +3,8 @@ 
 # Copyright(c) 2023 University of New Hampshire
 
 import json
-from typing import TypedDict
+from ipaddress import IPv4Interface, IPv6Interface
+from typing import TypedDict, Union
 
 from typing_extensions import NotRequired
 
@@ -180,3 +181,20 @@  def configure_port_state(self, port: Port, enable: bool) -> None:
         self.send_command(
             f"ip link set dev {port.logical_name} {state}", privileged=True
         )
+
+    def configure_port_ip_address(
+        self,
+        address: Union[IPv4Interface, IPv6Interface],
+        port: Port,
+        delete: bool,
+    ) -> None:
+        command = "del" if delete else "add"
+        self.send_command(
+            f"ip address {command} {address} dev {port.logical_name}",
+            privileged=True,
+            verify=True,
+        )
+
+    def configure_ipv4_forwarding(self, enable: bool) -> None:
+        state = 1 if enable else 0
+        self.send_command(f"sysctl -w net.ipv4.ip_forward={state}", privileged=True)
diff --git a/dts/framework/remote_session/os_session.py b/dts/framework/remote_session/os_session.py
index c17a17a267..ad06c1dcad 100644
--- a/dts/framework/remote_session/os_session.py
+++ b/dts/framework/remote_session/os_session.py
@@ -4,8 +4,9 @@ 
 
 from abc import ABC, abstractmethod
 from collections.abc import Iterable
+from ipaddress import IPv4Interface, IPv6Interface
 from pathlib import PurePath
-from typing import Type, TypeVar
+from typing import Type, TypeVar, Union
 
 from framework.config import Architecture, NodeConfiguration, NodeInfo
 from framework.logger import DTSLOG
@@ -268,3 +269,20 @@  def configure_port_state(self, port: Port, enable: bool) -> None:
         """
         Enable/disable port.
         """
+
+    @abstractmethod
+    def configure_port_ip_address(
+        self,
+        address: Union[IPv4Interface, IPv6Interface],
+        port: Port,
+        delete: bool,
+    ) -> None:
+        """
+        Configure (add or delete) an IP address of the input port.
+        """
+
+    @abstractmethod
+    def configure_ipv4_forwarding(self, enable: bool) -> None:
+        """
+        Enable IPv4 forwarding in the underlying OS.
+        """
diff --git a/dts/framework/test_suite.py b/dts/framework/test_suite.py
index 056460dd05..3b890c0451 100644
--- a/dts/framework/test_suite.py
+++ b/dts/framework/test_suite.py
@@ -9,7 +9,13 @@ 
 import importlib
 import inspect
 import re
+from ipaddress import IPv4Interface, IPv6Interface, ip_interface
 from types import MethodType
+from typing import Union
+
+from scapy.layers.inet import IP  # type: ignore[import]
+from scapy.layers.l2 import Ether  # type: ignore[import]
+from scapy.packet import Packet, Padding  # type: ignore[import]
 
 from .exception import (
     BlockingTestSuiteError,
@@ -21,6 +27,8 @@ 
 from .settings import SETTINGS
 from .test_result import BuildTargetResult, Result, TestCaseResult, TestSuiteResult
 from .testbed_model import SutNode, TGNode
+from .testbed_model.hw.port import Port, PortLink
+from .utils import get_packet_summaries
 
 
 class TestSuite(object):
@@ -47,6 +55,15 @@  class TestSuite(object):
     _test_cases_to_run: list[str]
     _func: bool
     _result: TestSuiteResult
+    _port_links: list[PortLink]
+    _sut_port_ingress: Port
+    _sut_port_egress: Port
+    _sut_ip_address_ingress: Union[IPv4Interface, IPv6Interface]
+    _sut_ip_address_egress: Union[IPv4Interface, IPv6Interface]
+    _tg_port_ingress: Port
+    _tg_port_egress: Port
+    _tg_ip_address_ingress: Union[IPv4Interface, IPv6Interface]
+    _tg_ip_address_egress: Union[IPv4Interface, IPv6Interface]
 
     def __init__(
         self,
@@ -63,6 +80,31 @@  def __init__(
         self._test_cases_to_run.extend(SETTINGS.test_cases)
         self._func = func
         self._result = build_target_result.add_test_suite(self.__class__.__name__)
+        self._port_links = []
+        self._process_links()
+        self._sut_port_ingress, self._tg_port_egress = (
+            self._port_links[0].sut_port,
+            self._port_links[0].tg_port,
+        )
+        self._sut_port_egress, self._tg_port_ingress = (
+            self._port_links[1].sut_port,
+            self._port_links[1].tg_port,
+        )
+        self._sut_ip_address_ingress = ip_interface("192.168.100.2/24")
+        self._sut_ip_address_egress = ip_interface("192.168.101.2/24")
+        self._tg_ip_address_egress = ip_interface("192.168.100.3/24")
+        self._tg_ip_address_ingress = ip_interface("192.168.101.3/24")
+
+    def _process_links(self) -> None:
+        for sut_port in self.sut_node.ports:
+            for tg_port in self.tg_node.ports:
+                if (sut_port.identifier, sut_port.peer) == (
+                    tg_port.peer,
+                    tg_port.identifier,
+                ):
+                    self._port_links.append(
+                        PortLink(sut_port=sut_port, tg_port=tg_port)
+                    )
 
     def set_up_suite(self) -> None:
         """
@@ -85,14 +127,181 @@  def tear_down_test_case(self) -> None:
         Tear down the previously created test fixtures after each test case.
         """
 
+    def configure_testbed_ipv4(self, restore: bool = False) -> None:
+        delete = True if restore else False
+        enable = False if restore else True
+        self._configure_ipv4_forwarding(enable)
+        self.sut_node.configure_port_ip_address(
+            self._sut_ip_address_egress, self._sut_port_egress, delete
+        )
+        self.sut_node.configure_port_state(self._sut_port_egress, enable)
+        self.sut_node.configure_port_ip_address(
+            self._sut_ip_address_ingress, self._sut_port_ingress, delete
+        )
+        self.sut_node.configure_port_state(self._sut_port_ingress, enable)
+        self.tg_node.configure_port_ip_address(
+            self._tg_ip_address_ingress, self._tg_port_ingress, delete
+        )
+        self.tg_node.configure_port_state(self._tg_port_ingress, enable)
+        self.tg_node.configure_port_ip_address(
+            self._tg_ip_address_egress, self._tg_port_egress, delete
+        )
+        self.tg_node.configure_port_state(self._tg_port_egress, enable)
+
+    def _configure_ipv4_forwarding(self, enable: bool) -> None:
+        self.sut_node.configure_ipv4_forwarding(enable)
+
+    def send_packet_and_capture(
+        self, packet: Packet, duration: float = 1
+    ) -> list[Packet]:
+        """
+        Send a packet through the appropriate interface and
+        receive on the appropriate interface.
+        Modify the packet with l3/l2 addresses corresponding
+        to the testbed and desired traffic.
+        """
+        packet = self._adjust_addresses(packet)
+        return self.tg_node.send_packet_and_capture(
+            packet, self._tg_port_egress, self._tg_port_ingress, duration
+        )
+
+    def get_expected_packet(self, packet: Packet) -> Packet:
+        return self._adjust_addresses(packet, expected=True)
+
+    def _adjust_addresses(self, packet: Packet, expected: bool = False) -> Packet:
+        """
+        Assumptions:
+            Two links between SUT and TG, one link is TG -> SUT,
+            the other SUT -> TG.
+        """
+        if expected:
+            # The packet enters the TG from SUT
+            # update l2 addresses
+            packet.src = self._sut_port_egress.mac_address
+            packet.dst = self._tg_port_ingress.mac_address
+
+            # The packet is routed from TG egress to TG ingress
+            # update l3 addresses
+            packet.payload.src = self._tg_ip_address_egress.ip.exploded
+            packet.payload.dst = self._tg_ip_address_ingress.ip.exploded
+        else:
+            # The packet leaves TG towards SUT
+            # update l2 addresses
+            packet.src = self._tg_port_egress.mac_address
+            packet.dst = self._sut_port_ingress.mac_address
+
+            # The packet is routed from TG egress to TG ingress
+            # update l3 addresses
+            packet.payload.src = self._tg_ip_address_egress.ip.exploded
+            packet.payload.dst = self._tg_ip_address_ingress.ip.exploded
+
+        return Ether(packet.build())
+
     def verify(self, condition: bool, failure_description: str) -> None:
         if not condition:
+            self._fail_test_case_verify(failure_description)
+
+    def _fail_test_case_verify(self, failure_description: str) -> None:
+        self._logger.debug(
+            "A test case failed, showing the last 10 commands executed on SUT:"
+        )
+        for command_res in self.sut_node.main_session.remote_session.history[-10:]:
+            self._logger.debug(command_res.command)
+        self._logger.debug(
+            "A test case failed, showing the last 10 commands executed on TG:"
+        )
+        for command_res in self.tg_node.main_session.remote_session.history[-10:]:
+            self._logger.debug(command_res.command)
+        raise TestCaseVerifyError(failure_description)
+
+    def verify_packets(
+        self, expected_packet: Packet, received_packets: list[Packet]
+    ) -> None:
+        for received_packet in received_packets:
+            if self._compare_packets(expected_packet, received_packet):
+                break
+        else:
+            self._logger.debug(
+                f"The expected packet {get_packet_summaries(expected_packet)} "
+                f"not found among received {get_packet_summaries(received_packets)}"
+            )
+            self._fail_test_case_verify(
+                "An expected packet not found among received packets."
+            )
+
+    def _compare_packets(
+        self, expected_packet: Packet, received_packet: Packet
+    ) -> bool:
+        self._logger.debug(
+            "Comparing packets: \n"
+            f"{expected_packet.summary()}\n"
+            f"{received_packet.summary()}"
+        )
+
+        l3 = IP in expected_packet.layers()
+        self._logger.debug("Found l3 layer")
+
+        received_payload = received_packet
+        expected_payload = expected_packet
+        while received_payload and expected_payload:
+            self._logger.debug("Comparing payloads:")
+            self._logger.debug(f"Received: {received_payload}")
+            self._logger.debug(f"Expected: {expected_payload}")
+            if received_payload.__class__ == expected_payload.__class__:
+                self._logger.debug("The layers are the same.")
+                if received_payload.__class__ == Ether:
+                    if not self._verify_l2_frame(received_payload, l3):
+                        return False
+                elif received_payload.__class__ == IP:
+                    if not self._verify_l3_packet(received_payload, expected_payload):
+                        return False
+            else:
+                # Different layers => different packets
+                return False
+            received_payload = received_payload.payload
+            expected_payload = expected_payload.payload
+
+        if expected_payload:
             self._logger.debug(
-                "A test case failed, showing the last 10 commands executed on SUT:"
+                f"The expected packet did not contain {expected_payload}."
             )
-            for command_res in self.sut_node.main_session.remote_session.history[-10:]:
-                self._logger.debug(command_res.command)
-            raise TestCaseVerifyError(failure_description)
+            return False
+        if received_payload and received_payload.__class__ != Padding:
+            self._logger.debug(
+                "The received payload had extra layers which were not padding."
+            )
+            return False
+        return True
+
+    def _verify_l2_frame(self, received_packet: Ether, l3: bool) -> bool:
+        self._logger.debug("Looking at the Ether layer.")
+        self._logger.debug(
+            f"Comparing received dst mac '{received_packet.dst}' "
+            f"with expected '{self._tg_port_ingress.mac_address}'."
+        )
+        if received_packet.dst != self._tg_port_ingress.mac_address:
+            return False
+
+        expected_src_mac = self._tg_port_egress.mac_address
+        if l3:
+            expected_src_mac = self._sut_port_egress.mac_address
+        self._logger.debug(
+            f"Comparing received src mac '{received_packet.src}' "
+            f"with expected '{expected_src_mac}'."
+        )
+        if received_packet.src != expected_src_mac:
+            return False
+
+        return True
+
+    def _verify_l3_packet(self, received_packet: IP, expected_packet: IP) -> bool:
+        self._logger.debug("Looking at the IP layer.")
+        if (
+            received_packet.src != expected_packet.src
+            or received_packet.dst != expected_packet.dst
+        ):
+            return False
+        return True
 
     def run(self) -> None:
         """
diff --git a/dts/framework/testbed_model/node.py b/dts/framework/testbed_model/node.py
index f70e4d5ce6..b45fea6bbf 100644
--- a/dts/framework/testbed_model/node.py
+++ b/dts/framework/testbed_model/node.py
@@ -7,7 +7,8 @@ 
 A node is a generic host that DTS connects to and manages.
 """
 
-from typing import Any, Callable, Type
+from ipaddress import IPv4Interface, IPv6Interface
+from typing import Any, Callable, Type, Union
 
 from framework.config import (
     BuildTargetConfiguration,
@@ -214,6 +215,17 @@  def configure_port_state(self, port: Port, enable: bool = True) -> None:
         """
         self.main_session.configure_port_state(port, enable)
 
+    def configure_port_ip_address(
+        self,
+        address: Union[IPv4Interface, IPv6Interface],
+        port: Port,
+        delete: bool = False,
+    ) -> None:
+        """
+        Configure the IP address of a port on this node.
+        """
+        self.main_session.configure_port_ip_address(address, port, delete)
+
     def close(self) -> None:
         """
         Close all connections and free other resources.
diff --git a/dts/framework/testbed_model/sut_node.py b/dts/framework/testbed_model/sut_node.py
index f0b017a383..202aebfd06 100644
--- a/dts/framework/testbed_model/sut_node.py
+++ b/dts/framework/testbed_model/sut_node.py
@@ -351,6 +351,9 @@  def run_dpdk_app(
             f"{app_path} {eal_args}", timeout, privileged=True, verify=True
         )
 
+    def configure_ipv4_forwarding(self, enable: bool) -> None:
+        self.main_session.configure_ipv4_forwarding(enable)
+
     def create_interactive_shell(
         self,
         shell_cls: Type[InteractiveShellType],
diff --git a/dts/tests/TestSuite_os_udp.py b/dts/tests/TestSuite_os_udp.py
new file mode 100644
index 0000000000..9b5f39711d
--- /dev/null
+++ b/dts/tests/TestSuite_os_udp.py
@@ -0,0 +1,45 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2023 PANTHEON.tech s.r.o.
+
+"""
+Configure SUT node to route traffic from if1 to if2.
+Send a packet to the SUT node, verify it comes back on the second port on the TG node.
+"""
+
+from scapy.layers.inet import IP, UDP  # type: ignore[import]
+from scapy.layers.l2 import Ether  # type: ignore[import]
+
+from framework.test_suite import TestSuite
+
+
+class TestOSUdp(TestSuite):
+    def set_up_suite(self) -> None:
+        """
+        Setup:
+            Configure SUT ports and SUT to route traffic from if1 to if2.
+        """
+
+        self.configure_testbed_ipv4()
+
+    def test_os_udp(self) -> None:
+        """
+        Steps:
+            Send a UDP packet.
+        Verify:
+            The packet with proper addresses arrives at the other TG port.
+        """
+
+        packet = Ether() / IP() / UDP()
+
+        received_packets = self.send_packet_and_capture(packet)
+
+        expected_packet = self.get_expected_packet(packet)
+
+        self.verify_packets(expected_packet, received_packets)
+
+    def tear_down_suite(self) -> None:
+        """
+        Teardown:
+            Remove the SUT port configuration configured in setup.
+        """
+        self.configure_testbed_ipv4(restore=True)