[RFC,v1,06/15] dts: merge DTS framework/checkCase.py to DPDK

Message ID 20220406145606.2913834-7-juraj.linkes@pantheon.tech (mailing list archive)
State RFC, archived
Delegated to: Thomas Monjalon
Headers
Series merge DTS core files to DPDK |

Checks

Context Check Description
ci/checkpatch warning coding style issues

Commit Message

Juraj Linkeš April 6, 2022, 2:55 p.m. UTC
  ---
 dts/framework/checkCase.py | 214 +++++++++++++++++++++++++++++++++++++
 1 file changed, 214 insertions(+)
 create mode 100644 dts/framework/checkCase.py
  

Patch

diff --git a/dts/framework/checkCase.py b/dts/framework/checkCase.py
new file mode 100644
index 0000000000..7b2c22b3c0
--- /dev/null
+++ b/dts/framework/checkCase.py
@@ -0,0 +1,214 @@ 
+import collections
+import json
+import os
+
+from .settings import (
+    CONFIG_ROOT_PATH,
+    HOST_DRIVER_SETTING,
+    get_nic_name,
+    load_global_setting,
+)
+from .utils import RED
+
+filter_json_file = os.path.join(CONFIG_ROOT_PATH, "test_case_checklist.json")
+support_json_file = os.path.join(CONFIG_ROOT_PATH, "test_case_supportlist.json")
+
+
+class CheckCase(object):
+    """
+    Class for check test case running criteria. All information will be loaded
+    from DTS_CFG_FOLDER/test_case_*list.json. Current two files are maintained. One is
+    for check whether test case should skip, another one is for check whether
+    current environment support test case execution.
+    """
+
+    def __init__(self):
+        self.dut = None
+        self.comments = ""
+
+        self.check_function_dict = {}
+        self.support_function_dict = {}
+        try:
+            self.check_function_dict = json.load(
+                open(filter_json_file), object_pairs_hook=collections.OrderedDict
+            )
+        except:
+            print(
+                RED(
+                    "Can't load check list for test cases, all case will be taken as supported"
+                )
+            )
+
+        try:
+            self.support_function_dict = json.load(
+                open(support_json_file), object_pairs_hook=collections.OrderedDict
+            )
+        except:
+            print(
+                RED(
+                    "Can't load support list for test cases, all case will be taken as supported"
+                )
+            )
+
+    def check_dut(self, dut):
+        """
+        Change DUT instance for environment check
+        """
+        self.dut = dut
+
+    def _check_os(self, os_type):
+        if "all" == os_type[0].lower():
+            return True
+        dut_os_type = self.dut.get_os_type()
+        if dut_os_type in os_type:
+            return True
+        else:
+            return False
+
+    def _check_nic(self, nic_type):
+        if "all" == nic_type[0].lower():
+            return True
+        dut_nic_type = get_nic_name(self.dut.ports_info[0]["type"])
+        if dut_nic_type in nic_type:
+            return True
+        else:
+            return False
+
+    def _check_target(self, target):
+        if "all" == target[0].lower():
+            return True
+        if self.dut.target in target:
+            return True
+        else:
+            return False
+
+    def _check_host_driver(self, drivers):
+        host_driver = load_global_setting(HOST_DRIVER_SETTING)
+        if "all" == drivers[0].lower():
+            return True
+        if host_driver in drivers:
+            return True
+        else:
+            return False
+
+    def case_skip(self, case_name):
+        """
+        Check whether test case and DUT match skip criteria
+        Return True if skip should skip
+        """
+        skip_flag = False
+        self.comments = ""
+
+        if self.dut is None:
+            print(RED("No Dut assigned before case skip check"))
+            return skip_flag
+
+        if case_name in list(self.check_function_dict.keys()):
+            case_checks = self.check_function_dict[case_name]
+            # each case may have several checks
+            for case_check in case_checks:
+                # init result for each check
+                skip_flag = False
+                for key in list(case_check.keys()):
+                    # some items like "Bug ID" and "Comments" do not need check
+                    try:
+                        if "Comments" == key:
+                            continue
+                        if "Bug ID" == key:
+                            continue
+                        check_function = getattr(self, "_check_%s" % key.lower())
+                    except:
+                        print(RED("can't check %s type" % key))
+
+                    # skip this check if any item not matched
+                    if check_function(case_check[key]):
+                        skip_flag = True
+                    else:
+                        skip_flag = False
+                        break
+
+                # if all items matched, this case should skip
+                if skip_flag:
+                    if "Comments" in list(case_check.keys()):
+                        self.comments = case_check["Comments"]
+                    return skip_flag
+
+        return skip_flag
+
+    def case_support(self, case_name):
+        """
+        Check whether test case and DUT match support criteria
+        Return False if test case not supported
+        """
+        support_flag = True
+        self.comments = ""
+
+        if self.dut is None:
+            print(RED("No Dut assigned before case support check"))
+            return support_flag
+
+        if case_name in list(self.support_function_dict.keys()):
+            # each case may have several supports
+            case_supports = self.support_function_dict[case_name]
+            for case_support in case_supports:
+                # init result for each check
+                support_flag = True
+                for key in list(case_support.keys()):
+                    # some items like "Bug ID" and "Comments" do not need check
+                    try:
+                        if "Comments" == key:
+                            continue
+                        if "Bug ID" == key:
+                            continue
+                        check_function = getattr(self, "_check_%s" % key.lower())
+                    except:
+                        print(RED("can't check %s type" % key))
+
+                    # skip this case if any item not matched
+                    if check_function(case_support[key]):
+                        support_flag = True
+                    else:
+                        support_flag = False
+                        break
+
+            if support_flag is False:
+                if "Comments" in list(case_support.keys()):
+                    self.comments = case_support["Comments"]
+                return support_flag
+
+        return support_flag
+
+
+class simple_dut(object):
+    def __init__(self, os="", target="", nic=""):
+        self.ports_info = [{}]
+        self.os = os
+        self.target = target
+        self.ports_info[0]["type"] = nic
+
+    def get_os_type(self):
+        return self.os
+
+
+if __name__ == "__main__":
+    dut = simple_dut(os="linux", target="x86_64-native-linuxapp-gcc", nic="177d:a034")
+    dut1 = simple_dut(
+        os="freebsd", target="x86_64-native-linuxapp-gcc", nic="8086:158b"
+    )
+
+    # create instance for check/support case list
+    case_inst = CheckCase()
+
+    # check dut
+    case_inst.check_dut(dut)
+    print(case_inst.case_skip("fdir_flexword_drop_ipv4"))
+    print(case_inst.comments)
+    print(case_inst.case_support("Vxlan_tunnel"))
+    print(case_inst.comments)
+
+    # check other dut
+    case_inst.check_dut(dut1)
+    print(case_inst.case_skip("fdir_flexword_drop_ipv4"))
+    print(case_inst.comments)
+    print(case_inst.case_support("Vxlan_tunnel"))
+    print(case_inst.comments)