[V2,2/5] tests/kernelpf_iavf: split the test suite of ice_qinq

Message ID 20230111103858.970228-3-songx.jiale@intel.com (mailing list archive)
State Accepted
Headers
Series split the ice_qinq suite |

Commit Message

Jiale, SongX Jan. 11, 2023, 10:38 a.m. UTC
  split iavf cases in ice_qinq to kernelpf_iavf.

Signed-off-by: Song Jiale <songx.jiale@intel.com>
---
 tests/TestSuite_kernelpf_iavf.py | 361 ++++++++++++++++++++++++++++++-
 1 file changed, 360 insertions(+), 1 deletion(-)
  

Patch

diff --git a/tests/TestSuite_kernelpf_iavf.py b/tests/TestSuite_kernelpf_iavf.py
index d8a3f55c..9089b511 100644
--- a/tests/TestSuite_kernelpf_iavf.py
+++ b/tests/TestSuite_kernelpf_iavf.py
@@ -17,7 +17,7 @@  import framework.utils as utils
 from framework.packet import Packet
 from framework.pmd_output import PmdOutput
 from framework.settings import DPDK_DCFMODE_SETTING, load_global_setting
-from framework.test_case import TestCase
+from framework.test_case import TestCase, check_supported_nic
 from framework.utils import RED
 from framework.virt_common import VM
 
@@ -30,6 +30,12 @@  ETHER_JUMBO_FRAME_MTU = 9000
 class TestKernelpfIavf(TestCase):
 
     supported_vf_driver = ["pci-stub", "vfio-pci"]
+    ice_nic = [
+        "ICE_25G-E810C_SFP",
+        "ICE_100G-E810C_QSFP",
+        "ICE_25G-E810_XXV_SFP",
+        "ICE_25G-E823C_QSFP",
+    ]
 
     def set_up_all(self):
         self.dut_ports = self.dut.get_ports(self.nic)
@@ -835,6 +841,359 @@  class TestKernelpfIavf(TestCase):
         packets = len(re.findall("received 1 packets", out))
         self.verify(packets == 10, "Not receive expected packet")
 
