Patch Detail
get:
Show a patch.
patch:
Update a patch.
put:
Update a patch.
GET /api/patches/109306/?format=api
http://patches.dpdk.org/api/patches/109306/?format=api", "web_url": "http://patches.dpdk.org/project/dpdk/patch/20220406151903.2916254-8-juraj.linkes@pantheon.tech/", "project": { "id": 1, "url": "http://patches.dpdk.org/api/projects/1/?format=api", "name": "DPDK", "link_name": "dpdk", "list_id": "dev.dpdk.org", "list_email": "dev@dpdk.org", "web_url": "http://core.dpdk.org", "scm_url": "git://dpdk.org/dpdk", "webscm_url": "http://git.dpdk.org/dpdk", "list_archive_url": "https://inbox.dpdk.org/dev", "list_archive_url_format": "https://inbox.dpdk.org/dev/{}", "commit_url_format": "" }, "msgid": "<20220406151903.2916254-8-juraj.linkes@pantheon.tech>", "list_archive_url": "https://inbox.dpdk.org/dev/20220406151903.2916254-8-juraj.linkes@pantheon.tech", "date": "2022-04-06T15:18:47", "name": "[RFC,v1,07/23] dts: merge DTS framework/test_case.py to DPDK", "commit_ref": null, "pull_url": null, "state": "rfc", "archived": true, "hash": "7b463643611a399c821fe0ae0df88463094f03be", "submitter": { "id": 1626, "url": "http://patches.dpdk.org/api/people/1626/?format=api", "name": "Juraj Linkeš", "email": "juraj.linkes@pantheon.tech" }, "delegate": { "id": 1, "url": "http://patches.dpdk.org/api/users/1/?format=api", "username": "tmonjalo", "first_name": "Thomas", "last_name": "Monjalon", "email": "thomas@monjalon.net" }, "mbox": "http://patches.dpdk.org/project/dpdk/patch/20220406151903.2916254-8-juraj.linkes@pantheon.tech/mbox/", "series": [ { "id": 22381, "url": "http://patches.dpdk.org/api/series/22381/?format=api", "web_url": "http://patches.dpdk.org/project/dpdk/list/?series=22381", "date": "2022-04-06T15:18:40", "name": "merge DTS test resource files to DPDK", "version": 1, "mbox": "http://patches.dpdk.org/series/22381/mbox/" } ], "comments": "http://patches.dpdk.org/api/patches/109306/comments/", "check": "warning", "checks": "http://patches.dpdk.org/api/patches/109306/checks/", "tags": {}, "related": [], "headers": { "Return-Path": "<dev-bounces@dpdk.org>", "X-Original-To": "patchwork@inbox.dpdk.org", "Delivered-To": "patchwork@inbox.dpdk.org", "Received": [ "from mails.dpdk.org (mails.dpdk.org [217.70.189.124])\n\tby inbox.dpdk.org (Postfix) with ESMTP id 9056FA0509;\n\tWed, 6 Apr 2022 17:20:02 +0200 (CEST)", "from [217.70.189.124] (localhost [127.0.0.1])\n\tby mails.dpdk.org (Postfix) with ESMTP id 7DE7542906;\n\tWed, 6 Apr 2022 17:19:18 +0200 (CEST)", "from lb.pantheon.sk (lb.pantheon.sk [46.229.239.20])\n by mails.dpdk.org (Postfix) with ESMTP id AA16D428FB\n for <dev@dpdk.org>; Wed, 6 Apr 2022 17:19:16 +0200 (CEST)", "from localhost (localhost [127.0.0.1])\n by lb.pantheon.sk (Postfix) with ESMTP id 8F05E19E0D5;\n Wed, 6 Apr 2022 17:19:15 +0200 (CEST)", "from lb.pantheon.sk ([127.0.0.1])\n by localhost (lb.pantheon.sk [127.0.0.1]) (amavisd-new, port 10024)\n with ESMTP id OCMa45QxqzPn; Wed, 6 Apr 2022 17:19:14 +0200 (CEST)", "from entguard.lab.pantheon.local (unknown [46.229.239.141])\n by lb.pantheon.sk (Postfix) with ESMTP id 3516C184FE9;\n Wed, 6 Apr 2022 17:19:08 +0200 (CEST)" ], "X-Virus-Scanned": "amavisd-new at siecit.sk", "From": "=?utf-8?q?Juraj_Linke=C5=A1?= <juraj.linkes@pantheon.tech>", "To": "thomas@monjalon.net, david.marchand@redhat.com,\n Honnappa.Nagarahalli@arm.com, ohilyard@iol.unh.edu, lijuan.tu@intel.com", "Cc": "dev@dpdk.org, =?utf-8?q?Juraj_Linke=C5=A1?= <juraj.linkes@pantheon.tech>", "Subject": "[RFC PATCH v1 07/23] dts: merge DTS framework/test_case.py to DPDK", "Date": "Wed, 6 Apr 2022 15:18:47 +0000", "Message-Id": "<20220406151903.2916254-8-juraj.linkes@pantheon.tech>", "X-Mailer": "git-send-email 2.25.1", "In-Reply-To": "<20220406151903.2916254-1-juraj.linkes@pantheon.tech>", "References": "<20220406151903.2916254-1-juraj.linkes@pantheon.tech>", "MIME-Version": "1.0", "Content-Transfer-Encoding": "8bit", "X-BeenThere": "dev@dpdk.org", "X-Mailman-Version": "2.1.29", "Precedence": "list", "List-Id": "DPDK patches and discussions <dev.dpdk.org>", "List-Unsubscribe": "<https://mails.dpdk.org/options/dev>,\n <mailto:dev-request@dpdk.org?subject=unsubscribe>", "List-Archive": "<http://mails.dpdk.org/archives/dev/>", "List-Post": "<mailto:dev@dpdk.org>", "List-Help": "<mailto:dev-request@dpdk.org?subject=help>", "List-Subscribe": "<https://mails.dpdk.org/listinfo/dev>,\n <mailto:dev-request@dpdk.org?subject=subscribe>", "Errors-To": "dev-bounces@dpdk.org" }, "content": "---\n dts/framework/test_case.py | 625 +++++++++++++++++++++++++++++++++++++\n 1 file changed, 625 insertions(+)\n create mode 100644 dts/framework/test_case.py", "diff": "diff --git a/dts/framework/test_case.py b/dts/framework/test_case.py\nnew file mode 100644\nindex 0000000000..1f5d383bae\n--- /dev/null\n+++ b/dts/framework/test_case.py\n@@ -0,0 +1,625 @@\n+# BSD LICENSE\n+#\n+# Copyright(c) 2010-2014 Intel Corporation. All rights reserved.\n+# All rights reserved.\n+#\n+# Redistribution and use in source and binary forms, with or without\n+# modification, are permitted provided that the following conditions\n+# are met:\n+#\n+# * Redistributions of source code must retain the above copyright\n+# notice, this list of conditions and the following disclaimer.\n+# * Redistributions in binary form must reproduce the above copyright\n+# notice, this list of conditions and the following disclaimer in\n+# the documentation and/or other materials provided with the\n+# distribution.\n+# * Neither the name of Intel Corporation nor the names of its\n+# contributors may be used to endorse or promote products derived\n+# from this software without specific prior written permission.\n+#\n+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n+# \"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\n+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\n+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\n+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\n+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\n+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n+\n+\"\"\"\n+A base class for creating DTF test cases.\n+\"\"\"\n+import re\n+import signal\n+import time\n+import traceback\n+from functools import wraps\n+\n+import framework.debugger as debugger\n+\n+from .config import SuiteConf\n+from .exception import TimeoutException, VerifyFailure, VerifySkip\n+from .logger import getLogger\n+from .rst import RstReport\n+from .settings import (\n+ DEBUG_CASE_SETTING,\n+ DEBUG_SETTING,\n+ DRIVERS,\n+ FUNC_SETTING,\n+ HOST_DRIVER_SETTING,\n+ NICS,\n+ PERF_SETTING,\n+ SUITE_SECTION_NAME,\n+ UPDATE_EXPECTED,\n+ get_nic_name,\n+ load_global_setting,\n+)\n+from .test_result import Result, ResultTable\n+from .utils import BLUE, RED\n+\n+\n+class TestCase(object):\n+ def __init__(self, duts, tester, target, suitename):\n+ self.suite_name = suitename\n+ self.dut = duts[0]\n+ self.duts = duts\n+ self.tester = tester\n+ self.target = target\n+\n+ # local variable\n+ self._requested_tests = None\n+ self._subtitle = None\n+\n+ # check session and reconnect if possible\n+ for dutobj in self.duts:\n+ self._check_and_reconnect(crb=dutobj)\n+ self._check_and_reconnect(crb=self.tester)\n+\n+ # convert netdevice to codename\n+ self.nic = self.dut.nic.name\n+ self.nic_obj = self.dut.nic\n+ self.kdriver = self.dut.nic.default_driver\n+ self.pkg = self.dut.nic.pkg\n+\n+ # result object for save suite result\n+ self._suite_result = Result()\n+ self._suite_result.dut = self.dut.crb[\"IP\"]\n+ self._suite_result.target = target\n+ self._suite_result.nic = self.nic\n+ self._suite_result.test_suite = self.suite_name\n+ if self._suite_result is None:\n+ raise ValueError(\"Result object should not None\")\n+\n+ # load running environment\n+ if load_global_setting(PERF_SETTING) == \"yes\":\n+ self._enable_perf = True\n+ else:\n+ self._enable_perf = False\n+\n+ if load_global_setting(FUNC_SETTING) == \"yes\":\n+ self._enable_func = True\n+ else:\n+ self._enable_func = False\n+\n+ if load_global_setting(DEBUG_SETTING) == \"yes\":\n+ self._enable_debug = True\n+ else:\n+ self._enable_debug = False\n+\n+ if load_global_setting(DEBUG_CASE_SETTING) == \"yes\":\n+ self._debug_case = True\n+ else:\n+ self._debug_case = False\n+\n+ self.drivername = load_global_setting(HOST_DRIVER_SETTING)\n+\n+ # create rst format report for this suite\n+ self._rst_obj = RstReport(\n+ \"rst_report\", target, self.nic, self.suite_name, self._enable_perf\n+ )\n+\n+ # load suite configuration\n+ self._suite_conf = SuiteConf(self.suite_name)\n+ self._suite_cfg = self._suite_conf.suite_cfg\n+\n+ # command history\n+ self.setup_history = list()\n+ self.test_history = list()\n+\n+ def init_log(self):\n+ # get log handler\n+ class_name = self.__class__.__name__\n+ self.logger = getLogger(class_name)\n+ self.logger.config_suite(class_name)\n+\n+ def _check_and_reconnect(self, crb=None):\n+ try:\n+ result = crb.session.check_available()\n+ except:\n+ result = False\n+\n+ if result is False:\n+ crb.reconnect_session()\n+ if \"dut\" in str(type(crb)):\n+ crb.send_expect(\"cd %s\" % crb.base_dir, \"#\")\n+ crb.set_env_variable()\n+\n+ try:\n+ result = crb.alt_session.check_available()\n+ except:\n+ result = False\n+\n+ if result is False:\n+ crb.reconnect_session(alt_session=True)\n+\n+ def set_up_all(self):\n+ pass\n+\n+ def set_up(self):\n+ pass\n+\n+ def tear_down(self):\n+ pass\n+\n+ def tear_down_all(self):\n+ pass\n+\n+ def verify(self, passed, description):\n+ if not passed:\n+ if self._enable_debug:\n+ print(RED(\"Error happened, dump command history...\"))\n+ self.dump_history()\n+ print('Error \"%s\" happened' % RED(description))\n+ print(RED(\"History dump finished.\"))\n+ raise VerifyFailure(description)\n+\n+ def skip_case(self, passed, description):\n+ if not passed:\n+ if self._enable_debug:\n+ print('skip case: \"%s\" ' % RED(description))\n+ raise VerifySkip(description)\n+\n+ def _get_nic_driver(self, nic_name):\n+ if nic_name in list(DRIVERS.keys()):\n+ return DRIVERS[nic_name]\n+\n+ return \"Unknown\"\n+\n+ def set_check_inst(self, check=None):\n+ self._check_inst = check\n+\n+ def rst_report(self, *args, **kwargs):\n+ self._rst_obj.report(*args, **kwargs)\n+\n+ def result_table_create(self, header):\n+ self._result_table = ResultTable(header)\n+ self._result_table.set_rst(self._rst_obj)\n+ self._result_table.set_logger(self.logger)\n+\n+ def result_table_add(self, row):\n+ self._result_table.add_row(row)\n+\n+ def result_table_print(self):\n+ self._result_table.table_print()\n+\n+ def result_table_getrows(self):\n+ return self._result_table.results_table_rows\n+\n+ def _get_functional_cases(self):\n+ \"\"\"\n+ Get all functional test cases.\n+ \"\"\"\n+ return self._get_test_cases(r\"test_(?!perf_)\")\n+\n+ def _get_performance_cases(self):\n+ \"\"\"\n+ Get all performance test cases.\n+ \"\"\"\n+ return self._get_test_cases(r\"test_perf_\")\n+\n+ def _has_it_been_requested(self, test_case, test_name_regex):\n+ \"\"\"\n+ Check whether test case has been requested for validation.\n+ \"\"\"\n+ name_matches = re.match(test_name_regex, test_case.__name__)\n+\n+ if self._requested_tests is not None:\n+ return name_matches and test_case.__name__ in self._requested_tests\n+\n+ return name_matches\n+\n+ def set_requested_cases(self, case_list):\n+ \"\"\"\n+ Pass down input cases list for check\n+ \"\"\"\n+ if self._requested_tests is None:\n+ self._requested_tests = case_list\n+ elif case_list is not None:\n+ self._requested_tests += case_list\n+\n+ def set_subtitle(self, subtitle):\n+ \"\"\"\n+ Pass down subtitle for Rst report\n+ \"\"\"\n+ self._rst_obj._subtitle = subtitle\n+ self._rst_obj.write_subtitle()\n+\n+ def _get_test_cases(self, test_name_regex):\n+ \"\"\"\n+ Return case list which name matched regex.\n+ \"\"\"\n+ for test_case_name in dir(self):\n+ test_case = getattr(self, test_case_name)\n+ if callable(test_case) and self._has_it_been_requested(\n+ test_case, test_name_regex\n+ ):\n+ yield test_case\n+\n+ def execute_setup_all(self):\n+ \"\"\"\n+ Execute suite setup_all function before cases.\n+ \"\"\"\n+ # clear all previous output\n+ for dutobj in self.duts:\n+ dutobj.get_session_output(timeout=0.1)\n+ self.tester.get_session_output(timeout=0.1)\n+\n+ # save into setup history list\n+ self.enable_history(self.setup_history)\n+\n+ try:\n+ self.set_up_all()\n+ return True\n+ except VerifySkip as v:\n+ self.logger.info(\"set_up_all SKIPPED:\\n\" + traceback.format_exc())\n+ # record all cases N/A\n+ if self._enable_func:\n+ for case_obj in self._get_functional_cases():\n+ self._suite_result.test_case = case_obj.__name__\n+ self._suite_result.test_case_skip(str(v))\n+ if self._enable_perf:\n+ for case_obj in self._get_performance_cases():\n+ self._suite_result.test_case = case_obj.__name__\n+ self._suite_result.test_case_skip(str(v))\n+ except Exception as v:\n+ self.logger.error(\"set_up_all failed:\\n\" + traceback.format_exc())\n+ # record all cases blocked\n+ if self._enable_func:\n+ for case_obj in self._get_functional_cases():\n+ self._suite_result.test_case = case_obj.__name__\n+ self._suite_result.test_case_blocked(\n+ \"set_up_all failed: {}\".format(str(v))\n+ )\n+ if self._enable_perf:\n+ for case_obj in self._get_performance_cases():\n+ self._suite_result.test_case = case_obj.__name__\n+ self._suite_result.test_case_blocked(\n+ \"set_up_all failed: {}\".format(str(v))\n+ )\n+ return False\n+\n+ def _execute_test_case(self, case_obj):\n+ \"\"\"\n+ Execute specified test case in specified suite. If any exception occurred in\n+ validation process, save the result and tear down this case.\n+ \"\"\"\n+ case_name = case_obj.__name__\n+ self._suite_result.test_case = case_obj.__name__\n+\n+ self._rst_obj.write_title(\"Test Case: \" + case_name)\n+\n+ # save into test command history\n+ self.test_history = list()\n+ self.enable_history(self.test_history)\n+\n+ # load suite configuration file here for rerun command\n+ self._suite_conf = SuiteConf(self.suite_name)\n+ self._suite_cfg = self._suite_conf.suite_cfg\n+ self._case_cfg = self._suite_conf.load_case_config(case_name)\n+\n+ case_result = True\n+ if self._check_inst is not None:\n+ if self._check_inst.case_skip(case_name[len(\"test_\") :]):\n+ self.logger.info(\"Test Case %s Result SKIPPED:\" % case_name)\n+ self._rst_obj.write_result(\"N/A\")\n+ self._suite_result.test_case_skip(self._check_inst.comments)\n+ return case_result\n+\n+ if not self._check_inst.case_support(case_name[len(\"test_\") :]):\n+ self.logger.info(\"Test Case %s Result SKIPPED:\" % case_name)\n+ self._rst_obj.write_result(\"N/A\")\n+ self._suite_result.test_case_skip(self._check_inst.comments)\n+ return case_result\n+\n+ if self._enable_perf:\n+ self._rst_obj.write_annex_title(\"Annex: \" + case_name)\n+ try:\n+ self.logger.info(\"Test Case %s Begin\" % case_name)\n+\n+ self.running_case = case_name\n+ # clean session\n+ for dutobj in self.duts:\n+ dutobj.get_session_output(timeout=0.1)\n+ self.tester.get_session_output(timeout=0.1)\n+ # run set_up function for each case\n+ self.set_up()\n+ # run test case\n+ case_obj()\n+\n+ self._suite_result.test_case_passed()\n+\n+ self._rst_obj.write_result(\"PASS\")\n+ self.logger.info(\"Test Case %s Result PASSED:\" % case_name)\n+\n+ except VerifyFailure as v:\n+ case_result = False\n+ self._suite_result.test_case_failed(str(v))\n+ self._rst_obj.write_result(\"FAIL\")\n+ self.logger.error(\"Test Case %s Result FAILED: \" % (case_name) + str(v))\n+ except VerifySkip as v:\n+ self._suite_result.test_case_skip(str(v))\n+ self._rst_obj.write_result(\"N/A\")\n+ self.logger.info(\"Test Case %s N/A: \" % (case_name))\n+ except KeyboardInterrupt:\n+ self._suite_result.test_case_blocked(\"Skipped\")\n+ self.logger.error(\"Test Case %s SKIPPED: \" % (case_name))\n+ self.tear_down()\n+ raise KeyboardInterrupt(\"Stop DTS\")\n+ except TimeoutException as e:\n+ case_result = False\n+ self._rst_obj.write_result(\"FAIL\")\n+ self._suite_result.test_case_failed(str(e))\n+ self.logger.error(\"Test Case %s Result FAILED: \" % (case_name) + str(e))\n+ self.logger.error(\"%s\" % (e.get_output()))\n+ except Exception:\n+ case_result = False\n+ trace = traceback.format_exc()\n+ self._suite_result.test_case_failed(trace)\n+ self.logger.error(\"Test Case %s Result ERROR: \" % (case_name) + trace)\n+ finally:\n+ # update expected\n+ if (\n+ load_global_setting(UPDATE_EXPECTED) == \"yes\"\n+ and \"update_expected\" in self.get_suite_cfg()\n+ and self.get_suite_cfg()[\"update_expected\"] == True\n+ ):\n+ self._suite_conf.update_case_config(SUITE_SECTION_NAME)\n+ self.execute_tear_down()\n+ return case_result\n+\n+ def execute_test_cases(self):\n+ \"\"\"\n+ Execute all test cases in one suite.\n+ \"\"\"\n+ # prepare debugger rerun case environment\n+ if self._enable_debug or self._debug_case:\n+ debugger.AliveSuite = self\n+ _suite_full_name = \"TestSuite_\" + self.suite_name\n+ debugger.AliveModule = __import__(\n+ \"tests.\" + _suite_full_name, fromlist=[_suite_full_name]\n+ )\n+\n+ if load_global_setting(FUNC_SETTING) == \"yes\":\n+ for case_obj in self._get_functional_cases():\n+ for i in range(self.tester.re_run_time + 1):\n+ ret = self.execute_test_case(case_obj)\n+\n+ if ret is False and self.tester.re_run_time:\n+ for dutobj in self.duts:\n+ dutobj.get_session_output(timeout=0.5 * (i + 1))\n+ self.tester.get_session_output(timeout=0.5 * (i + 1))\n+ time.sleep(i + 1)\n+ self.logger.info(\n+ \" Test case %s failed and re-run %d time\"\n+ % (case_obj.__name__, i + 1)\n+ )\n+ else:\n+ break\n+\n+ if load_global_setting(PERF_SETTING) == \"yes\":\n+ for case_obj in self._get_performance_cases():\n+ self.execute_test_case(case_obj)\n+\n+ def execute_test_case(self, case_obj):\n+ \"\"\"\n+ Execute test case or enter into debug mode.\n+ \"\"\"\n+ debugger.AliveCase = case_obj.__name__\n+\n+ if self._debug_case:\n+ self.logger.info(\"Rerun Test Case %s Begin\" % debugger.AliveCase)\n+ debugger.keyboard_handle(signal.SIGINT, None)\n+ else:\n+ return self._execute_test_case(case_obj)\n+\n+ def get_result(self):\n+ \"\"\"\n+ Return suite test result\n+ \"\"\"\n+ return self._suite_result\n+\n+ def get_case_cfg(self):\n+ \"\"\"\n+ Return case based configuration\n+ \"\"\"\n+ return self._case_cfg\n+\n+ def get_suite_cfg(self):\n+ \"\"\"\n+ Return suite based configuration\n+ \"\"\"\n+ return self._suite_cfg\n+\n+ def update_suite_cfg(self, suite_cfg):\n+ \"\"\"\n+ Update suite based configuration\n+ \"\"\"\n+ self._suite_cfg = suite_cfg\n+\n+ def update_suite_cfg_ele(self, key, value):\n+ \"\"\"\n+ update one element of suite configuration\n+ \"\"\"\n+ self._suite_cfg[key] = value\n+\n+ def execute_tear_downall(self):\n+ \"\"\"\n+ execute suite tear_down_all function\n+ \"\"\"\n+ try:\n+ self.tear_down_all()\n+ except Exception:\n+ self.logger.error(\"tear_down_all failed:\\n\" + traceback.format_exc())\n+\n+ for dutobj in self.duts:\n+ dutobj.kill_all()\n+ self.tester.kill_all()\n+\n+ for dutobj in self.duts:\n+ dutobj.virt_exit()\n+ # destroy all vfs\n+ dutobj.destroy_all_sriov_vfs()\n+\n+ def execute_tear_down(self):\n+ \"\"\"\n+ execute suite tear_down function\n+ \"\"\"\n+ try:\n+ self.tear_down()\n+ except Exception:\n+ self.logger.error(\"tear_down failed:\\n\" + traceback.format_exc())\n+ self.logger.warning(\n+ \"tear down %s failed, might iterfere next case's result!\"\n+ % self.running_case\n+ )\n+\n+ def enable_history(self, history):\n+ \"\"\"\n+ Enable history for all CRB's default session\n+ \"\"\"\n+ for dutobj in self.duts:\n+ dutobj.session.set_history(history)\n+\n+ self.tester.session.set_history(history)\n+\n+ def dump_history(self):\n+ \"\"\"\n+ Dump recorded command history\n+ \"\"\"\n+ for cmd_history in self.setup_history:\n+ print(\"%-20s: %s\" % (BLUE(cmd_history[\"name\"]), cmd_history[\"command\"]))\n+ for cmd_history in self.test_history:\n+ print(\"%-20s: %s\" % (BLUE(cmd_history[\"name\"]), cmd_history[\"command\"]))\n+\n+ def wirespeed(self, nic, frame_size, num_ports):\n+ \"\"\"\n+ Calculate bit rate. It is depended for NICs\n+ \"\"\"\n+ bitrate = 1000.0 # 1Gb ('.0' forces to operate as float)\n+ if self.nic == \"any\" or self.nic == \"cfg\":\n+ driver = self._get_nic_driver(self.dut.ports_info[0][\"type\"])\n+ nic = get_nic_name(self.dut.ports_info[0][\"type\"])\n+ else:\n+ driver = self._get_nic_driver(self.nic)\n+ nic = self.nic\n+\n+ if driver == \"ixgbe\":\n+ bitrate *= 10 # 10 Gb NICs\n+ elif nic == \"avoton2c5\":\n+ bitrate *= 2.5 # 2.5 Gb NICs\n+ elif nic in [\"fortville_spirit\", \"fortville_spirit_single\"]:\n+ bitrate *= 40\n+ elif nic == \"fortville_eagle\":\n+ bitrate *= 10\n+ elif nic == \"fortpark_TLV\":\n+ bitrate *= 10\n+ elif driver == \"thunder-nicvf\":\n+ bitrate *= 10\n+ elif nic == \"fortville_25g\":\n+ bitrate *= 25\n+ elif nic == \"columbiaville_25g\":\n+ bitrate *= 25\n+ elif nic == \"columbiaville_25gx2\":\n+ bitrate *= 25\n+ elif nic == \"columbiaville_100g\":\n+ bitrate *= 100\n+\n+ return bitrate * num_ports / 8 / (frame_size + 20)\n+\n+ def bind_nic_driver(self, ports, driver=\"\"):\n+ for port in ports:\n+ netdev = self.dut.ports_info[port][\"port\"]\n+ driver_now = netdev.get_nic_driver()\n+ driver_new = driver if driver else netdev.default_driver\n+ if driver_new != driver_now:\n+ netdev.bind_driver(driver=driver_new)\n+\n+\n+def skip_unsupported_pkg(pkgs):\n+ \"\"\"\n+ Skip case which are not supported by the input pkgs\n+ \"\"\"\n+ if isinstance(pkgs, str):\n+ pkgs = [pkgs]\n+\n+ def decorator(func):\n+ @wraps(func)\n+ def wrapper(*args, **kwargs):\n+ test_case = args[0]\n+ pkg_type = test_case.pkg.get(\"type\")\n+ pkg_version = test_case.pkg.get(\"version\")\n+ if not pkg_type or not pkg_version:\n+ raise VerifyFailure(\"Failed due to pkg is empty\".format(test_case.pkg))\n+ for pkg in pkgs:\n+ if pkg in pkg_type:\n+ raise VerifySkip(\n+ \"{} {} do not support this case\".format(pkg_type, pkg_version)\n+ )\n+ return func(*args, **kwargs)\n+\n+ return wrapper\n+\n+ return decorator\n+\n+\n+def skip_unsupported_nic(nics):\n+ \"\"\"\n+ Skip case which are not supported by the input nics\n+ \"\"\"\n+ if isinstance(nics, str):\n+ nics = [nics]\n+\n+ def decorator(func):\n+ @wraps(func)\n+ def wrapper(*args, **kwargs):\n+ test_case = args[0]\n+ if test_case.nic in nics:\n+ raise VerifySkip(\"{} do not support this case\".format(test_case.nic))\n+ return func(*args, **kwargs)\n+\n+ return wrapper\n+\n+ return decorator\n+\n+\n+def check_supported_nic(nics):\n+ \"\"\"\n+ check if the test case is supported by the input nics\n+ \"\"\"\n+ if isinstance(nics, str):\n+ nics = [nics]\n+\n+ def decorator(func):\n+ @wraps(func)\n+ def wrapper(*args, **kwargs):\n+ test_case = args[0]\n+ if test_case.nic not in nics:\n+ raise VerifySkip(\"{} do not support this case\".format(test_case.nic))\n+ return func(*args, **kwargs)\n+\n+ return wrapper\n+\n+ return decorator\n", "prefixes": [ "RFC", "v1", "07/23" ] }{ "id": 109306, "url": "