new file mode 100644
@@ -0,0 +1,1055 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2023 Intel Corporation
+#
+
+import random
+
+from framework.exception import VerifyFailure
+from framework.settings import DPDK_RXMODE_SETTING, ETH_800_SERIES, load_global_setting
+from framework.test_case import TestCase, check_supported_nic
+
+from .func_test_base import *
+
+VF_MAC_ADDR = "00:01:23:45:67:89"
+VF_WRONG_MAC_ADDR = "00:01:23:45:67:99"
+MULTICAST_MAC_ADDR = "01:80:C2:00:00:08"
+BROADCAST_MAC_ADDR = "FF:FF:FF:FF:FF:FF"
+
+MAX_VLAN_ID = 4095
+RANDOM_VLAN_ID = random.randint(2, MAX_VLAN_ID)
+OUTER_VLAN_ID = 1
+INNTER_VLAN_ID = 2
+
+MATCHED_VLAN_PKT = FuncTestBase.generate_using_packets(
+ pkt_type="VLAN_UDP", vlan_id=RANDOM_VLAN_ID, dst_mac=VF_MAC_ADDR
+)
+UNMATCHED_VLAN_PKT = FuncTestBase.generate_using_packets(
+ pkt_type="VLAN_UDP", vlan_id=RANDOM_VLAN_ID - 1, dst_mac=VF_MAC_ADDR
+)
+
+MATCHED_DOUBLE_VLAN_PKT = [
+ FuncTestBase.generate_using_packets(pkt_str=_pkt)
+ for _pkt in [
+ 'Ether(dst="%s",type=0x8100)/Dot1Q(vlan=%d,type=0x8100,prio=1)/Dot1Q(vlan=%d,type=0x0800,prio=2)/IP('
+ 'src="196.222.232.221")/("X"*480)'
+ % (VF_MAC_ADDR, OUTER_VLAN_ID, INNTER_VLAN_ID),
+ 'Ether(dst="%s",type=0x8100)/Dot1Q(vlan=%d,type=0x0800)/IP('
+ 'src="196.222.232.221")/("X"*480)' % (VF_MAC_ADDR, OUTER_VLAN_ID),
+ ]
+]
+
+UNMATCHED_DOUBLE_VLAN_PKT = [
+ FuncTestBase.generate_using_packets(pkt_str=_pkt)
+ for _pkt in [
+ 'Ether(dst="%s",type=0x8100)/Dot1Q(vlan=%d,type=0x8100,prio=1)/Dot1Q(vlan=%d,type=0x0800,prio=2)/IP('
+ 'src="196.222.232.221")/("X"*480)'
+ % (VF_MAC_ADDR, OUTER_VLAN_ID + 10, INNTER_VLAN_ID),
+ 'Ether(dst="%s",type=0x8100)/Dot1Q(vlan=%d,type=0x0800)/IP('
+ 'src="196.222.232.221")/("X"*480)' % (VF_MAC_ADDR, OUTER_VLAN_ID + 10),
+ ]
+]
+
+
+class TestKernelpfVf(TestCase):
+ def set_up_all(self):
+ self.dut_port = self.dut.get_ports(self.nic)
+ self.used_dut_tx_port = self.dut_port[0]
+ self.used_dut_rx_port = self.dut_port[1]
+ self.port_obj = [self.dut.ports_info[port]["port"] for port in self.dut_port]
+ self.tester_tx_interface = self.tester.get_interface(
+ self.tester.get_local_port(self.used_dut_tx_port)
+ )
+ self.tester_rx_interface = self.tester.get_interface(
+ self.tester.get_local_port(self.used_dut_rx_port)
+ )
+ self.rxtx_base = RxTxBaseTest(
+ self, self.tester_tx_interface, self.tester_rx_interface
+ )
+ self.vlan_func = VlanFuncBaseTest(
+ self, self.tester_tx_interface, self.tester_rx_interface
+ )
+ self.rxtx_base.check_port_num_for_test(2)
+ self.setup_env_configuration()
+ self.rx_mode = load_global_setting(DPDK_RXMODE_SETTING)
+ self.testpmd_flag = False
+ self.launch_testpmd()
+
+ def set_up(self):
+ pass
+
+ def pf_config(self):
+ self.flag = "vf-vlan-pruning"
+ self.dut.bind_interfaces_linux(self.kdriver)
+ self.default_stats = self.dut.get_priv_flags_state(
+ self.port_obj[self.used_dut_tx_port].get_interface_name(), self.flag
+ )
+ if not self.default_stats:
+ self.logger.warning(
+ f"{self.kdriver + '_' + self.nic_obj.driver_version} driver does not have vf-vlan-pruning flag."
+ )
+ if (
+ any([self.is_eth_series_nic(800), self.kdriver == "i40e"])
+ and self.default_stats
+ ):
+ self.dut.send_expect(
+ "ethtool --set-priv-flags %s %s on"
+ % (
+ self.port_obj[self.used_dut_tx_port].get_interface_name(),
+ self.flag,
+ ),
+ "# ",
+ )
+ self.dut.send_expect(
+ "ethtool --set-priv-flags %s %s on"
+ % (
+ self.port_obj[self.used_dut_rx_port].get_interface_name(),
+ self.flag,
+ ),
+ "# ",
+ )
+
+ def vf_config(self):
+ self.orig_vf_mac = self.rxtx_base.get_vf_mac_through_pf(
+ self.port_obj[self.used_dut_tx_port].get_interface_name()
+ )
+ self.port_obj[self.used_dut_tx_port].set_vf_mac_addr(mac=VF_MAC_ADDR)
+ self.dut.send_expect(
+ "ip link set %s vf 0 spoofchk off"
+ % (self.port_obj[self.used_dut_tx_port].get_interface_name()),
+ "# ",
+ )
+ self.dut.send_expect(
+ "ip link set %s vf 0 spoofchk off"
+ % (self.port_obj[self.used_dut_rx_port].get_interface_name()),
+ "# ",
+ )
+
+ def setup_env_configuration(self):
+ self.vm_obj, self.vm_dut = self.rxtx_base.vf_test_preset_env_vm(
+ pf_port=[self.used_dut_tx_port, self.used_dut_rx_port],
+ vfs_num=1,
+ vm_name="vm0",
+ )
+ self.rxtx_base.init_pmd_session(self.vm_dut)
+ self.vlan_func.init_pmd_session(self.vm_dut)
+
+ def launch_testpmd(self, **kwargs):
+ if not self.testpmd_flag:
+ self.rxtx_base.launch_testpmd(**kwargs)
+ self.testpmd_flag = True
+
+ def quit_testpmd(self):
+ if self.testpmd_flag:
+ self.rxtx_base.pmd_session.quit()
+ self.testpmd_flag = False
+
+ def restart_testpmd(self, **kwargs):
+ self.quit_testpmd()
+ self.launch_testpmd(**kwargs)
+
+ def test_vf_basic_rxtx(self):
+ self.rxtx_base.basic_rx_check(packets_num=100, packet_dst_mac=VF_MAC_ADDR)
+ self.rxtx_base.basic_tx_check()
+
+ def test_vf_promisc_mode(self):
+ self.quit_testpmd()
+ try:
+ self.rxtx_base.execute_host_cmd(
+ "ip link set dev %s vf 0 trust on"
+ % self.port_obj[self.used_dut_tx_port].get_interface_name()
+ )
+ self.launch_testpmd()
+ self.rxtx_base.basic_promisc_check(
+ match_mac=VF_MAC_ADDR, unmatch_mac=VF_WRONG_MAC_ADDR
+ )
+ except Exception as e:
+ raise VerifyFailure(e)
+ finally:
+ self.quit_testpmd()
+ self.rxtx_base.execute_host_cmd(
+ "ip link set dev %s vf 0 trust off"
+ % self.port_obj[self.used_dut_tx_port].get_interface_name()
+ )
+ self.vf_config()
+ self.launch_testpmd()
+
+ def test_vf_multicast(self):
+ self.rxtx_base.basic_multicast_check(
+ normal_mac=VF_MAC_ADDR, multicast_mac=MULTICAST_MAC_ADDR
+ )
+
+ def test_vf_broadcast(self):
+ self.rxtx_base.basic_rx_check(packets_num=1, packet_dst_mac=BROADCAST_MAC_ADDR)
+
+ def test_vf_queue_start_stop(self):
+ self.rxtx_base.basic_macfwd_check(
+ packet_num=4, dst_mac=VF_MAC_ADDR, rx_port=self.used_dut_rx_port
+ )
+ packets_captured, _, stats = self.rxtx_base.execute_fwd_check_process(
+ packets=self.rxtx_base.generate_random_packets(
+ dstmac=VF_MAC_ADDR, pktnum=4
+ ),
+ rx_port=self.used_dut_rx_port,
+ pmd_commands=["stop", "port 0 rxq 0 stop", "start"],
+ )
+ self.verify(
+ len(packets_captured) == 0
+ and stats[self.used_dut_rx_port]["TX-packets"] == 0,
+ "receive packet num is not match",
+ )
+ packets_captured, _, stats = self.rxtx_base.execute_fwd_check_process(
+ packets=self.rxtx_base.generate_random_packets(
+ dstmac=VF_MAC_ADDR, pktnum=4
+ ),
+ rx_port=self.used_dut_rx_port,
+ pmd_commands=["stop", "port 0 rxq 0 start", "port 1 txq 0 stop", "start"],
+ )
+ self.verify(
+ len(packets_captured) == 0
+ and stats[self.used_dut_rx_port]["TX-packets"] == 0,
+ "receive packet num is not match",
+ )
+ self.rxtx_base.basic_macfwd_check(
+ packet_num=4,
+ dst_mac=VF_MAC_ADDR,
+ rx_port=self.used_dut_rx_port,
+ pmd_commands=["stop", "port 1 txq 0 start", "start"],
+ )
+
+ def test_vf_rss(self):
+ rss_type = ["ip", "tcp", "udp"]
+ rss_reta = [3, 2, 1, 0] * 16
+ try:
+ self.restart_testpmd(param="--rxq=4 --txq=4")
+ self.rxtx_base.rss_reta_config_check(rss_reta)
+ for _rss in rss_type:
+ hash_table = self.rxtx_base.basic_rss_check(
+ dst_mac=VF_MAC_ADDR, rss_type=_rss, queue_num=4
+ )
+ self.rxtx_base.rss_reta_hit_check(hash_table, rss_reta)
+ self.rxtx_base.execute_pmd_cmd("stop")
+ except Exception as e:
+ raise VerifyFailure(e)
+ finally:
+ self.restart_testpmd()
+
+ def test_vf_rss_hash_key(self):
+ update_hash_key = "1b9d58a4b961d9cd1c56ad1621c3ad51632c16a5d16c21c3513d132c135d132c13ad1531c23a51d6ac49879c499d798a7d949c8a"
+ try:
+ self.restart_testpmd(param="--rxq=4 --txq=4")
+ self.rxtx_base.basic_rss_hash_key_check(
+ dst_mac=VF_MAC_ADDR, hash_key=update_hash_key
+ )
+ except Exception as e:
+ raise VerifyFailure(e)
+ finally:
+ self.restart_testpmd()
+
+ def test_vf_rss_rxq_txq_inconsistent(self):
+ params = [
+ "--rxq=4 --txq=8",
+ "--rxq=6 --txq=8",
+ "--rxq=3 --txq=9",
+ "--rxq=4 --txq=16",
+ ]
+ if self.kdriver == "ixgbe":
+ params = [
+ "--rxq=2 --txq=4",
+ "--rxq=1 --txq=2",
+ ]
+ try:
+ for param in params:
+ queue_num = re.search(r"--rxq=(\d+)", param).group(1)
+ self.restart_testpmd(param=param)
+ self.rxtx_base.basic_rss_check(
+ dst_mac=VF_MAC_ADDR, rss_type="ip", queue_num=queue_num
+ )
+ except Exception as e:
+ raise VerifyFailure(e)
+ finally:
+ self.restart_testpmd()
+
+ def test_vf_port_start_stop(self):
+ for i in range(10):
+ self.rxtx_base.execute_pmd_cmd("port stop all")
+ self.rxtx_base.execute_pmd_cmd("port start all")
+ self.rxtx_base.basic_macfwd_check(
+ packet_num=100, dst_mac=VF_MAC_ADDR, rx_port=self.used_dut_rx_port
+ )
+
+ def test_vf_statistic_reset(self):
+ out = self.rxtx_base.execute_pmd_cmd("show port stats all")
+ self.verify(
+ "RX-packets: 0" in out and "TX-packets: 0" in out,
+ "receive some misc packet",
+ )
+ self.rxtx_base.basic_macfwd_check(
+ packet_num=100, dst_mac=VF_MAC_ADDR, rx_port=self.used_dut_rx_port
+ )
+ self.rxtx_base.execute_pmd_cmd("clear port stats all")
+ out = self.rxtx_base.execute_pmd_cmd("show port stats all")
+ self.verify(
+ "RX-packets: 0" in out and "TX-packets: 0" in out,
+ "clear port stats fail",
+ )
+
+ def test_vf_information(self):
+ self.rxtx_base.basic_pmd_info_check(self.port_obj[0])
+ self.rxtx_base.basic_macfwd_check(
+ packet_num=100, dst_mac=VF_MAC_ADDR, rx_port=self.used_dut_rx_port
+ )
+
+ def test_vf_xstats_check(self):
+ self.quit_testpmd()
+ try:
+ self.tester.send_expect(
+ "ifconfig {} mtu {}".format(self.tester_tx_interface, 3000), "# "
+ )
+ if self.kdriver == "ixgbe":
+ self.rxtx_base.execute_host_cmd(
+ "ifconfig {} mtu 3000".format(
+ self.port_obj[self.used_dut_tx_port].get_interface_name()
+ )
+ )
+ self.launch_testpmd(param="--rxq=4 --txq=4 --max-pkt-len=9000")
+ for _payload_size in [64, 128, 256, 512, 1024, 1523]:
+ stats_table, xstats_table = self.rxtx_base.basic_xstats_check(
+ packet_num=100,
+ dst_mac=VF_MAC_ADDR,
+ rx_port=self.used_dut_rx_port,
+ payload_size=_payload_size,
+ )
+ self.verify(
+ xstats_table[self.used_dut_tx_port]["rx_good_packets"]
+ == stats_table[self.used_dut_tx_port]["RX-packets"]
+ == xstats_table[self.used_dut_rx_port]["tx_good_packets"]
+ == stats_table[self.used_dut_rx_port]["TX-packets"]
+ == 100,
+ "pkt recieve or transport count error!",
+ )
+ self.verify(
+ xstats_table[self.used_dut_tx_port]["rx_good_bytes"]
+ == stats_table[self.used_dut_tx_port]["RX-bytes"]
+ == xstats_table[self.used_dut_rx_port]["tx_good_bytes"]
+ == stats_table[self.used_dut_rx_port]["TX-bytes"],
+ "pkt recieve or transport bytes error!",
+ )
+ except Exception as e:
+ raise VerifyFailure(e)
+ finally:
+ self.quit_testpmd()
+ self.tester.send_expect(
+ "ifconfig {} mtu {}".format(self.tester_tx_interface, 1500), "# "
+ )
+ if self.kdriver == "ixgbe":
+ self.rxtx_base.execute_host_cmd(
+ "ifconfig {} mtu 1500".format(
+ self.port_obj[self.used_dut_tx_port].get_interface_name()
+ )
+ )
+ self.launch_testpmd()
+
+ def test_vf_unicast(self):
+ self.rxtx_base.basic_macfwd_check(
+ packet_num=10,
+ dst_mac=VF_WRONG_MAC_ADDR,
+ check_miss=True,
+ pmd_commands=[
+ "set promisc all off",
+ "set allmulti all off",
+ "set verbose 1",
+ "set fwd mac",
+ "start",
+ ],
+ rx_port=self.used_dut_rx_port,
+ )
+ self.rxtx_base.basic_macfwd_check(
+ packet_num=10, dst_mac=VF_MAC_ADDR, rx_port=self.used_dut_rx_port
+ )
+
+ def test_vf_mac_add_filter(self):
+ self.quit_testpmd()
+ try:
+ # reset vf mac
+ self.port_obj[self.used_dut_tx_port].set_vf_mac_addr(
+ mac="00:00:00:00:00:00"
+ )
+ self.launch_testpmd()
+ default_vf_mac = self.rxtx_base.get_vf_mac_through_pf(
+ self.port_obj[self.used_dut_tx_port].get_interface_name()
+ )
+ self.rxtx_base.basic_macfwd_check(
+ packet_num=100,
+ dst_mac=VF_MAC_ADDR,
+ rx_port=self.used_dut_rx_port,
+ pmd_commands=[
+ "port stop all",
+ "port config all crc-strip on",
+ "port start all",
+ "set promisc all off",
+ "mac_addr add 0 {}".format(VF_MAC_ADDR),
+ "set verbose 1",
+ "set fwd mac",
+ "start",
+ ],
+ )
+ self.rxtx_base.basic_macfwd_check(
+ packet_num=100,
+ dst_mac=default_vf_mac[0],
+ rx_port=self.used_dut_rx_port,
+ pmd_commands=["clear port stats all"],
+ )
+ self.rxtx_base.basic_macfwd_check(
+ packet_num=100,
+ dst_mac=VF_MAC_ADDR,
+ rx_port=self.used_dut_rx_port,
+ check_miss=True,
+ pmd_commands=[
+ "clear port stats all",
+ "mac_addr remove 0 {}".format(VF_MAC_ADDR),
+ ],
+ )
+ self.rxtx_base.basic_macfwd_check(
+ packet_num=100,
+ dst_mac=VF_WRONG_MAC_ADDR,
+ rx_port=self.used_dut_rx_port,
+ check_miss=True,
+ pmd_commands=[
+ "clear port stats all",
+ ],
+ )
+ except Exception as e:
+ raise VerifyFailure(e)
+ finally:
+ self.quit_testpmd()
+ self.port_obj[self.used_dut_tx_port].set_vf_mac_addr(mac=VF_MAC_ADDR)
+ self.launch_testpmd()
+
+ def test_vf_vlan_filter(self):
+ self.vlan_func.basic_vlan_filter_check(
+ vlan_id=RANDOM_VLAN_ID,
+ match_pkt=MATCHED_VLAN_PKT,
+ unmatch_pkt=UNMATCHED_VLAN_PKT,
+ pmd_commands=[
+ "set promisc all off",
+ "vlan set filter on 0",
+ "vlan set strip off 0",
+ "rx_vlan add %d 0" % RANDOM_VLAN_ID,
+ "set verbose 1",
+ "set fwd mac",
+ "start",
+ ],
+ rx_port=self.used_dut_rx_port,
+ )
+ _, _, _, vlan_id_list = self.vlan_func.vlan_pkts_fwd_check(
+ pkts=MATCHED_VLAN_PKT,
+ pmd_commands=[
+ "rx_vlan rm %d 0" % RANDOM_VLAN_ID,
+ "vlan set filter off 0",
+ ],
+ )
+ if (
+ (self.kdriver == "i40e" and self.nic_obj.driver_version < "2.13.10")
+ or (self.kdriver == "i40e" and not self.default_stats)
+ or (self.kdriver == "ice" and not self.default_stats)
+ ):
+ self.verify(len(vlan_id_list) == 1, "Failed to received vlan packet!!!")
+ else:
+ self.verify(len(vlan_id_list) == 0, "Failed to received vlan packet!!!")
+
+ def test_vf_vlan_strip(self):
+ self.vlan_func.basic_vlan_strip_check(
+ vlan_id=RANDOM_VLAN_ID,
+ match_pkt=MATCHED_VLAN_PKT,
+ pmd_commands=[
+ "set promisc all off",
+ "vlan set filter on 0",
+ "rx_vlan add %d 0" % RANDOM_VLAN_ID,
+ "vlan set strip on 0",
+ "set verbose 1",
+ "set fwd mac",
+ "start",
+ ],
+ rx_port=self.used_dut_rx_port,
+ )
+
+ def test_vf_vlan_insertion(self):
+ try:
+ self.vlan_func.basic_vlan_insert_check(
+ vlan_id=RANDOM_VLAN_ID,
+ insert_vlan=RANDOM_VLAN_ID,
+ match_pkt=self.vlan_func.generate_using_packets(
+ pkt_type="UDP", dst_mac=VF_MAC_ADDR
+ ),
+ pmd_commands=[
+ "port stop all",
+ "set promisc all off",
+ "vlan set filter on 0",
+ "tx_vlan set 1 %s" % RANDOM_VLAN_ID,
+ "rx_vlan add %s 0" % RANDOM_VLAN_ID,
+ "port start all",
+ "set verbose 1",
+ "set fwd mac",
+ "start",
+ ],
+ )
+ except Exception as e:
+ raise VerifyFailure(e)
+ finally:
+ self.restart_testpmd()
+
+ def test_vf_vlan_pvid_base_tx(self):
+ self.quit_testpmd()
+ try:
+ self.vlan_func.set_pvid_from_pf(
+ pf_intf=self.port_obj[self.used_dut_tx_port].get_interface_name(),
+ vlan_id=RANDOM_VLAN_ID,
+ )
+ self.launch_testpmd()
+ packets_captured, _, _ = self.vlan_func.execute_fwd_check_process(
+ packets=self.vlan_func.generate_using_packets(
+ pkt_type="UDP",
+ dst_mac=self.vlan_func.get_vf_mac_through_pf(
+ pf_intf=self.port_obj[
+ self.used_dut_rx_port
+ ].get_interface_name()
+ ),
+ ),
+ tx_port=1,
+ tester_tx_interface=self.tester_rx_interface,
+ tester_rx_interface=self.tester_tx_interface,
+ pmd_commands=["set fwd mac", "set verbose 1", "start"],
+ )
+ self.verify(len(packets_captured) == 1, "Not receive expected packet")
+ except Exception as e:
+ raise VerifyFailure(e)
+ finally:
+ self.quit_testpmd()
+ self.vlan_func.execute_host_cmd(
+ "ip link set {} vf 0 vlan 0".format(
+ self.port_obj[self.used_dut_tx_port].get_interface_name()
+ )
+ )
+ self.launch_testpmd()
+
+ def test_vf_vlan_pvid_base_rx(self):
+ self.quit_testpmd()
+ try:
+ self.vlan_func.set_pvid_from_pf(
+ pf_intf=self.port_obj[self.used_dut_tx_port].get_interface_name(),
+ vlan_id=RANDOM_VLAN_ID,
+ )
+ self.launch_testpmd()
+ rece_num, _, _, _ = self.vlan_func.vlan_pkts_fwd_check(
+ pkts=[MATCHED_VLAN_PKT, UNMATCHED_VLAN_PKT],
+ pmd_commands=[
+ "set fwd rxonly",
+ "set verbose 1",
+ "start",
+ ],
+ )
+ self.verify(rece_num == 1, "Failed to received matched vlan pkt")
+ rece_num, _, _, _ = self.vlan_func.vlan_pkts_fwd_check(
+ pkts=self.vlan_func.generate_using_packets(
+ pkt_type="UDP", dst_mac=VF_MAC_ADDR
+ ),
+ rx_port=self.used_dut_rx_port,
+ )
+ self.verify(rece_num == 0, "Failed to received udp pkt without vlan")
+ self.quit_testpmd()
+ self.vlan_func.set_pvid_from_pf(
+ pf_intf=self.port_obj[self.used_dut_tx_port].get_interface_name(),
+ vlan_id=0,
+ )
+ self.launch_testpmd()
+ rece_num, _, _, _ = self.vlan_func.vlan_pkts_fwd_check(
+ pkts=MATCHED_VLAN_PKT,
+ rx_port=self.used_dut_rx_port,
+ pmd_commands=[
+ "set fwd rxonly",
+ "set verbose 1",
+ "start",
+ ],
+ )
+ if (
+ (self.kdriver == "i40e" and self.nic_obj.driver_version < "2.13.10")
+ or (self.kdriver == "i40e" and not self.default_stats)
+ or (self.kdriver == "ice" and not self.default_stats)
+ ):
+ self.verify(rece_num == 1, "Failed to received vlan packet!!!")
+ else:
+ self.verify(rece_num == 0, "Failed to received vlan packet!!!")
+
+ rece_num, _, _, _ = self.vlan_func.vlan_pkts_fwd_check(
+ pkts=[
+ self.vlan_func.generate_using_packets(
+ pkt_type="VLAN_UDP", vlan_id=0, dst_mac=VF_MAC_ADDR
+ ),
+ self.vlan_func.generate_using_packets(
+ pkt_type="UDP", dst_mac=VF_MAC_ADDR
+ ),
+ ],
+ rx_port=self.used_dut_rx_port,
+ )
+ self.verify(rece_num == 2, "Not received expect packet")
+ except Exception as e:
+ raise VerifyFailure(e)
+ finally:
+ self.quit_testpmd()
+ self.vlan_func.execute_host_cmd(
+ "ip link set {} vf 0 vlan 0".format(
+ self.port_obj[self.used_dut_tx_port].get_interface_name()
+ )
+ )
+ self.launch_testpmd()
+
+ def test_vf_vlan_rx_combination(self):
+ rx_vlans = [1, RANDOM_VLAN_ID, MAX_VLAN_ID]
+ param = "--enable-hw-vlan" if self.kdriver != "ixgbe" else ""
+ try:
+ self.restart_testpmd(param=param)
+ rece_num, _, _, _ = self.vlan_func.vlan_pkts_fwd_check(
+ pkts=self.vlan_func.generate_using_packets(
+ pkt_type="UDP", dst_mac=VF_MAC_ADDR
+ ),
+ rx_port=self.used_dut_rx_port,
+ pmd_commands=[
+ "set fwd rxonly",
+ "set verbose 1",
+ "vlan set strip on 0",
+ "vlan set filter on 0",
+ "set promisc all off",
+ "start",
+ ],
+ )
+ self.verify(rece_num == 1, "Not received normal packet as default")
+ _, pmdout, _ = self.vlan_func.execute_fwd_check_process(
+ packets=self.vlan_func.generate_using_packets(
+ pkt_type="VLAN_UDP", vlan_id=0, dst_mac=VF_MAC_ADDR
+ ),
+ rx_port=self.used_dut_rx_port,
+ )
+ self.verify(
+ "VLAN tci=0x0" in pmdout, "Not received vlan 0 packet as default"
+ )
+ for rx_vlan in rx_vlans:
+ _, pmdout, _ = self.vlan_func.execute_fwd_check_process(
+ packets=self.vlan_func.generate_using_packets(
+ pkt_type="VLAN_UDP", vlan_id=rx_vlan, dst_mac=VF_MAC_ADDR
+ ),
+ rx_port=self.used_dut_rx_port,
+ pmd_commands="rx_vlan add {} 0".format(rx_vlan),
+ )
+ self.verify(
+ "VLAN tci={}".format(hex(rx_vlan)) in pmdout,
+ "Not received expected vlan packet",
+ )
+ if rx_vlan == MAX_VLAN_ID:
+ continue
+ rece_num, _, _, _ = self.vlan_func.vlan_pkts_fwd_check(
+ pkts=self.vlan_func.generate_using_packets(
+ pkt_type="VLAN_UDP", vlan_id=rx_vlan + 1, dst_mac=VF_MAC_ADDR
+ ),
+ rx_port=self.used_dut_rx_port,
+ )
+ self.verify(rece_num == 0, "failed to receive unmatch vlan pkt")
+ for rx_vlan in rx_vlans:
+ self.vlan_func.execute_pmd_cmd("rx_vlan rm {} 0".format(rx_vlan))
+ _, pmdout, _ = self.vlan_func.execute_fwd_check_process(
+ packets=self.vlan_func.generate_using_packets(
+ pkt_type="VLAN_UDP", vlan_id=0, dst_mac=VF_MAC_ADDR
+ ),
+ rx_port=self.used_dut_rx_port,
+ )
+ self.verify(
+ "VLAN tci=0x0" in pmdout, "Not received vlan 0 packet as default"
+ )
+ rece_num, _, _, _ = self.vlan_func.vlan_pkts_fwd_check(
+ pkts=self.vlan_func.generate_using_packets(
+ pkt_type="UDP", dst_mac=VF_MAC_ADDR
+ ),
+ rx_port=self.used_dut_rx_port,
+ )
+ self.verify(
+ rece_num == 1, "Not received normal packet after remove vlan filter"
+ )
+ rece_num, _, _, _ = self.vlan_func.vlan_pkts_fwd_check(
+ pkts=MATCHED_VLAN_PKT,
+ rx_port=self.used_dut_rx_port,
+ )
+ if (
+ (self.kdriver == "i40e" and self.nic_obj.driver_version < "2.13.10")
+ or (self.kdriver == "i40e" and not self.default_stats)
+ or (self.kdriver == "ice" and not self.default_stats)
+ ):
+ self.verify(rece_num == 1, "Failed to received vlan packet!!!")
+ else:
+ self.verify(rece_num == 0, "Failed to received vlan packet!!!")
+ except Exception as e:
+ raise VerifyFailure(e)
+ finally:
+ self.restart_testpmd()
+
+ def test_vf_vlan_promisc(self):
+ rece_num, _, _, _ = self.vlan_func.vlan_pkts_fwd_check(
+ pkts=[
+ self.vlan_func.generate_using_packets(
+ pkt_str=[
+ 'Ether(dst="{}",type=0x8100)/Dot1Q(vlan=100,type=0x0800)/IP(src="196.222.232.{}")/("X"*480)'.format(
+ VF_MAC_ADDR, i
+ )
+ for i in range(10)
+ ]
+ )
+ ],
+ pmd_commands=[
+ "port stop all",
+ "set promisc all on",
+ "set fwd mac",
+ "set verbose 1",
+ "vlan set filter off 0",
+ "vlan set strip off 0",
+ "port start all",
+ "start",
+ ],
+ )
+ if (
+ (self.kdriver == "i40e" and self.nic_obj.driver_version < "2.13.10")
+ or (self.kdriver == "i40e" and not self.default_stats)
+ or (self.kdriver == "ice" and not self.default_stats)
+ ):
+ self.verify(rece_num == 10, "Not receive expected packet")
+ else:
+ self.verify(rece_num == 0, "Receive expected packet")
+ rece_num, _, _, _ = self.vlan_func.vlan_pkts_fwd_check(
+ pkts=[
+ self.vlan_func.generate_using_packets(
+ pkt_str=[
+ 'Ether(dst="{}")/IP(src="196.222.232.{}")/("X"*480)'.format(
+ VF_MAC_ADDR, i
+ )
+ for i in range(10)
+ ]
+ )
+ ],
+ )
+ self.verify(rece_num == 10, "Not receive expected packet")
+
+ def test_iavf_dual_vlan_filter(self):
+ rece_num, _, _, _ = self.vlan_func.vlan_pkts_fwd_check(
+ pkts=MATCHED_DOUBLE_VLAN_PKT,
+ pmd_commands=[
+ "vlan set filter on 0",
+ "set verbose 1",
+ "set fwd mac",
+ "start",
+ ],
+ )
+ self.vlan_func.vlan_offload_flag_check(filter="on")
+ if self.default_stats:
+ self.verify(rece_num == 0, "Failed received vlan packet!")
+ else:
+ self.verify(
+ rece_num == len(MATCHED_DOUBLE_VLAN_PKT),
+ "Failed received vlan packet!",
+ )
+ self.vlan_func.basic_vlan_filter_check(
+ vlan_id=OUTER_VLAN_ID,
+ match_pkt=MATCHED_DOUBLE_VLAN_PKT[0],
+ unmatch_pkt=UNMATCHED_DOUBLE_VLAN_PKT[0],
+ pmd_commands=[
+ "rx_vlan add %d 0" % OUTER_VLAN_ID,
+ ],
+ double_vlan=True,
+ rx_port=self.used_dut_rx_port,
+ )
+ self.vlan_func.vlan_prio_check(
+ pkts=MATCHED_DOUBLE_VLAN_PKT[0], outer=1, inner=2
+ )
+ self.vlan_func.basic_vlan_filter_check(
+ vlan_id=OUTER_VLAN_ID,
+ match_pkt=MATCHED_DOUBLE_VLAN_PKT[1],
+ unmatch_pkt=UNMATCHED_DOUBLE_VLAN_PKT[1],
+ double_vlan=False,
+ rx_port=self.used_dut_rx_port,
+ )
+ rece_num, _, _, _ = self.vlan_func.vlan_pkts_fwd_check(
+ pkts=MATCHED_DOUBLE_VLAN_PKT,
+ pmd_commands=[
+ "rx_vlan rm %d 0" % OUTER_VLAN_ID,
+ ],
+ )
+ if self.default_stats:
+ self.verify(rece_num == 0, "Failed error received vlan packet!")
+ else:
+ self.verify(rece_num == 2, "Failed error received vlan packet!")
+
+ def test_iavf_dual_vlan_strip(self):
+ rece_num, _, _, vlan_id_list = self.vlan_func.vlan_pkts_fwd_check(
+ pkts=MATCHED_DOUBLE_VLAN_PKT,
+ pmd_commands=[
+ "vlan set filter on 0",
+ "rx_vlan add %d 0" % OUTER_VLAN_ID,
+ "vlan set strip on 0",
+ "set verbose 1",
+ "set fwd mac",
+ "start",
+ ],
+ )
+ self.vlan_func.vlan_offload_flag_check(filter="on", strip="on")
+ self.verify(
+ rece_num == len(MATCHED_DOUBLE_VLAN_PKT)
+ and len(vlan_id_list) == 1
+ and INNTER_VLAN_ID in vlan_id_list,
+ "Failed to receive vlan pkts with vlan tag",
+ )
+ rece_num, _, _, vlan_id_list = self.vlan_func.vlan_pkts_fwd_check(
+ pkts=MATCHED_DOUBLE_VLAN_PKT,
+ pmd_commands=[
+ "vlan set strip off 0",
+ ],
+ )
+ self.vlan_func.vlan_offload_flag_check(strip="off")
+ self.verify(
+ rece_num == len(MATCHED_DOUBLE_VLAN_PKT) and len(vlan_id_list) == 3,
+ "Failed to receive vlan pkts with vlan tag",
+ )
+
+ def test_iavf_dual_vlan_insert(self):
+ """
+ Test case: IAVF DUAL VLAN header insertion
+ """
+
+ """
+ according to dpdk commit d048a0aaae27809523969904c2f7b71fe3cc1bb6,
+ when the ice driver version newer than 1.8.9, avx512 tx path not support
+ insert correct vlag tag(outer of QinQ)
+ """
+ if self.rx_mode == "avx512" and self.vlan_func.convert_driver_version_value(
+ self.nic_obj.driver_version
+ ) > self.vlan_func.convert_driver_version_value("1.8.9"):
+ self.skip_case(False, "avx512 tx path not support insert correct vlan tag")
+ try:
+ self.vlan_func.basic_vlan_insert_check(
+ vlan_id=RANDOM_VLAN_ID,
+ insert_vlan=INNTER_VLAN_ID,
+ match_pkt=MATCHED_VLAN_PKT,
+ pmd_commands=[
+ "vlan set filter on 0",
+ "rx_vlan add {} 0".format(RANDOM_VLAN_ID),
+ "port stop 1",
+ "tx_vlan set 1 {}".format(INNTER_VLAN_ID),
+ "port start 1",
+ "set verbose 1",
+ "set fwd mac",
+ "start",
+ ],
+ rx_port=self.used_dut_rx_port,
+ double_vlan=INNTER_VLAN_ID,
+ )
+ self.vlan_func.basic_vlan_insert_check(
+ vlan_id=RANDOM_VLAN_ID,
+ insert_vlan=INNTER_VLAN_ID,
+ match_pkt=self.vlan_func.generate_using_packets(
+ pkt_type="UDP", dst_mac=VF_MAC_ADDR
+ ),
+ rx_port=self.used_dut_rx_port,
+ )
+ except Exception as e:
+ raise VerifyFailure(e)
+ finally:
+ self.restart_testpmd()
+
+ @check_supported_nic(ETH_800_SERIES)
+ def test_enable_disable_iavf_crc_strip(self):
+ """
+ Test case: Enable/disable AVF CRC stripping
+ """
+ try:
+ self.restart_testpmd(param="--disable-crc-strip")
+ packets_captured, pmdout, stats = self.rxtx_base.execute_fwd_check_process(
+ packets=self.rxtx_base.generate_random_packets(
+ dstmac=VF_MAC_ADDR, pktnum=1
+ ),
+ rx_port=self.used_dut_rx_port,
+ pmd_commands=["set verbose 1", "set fwd mac", "start"],
+ )
+ pkt_len_list = self.rxtx_base.get_pmd_rece_pkt_len(pmdout)
+ self.verify(
+ int(pkt_len_list[0]) + 4 == stats[self.used_dut_tx_port]["RX-bytes"],
+ "CRC strip off failed",
+ )
+
+ packets_captured, pmdout, stats = self.rxtx_base.execute_fwd_check_process(
+ packets=self.rxtx_base.generate_random_packets(
+ dstmac=VF_MAC_ADDR, pktnum=1
+ ),
+ rx_port=self.used_dut_rx_port,
+ pmd_commands=[
+ "stop",
+ "port stop 0",
+ "port config 0 rx_offload keep_crc off",
+ "port start 0",
+ "start",
+ ],
+ )
+ pkt_len_list = self.rxtx_base.get_pmd_rece_pkt_len(pmdout)
+ self.verify(
+ int(pkt_len_list[0]) == stats[self.used_dut_tx_port]["RX-bytes"],
+ "CRC strip off failed",
+ )
+
+ packets_captured, pmdout, stats = self.rxtx_base.execute_fwd_check_process(
+ packets=self.rxtx_base.generate_random_packets(
+ dstmac=VF_MAC_ADDR, pktnum=1
+ ),
+ rx_port=self.used_dut_rx_port,
+ pmd_commands=[
+ "stop",
+ "port stop 0",
+ "port config 0 rx_offload keep_crc on",
+ "port start 0",
+ "start",
+ ],
+ )
+ pkt_len_list = self.rxtx_base.get_pmd_rece_pkt_len(pmdout)
+ self.verify(
+ int(pkt_len_list[0]) + 4 == stats[self.used_dut_tx_port]["RX-bytes"],
+ "CRC strip off failed",
+ )
+
+ self.restart_testpmd()
+ packets_captured, pmdout, stats = self.rxtx_base.execute_fwd_check_process(
+ packets=self.rxtx_base.generate_random_packets(
+ dstmac=VF_MAC_ADDR, pktnum=1
+ ),
+ rx_port=self.used_dut_rx_port,
+ pmd_commands=[
+ "set verbose 1",
+ "set fwd mac",
+ "start",
+ ],
+ )
+ pkt_len_list = self.rxtx_base.get_pmd_rece_pkt_len(pmdout)
+ self.verify(
+ int(pkt_len_list[0]) == stats[self.used_dut_tx_port]["RX-bytes"],
+ "CRC strip off failed",
+ )
+ except Exception as e:
+ raise VerifyFailure(e)
+ finally:
+ self.restart_testpmd()
+
+ @check_supported_nic(ETH_800_SERIES)
+ def test_crc_strip_iavf_vlan_strip_coexists(self):
+ """
+ Test case: IAVF CRC strip and Vlan strip co-exists
+ """
+ self.vlan_func.set_pmd_fwd_mode("mac")
+ self.vlan_func.vlan_offload_flag_check(strip="off")
+ # vlan strip off, CRC strip on
+ self.vlan_func.execute_pmd_cmd(["stop", "vlan set strip off 0"])
+ self.vlan_func.vlan_offload_flag_check(strip="off")
+ packets_captured, pmdout, stats = self.vlan_func.execute_fwd_check_process(
+ packets=MATCHED_DOUBLE_VLAN_PKT[0],
+ rx_port=self.used_dut_rx_port,
+ pmd_commands=[
+ "vlan set filter on 0",
+ "rx_vlan add %d 0" % OUTER_VLAN_ID,
+ "start",
+ ],
+ )
+ _, vlan_id_list = self.vlan_func.get_pkts_vlan_layer(packets_captured, "vlan")
+ pkt_len_list = self.vlan_func.get_pmd_rece_pkt_len(pmdout)
+ self.verify(
+ len(vlan_id_list) == 2
+ and stats[self.used_dut_tx_port]["RX-bytes"]
+ == stats[self.used_dut_rx_port]["TX-bytes"]
+ == int(pkt_len_list[0]),
+ "CRC strip on and vlan strip off should have same values for rx/tx bytes and pkt_len",
+ )
+ compare_pkt_len = int(pkt_len_list[0])
+ # vlan strip on, CRC strip on
+ self.vlan_func.execute_pmd_cmd("vlan set strip on 0")
+ self.vlan_func.vlan_offload_flag_check(strip="on")
+ packets_captured, pmdout, stats = self.vlan_func.execute_fwd_check_process(
+ packets=MATCHED_DOUBLE_VLAN_PKT[0],
+ rx_port=self.used_dut_rx_port,
+ )
+ _, vlan_id_list = self.vlan_func.get_pkts_vlan_layer(packets_captured, "vlan")
+ pkt_len_list = self.vlan_func.get_pmd_rece_pkt_len(pmdout)
+ self.verify(
+ stats[self.used_dut_tx_port]["RX-bytes"] == int(pkt_len_list[0]) + 4
+ and int(pkt_len_list[0]) + 4 == compare_pkt_len
+ and len(vlan_id_list) == 1,
+ "CRC strip on, vlan strip on, coexists test failed",
+ )
+ # vlan strip off, CRC strip off
+ self.vlan_func.execute_pmd_cmd("vlan set strip off 0")
+ self.vlan_func.vlan_offload_flag_check(strip="off")
+ packets_captured, pmdout, stats = self.vlan_func.execute_fwd_check_process(
+ packets=MATCHED_DOUBLE_VLAN_PKT[0],
+ rx_port=self.used_dut_rx_port,
+ pmd_commands=[
+ "stop",
+ "port stop 0",
+ "port config 0 rx_offload keep_crc on",
+ "port start 0",
+ "start",
+ ],
+ )
+ _, vlan_id_list = self.vlan_func.get_pkts_vlan_layer(packets_captured, "vlan")
+ pkt_len_list = self.vlan_func.get_pmd_rece_pkt_len(pmdout)
+ self.verify(
+ stats[self.used_dut_tx_port]["RX-bytes"] == int(pkt_len_list[0]) + 4
+ and int(pkt_len_list[0]) == compare_pkt_len
+ and len(vlan_id_list) == 2,
+ "CRC strip off, vlan strip off, coexists test failed",
+ )
+ # vlan strip on, CRC strip off
+ out = self.vlan_func.execute_pmd_cmd("vlan set strip on 0")
+ self.verify(
+ "iavf_config_vlan_strip_v2(): fail to execute command VIRTCHNL_OP_ENABLE_VLAN_STRIPPING_V2"
+ in out,
+ "set vlan strip on successfully",
+ )
+ packets_captured, pmdout, stats = self.vlan_func.execute_fwd_check_process(
+ packets=MATCHED_DOUBLE_VLAN_PKT[0],
+ rx_port=self.used_dut_rx_port,
+ )
+ _, vlan_id_list = self.vlan_func.get_pkts_vlan_layer(packets_captured, "vlan")
+ pkt_len_list = self.vlan_func.get_pmd_rece_pkt_len(pmdout)
+ self.verify(
+ stats[self.used_dut_tx_port]["RX-bytes"] == int(pkt_len_list[0]) + 4
+ and int(pkt_len_list[0]) == compare_pkt_len
+ and len(vlan_id_list) == 2,
+ "CRC strip off, vlan strip off, coexists test failed",
+ )
+ # vlan strip off, CRC strip on
+ self.vlan_func.execute_pmd_cmd("vlan set strip off 0")
+ self.vlan_func.vlan_offload_flag_check(strip="off")
+ packets_captured, pmdout, stats = self.vlan_func.execute_fwd_check_process(
+ packets=MATCHED_DOUBLE_VLAN_PKT[0],
+ rx_port=self.used_dut_rx_port,
+ pmd_commands=[
+ "stop",
+ "port stop 0",
+ "port config 0 rx_offload keep_crc off",
+ "port start 0",
+ "start",
+ ],
+ )
+ self.verify(
+ stats[self.used_dut_tx_port]["RX-bytes"] == int(pkt_len_list[0])
+ and int(pkt_len_list[0]) == compare_pkt_len
+ and len(vlan_id_list) == 2,
+ "CRC strip on, vlan strip off, coexists test failed",
+ )
+
+ def tear_down(self):
+ self.rxtx_base.execute_pmd_cmd("stop")
+
+ def tear_down_all(self):
+ self.quit_testpmd()
+ self.rxtx_base.destroy_vm_env()
+ self.rxtx_base.destroy_vf()