new file mode 100644
@@ -0,0 +1,710 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2023 Intel Corporation
+#
+
+import random
+
+from framework.exception import VerifyFailure
+from framework.settings import ETH_800_SERIES
+from framework.test_case import TestCase, check_supported_nic
+
+from .func_test_base import *
+
+VF_MAC_ADDR = "00:01:23:45:67:89"
+VF_WRONG_MAC_ADDR = "00:01:23:45:67:99"
+MULTICAST_MAC_ADDR = "01:80:C2:00:00:08"
+BROADCAST_MAC_ADDR = "FF:FF:FF:FF:FF:FF"
+
+MAX_VLAN_ID = 4095
+RANDOM_VLAN_ID = random.randint(2, MAX_VLAN_ID)
+OUTER_VLAN_ID = 1
+INNTER_VLAN_ID = 2
+
+MATCHED_VLAN_PKT = FuncTestBase.generate_using_packets(
+ pkt_type="VLAN_UDP", vlan_id=RANDOM_VLAN_ID, dst_mac=VF_MAC_ADDR
+)
+UNMATCHED_VLAN_PKT = FuncTestBase.generate_using_packets(
+ pkt_type="VLAN_UDP", vlan_id=RANDOM_VLAN_ID - 1, dst_mac=VF_MAC_ADDR
+)
+
+
+class TestIceKernelpfDcf(TestCase):
+ @check_supported_nic(ETH_800_SERIES)
+ def set_up_all(self):
+ self.dut_port = self.dut.get_ports(self.nic)
+ self.used_dut_tx_port = self.dut_port[0]
+ self.used_dut_rx_port = self.dut_port[1]
+ self.port_obj = [self.dut.ports_info[port]["port"] for port in self.dut_port]
+ self.tester_tx_interface = self.tester.get_interface(
+ self.tester.get_local_port(self.used_dut_tx_port)
+ )
+ self.tester_rx_interface = self.tester.get_interface(
+ self.tester.get_local_port(self.used_dut_rx_port)
+ )
+ self.rxtx_base = RxTxBaseTest(
+ self, self.tester_tx_interface, self.tester_rx_interface
+ )
+ self.vlan_func = VlanFuncBaseTest(
+ self, self.tester_tx_interface, self.tester_rx_interface
+ )
+ self.rxtx_base.check_port_num_for_test(2)
+ self.setup_env_configuration()
+ self.testpmd_flag = False
+ self.launch_testpmd()
+
+ def set_up(self):
+ pass
+
+ def pf_config(self):
+ self.flag = "vf-vlan-pruning"
+ self.dut.bind_interfaces_linux(self.kdriver)
+ self.default_stats = self.dut.get_priv_flags_state(
+ self.port_obj[self.used_dut_tx_port].get_interface_name(), self.flag
+ )
+ if not self.default_stats:
+ self.logger.warning(
+ f"{self.kdriver + '_' + self.nic_obj.driver_version} driver does not have vf-vlan-pruning flag."
+ )
+ else:
+ self.dut.send_expect(
+ "ethtool --set-priv-flags %s %s on"
+ % (
+ self.port_obj[self.used_dut_tx_port].get_interface_name(),
+ self.flag,
+ ),
+ "# ",
+ )
+ self.dut.send_expect(
+ "ethtool --set-priv-flags %s %s on"
+ % (
+ self.port_obj[self.used_dut_rx_port].get_interface_name(),
+ self.flag,
+ ),
+ "# ",
+ )
+
+ def vf_config(self):
+ self.orig_vf_mac = self.rxtx_base.get_vf_mac_through_pf(
+ self.port_obj[self.used_dut_tx_port].get_interface_name()
+ )
+ self.port_obj[self.used_dut_tx_port].set_vf_mac_addr(mac=VF_MAC_ADDR)
+ self.dut.send_expect(
+ "ip link set %s vf 0 spoofchk off"
+ % (self.port_obj[self.used_dut_tx_port].get_interface_name()),
+ "# ",
+ )
+ self.dut.send_expect(
+ "ip link set %s vf 0 trust on"
+ % (self.port_obj[self.used_dut_tx_port].get_interface_name()),
+ "# ",
+ )
+ self.dut.send_expect(
+ "ip link set %s vf 0 spoofchk off"
+ % (self.port_obj[self.used_dut_rx_port].get_interface_name()),
+ "# ",
+ )
+ self.dut.send_expect(
+ "ip link set %s vf 0 trust on"
+ % (self.port_obj[self.used_dut_rx_port].get_interface_name()),
+ "# ",
+ )
+
+ def launch_testpmd(self, **kwargs):
+ if not self.testpmd_flag:
+ self.rxtx_base.launch_testpmd(
+ ports=self.vm_pci,
+ port_options={self.vm_pci[0]: "cap=dcf", self.vm_pci[1]: "cap=dcf"},
+ **kwargs,
+ )
+ self.testpmd_flag = True
+
+ def quit_testpmd(self):
+ if self.testpmd_flag:
+ self.rxtx_base.pmd_session.quit()
+ self.testpmd_flag = False
+
+ def restart_testpmd(self, param=""):
+ self.quit_testpmd()
+ self.launch_testpmd(param=param)
+
+ def setup_env_configuration(self):
+ self.vm_obj, self.vm_dut = self.rxtx_base.vf_test_preset_env_vm(
+ pf_port=[self.used_dut_tx_port, self.used_dut_rx_port],
+ vfs_num=1,
+ vm_name="vm0",
+ )
+ self.vm_pci = [
+ self.vm_obj.pci_maps[i]["guestpci"]
+ for i in range(len(self.vm_obj.pci_maps))
+ ]
+ self.rxtx_base.init_pmd_session(self.vm_dut)
+ self.vlan_func.init_pmd_session(self.vm_dut)
+
+ def test_dcf_basic_rxtx(self):
+ self.rxtx_base.basic_rx_check(packets_num=100, packet_dst_mac=VF_MAC_ADDR)
+ self.rxtx_base.basic_tx_check()
+
+ def test_dcf_promisc_mode(self):
+ self.rxtx_base.basic_promisc_check(
+ match_mac=VF_MAC_ADDR,
+ unmatch_mac=VF_WRONG_MAC_ADDR,
+ pmd_commands="set promisc all on",
+ )
+
+ def test_dcf_multicast(self):
+ self.rxtx_base.basic_multicast_check(
+ normal_mac=VF_MAC_ADDR, multicast_mac=MULTICAST_MAC_ADDR
+ )
+
+ def test_dcf_broadcast(self):
+ self.rxtx_base.basic_rx_check(packets_num=1, packet_dst_mac=BROADCAST_MAC_ADDR)
+
+ def test_dcf_queue_start_stop(self):
+ self.rxtx_base.basic_macfwd_check(
+ packet_num=4, dst_mac=VF_MAC_ADDR, rx_port=self.used_dut_rx_port
+ )
+ packets_captured, _, stats = self.rxtx_base.execute_fwd_check_process(
+ packets=self.rxtx_base.generate_random_packets(
+ dstmac=VF_MAC_ADDR, pktnum=4
+ ),
+ rx_port=self.used_dut_rx_port,
+ pmd_commands=["stop", "port 0 rxq 0 stop", "start"],
+ )
+ self.verify(
+ len(packets_captured) == 0
+ and stats[self.used_dut_rx_port]["TX-packets"] == 0,
+ "receive packet num is not match",
+ )
+ packets_captured, _, stats = self.rxtx_base.execute_fwd_check_process(
+ packets=self.rxtx_base.generate_random_packets(
+ dstmac=VF_MAC_ADDR, pktnum=4
+ ),
+ rx_port=self.used_dut_rx_port,
+ pmd_commands=["stop", "port 0 rxq 0 start", "port 1 txq 0 stop", "start"],
+ )
+ self.verify(
+ len(packets_captured) == 0
+ and stats[self.used_dut_rx_port]["TX-packets"] == 0,
+ "receive packet num is not match",
+ )
+ self.rxtx_base.basic_macfwd_check(
+ packet_num=4,
+ dst_mac=VF_MAC_ADDR,
+ rx_port=self.used_dut_rx_port,
+ pmd_commands=["stop", "port 1 txq 0 start", "start"],
+ )
+
+ def test_dcf_rss(self):
+ rss_type = ["ip", "tcp", "udp"]
+ rss_reta = [3, 2, 1, 0] * 16
+ try:
+ self.restart_testpmd(param="--rxq=4 --txq=4")
+ self.rxtx_base.rss_reta_config_check(rss_reta)
+ for _rss in rss_type:
+ hash_table = self.rxtx_base.basic_rss_check(
+ dst_mac=VF_MAC_ADDR, rss_type=_rss, queue_num=4
+ )
+ self.rxtx_base.rss_reta_hit_check(hash_table, rss_reta)
+ self.rxtx_base.execute_pmd_cmd("stop")
+ except Exception as e:
+ raise VerifyFailure(e)
+ finally:
+ self.restart_testpmd()
+
+ def test_dcf_rss_hash_key(self):
+ update_hash_key = "1b9d58a4b961d9cd1c56ad1621c3ad51632c16a5d16c21c3513d132c135d132c13ad1531c23a51d6ac49879c499d798a7d949c8a"
+ try:
+ self.restart_testpmd(param="--rxq=4 --txq=4")
+ self.rxtx_base.basic_rss_hash_key_check(
+ dst_mac=VF_MAC_ADDR, hash_key=update_hash_key
+ )
+ except Exception as e:
+ raise VerifyFailure(e)
+ finally:
+ self.restart_testpmd()
+
+ def test_dcf_rss_rxq_txq_inconsistent(self):
+ params = [
+ "--rxq=4 --txq=8",
+ "--rxq=6 --txq=8",
+ "--rxq=3 --txq=9",
+ "--rxq=4 --txq=16",
+ ]
+ if self.kdriver == "ixgbe":
+ params = [
+ "--rxq=2 --txq=4",
+ "--rxq=1 --txq=2",
+ ]
+ try:
+ for param in params:
+ queue_num = re.search(r"--rxq=(\d+)", param).group(1)
+ self.restart_testpmd(param=param)
+ self.rxtx_base.basic_rss_check(
+ dst_mac=VF_MAC_ADDR, rss_type="ip", queue_num=queue_num
+ )
+ except Exception as e:
+ raise VerifyFailure(e)
+ finally:
+ self.restart_testpmd()
+
+ def test_dcf_port_start_stop(self):
+ for i in range(10):
+ self.rxtx_base.execute_pmd_cmd("port stop all")
+ self.rxtx_base.execute_pmd_cmd("port start all")
+ self.rxtx_base.basic_macfwd_check(
+ packet_num=100, dst_mac=VF_MAC_ADDR, rx_port=self.used_dut_rx_port
+ )
+
+ def test_dcf_statistic_reset(self):
+ out = self.rxtx_base.execute_pmd_cmd("show port stats all")
+ self.verify(
+ "RX-packets: 0" in out and "TX-packets: 0" in out,
+ "receive some misc packet",
+ )
+ self.rxtx_base.basic_macfwd_check(
+ packet_num=100, dst_mac=VF_MAC_ADDR, rx_port=self.used_dut_rx_port
+ )
+ self.rxtx_base.execute_pmd_cmd("clear port stats all")
+ out = self.rxtx_base.execute_pmd_cmd("show port stats all")
+ self.verify(
+ "RX-packets: 0" in out and "TX-packets: 0" in out,
+ "clear port stats fail",
+ )
+
+ def test_dcf_information(self):
+ self.rxtx_base.basic_pmd_info_check(self.port_obj[0])
+ self.rxtx_base.basic_macfwd_check(
+ packet_num=100, dst_mac=VF_MAC_ADDR, rx_port=self.used_dut_rx_port
+ )
+
+ def test_dcf_xstats_check(self):
+ try:
+ self.quit_testpmd()
+ self.tester.send_expect(
+ "ifconfig {} mtu {}".format(self.tester_tx_interface, 3000), "# "
+ )
+ if self.kdriver == "ixgbe":
+ self.rxtx_base.execute_host_cmd(
+ "ifconfig {} mtu 3000".format(
+ self.port_obj[self.used_dut_tx_port].get_interface_name()
+ )
+ )
+ self.launch_testpmd(param="--rxq=4 --txq=4 --max-pkt-len=9000")
+ for _payload_size in [64, 128, 256, 512, 1024, 1523]:
+
+ stats_table, xstats_table = self.rxtx_base.basic_xstats_check(
+ packet_num=100,
+ dst_mac=VF_MAC_ADDR,
+ rx_port=self.used_dut_rx_port,
+ payload_size=_payload_size,
+ )
+ self.verify(
+ xstats_table[self.used_dut_tx_port]["rx_good_packets"]
+ == stats_table[self.used_dut_tx_port]["RX-packets"]
+ == xstats_table[self.used_dut_rx_port]["tx_good_packets"]
+ == stats_table[self.used_dut_rx_port]["TX-packets"]
+ == 100,
+ "pkt recieve or transport count error!",
+ )
+ self.verify(
+ xstats_table[self.used_dut_tx_port]["rx_good_bytes"]
+ == stats_table[self.used_dut_tx_port]["RX-bytes"]
+ == xstats_table[self.used_dut_rx_port]["tx_good_bytes"]
+ == stats_table[self.used_dut_rx_port]["TX-bytes"],
+ "pkt recieve or transport bytes error!",
+ )
+ except Exception as e:
+ raise VerifyFailure(e)
+ finally:
+ self.tester.send_expect(
+ "ifconfig {} mtu {}".format(self.tester_tx_interface, 1500), "# "
+ )
+ self.quit_testpmd()
+ if self.kdriver == "ixgbe":
+ self.rxtx_base.execute_host_cmd(
+ "ifconfig {} mtu 1500".format(
+ self.port_obj[self.used_dut_tx_port].get_interface_name()
+ )
+ )
+ self.launch_testpmd()
+
+ def test_dcf_unicast(self):
+ self.rxtx_base.basic_macfwd_check(
+ packet_num=10,
+ dst_mac=VF_WRONG_MAC_ADDR,
+ check_miss=True,
+ pmd_commands=[
+ "set promisc all off",
+ "set allmulti all off",
+ "set verbose 1",
+ "set fwd mac",
+ "start",
+ ],
+ rx_port=self.used_dut_rx_port,
+ )
+ self.rxtx_base.basic_macfwd_check(
+ packet_num=10, dst_mac=VF_MAC_ADDR, rx_port=self.used_dut_rx_port
+ )
+
+ def test_dcf_mac_add_filter(self):
+ try:
+ self.quit_testpmd()
+ self.port_obj[self.used_dut_tx_port].set_vf_mac_addr(
+ mac=self.orig_vf_mac[0]
+ )
+ self.launch_testpmd()
+ default_vf_mac = self.rxtx_base.get_vf_mac_through_pf(
+ self.port_obj[self.used_dut_tx_port].get_interface_name()
+ )
+ self.rxtx_base.basic_macfwd_check(
+ packet_num=100,
+ dst_mac=VF_MAC_ADDR,
+ rx_port=self.used_dut_rx_port,
+ pmd_commands=[
+ "port stop all",
+ "port config all crc-strip on",
+ "port start all",
+ "set promisc all off",
+ "mac_addr add 0 {}".format(VF_MAC_ADDR),
+ "set verbose 1",
+ "set fwd mac",
+ "start",
+ ],
+ )
+ self.rxtx_base.basic_macfwd_check(
+ packet_num=100,
+ dst_mac=default_vf_mac[0],
+ rx_port=self.used_dut_rx_port,
+ pmd_commands=["clear port stats all"],
+ )
+ self.rxtx_base.basic_macfwd_check(
+ packet_num=100,
+ dst_mac=VF_MAC_ADDR,
+ rx_port=self.used_dut_rx_port,
+ check_miss=True,
+ pmd_commands=[
+ "clear port stats all",
+ "mac_addr remove 0 {}".format(VF_MAC_ADDR),
+ ],
+ )
+ self.rxtx_base.basic_macfwd_check(
+ packet_num=100,
+ dst_mac=VF_WRONG_MAC_ADDR,
+ rx_port=self.used_dut_rx_port,
+ check_miss=True,
+ pmd_commands=[
+ "clear port stats all",
+ ],
+ )
+ except Exception as e:
+ raise VerifyFailure(e)
+ finally:
+ self.quit_testpmd()
+ self.port_obj[self.used_dut_tx_port].set_vf_mac_addr(mac=VF_MAC_ADDR)
+ self.launch_testpmd()
+
+ def test_dcf_vlan_filter(self):
+ self.vlan_func.basic_vlan_filter_check(
+ vlan_id=RANDOM_VLAN_ID,
+ match_pkt=MATCHED_VLAN_PKT,
+ unmatch_pkt=UNMATCHED_VLAN_PKT,
+ pmd_commands=[
+ "set promisc all off",
+ "vlan set filter on 0",
+ "vlan set strip off 0",
+ "rx_vlan add %d 0" % RANDOM_VLAN_ID,
+ "set verbose 1",
+ "set fwd mac",
+ "start",
+ ],
+ rx_port=self.used_dut_rx_port,
+ )
+ _, _, _, vlan_id_list = self.vlan_func.vlan_pkts_fwd_check(
+ pkts=MATCHED_VLAN_PKT,
+ pmd_commands=[
+ "rx_vlan rm %d 0" % RANDOM_VLAN_ID,
+ "vlan set filter off 0",
+ ],
+ )
+ if not self.default_stats:
+ self.verify(len(vlan_id_list) == 1, "Failed to received vlan packet!!!")
+ else:
+ self.verify(len(vlan_id_list) == 0, "Failed to received vlan packet!!!")
+
+ def test_dcf_vlan_strip(self):
+ self.vlan_func.basic_vlan_strip_check(
+ vlan_id=RANDOM_VLAN_ID,
+ match_pkt=MATCHED_VLAN_PKT,
+ pmd_commands=[
+ "set promisc all off",
+ "vlan set filter on 0",
+ "rx_vlan add %d 0" % RANDOM_VLAN_ID,
+ "vlan set strip on 0",
+ "set verbose 1",
+ "set fwd mac",
+ "start",
+ ],
+ rx_port=self.used_dut_rx_port,
+ )
+
+ def test_dcf_vlan_insertion(self):
+ try:
+ self.vlan_func.basic_vlan_insert_check(
+ vlan_id=RANDOM_VLAN_ID,
+ insert_vlan=RANDOM_VLAN_ID,
+ match_pkt=self.vlan_func.generate_using_packets(
+ pkt_type="UDP", dst_mac=VF_MAC_ADDR
+ ),
+ pmd_commands=[
+ "port stop all",
+ "set promisc all off",
+ "vlan set filter on 0",
+ "tx_vlan set 1 %s" % RANDOM_VLAN_ID,
+ "rx_vlan add %s 0" % RANDOM_VLAN_ID,
+ "port start all",
+ "set verbose 1",
+ "set fwd mac",
+ "start",
+ ],
+ )
+ except Exception as e:
+ raise VerifyFailure(e)
+ finally:
+ self.restart_testpmd()
+
+ def test_dcf_vlan_pvid_base_tx(self):
+ try:
+ self.quit_testpmd()
+ self.vlan_func.set_pvid_from_pf(
+ pf_intf=self.port_obj[self.used_dut_tx_port].get_interface_name(),
+ vlan_id=RANDOM_VLAN_ID,
+ )
+ self.launch_testpmd()
+ packets_captured, _, _ = self.vlan_func.execute_fwd_check_process(
+ packets=self.vlan_func.generate_using_packets(
+ pkt_type="UDP",
+ dst_mac=self.vlan_func.get_vf_mac_through_pf(
+ pf_intf=self.port_obj[
+ self.used_dut_rx_port
+ ].get_interface_name()
+ ),
+ ),
+ tx_port=1,
+ tester_tx_interface=self.tester_rx_interface,
+ tester_rx_interface=self.tester_tx_interface,
+ pmd_commands=["set fwd mac", "set verbose 1", "start"],
+ )
+ self.verify(len(packets_captured) == 1, "Not receive expected packet")
+ except Exception as e:
+ raise VerifyFailure(e)
+ finally:
+ self.quit_testpmd()
+ self.vlan_func.execute_host_cmd(
+ "ip link set {} vf 0 vlan 0".format(
+ self.port_obj[self.used_dut_tx_port].get_interface_name()
+ )
+ )
+ self.launch_testpmd()
+
+ def test_dcf_vlan_pvid_base_rx(self):
+ try:
+ self.quit_testpmd()
+ self.vlan_func.set_pvid_from_pf(
+ pf_intf=self.port_obj[self.used_dut_tx_port].get_interface_name(),
+ vlan_id=RANDOM_VLAN_ID,
+ )
+ self.launch_testpmd()
+ rece_num, _, _, _ = self.vlan_func.vlan_pkts_fwd_check(
+ pkts=[MATCHED_VLAN_PKT, UNMATCHED_VLAN_PKT],
+ pmd_commands=[
+ "set fwd rxonly",
+ "set verbose 1",
+ "set promisc all off",
+ "start",
+ ],
+ )
+ self.verify(rece_num == 1, "Failed to received matched vlan pkt")
+ rece_num, _, _, _ = self.vlan_func.vlan_pkts_fwd_check(
+ pkts=self.vlan_func.generate_using_packets(
+ pkt_type="UDP", dst_mac=VF_MAC_ADDR
+ ),
+ rx_port=self.used_dut_rx_port,
+ )
+ self.verify(rece_num == 0, "Failed to received udp pkt without vlan")
+ self.quit_testpmd()
+ self.vlan_func.set_pvid_from_pf(
+ pf_intf=self.port_obj[self.used_dut_tx_port].get_interface_name(),
+ vlan_id=0,
+ )
+ self.launch_testpmd()
+ rece_num, _, _, _ = self.vlan_func.vlan_pkts_fwd_check(
+ pkts=MATCHED_VLAN_PKT,
+ rx_port=self.used_dut_rx_port,
+ pmd_commands=[
+ "set fwd rxonly",
+ "set verbose 1",
+ "set promisc all off",
+ "start",
+ ],
+ )
+ if not self.default_stats:
+ self.verify(rece_num == 1, "Failed to received vlan packet!!!")
+ else:
+ self.verify(rece_num == 0, "Failed to received vlan packet!!!")
+
+ rece_num, _, _, _ = self.vlan_func.vlan_pkts_fwd_check(
+ pkts=[
+ self.vlan_func.generate_using_packets(
+ pkt_type="VLAN_UDP", vlan_id=0, dst_mac=VF_MAC_ADDR
+ ),
+ self.vlan_func.generate_using_packets(
+ pkt_type="UDP", dst_mac=VF_MAC_ADDR
+ ),
+ ],
+ rx_port=self.used_dut_rx_port,
+ )
+ self.verify(rece_num == 2, "Not received expect packet")
+ except Exception as e:
+ raise VerifyFailure(e)
+ finally:
+ self.quit_testpmd()
+ self.vlan_func.execute_host_cmd(
+ "ip link set {} vf 0 vlan 0".format(
+ self.port_obj[self.used_dut_tx_port].get_interface_name()
+ )
+ )
+ self.launch_testpmd()
+
+ def test_dcf_vlan_rx_combination(self):
+ rx_vlans = [1, RANDOM_VLAN_ID, MAX_VLAN_ID]
+ """
+ the dcf not support '--enable-hw-vlan', So change to scalar path to test dcf vlan offload.
+ """
+ try:
+ self.quit_testpmd()
+ self.launch_testpmd(
+ eal_param="--force-max-simd-bitwidth=64",
+ )
+ rece_num, _, _, _ = self.vlan_func.vlan_pkts_fwd_check(
+ pkts=self.vlan_func.generate_using_packets(
+ pkt_type="UDP", dst_mac=VF_MAC_ADDR
+ ),
+ rx_port=self.used_dut_rx_port,
+ pmd_commands=[
+ "set fwd rxonly",
+ "set verbose 1",
+ "vlan set strip on 0",
+ "vlan set filter on 0",
+ "set promisc all off",
+ "start",
+ ],
+ )
+ self.verify(rece_num == 1, "Not received normal packet as default")
+ _, pmdout, _ = self.vlan_func.execute_fwd_check_process(
+ packets=self.vlan_func.generate_using_packets(
+ pkt_type="VLAN_UDP", vlan_id=0, dst_mac=VF_MAC_ADDR
+ ),
+ rx_port=self.used_dut_rx_port,
+ )
+ self.verify(
+ "VLAN tci=0x0" in pmdout, "Not received vlan 0 packet as default"
+ )
+ for rx_vlan in rx_vlans:
+ _, pmdout, _ = self.vlan_func.execute_fwd_check_process(
+ packets=self.vlan_func.generate_using_packets(
+ pkt_type="VLAN_UDP", vlan_id=rx_vlan, dst_mac=VF_MAC_ADDR
+ ),
+ rx_port=self.used_dut_rx_port,
+ pmd_commands="rx_vlan add {} 0".format(rx_vlan),
+ )
+ self.verify(
+ "VLAN tci={}".format(hex(rx_vlan)) in pmdout,
+ "Not received expected vlan packet",
+ )
+ if rx_vlan == MAX_VLAN_ID:
+ continue
+ rece_num, _, _, _ = self.vlan_func.vlan_pkts_fwd_check(
+ pkts=self.vlan_func.generate_using_packets(
+ pkt_type="VLAN_UDP", vlan_id=rx_vlan + 1, dst_mac=VF_MAC_ADDR
+ ),
+ rx_port=self.used_dut_rx_port,
+ )
+ self.verify(rece_num == 0, "failed to receive unmatch vlan pkt")
+ for rx_vlan in rx_vlans:
+ self.vlan_func.execute_pmd_cmd("rx_vlan rm {} 0".format(rx_vlan))
+ _, pmdout, _ = self.vlan_func.execute_fwd_check_process(
+ packets=self.vlan_func.generate_using_packets(
+ pkt_type="VLAN_UDP", vlan_id=0, dst_mac=VF_MAC_ADDR
+ ),
+ rx_port=self.used_dut_rx_port,
+ )
+ self.verify(
+ "VLAN tci=0x0" in pmdout, "Not received vlan 0 packet as default"
+ )
+ rece_num, _, _, _ = self.vlan_func.vlan_pkts_fwd_check(
+ pkts=self.vlan_func.generate_using_packets(
+ pkt_type="UDP", dst_mac=VF_MAC_ADDR
+ ),
+ rx_port=self.used_dut_rx_port,
+ )
+ self.verify(
+ rece_num == 1, "Not received normal packet after remove vlan filter"
+ )
+ rece_num, _, _, _ = self.vlan_func.vlan_pkts_fwd_check(
+ pkts=MATCHED_VLAN_PKT,
+ rx_port=self.used_dut_rx_port,
+ )
+ if not self.default_stats:
+ self.verify(rece_num == 1, "Failed to received vlan packet!!!")
+ else:
+ self.verify(rece_num == 0, "Failed to received vlan packet!!!")
+ except Exception as e:
+ raise VerifyFailure(e)
+ finally:
+ self.restart_testpmd()
+
+ def test_dcf_vlan_promisc(self):
+ rece_num, _, _, _ = self.vlan_func.vlan_pkts_fwd_check(
+ pkts=[
+ self.vlan_func.generate_using_packets(
+ pkt_str=[
+ 'Ether(dst="{}",type=0x8100)/Dot1Q(vlan=100,type=0x0800)/IP(src="196.222.232.{}")/("X"*480)'.format(
+ VF_MAC_ADDR, i
+ )
+ for i in range(10)
+ ]
+ )
+ ],
+ pmd_commands=[
+ "port stop all",
+ "set promisc all on",
+ "set fwd mac",
+ "set verbose 1",
+ "vlan set filter off 0",
+ "vlan set strip off 0",
+ "port start all",
+ "start",
+ ],
+ )
+ self.verify(rece_num == 10, "Not receive expected packet")
+ rece_num, _, _, _ = self.vlan_func.vlan_pkts_fwd_check(
+ pkts=[
+ self.vlan_func.generate_using_packets(
+ pkt_str=[
+ 'Ether(dst="{}")/IP(src="196.222.232.{}")/("X"*480)'.format(
+ VF_MAC_ADDR, i
+ )
+ for i in range(10)
+ ]
+ )
+ ],
+ )
+ self.verify(rece_num == 10, "Not receive expected packet")
+
+ def tear_down(self):
+ self.rxtx_base.execute_pmd_cmd("stop")
+
+ def tear_down_all(self):
+ self.quit_testpmd()
+ self.rxtx_base.destroy_vm_env()
+ self.rxtx_base.destroy_vf()