new file mode 100644
@@ -0,0 +1,977 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2023 Intel Corporation
+#
+
+import re
+import time
+import traceback
+
+from framework.packet import Packet
+from framework.pmd_output import PmdOutput
+from framework.virt_common import VM
+
+supported_vf_driver = ["pci-stub", "vfio-pci"]
+
+
+class FuncTestBase(object):
+ def __init__(self, test_case, tester_tx_interface, tester_rx_interface):
+ self.test_case = test_case
+ self.verify = self.test_case.verify
+ self.logger = test_case.logger
+ self.dut = self.test_case.dut
+ self.tester = self.test_case.tester
+
+ self.vm_info = []
+ self.pmd_session = None
+ self.tester_tx_interface = tester_tx_interface
+ self.tester_rx_interface = tester_rx_interface
+
+ self.pkt = Packet()
+ self.vf_driver = self.test_case.get_suite_cfg()["vf_driver"] or "pci-stub"
+ self.vm_specified_driver = self.get_vm_specified_driver()
+
+ def check_port_num_for_test(self, port_num: int):
+ """
+ check the port num for test requirement
+ """
+ dut_ports = self.dut.get_ports(self.test_case.nic)
+ self.verify(len(dut_ports) >= port_num, "Unsupported port num for test require")
+
+ def get_vm_specified_driver(self):
+ """
+ check the vf support driver and return vm driver
+ :return: vm_specified_driver
+ """
+ self.verify(self.vf_driver in supported_vf_driver, "Unsupported vf driver")
+ if self.vf_driver == "pci-stub":
+ vm_specified_driver = "pci-assign"
+ else:
+ vm_specified_driver = "vfio-pci"
+ return vm_specified_driver
+
+ def create_vf(self, pf_port, vfs_num: int, driver="default"):
+ """
+ create vfs, support multi pfs to create multi vfs
+ :param pf_port: pf port or port list
+ :param vfs_num: create vf num
+ :param driver: set vf driver on dpdk
+ :return: vf net device object
+ """
+ sriov_vfs_obj = []
+ try:
+ self.test_case.bind_nic_driver(
+ self.dut.get_ports(self.test_case.nic), driver=self.test_case.kdriver
+ )
+ except Exception as e:
+ self.logger.info(traceback.format_exc(e))
+ if hasattr(self.test_case, "pf_config"):
+ self.test_case.pf_config()
+ if isinstance(pf_port, int):
+ pf_port = [pf_port]
+ for _port in pf_port:
+ self.dut.generate_sriov_vfs_by_port(_port, vfs_num, driver=driver)
+ sriov_vfs_obj.append(self.dut.ports_info[_port]["vfs_port"])
+ self.dut.send_expect(
+ "ifconfig %s up" % self.dut.ports_info[_port]["intf"], "#"
+ )
+ res = self.dut.is_interface_up(self.dut.ports_info[_port]["intf"])
+ self.verify(
+ res, "%s link status is down" % self.dut.ports_info[_port]["intf"]
+ )
+ if hasattr(self.test_case, "vf_config"):
+ self.test_case.vf_config()
+ for vf_port in sriov_vfs_obj:
+ for _vf in vf_port:
+ _vf.bind_driver(self.vf_driver)
+ return sriov_vfs_obj
+
+ def destroy_vf(self, pf_port="all"):
+ """
+ destroy vfs
+ :param pf_port: select the pf port to destroy vfs.
+ the default will destroy all vfs.
+ """
+ pf_port = pf_port if isinstance(pf_port, list) else [pf_port]
+ if "all" in pf_port:
+ # destory all vfs
+ self.dut.destroy_all_sriov_vfs()
+ else:
+ for port in pf_port:
+ # destroy the vf on the specified port
+ self.dut.destroy_sriov_vfs_by_port(port)
+
+ def setup_vm_env(self, vm_name: str, sriov_pci: list):
+ """
+ passtrough the vfs into vm, start vm.
+ :param vm_name: select the vm to use
+ :param sriov_pci: pci list
+ :returns: vm object and vm dut session
+ """
+ try:
+ vm_obj = VM(self.dut, vm_name, self.test_case.suite_name)
+ for _pci in sriov_pci:
+ vm_obj.set_vm_device(
+ driver=self.vm_specified_driver, **{"opt_host": _pci}
+ )
+ vm_dut = vm_obj.start()
+ if vm_dut is None:
+ raise Exception("Set up VM ENV failed!")
+ self.vm_info.append((vm_obj, vm_dut))
+ return vm_obj, vm_dut
+ except Exception as e:
+ self.destroy_vm_env(vm_obj_name=vm_name)
+ raise Exception(e)
+
+ def destroy_vm_env(self, vm_obj_name=""):
+ """
+ destroy the specified vm name or all vms
+ """
+ try:
+ for vm_obj, vm_session in self.vm_info:
+ if vm_obj_name == "":
+ # destoty all vm
+ vm_session.kill_all()
+ vm_obj.stop()
+ self.vm_info = []
+ elif vm_obj.vm_name == vm_obj_name:
+ # destroy the vm with the specified name
+ vm_session.kill_all()
+ vm_obj.stop()
+ self.vm_info.remove((vm_obj, vm_session))
+ else:
+ self.logger.warning("VM %s not found!!!" % vm_obj_name)
+ except Exception as e:
+ self.dut.virt_exit()
+ time.sleep(3)
+ raise Exception(e)
+
+ def init_pmd_session(self, dut_obj):
+ """
+ init PMD session
+ """
+ self.pmd_session = PmdOutput(dut_obj)
+
+ def launch_testpmd(self, **kwargs):
+ """
+ launch testpmd with testpmd session
+ the default session is self.pmd_session
+ :return: testpmd output
+ """
+ pmd_session = kwargs.get("testpmd_obj") or self.pmd_session
+ out = pmd_session.start_testpmd(**kwargs)
+ return out
+
+ def execute_pmd_cmd(self, cmd, **kwargs):
+ """
+ execute multiple testpmd commands, return string output
+ :param cmd: testpmd cmd, support str and list
+ :return: testpmd output
+ """
+ pmd_session = kwargs.get("pmd_session") or self.pmd_session
+ _cmds = [cmd] if isinstance(cmd, str) else cmd
+ output = ""
+ for _cmd in _cmds:
+ output += pmd_session.execute_cmd(_cmd)
+ return output
+
+ def execute_host_cmd(self, cmd, **kwargs):
+ """
+ execute multiple host commands
+ :param cmd: host commands, support str and list
+ :return: host output
+ """
+ dut_obj = kwargs.get("dut_obj") or self.dut
+ _cmd = [cmd, "# ", 20] if isinstance(cmd, (str)) else cmd
+ return dut_obj.send_expect(*_cmd)
+
+ def build_dpdk_apps(self, app_path):
+ """
+ build dpdk apps and check the build status
+ """
+ out = self.dut.build_dpdk_apps(app_path)
+ self.verify("Error" not in out, "Compilation error")
+ self.verify("No such" not in out, "Compilation error")
+
+ def vf_test_preset_env_vm(self, pf_port, vfs_num, vm_name, driver="default"):
+ """
+ create vfs and setup vm env
+ """
+ if not isinstance(pf_port, list):
+ pf_port = [pf_port]
+ sriov_vfs_obj = self.create_vf(pf_port, vfs_num, driver=driver)
+ vf_list = []
+ for pf_id in pf_port:
+ vf_list += [
+ sriov_vfs_obj[pf_id][i].pci for i in range(len(sriov_vfs_obj[pf_id]))
+ ]
+ vm_obj, vm_dut = self.setup_vm_env(vm_name=vm_name, sriov_pci=vf_list)
+ return vm_obj, vm_dut
+
+ def get_vf_mac_through_pf(self, pf_intf):
+ """
+ use ip link show pf to get vf mac list
+ """
+ out = self.dut.send_expect(
+ "ip link show {}".format(pf_intf), "# ", alt_session=True
+ )
+ vf_mac_pattern = r"vf\s+\d+\s+.*\s+link\/ether\s+(\S+)\s+.*"
+ match = re.findall(vf_mac_pattern, out)
+ return match
+
+ @staticmethod
+ def generate_using_packets(pkt_type=None, pkt_str=None, **kwargs):
+ """
+ generate using pkts:
+ 1.select the protocol type and generate the pkts through packet module
+ 2.select customized pkt string
+ :return: pkt object
+ """
+ pkt = Packet()
+ dst_mac = kwargs.get("dst_mac")
+ vlan_id = kwargs.get("vlan_id")
+ if pkt_type:
+ pkt = Packet(pkt_type=pkt_type)
+ pkt.config_layer("ether", {"dst": dst_mac})
+ if vlan_id is not None:
+ pkt.config_layer("vlan", {"vlan": vlan_id})
+ elif pkt_str:
+ pkt.update_pkt(pkt_str)
+ else:
+ raise Exception("wrong pkt value")
+ return pkt
+
+ @staticmethod
+ def get_received_pkt_num(output, port_id=0):
+ """
+ use the testpmd output to get the receive pkts num for port
+ """
+ pkt_pattern = (
+ "port\s%d/queue\s\d+:\sreceived\s(\d+)\spackets.+?\n.*length=\d{2,}\s"
+ % port_id
+ )
+ received_data = re.findall(pkt_pattern, output)
+ received_pkts = sum(map(int, [i[0] for i in received_data]))
+ return received_pkts
+
+ @staticmethod
+ def get_hash_and_queues(out, port_id=0):
+ """
+ use testpmd output to get hash values and queues
+ """
+ hash_pattern = re.compile(
+ "port\s%s/queue\s\d+:\sreceived\s\d+\spackets.+?\n.*RSS\shash=(\w+)\s-\sRSS\squeue=(\w+)"
+ % port_id
+ )
+ hash_infos = hash_pattern.findall(out)
+ if len(hash_infos) == 0:
+ queue_pattern = re.compile("Receive\squeue=(\w+)")
+ queues = queue_pattern.findall(out)
+ return [], queues
+ hashes = [hash_info[0].strip() for hash_info in hash_infos]
+ queues = [hash_info[1].strip() for hash_info in hash_infos]
+ return hashes, queues
+
+ @staticmethod
+ def get_pkts_vlan_layer(pkt_obj: Packet, vlan_layer):
+ """
+ get pkts vlan layers
+ :param pkt_obj: pkt object
+ :param vlan_layer: vlan, prio etc...
+ :return: vlan id list, if pkt object consists of multiple vlan pkts
+ """
+ vlans = []
+ vlan_layers_list = []
+ for i in range(len(pkt_obj)):
+ vlan_dict = {}
+ try:
+ outer_vlan = pkt_obj.strip_element_layer3(vlan_layer, p_index=i)
+ vlan_dict["outer"] = outer_vlan
+ except Exception:
+ pass
+ try:
+ inner_vlan = pkt_obj.strip_element_layer4(vlan_layer, p_index=i)
+ vlan_dict["inner"] = inner_vlan
+ except Exception:
+ pass
+ vlans.append(vlan_dict)
+ for _vlan in vlans:
+ vlan_layers_list += list(_vlan.values())
+ return vlans, vlan_layers_list
+
+ def get_pmd_port_infomation(self, port_id=0, **kwargs):
+ """
+ use 'show ports info 0' to get port link status and port speed
+ """
+ pmd_session = kwargs.get("pmd_session") or self.pmd_session
+ link_status = pmd_session.get_port_link_status(port_id)
+ link_speed = pmd_session.get_port_link_speed(port_id)
+ return link_status, link_speed
+
+ def convert_driver_version_value(self, check_version):
+ """
+ convert the driver version to int list
+ take the first three values in the list for comparison and limit intree driver
+ for example:
+ 6.0.7-060007-generic: [6, 0, 7-060007-generic]
+ 1.11.0_rc59: [1, 11, 0]
+ 1.11.11: [1, 11, 11]
+ """
+ try:
+ value_list = list(map(int, re.split(r"[.|_]", check_version)[:3]))
+ except ValueError as e:
+ self.logger.warning(e)
+ # the intree-driver has character, so set the return value is null list as the lowest driver version
+ return []
+ return value_list
+
+ @staticmethod
+ def get_pmd_rece_pkt_len(output):
+ """
+ get the pkt length in testpmd output
+ """
+ pkt_length = re.findall("length=(\d+)", output)
+ return pkt_length
+
+ def get_xstats_table(self, port_id_list):
+ """
+ use 'show port xstats' to get xstats info dict
+ """
+ xstats_data = dict()
+ if not isinstance(port_id_list, list):
+ port_id_list = [port_id_list]
+ for port_id in port_id_list:
+ out = self.execute_pmd_cmd("show port xstats %s" % port_id)
+ tmp_data = dict()
+ matches = re.findall(r"(\w+):\s+(\d+)", out)
+ for match in matches:
+ key = match[0]
+ value = int(match[1])
+ tmp_data[key] = value
+ xstats_data[port_id] = tmp_data
+ return xstats_data
+
+ @staticmethod
+ def generate_random_packets(
+ dstmac=None,
+ pktnum=100,
+ random_type=None,
+ ip_increase=True,
+ random_payload=False,
+ options=None,
+ ):
+ """
+ generate the random packets,
+ """
+ pkt = Packet()
+ pkt.generate_random_pkts(
+ dstmac=dstmac,
+ pktnum=pktnum,
+ random_type=random_type,
+ ip_increase=ip_increase,
+ random_payload=random_payload,
+ options=options,
+ )
+ return pkt
+
+ def start_tcpdump_output_pcap_file(self, port_inface, count=0, filters=None):
+ """
+ start tcpdump to capture the pkts
+ :param port_inface: port interface name
+ :param count: limit capture the pkts num
+ :param filters:
+ add filter: [{"layer": "ether", "config": {"src": "xxxx"}}]
+ :return:
+ """
+ index = self.tester.tcpdump_sniff_packets(
+ port_inface, count=count, filters=filters
+ )
+ return index
+
+ def stop_tcpdump_and_get_pkts(self, pcap_file):
+ """
+ stop tcpdump and parse the pcap file
+ """
+ pkts = self.tester.load_tcpdump_sniff_packets(pcap_file)
+ return pkts
+
+ def set_pmd_fwd_mode(self, fwd_mode="mac", pmd_session=None):
+ """
+ set testpmd fwd
+ """
+ pmd_session = pmd_session or self.pmd_session
+ self.execute_pmd_cmd(
+ ["set fwd %s" % fwd_mode, "set verbose 1", "start"], pmd_session=pmd_session
+ )
+
+ def send_pkts(
+ self,
+ pkt_list: list,
+ tester_tx_interface=None,
+ packet_count=1,
+ packet_interval=0.01,
+ ):
+ """
+ send pkts with packet obj
+ """
+ tester_tx_interface = (
+ self.tester_tx_interface
+ if tester_tx_interface is None
+ else tester_tx_interface
+ )
+ for _pkt in pkt_list:
+ _pkt.send_pkt(
+ crb=self.tester,
+ tx_port=tester_tx_interface,
+ count=packet_count,
+ interval=packet_interval,
+ )
+
+ def execute_fwd_check_process(
+ self,
+ packets,
+ pmd_commands=None,
+ rx_port=0,
+ tx_port=0,
+ packet_interval=0.01,
+ packet_count=1,
+ tcpdump_filter=None,
+ tester_tx_interface=None,
+ tester_rx_interface=None,
+ ):
+ """
+ pkt fwd flow: tcpdump ---> send pkt ---> testpmd output/tcpdump capture pkts
+ :return:
+ 1.capture the tcpdump pkts
+ 2.pmd output
+ 3.use pmd output to get the pkts stats
+ """
+ if isinstance(packets, list):
+ pkt_list = packets
+ else:
+ pkt_list = [packets]
+ tester_rx_interface = (
+ self.tester_rx_interface
+ if tester_rx_interface is None
+ else tester_rx_interface
+ )
+ inst = self.start_tcpdump_output_pcap_file(
+ port_inface=tester_rx_interface,
+ count=len(packets) * packet_count,
+ filters=tcpdump_filter,
+ )
+ time.sleep(3)
+ if pmd_commands:
+ self.execute_pmd_cmd(pmd_commands)
+ self.execute_pmd_cmd("clear port stats all")
+ self.send_pkts(
+ pkt_list=pkt_list,
+ packet_count=packet_count,
+ packet_interval=packet_interval,
+ tester_tx_interface=tester_tx_interface,
+ )
+ time.sleep(packet_interval * len(pkt_list) * packet_count)
+ packets_captured = self.stop_tcpdump_and_get_pkts(inst)
+ self.logger.info("capture the pkt: {}".format(str(list(packets_captured))))
+ pmdout = self.pmd_session.get_output()
+ tx_stats = self.pmd_session.get_pmd_stats(tx_port)
+ rx_stats = self.pmd_session.get_pmd_stats(rx_port)
+ stats = {tx_port: tx_stats, rx_port: rx_stats}
+
+ return packets_captured, pmdout, stats
+
+
+class RxTxBaseTest(FuncTestBase):
+ def basic_rx_check(
+ self, packets_num, packet_dst_mac=None, pmd_commands=None, rx_port=0, tx_port=0
+ ):
+ """
+ set fwd rxonly and check the rece pkts num
+ """
+ self.set_pmd_fwd_mode(fwd_mode="rxonly")
+ random_pkt = self.generate_random_packets(
+ dstmac=packet_dst_mac, pktnum=packets_num
+ )
+ _, pmdout, stats = self.execute_fwd_check_process(
+ packets=random_pkt,
+ pmd_commands=pmd_commands,
+ rx_port=rx_port,
+ tx_port=tx_port,
+ )
+ self.verify(packet_dst_mac in pmdout, "receive packet fail")
+ rece_pkts_num = self.get_received_pkt_num(pmdout)
+ self.verify(rece_pkts_num == packets_num, "receive packet num is not match")
+
+ def basic_tx_check(self):
+ """
+ set fwd txonly, check:
+ 1.testpmd output tx-pkts != 0
+ 2.the tcpdump can capture pkts
+ """
+ self.execute_pmd_cmd("stop")
+ self.execute_pmd_cmd("set fwd txonly")
+ index = self.start_tcpdump_output_pcap_file(self.tester_rx_interface, count=100)
+ self.execute_pmd_cmd("start")
+ time.sleep(1)
+ self.execute_pmd_cmd("stop")
+ pkts_num = self.stop_tcpdump_and_get_pkts(index)
+ stats = self.pmd_session.get_pmd_stats(0)
+ self.verify(
+ stats["TX-packets"] != 0
+ and len(pkts_num) == 100
+ and stats["TX-packets"] > len(pkts_num),
+ "send packet num is not match",
+ )
+
+ def basic_macfwd_check(
+ self,
+ packet_num,
+ dst_mac=None,
+ check_miss=False,
+ pmd_commands=None,
+ rx_port=0,
+ tx_port=0,
+ ):
+ """
+ mac fwd, check rx-pkts and tx-pkt num is correct
+ if check_miss is true, it will check rx/tx is 0
+ """
+ random_pkt = self.generate_random_packets(dstmac=dst_mac, pktnum=packet_num)
+ if not pmd_commands:
+ self.set_pmd_fwd_mode()
+ packets_captured, pmdout, stats = self.execute_fwd_check_process(
+ packets=random_pkt,
+ pmd_commands=pmd_commands,
+ rx_port=rx_port,
+ tx_port=tx_port,
+ )
+ rece_pkts_num = self.get_received_pkt_num(pmdout)
+ if check_miss:
+ packet_num = 0
+ self.verify(
+ stats[tx_port]["RX-packets"] == packet_num,
+ "receive packet num is not match",
+ )
+ self.verify(
+ stats[tx_port]["RX-errors"] == 0, "some pkts have rx-errors in testpmd"
+ )
+ self.verify(
+ stats[rx_port]["TX-packets"] == packet_num,
+ "receive packet num is not match",
+ )
+ self.verify(
+ rece_pkts_num == packet_num == len(packets_captured),
+ "receive packet num is not match",
+ )
+
+ def basic_xstats_check(
+ self, packet_num, dst_mac=None, rx_port=0, tx_port=0, payload_size=64
+ ):
+ """
+ 1. default stats check
+ 2. send pkt and check testpmd stats and xstats
+ 3. send pkt and clear port stats and check xstats
+ 4. send pkt and clear xstats and check xstats
+ """
+ random_pkt = self.generate_random_packets(
+ dstmac=dst_mac,
+ pktnum=packet_num,
+ options={
+ "ip": {"src": "192.168.0.1", "dst": "192.168.1.1"},
+ "layers_config": [("raw", {"payload": ["58"] * payload_size})],
+ },
+ )
+ self.execute_pmd_cmd("clear port xstats all")
+ xstats_table = self.get_xstats_table([rx_port, tx_port])
+ for port in xstats_table.keys():
+ self.verify(
+ not any(xstats_table[port].values()),
+ "xstats Initial value error! port {} xstats "
+ "data is {}".format(port, xstats_table[port]),
+ )
+ _, _, stats_table = self.execute_fwd_check_process(
+ packets=random_pkt,
+ pmd_commands=[
+ "port config all rss all",
+ "set fwd mac",
+ "clear port xstats all",
+ "start",
+ ],
+ rx_port=rx_port,
+ tx_port=tx_port,
+ )
+ xstats_table = self.get_xstats_table([rx_port, tx_port])
+ return stats_table, xstats_table
+
+ def basic_promisc_check(
+ self, match_mac, unmatch_mac, pmd_commands=None, rx_port=0, tx_port=0
+ ):
+ """
+ use match and unmatch pkts to test promisc
+ test flow: default mode --> set promisc off --> set promisc on
+ note: if test vf promisc, confirm the vf primisc enable(kernel need to set trust on)
+ """
+ unmatch_pkt = self.generate_random_packets(dstmac=unmatch_mac, pktnum=1)
+ match_pkt = self.generate_random_packets(dstmac=match_mac, pktnum=1)
+ self.set_pmd_fwd_mode(fwd_mode="mac")
+ self.logger.info("check the default promisc mode")
+ _, pmdout, _ = self.execute_fwd_check_process(
+ packets=[unmatch_pkt, match_pkt],
+ pmd_commands=pmd_commands,
+ rx_port=rx_port,
+ tx_port=tx_port,
+ )
+ self.verify(
+ match_mac in pmdout and unmatch_mac in pmdout,
+ "enable promisc not receive all pkts",
+ )
+ self.logger.info("check disable promisc mode")
+ self.execute_pmd_cmd("set promisc all off")
+ _, pmdout, _ = self.execute_fwd_check_process(
+ packets=[unmatch_pkt, match_pkt], rx_port=rx_port, tx_port=tx_port
+ )
+ self.verify(
+ match_mac in pmdout and unmatch_mac not in pmdout,
+ "disable promisc should receive match pkt",
+ )
+ self.logger.info("check re-enable promisc mode")
+ self.execute_pmd_cmd("set promisc all on")
+ _, pmdout, _ = self.execute_fwd_check_process(
+ packets=[unmatch_pkt, match_pkt], rx_port=rx_port, tx_port=tx_port
+ )
+ self.verify(
+ match_mac in pmdout and unmatch_mac in pmdout,
+ "enable promisc should receive all pkt",
+ )
+
+ def basic_multicast_check(
+ self, normal_mac, multicast_mac, pmd_commands=None, rx_port=0, tx_port=0
+ ):
+ """
+ use normal mac and multicast mac to test
+ """
+ normal_pkt = self.generate_random_packets(dstmac=normal_mac, pktnum=1)
+ multicast_pkt = self.generate_random_packets(dstmac=multicast_mac, pktnum=1)
+ self.execute_pmd_cmd(
+ [
+ "set allmulti all off",
+ "set promisc all off",
+ ],
+ )
+ self.set_pmd_fwd_mode(fwd_mode="mac")
+ self.logger.info("check the default pmd multicast")
+ _, pmdout, _ = self.execute_fwd_check_process(
+ packets=[normal_pkt, multicast_pkt],
+ pmd_commands=pmd_commands,
+ rx_port=rx_port,
+ tx_port=tx_port,
+ )
+ self.verify(
+ normal_mac in pmdout and multicast_mac not in pmdout,
+ "the default can not receive multicast pkt",
+ )
+ self.execute_pmd_cmd(
+ [
+ "set allmulti all on",
+ "mcast_addr add 0 {}".format(multicast_mac),
+ ],
+ )
+ self.logger.info("check enable pmd multicast")
+ _, pmdout, _ = self.execute_fwd_check_process(
+ packets=[normal_pkt, multicast_pkt], rx_port=rx_port, tx_port=tx_port
+ )
+ self.verify(
+ normal_mac in pmdout and multicast_mac in pmdout,
+ "enable multicast not receive multicast pkt",
+ )
+
+ def basic_rss_check(
+ self, dst_mac, rss_type, queue_num, pmd_commands=None, rx_port=0, tx_port=0
+ ):
+ """
+ use pkt type mapping to rss type, and check rss func
+ """
+ rss2pkt_dict = {
+ "ip": "IP_RAW",
+ "tcp": "TCP",
+ "udp": "UDP",
+ }
+ rss_pkts = self.generate_random_packets(
+ dstmac=dst_mac, pktnum=30, random_type=[rss2pkt_dict[rss_type]]
+ )
+ self.execute_pmd_cmd("port config all rss %s" % rss_type)
+ self.set_pmd_fwd_mode(fwd_mode="mac")
+ _, pmdout, stats = self.execute_fwd_check_process(
+ packets=rss_pkts,
+ pmd_commands=pmd_commands,
+ rx_port=rx_port,
+ tx_port=tx_port,
+ )
+ hashes, queues = self.get_hash_and_queues(pmdout)
+ self.verify(
+ len(set(queues)) == int(queue_num)
+ or len(queues) == len(hashes) == stats[tx_port]["RX-packets"],
+ "some pkt and queue can not get get the hash",
+ )
+ return zip(hashes, queues)
+
+ def rss_reta_config_check(self, rss_reta: list, port_id=0, reta_size=64):
+ """
+ check the rss reta after setting new rss rate in testpmd
+ """
+ reta_mask = "0x{}".format(int(reta_size / 4) * "f")
+ default_rss_reta = self.execute_pmd_cmd(
+ "show port {} rss reta {} ({})".format(port_id, reta_size, reta_mask)
+ )
+ for i, j in zip(list(range(reta_size)), rss_reta):
+ self.execute_pmd_cmd("port config %d rss reta (%d,%d)" % (port_id, i, j))
+ change_rss_reta = self.execute_pmd_cmd(
+ "show port {} rss reta {} ({})".format(port_id, reta_size, reta_mask)
+ )
+ self.verify(default_rss_reta != change_rss_reta, "port config rss reta failed")
+
+ def rss_reta_hit_check(self, hash_table, rss_reta: list, reta_size=64):
+ """
+ check the rece pkts hash value can map the queue according to rss reta
+ """
+ hit_hash = False
+ for rss_hash, rss_queue in hash_table:
+ for i, j in zip(list(range(reta_size)), rss_reta):
+ if int(rss_hash, 16) % reta_size == i and int(rss_queue, 16) == j:
+ hit_hash = True
+ break
+ else:
+ hit_hash = False
+ self.verify(hit_hash, "some pkt not directed by rss.")
+
+ def basic_rss_hash_key_check(
+ self, dst_mac, hash_key, port_id=0, pmd_commands=None, rx_port=0, tx_port=0
+ ):
+ """
+ check the hash values is different after setting rss hash key
+ """
+ pkt = self.generate_using_packets(pkt_type="UDP", dst_mac=dst_mac)
+ self.set_pmd_fwd_mode("mac")
+ _, pmdout, _ = self.execute_fwd_check_process(
+ packets=pkt, pmd_commands=pmd_commands, rx_port=rx_port, tx_port=tx_port
+ )
+ hash_1, queue_1 = self.get_hash_and_queues(pmdout)
+ self.execute_pmd_cmd(
+ "port config {} rss-hash-key ipv4 {}".format(port_id, hash_key)
+ )
+ out = self.execute_pmd_cmd("show port 0 rss-hash key")
+ self.verify(hash_key.upper() in out, "rss hash key update failed")
+ _, pmdout, _ = self.execute_fwd_check_process(
+ packets=pkt, rx_port=rx_port, tx_port=tx_port
+ )
+ hash_2, queue_2 = self.get_hash_and_queues(pmdout)
+ self.verify(hash_1 != hash_2, "hash value should be different")
+
+ def basic_pmd_info_check(self, port_obj, port_id=0):
+ """
+ check the link speed and link status
+ """
+ link_status, link_speed = self.get_pmd_port_infomation(port_id)
+ link_speed_host = int(port_obj.get_nic_speed()) // 1000
+ self.verify(link_status == "up", "link stats has error")
+ self.verify(
+ int(link_speed) == link_speed_host,
+ "link speed has error",
+ )
+
+
+class VlanFuncBaseTest(FuncTestBase):
+ def vlan_pkts_fwd_check(
+ self, pkts, pmd_commands=None, port_id=0, rx_port=0, tx_port=0
+ ):
+ """
+ send pkts and return rece num, vlan dict and vlan id list
+ """
+ packets_captured, pmdout, _ = self.execute_fwd_check_process(
+ packets=pkts, pmd_commands=pmd_commands, rx_port=rx_port, tx_port=tx_port
+ )
+ rece_num = self.get_received_pkt_num(pmdout, port_id=port_id)
+ vlans, vlan_id_list = self.get_pkts_vlan_layer(packets_captured, "vlan")
+ self.logger.info("capture the TX pkts vlans: {}".format(vlans))
+ return rece_num, packets_captured, vlans, vlan_id_list
+
+ def vlan_offload_flag_check(self, port_id=0, **kwargs):
+ """
+ check the vlan offload flag status:
+ filter="on"
+ strip="on"
+ ...
+ """
+ out = self.execute_pmd_cmd("show port info %d" % port_id)
+ for flag in kwargs.keys():
+ p = "VLAN offload.*\n.*?%s (\w+)" % flag
+ vlan_stats = re.search(p, out).group(1)
+ self.logger.info("{} flag is {}".format(flag, vlan_stats))
+ self.verify(
+ vlan_stats == kwargs[flag], "the vlan offload flag is incorrect"
+ )
+
+ def vlan_prio_check(self, pkts, **kwargs):
+ """
+
+ :param pkts:
+ :param kwargs:
+ :return:
+ """
+ packets_captured, _, _ = self.execute_fwd_check_process(packets=pkts)
+ vlans, _ = self.get_pkts_vlan_layer(packets_captured, "prio")
+ self.logger.info("vlan prio: {}".format(vlans))
+ for _prio in kwargs.keys():
+ for _vlan in vlans:
+ self.verify(
+ _vlan[_prio] == kwargs[_prio], "the vlan prio values not matched"
+ )
+
+ def set_pvid_from_pf(self, pf_intf, vf_id=0, vlan_id=0):
+ """
+
+ :return:
+ """
+ self.execute_host_cmd(
+ "ip link set {} vf {} vlan {}".format(pf_intf, vf_id, vlan_id)
+ )
+ output = self.execute_host_cmd("ip link show {}".format(pf_intf))
+ if vlan_id != 0:
+ self.verify("vlan %d" % vlan_id in output, "Failed to add pvid on VF")
+ else:
+ self.verify("vlan" not in output, "Failed to add pvid on VF")
+
+ def basic_vlan_filter_check(
+ self,
+ vlan_id,
+ match_pkt,
+ unmatch_pkt,
+ pmd_commands=None,
+ port_id=0,
+ double_vlan=False,
+ rx_port=0,
+ tx_port=0,
+ ):
+ """
+ send match and unmatch pkt to check vlan filter
+ double vlan have 2 vlan id
+ """
+ if not isinstance(match_pkt, list):
+ match_pkt = [match_pkt]
+ if not isinstance(unmatch_pkt, list):
+ unmatch_pkt = [unmatch_pkt]
+ rece_num, _, _, vlan_id_list = self.vlan_pkts_fwd_check(
+ pkts=match_pkt + unmatch_pkt,
+ port_id=port_id,
+ pmd_commands=pmd_commands,
+ rx_port=rx_port,
+ tx_port=tx_port,
+ )
+ self.verify(
+ rece_num == len(match_pkt) and vlan_id in vlan_id_list,
+ "failed receive vlan pkts",
+ )
+ if double_vlan:
+ self.verify(
+ len(vlan_id_list) == len(match_pkt) * 2, "failed receive vlan pkts"
+ )
+ else:
+ self.verify(len(vlan_id_list) == len(match_pkt), "failed receive vlan pkts")
+
+ def basic_vlan_strip_check(
+ self,
+ vlan_id,
+ match_pkt,
+ pmd_commands=None,
+ port_id=0,
+ double_vlan=False,
+ rx_port=0,
+ tx_port=0,
+ ):
+ """
+ send vlan pkts to check vlan strip
+ single vlan: after strip, the rece pkt not have vlan id
+ double vlan: after strip, the rece pkt have single vlan
+ """
+ rece_num, _, _, vlan_id_list = self.vlan_pkts_fwd_check(
+ match_pkt,
+ port_id=port_id,
+ pmd_commands=pmd_commands,
+ tx_port=tx_port,
+ rx_port=rx_port,
+ )
+ if double_vlan:
+ self.verify(
+ rece_num == len(match_pkt) and len(vlan_id_list) == len(match_pkt),
+ "Failed to strip double vlan tag",
+ )
+ else:
+ self.verify(
+ rece_num == len(match_pkt) and len(vlan_id_list) == 0,
+ "Failed to strip vlan tag",
+ )
+ self.execute_pmd_cmd("vlan set strip off %s" % port_id)
+ rece_num, _, _, vlan_id_list = self.vlan_pkts_fwd_check(
+ match_pkt, port_id=port_id, tx_port=tx_port, rx_port=rx_port
+ )
+ if double_vlan:
+ self.verify(
+ rece_num == len(match_pkt)
+ and len(vlan_id_list) == len(match_pkt) * 2
+ and vlan_id in vlan_id_list,
+ "Failed to receive vlan pkts with vlan tag",
+ )
+ else:
+ self.verify(
+ rece_num == len(match_pkt)
+ and len(vlan_id_list) == len(match_pkt)
+ and vlan_id in vlan_id_list,
+ "Failed to receive vlan pkts with vlan tag",
+ )
+
+ def basic_vlan_insert_check(
+ self,
+ vlan_id,
+ insert_vlan,
+ match_pkt,
+ pmd_commands=None,
+ port_id=0,
+ double_vlan=None,
+ rx_port=0,
+ tx_port=0,
+ ):
+ """
+ single vlan insert: send normal pkt, the tx get the single vlan pkt
+ double vlan insert: send single vlan pkt, the vlan insert to outer vlan and the default vlan become inner vlan
+ """
+ rece_num, _, vlans, vlan_id_list = self.vlan_pkts_fwd_check(
+ match_pkt,
+ port_id=port_id,
+ pmd_commands=pmd_commands,
+ tx_port=tx_port,
+ rx_port=rx_port,
+ )
+ if double_vlan:
+ self.verify(
+ rece_num == len(match_pkt) and len(vlan_id_list) == len(match_pkt) * 2,
+ "Failed to receive vlan pkts with vlan tag",
+ )
+ self.verify(
+ all(
+ [
+ insert_vlan == _vlan["outer"] and vlan_id == _vlan["inner"]
+ for _vlan in vlans
+ ]
+ ),
+ "the insert vlan is incorrect",
+ )
+ else:
+ self.verify(
+ rece_num == len(match_pkt)
+ and len(vlan_id_list) == len(match_pkt)
+ and insert_vlan in vlan_id_list,
+ "Failed to receive vlan pkts with vlan tag",
+ )
+ self.verify(
+ all([insert_vlan == _vlan["outer"] for _vlan in vlans]),
+ "the insert vlan is incorrect",
+ )