+    @check_supported_nic(ice_nic)
+    def test_iavf_dual_vlan_filter(self):
+        """
+        Test case: IAVF DUAL VLAN filtering
+        """
+        self.skip_case(not self.dcf_mode, "the case not support dcf mode")
+        pkt_list1 = [
+            'Ether(dst="%s",type=0x8100)/Dot1Q(vlan=1,type=0x8100)/Dot1Q(vlan=2,type=0x0800)/IP(src="196.222.232.221")/("X"*480)'
+            % self.vf_mac,
+            'Ether(dst="%s",type=0x8100)/Dot1Q(vlan=1,type=0x0800)/IP(src="196.222.232.221")/("X"*480)'
+            % self.vf_mac,
+        ]
+        pkt_list2 = [
+            'Ether(dst="%s",type=0x8100)/Dot1Q(vlan=11,type=0x8100)/Dot1Q(vlan=2,type=0x0800)/IP(src="196.222.232.221")/("X"*480)'
+            % self.vf_mac,
+            'Ether(dst="%s",type=0x8100)/Dot1Q(vlan=11,type=0x0800)/IP(src="196.222.232.221")/("X"*480)'
+            % self.vf_mac,
+        ]
+        self.launch_testpmd(dcf_flag=self.dcf_mode)
+        if self.dcf_mode:
+            self.vm_testpmd.execute_cmd("set promisc all off")
+        self.vm_testpmd.execute_cmd("set fwd mac")
+        self.vm_testpmd.execute_cmd("set verbose 1")
+        self.vm_testpmd.execute_cmd("start")
+        self.vm_testpmd.execute_cmd("vlan set filter on 0")
+        self.check_vlan_offload(vlan_type="filter", stats="on")
+        out = self.send_pkts_getouput(self.tester_intf, pkt_list1)
+        receive_pkt = re.findall("dst=%s" % self.vf_mac, out)
+        self.verify(len(receive_pkt) == 0, "Failed error received vlan packet!")
+
+        self.vm_testpmd.execute_cmd("rx_vlan add 1 0")
+        self.start_tcpdump(self.tester_intf)
+        out = self.send_pkts_getouput(self.tester_intf, pkt_list1)
+        tcpdump_out = self.get_tcpdump_package()
+        receive_pkt = re.findall("dst=%s" % self.vf_mac, out)
+        self.verify(len(receive_pkt) == 2, "Failed error received vlan packet!")
+        tester_pkt = re.findall("vlan \d+", tcpdump_out)
+        self.verify(len(tester_pkt) == 6, "Failed pass received vlan packet!")
+
+        out = self.send_pkts_getouput(self.tester_intf, pkt_list2)
+        receive_pkt = re.findall("dst=%s" % self.vf_mac, out)
+        self.verify(len(receive_pkt) == 0, "Failed error received vlan packet!")
+
+        self.vm_testpmd.execute_cmd("rx_vlan rm 1 0")
+        out = self.send_pkts_getouput(self.tester_intf, pkt_list1)
+        receive_pkt = re.findall("dst=%s" % self.vf_mac, out)
+        self.verify(len(receive_pkt) == 0, "Failed error received vlan packet!")
+
+    @check_supported_nic(ice_nic)
+    def test_iavf_dual_vlan_strip(self):
+        """
+        Test case: IAVF DUAL VLAN header stripping
+        """
+        self.skip_case(not self.dcf_mode, "the case not support dcf mode")
+        pkt_list = [
+            'Ether(dst="%s",type=0x8100)/Dot1Q(vlan=1,type=0x8100)/Dot1Q(vlan=2,type=0x0800)/IP(src="196.222.232.221")/("X"*480)'
+            % self.vf_mac,
+            'Ether(dst="%s",type=0x8100)/Dot1Q(vlan=1,type=0x0800)/IP(src="196.222.232.221")/("X"*480)'
+            % self.vf_mac,
+        ]
+        self.launch_testpmd(dcf_flag=self.dcf_mode)
+        self.vm_testpmd.execute_cmd("set fwd mac")
+        self.vm_testpmd.execute_cmd("set verbose 1")
+        self.vm_testpmd.execute_cmd("start")
+        self.vm_testpmd.execute_cmd("vlan set filter on 0")
+        self.vm_testpmd.execute_cmd("rx_vlan add 1 0")
+        self.check_vlan_offload(vlan_type="filter", stats="on")
+        self.vm_testpmd.execute_cmd("vlan set strip on 0")
+        self.check_vlan_offload(vlan_type="strip", stats="on")
+
+        self.start_tcpdump(self.tester_intf)
+        out = self.send_pkts_getouput(self.tester_intf, pkt_list)
+        tcpdump_out = self.get_tcpdump_package()
+        receive_pkt = re.findall("dst=%s" % self.vf_mac, out)
+        self.verify(len(receive_pkt) == 2, "Failed error received vlan packet!")
+        tester_pkt = re.findall("vlan \d+", tcpdump_out)
+        self.verify(len(tester_pkt) == 4, "Failed pass received vlan packet!")
+
+        self.vm_testpmd.execute_cmd("vlan set strip off 0")
+        self.check_vlan_offload(vlan_type="strip", stats="off")
+        self.start_tcpdump(self.tester_intf)
+        out = self.send_pkts_getouput(self.tester_intf, pkt_list)
+        tcpdump_out = self.get_tcpdump_package()
+        receive_pkt = re.findall("dst=%s" % self.vf_mac, out)
+        self.verify(len(receive_pkt) == 2, "Failed error received vlan packet!")
+        tester_pkt = re.findall("vlan \d+", tcpdump_out)
+        self.verify(len(tester_pkt) == 6, "Failed pass received vlan packet!")
+
+    def send_packet_check_vlan_inter(
+        self, pkts, out_vlan, port_id=3, vlan_header=None, iner_vlan=None
+    ):
+        for pkt in pkts:
+            pkt_index = pkts.index(pkt)
+            self.start_tcpdump(self.tester_intf)
+            out = self.send_pkts_getouput(self.tester_intf, pkt)
+            self.check_packets(out, port_id)
+            p = "vlan (\d+)"
+            tcpdump_out = self.get_tcpdump_package()
+            vlan_list = re.findall(p, tcpdump_out)
+            if vlan_header:
+                header = re.findall(vlan_header, tcpdump_out)
+            if pkt_index == 0:
+                if out_vlan == 1:
+                    self.verify(
+                        len(vlan_list) == 1,
+                        "received packet outer vlan not is %s" % out_vlan,
+                    )
+                elif out_vlan == 0:
+                    self.verify(
+                        len(vlan_list) == 0,
+                        "received packet outer vlan not is %s" % out_vlan,
+                    )
+                else:
+                    self.verify(
+                        int(vlan_list[0]) == out_vlan,
+                        "received packet outer vlan not is %s" % out_vlan,
+                    )
+                if iner_vlan:
+                    self.verify(
+                        int(vlan_list[1]) == iner_vlan,
+                        "received packet outer vlan not is %s" % iner_vlan,
+                    )
+            else:
+                if out_vlan == 1:
+                    self.verify(
+                        len(vlan_list) == 3 and int(vlan_list[1]) == out_vlan,
+                        "received packet outer vlan not is %s" % out_vlan,
+                    )
+                elif out_vlan == 0:
+                    self.verify(
+                        len(vlan_list) == 2,
+                        "received packet outer vlan not is %s" % out_vlan,
+                    )
+                else:
+                    self.verify(
+                        int(vlan_list[1]) == out_vlan,
+                        "received packet outer vlan not is %s" % out_vlan,
+                    )
+                if iner_vlan:
+                    self.verify(
+                        int(vlan_list[2]) == iner_vlan,
+                        "received packet outer vlan not is %s" % iner_vlan,
+                    )
+            if vlan_header == "0x8100":
+                self.verify(
+                    vlan_header in tcpdump_out,
+                    "vlan header not matched, expect: %s." % vlan_header,
+                )
+            elif vlan_header is None:
+                pass
+            else:
+                self.verify(
+                    len(header) == 1,
+                    "vlan header not matched, expect: %s." % vlan_header,
+                )
+
+    def check_packets(self, out, port_id, pkt_num=1, check_stats=True):
+        p = "port (\d+)/queue.*"
+        result_list = re.findall(p, out)
+        self.verify(
+            len(result_list) == pkt_num,
+            "received %s packets, expected to %s packets" % (result_list, pkt_num),
+        )
+        for res in result_list:
+            if check_stats:
+                self.verify(
+                    int(res) == port_id,
+                    "port {} did not received the packets".format(port_id),
+                )
+            else:
+                self.verify(
+                    int(res) != port_id,
+                    "port {} should not received a packets".format(port_id),
+                )
+
+    @check_supported_nic(ice_nic)
+    def test_iavf_dual_vlan_insert(self):
+        """
+        Test case: IAVF DUAL VLAN header insertion
+        """
+        out_vlan = 1
+        pkt_list = [
+            'Ether(dst="%s",type=0x0800)/IP(src="196.222.232.221")/("X"*480)'
+            % self.vf_mac,
+            'Ether(dst="%s",type=0x8100)/Dot1Q(vlan=11,type=0x0800)/IP(src="196.222.232.221")/("X"*480)'
+            % self.vf_mac,
+        ]
+        self.launch_testpmd(dcf_flag=self.dcf_mode)
+        self.vm_testpmd.execute_cmd("set fwd mac")
+        self.vm_testpmd.execute_cmd("set verbose 1")
+        self.vm_testpmd.execute_cmd("vlan set filter on 0")
+        self.vm_testpmd.execute_cmd("rx_vlan add 11 0")
+        self.vm_testpmd.execute_cmd("port stop 0")
+        self.vm_testpmd.execute_cmd("tx_vlan set 0 1")
+        self.vm_testpmd.execute_cmd("port start 0")
+        self.vm_testpmd.execute_cmd("start")
+        self.send_packet_check_vlan_inter(pkt_list, out_vlan, port_id=0)
+
+        self.vm_testpmd.execute_cmd("stop")
+        self.vm_testpmd.execute_cmd("port stop 0")
+        self.vm_testpmd.execute_cmd("tx_vlan reset 0")
+        self.vm_testpmd.execute_cmd("port start 0")
+        self.vm_testpmd.execute_cmd("start")
+        self.send_packet_check_vlan_inter(pkt_list, out_vlan=0, port_id=0)
+
+    def check_vlan_offload(self, vlan_type, stats):
+        p = "VLAN offload.*\n.*?%s (\w+)" % vlan_type
+        out = self.vm_testpmd.execute_cmd("show port info 0")
+        vlan_stats = re.search(p, out).group(1)
+        self.verify(vlan_stats == stats, "VLAN stats mismatch")
+
+    def send_pkts_getouput(self, tport_inface, pkts, count=1):
+        self.pkt = Packet()
+        self.pkt.update_pkt(pkts)
+        self.pkt.send_pkt(crb=self.tester, tx_port=tport_inface, count=count)
+        time.sleep(1)
+        out = self.vm_testpmd.get_output()
+        port_stats = self.vm_testpmd.execute_cmd("show port stats all")
+        self.vm_testpmd.execute_cmd("clear port stats all")
+        self.vm_testpmd.execute_cmd("stop")
+        self.vm_testpmd.execute_cmd("start")
+        return out + port_stats
+
+    def send_pkt_check_vlan_and_crc(
+        self, tport_inface, pkt, pkt_len=None, vlan_strip=False, crc_strip=False
+    ):
+        if pkt_len:
+            self.start_tcpdump(tport_inface)
+        out = self.send_pkts_getouput(tport_inface, pkt)
+        pkt_length = re.search("length=(\d+)", out).group(1)
+        rx_bytes = re.search("RX-bytes:\s+(\d+)", out).group(1)
+        if crc_strip:
+            self.verify(rx_bytes == pkt_length, "CRC strip on failed")
+        else:
+            self.verify(int(rx_bytes) == int(pkt_length) + 4, "CRC strip off failed")
+        if pkt_len:
+            tcpdump_out = self.get_tcpdump_package()
+            vlan_list = re.findall("vlan\s+\d+", tcpdump_out)
+            if not vlan_strip:
+                self.verify(pkt_length == pkt_len, "vlan strip off failed")
+                self.verify(len(vlan_list) == 4, "Failed pass received vlan packet")
+            elif vlan_strip:
+                self.verify(
+                    int(pkt_length) + 4 == int(pkt_len), "vlan strip off failed"
+                )
+                self.verify(
+                    len(vlan_list) == 3 and vlan_list[0] != vlan_list[-1],
+                    "Failed error received vlan packet",
+                )
+
+    @check_supported_nic(ice_nic)
+    def test_enable_disable_iavf_CRC_strip(self):
+        """
+        Test case: Enable/disable AVF CRC stripping
+        """
+        self.skip_case(not self.dcf_mode, "the case not support this dcf mode")
+        param = "--disable-crc-strip"
+        pkt = (
+            'Ether(dst="%s",type=0x0800)/IP(src="196.222.232.221")/("X"*480)'
+            % self.vf_mac
+        )
+
+        self.launch_testpmd(param=param, dcf_flag=self.dcf_mode)
+        self.vm_testpmd.execute_cmd("set fwd mac")
+        self.vm_testpmd.execute_cmd("set verbose 1")
+        self.vm_testpmd.execute_cmd("start")
+        self.send_pkt_check_vlan_and_crc(self.tester_intf, pkt)
+
+        self.vm_testpmd.execute_cmd("stop")
+        self.vm_testpmd.execute_cmd("port stop 0")
+        self.vm_testpmd.execute_cmd("port config 0 rx_offload keep_crc off")
+        self.vm_testpmd.execute_cmd("port start 0")
+        self.vm_testpmd.execute_cmd("start")
+        self.send_pkt_check_vlan_and_crc(self.tester_intf, pkt, crc_strip=True)
+
+        self.vm_testpmd.execute_cmd("stop")
+        self.vm_testpmd.execute_cmd("port stop 0")
+        self.vm_testpmd.execute_cmd("port config 0 rx_offload keep_crc on")
+        self.vm_testpmd.execute_cmd("port start 0")
+        self.vm_testpmd.execute_cmd("start")
+        self.send_pkt_check_vlan_and_crc(self.tester_intf, pkt)
+
+        self.vm_testpmd.execute_cmd("quit", "#")
+        self.launch_testpmd(dcf_flag=self.dcf_mode)
+        self.vm_testpmd.execute_cmd("set fwd mac")
+        self.vm_testpmd.execute_cmd("set verbose 1")
+        self.vm_testpmd.execute_cmd("start")
+        self.start_tcpdump(self.tester_intf)
+        self.send_pkt_check_vlan_and_crc(self.tester_intf, pkt, crc_strip=True)
+
+    @check_supported_nic(ice_nic)
+    def test_CRC_strip_iavf_vlan_strip_coexists(self):
+        """
+        Test case: IAVF CRC strip and Vlan strip co-exists
+        """
+        self.skip_case(not self.dcf_mode, "the case not support dcf mode")
+        pkt = (
+            'Ether(dst="%s",type=0x8100)/Dot1Q(vlan=1,type=0x8100)/Dot1Q(vlan=2,type=0x0800)/IP(src="196.222.232.221")/("X"*480)'
+            % self.vf_mac
+        )
+        self.launch_testpmd(dcf_flag=self.dcf_mode)
+        self.vm_testpmd.execute_cmd("set fwd mac")
+        self.vm_testpmd.execute_cmd("set verbose 1")
+        self.vm_testpmd.execute_cmd("start")
+        self.check_vlan_offload(vlan_type="strip", stats="off")
+        self.vm_testpmd.execute_cmd("stop")
+        self.vm_testpmd.execute_cmd("vlan set strip off 0")
+        self.check_vlan_offload(vlan_type="strip", stats="off")
+        self.vm_testpmd.execute_cmd("vlan set filter on 0")
+        self.vm_testpmd.execute_cmd("rx_vlan add 1 0")
+        self.vm_testpmd.execute_cmd("start")
+
+        self.start_tcpdump(self.tester_intf)
+        out = self.send_pkts_getouput(self.tester_intf, pkt)
+        tcpdump_out = self.get_tcpdump_package()
+        pkt_len = re.search("length=(\d+)", out).group(1)
+        vlan_list = re.findall("vlan\s+\d+", tcpdump_out)
+        self.verify(len(vlan_list) == 4, "vlan strip off failed")
+        rx_bytes = re.search("RX-bytes:\s+(\d+)", out).group(1)
+        tx_bytes = re.search("TX-bytes:\s+(\d+)", out).group(1)
+        self.verify(rx_bytes == tx_bytes == pkt_len, "CRC strip on failed")
+
+        self.vm_testpmd.execute_cmd("vlan set strip on 0")
+        self.check_vlan_offload(vlan_type="strip", stats="on")
+        self.send_pkt_check_vlan_and_crc(
+            self.tester_intf, pkt=pkt, pkt_len=pkt_len, vlan_strip=True
+        )
+
+        self.vm_testpmd.execute_cmd("vlan set strip off 0")
+        self.check_vlan_offload(vlan_type="strip", stats="off")
+        self.vm_testpmd.execute_cmd("stop")
+        self.vm_testpmd.execute_cmd("port stop 0")
+        self.vm_testpmd.execute_cmd("port config 0 rx_offload keep_crc on")
+        self.vm_testpmd.execute_cmd("port start 0")
+        self.vm_testpmd.execute_cmd("start")
+        self.send_pkt_check_vlan_and_crc(self.tester_intf, pkt=pkt, pkt_len=pkt_len)
+
+        out = self.vm_testpmd.execute_cmd("vlan set strip on 0")
+        p = "iavf_config_vlan_strip_v2(): fail to execute command VIRTCHNL_OP_ENABLE_VLAN_STRIPPING_V2"
+        self.verify(p in out, "set vlan strip on successfully")
+        self.send_pkt_check_vlan_and_crc(self.tester_intf, pkt=pkt, pkt_len=pkt_len)
+
+        self.vm_testpmd.execute_cmd("vlan set strip off 0")
+        self.check_vlan_offload(vlan_type="strip", stats="off")
+        self.vm_testpmd.execute_cmd("stop")
+        self.vm_testpmd.execute_cmd("port stop 0")
+        self.vm_testpmd.execute_cmd("port config 0 rx_offload keep_crc off")
+        self.vm_testpmd.execute_cmd("port start 0")
+        self.vm_testpmd.execute_cmd("start")
+        self.send_pkt_check_vlan_and_crc(
+            self.tester_intf, pkt=pkt, pkt_len=pkt_len, crc_strip=True
+        )
+
     def scapy_send_packet(self, mac, testinterface, vlan_flags=False, count=1):
         """
         Send a packet to port