[4/5] dts: add `show port info` command to TestPmdShell
Checks
Commit Message
Add a new TestPmdPort data structure to represent the output
returned by `show port info`, which is implemented as part of
TestPmdShell.
The TestPmdPort data structure and its derived classes are modelled
based on the relevant testpmd source code.
This implementation makes extensive use of regular expressions, which
all parse individually. The rationale behind this is to lower the risk
of the testpmd output changing as part of development. Therefore
minimising breakage.
Bugzilla ID: 1407
Signed-off-by: Luca Vizzarro <luca.vizzarro@arm.com>
Reviewed-by: Paul Szczepanek <paul.szczepanek@arm.com>
---
dts/framework/remote_session/testpmd_shell.py | 473 +++++++++++++++++-
1 file changed, 472 insertions(+), 1 deletion(-)
Comments
On Fri, Apr 12, 2024 at 1:11 PM Luca Vizzarro <luca.vizzarro@arm.com> wrote:
>
> Add a new TestPmdPort data structure to represent the output
> returned by `show port info`, which is implemented as part of
> TestPmdShell.
>
> The TestPmdPort data structure and its derived classes are modelled
> based on the relevant testpmd source code.
>
> This implementation makes extensive use of regular expressions, which
> all parse individually. The rationale behind this is to lower the risk
> of the testpmd output changing as part of development. Therefore
> minimising breakage.
>
> Bugzilla ID: 1407
>
> Signed-off-by: Luca Vizzarro <luca.vizzarro@arm.com>
> Reviewed-by: Paul Szczepanek <paul.szczepanek@arm.com>
<snip>
> +@dataclass
> +class TestPmdPort(TextParser):
This and the classes above are missing docstrings.
<snip>
> @@ -225,6 +664,38 @@ def set_forward_mode(self, mode: TestPmdForwardingModes, verify: bool = True):
> f"Test pmd failed to set fwd mode to {mode.value}"
> )
>
> + def show_port_info_all(self) -> list[TestPmdPort]:
> + """Returns the information of all the ports."""
Can we add sample output so that the format of what we're trying to
parse is clear?
> + output = self.send_command("show port info all")
> +
> + ports = []
> + iter = re.finditer(r"\*+.+\*+", output)
> + if next(iter, False): # we are slicing retrospectively, skip first block
> + start_pos = 0
> + for block in iter:
> + end_pos = block.start()
> + ports.append(TestPmdPort.parse(output[start_pos:end_pos]))
> + start_pos = end_pos
> +
> + ports.append(TestPmdPort.parse(output[start_pos:]))
> +
> + return ports
Can this be done the same way it's done in the last commit?
iter = re.finditer(r"(^ #*.+#*$[^#]+)^ #*$", output, re.MULTILINE)
return [TestPmdPortStats.parse(block.group(1)) for block in iter]
Looks much better.
> +
> + def show_port_info(self, port_id: int) -> TestPmdPort:
> + """Returns the given port information.
> +
> + Args:
> + port_id: The port ID to gather information for.
> +
> + Raises:
> + InteractiveCommandExecutionError: If `port_id` is invalid.
> + """
> + output = self.send_command(f"show port info {port_id}", skip_first_line=True)
> + if output.startswith("Invalid port"):
> + raise InteractiveCommandExecutionError("invalid port given")
> +
> + return TestPmdPort.parse(output)
> +
> def close(self) -> None:
> """Overrides :meth:`~.interactive_shell.close`."""
> self.send_command("quit", "")
> --
> 2.34.1
>
On 16/04/2024 10:03, Juraj Linkeš wrote:
>> +@dataclass
>> +class TestPmdPort(TextParser):
>
> This and the classes above are missing docstrings.
Ack.
>> @@ -225,6 +664,38 @@ def set_forward_mode(self, mode: TestPmdForwardingModes, verify: bool = True):
>> f"Test pmd failed to set fwd mode to {mode.value}"
>> )
>>
>> + def show_port_info_all(self) -> list[TestPmdPort]:
>> + """Returns the information of all the ports."""
>
> Can we add sample output so that the format of what we're trying to
> parse is clear?
Ack.
>> + output = self.send_command("show port info all")
>> +
>> + ports = []
>> + iter = re.finditer(r"\*+.+\*+", output)
>> + if next(iter, False): # we are slicing retrospectively, skip first block
>> + start_pos = 0
>> + for block in iter:
>> + end_pos = block.start()
>> + ports.append(TestPmdPort.parse(output[start_pos:end_pos]))
>> + start_pos = end_pos
>> +
>> + ports.append(TestPmdPort.parse(output[start_pos:]))
>> +
>> + return ports
>
> Can this be done the same way it's done in the last commit?
>
> iter = re.finditer(r"(^ #*.+#*$[^#]+)^ #*$", output, re.MULTILINE)
> return [TestPmdPortStats.parse(block.group(1)) for block in iter]
>
> Looks much better.
I agree that it looks much better. I gave it a first attempt to come up
with a regular expression that is not too complicated and is able to
match blocks individually. I've noticed that blocks start with:
\n********* Infos for port X ************
but don't have an actual ending delimiter, unlike for the stats. I'll
experiment with some look ahead constructs. The easiest solution is to
match everything that is not * ([^*]+) but can we be certain that there
won't be any asterisk in the actual information?
On Tue, Apr 16, 2024 at 2:24 PM Luca Vizzarro <Luca.Vizzarro@arm.com> wrote:
>
> On 16/04/2024 10:03, Juraj Linkeš wrote:
> >> +@dataclass
> >> +class TestPmdPort(TextParser):
> >
> > This and the classes above are missing docstrings.
>
> Ack.
>
> >> @@ -225,6 +664,38 @@ def set_forward_mode(self, mode: TestPmdForwardingModes, verify: bool = True):
> >> f"Test pmd failed to set fwd mode to {mode.value}"
> >> )
> >>
> >> + def show_port_info_all(self) -> list[TestPmdPort]:
> >> + """Returns the information of all the ports."""
> >
> > Can we add sample output so that the format of what we're trying to
> > parse is clear?
>
> Ack.
>
> >> + output = self.send_command("show port info all")
> >> +
> >> + ports = []
> >> + iter = re.finditer(r"\*+.+\*+", output)
> >> + if next(iter, False): # we are slicing retrospectively, skip first block
> >> + start_pos = 0
> >> + for block in iter:
> >> + end_pos = block.start()
> >> + ports.append(TestPmdPort.parse(output[start_pos:end_pos]))
> >> + start_pos = end_pos
> >> +
> >> + ports.append(TestPmdPort.parse(output[start_pos:]))
> >> +
> >> + return ports
> >
> > Can this be done the same way it's done in the last commit?
> >
> > iter = re.finditer(r"(^ #*.+#*$[^#]+)^ #*$", output, re.MULTILINE)
> > return [TestPmdPortStats.parse(block.group(1)) for block in iter]
> >
> > Looks much better.
>
> I agree that it looks much better. I gave it a first attempt to come up
> with a regular expression that is not too complicated and is able to
> match blocks individually. I've noticed that blocks start with:
>
> \n********* Infos for port X ************
>
> but don't have an actual ending delimiter, unlike for the stats.
Ah, so that's the difference and the reason. I guess the ending
delimiter is either the start of the next section of the prompt (or
the end of the string).
> I'll
> experiment with some look ahead constructs. The easiest solution is to
> match everything that is not * ([^*]+) but can we be certain that there
> won't be any asterisk in the actual information?
We can't. But we can be reasonably certain there won't be five
consecutive asterisks, so maybe we can work with that.
On 17/04/2024 14:22, Juraj Linkeš wrote:
>> I agree that it looks much better. I gave it a first attempt to come up
>> with a regular expression that is not too complicated and is able to
>> match blocks individually. I've noticed that blocks start with:
>>
>> \n********* Infos for port X ************
>>
>> but don't have an actual ending delimiter, unlike for the stats.
>
> Ah, so that's the difference and the reason. I guess the ending
> delimiter is either the start of the next section of the prompt (or
> the end of the string).
Yes, but it's not as trivial unfortunately. In the current code I am
effectively just finding all the start positions and slice.
>> I'll
>> experiment with some look ahead constructs. The easiest solution is to
>> match everything that is not * ([^*]+) but can we be certain that there
>> won't be any asterisk in the actual information?
>
> We can't. But we can be reasonably certain there won't be five
> consecutive asterisks, so maybe we can work with that.
We can work with that by using look ahead constructs as mentioned, which
can be quite intensive. For example:
/(?<=\n\*).*?(?=\n\*|$)/gs
looks for the start delimiter and for the start of the next block or the
end. This works perfectly! But it's performing 9576 steps (!) for just
two ports. The current solution only takes 10 steps in total.
On 17/04/2024 15:25, Luca Vizzarro wrote:
> On 17/04/2024 14:22, Juraj Linkeš wrote:
>>> I'll
>>> experiment with some look ahead constructs. The easiest solution is to
>>> match everything that is not * ([^*]+) but can we be certain that there
>>> won't be any asterisk in the actual information?
>>
>> We can't. But we can be reasonably certain there won't be five
>> consecutive asterisks, so maybe we can work with that.
>
> We can work with that by using look ahead constructs as mentioned, which
> can be quite intensive. For example:
>
> /(?<=\n\*).*?(?=\n\*|$)/gs
>
> looks for the start delimiter and for the start of the next block or the
> end. This works perfectly! But it's performing 9576 steps (!) for just
> two ports. The current solution only takes 10 steps in total.
Thinking of it... we are not really aiming for performance, so I guess
if it simplifies and it's justifiable, then it's not a problem.
Especially since this command shouldn't be called continuosly.
The equivalent /\n\*.+?(?=\n\*|$)/gs (but slightly more optimised) takes
approximately 3*input_length steps to run (according to regex101 at
least). If that's reasonable enough, I can do this:
iter = re.finditer(input, "\n\*.+?(?=\n\*|$)", re.S)
return [TestPmdPortInfo.parse(match.group(0)) for match in iter]
Another optimization is artificially adding a `\n*` delimiter at the end
before feeding it to the regex, thus removing the alternative case (|$),
and making it 2*len steps:
input += "\n*"
iter = re.finditer(input, "\n\*.+?(?=\n\*)", re.S)
return [TestPmdPortInfo.parse(match.group(0)) for match in iter]
Let me know what you think!
Best,
Luca
On Wed, Apr 17, 2024 at 5:29 PM Luca Vizzarro <Luca.Vizzarro@arm.com> wrote:
>
> On 17/04/2024 15:25, Luca Vizzarro wrote:
> > On 17/04/2024 14:22, Juraj Linkeš wrote:
> >>> I'll
> >>> experiment with some look ahead constructs. The easiest solution is to
> >>> match everything that is not * ([^*]+) but can we be certain that there
> >>> won't be any asterisk in the actual information?
> >>
> >> We can't. But we can be reasonably certain there won't be five
> >> consecutive asterisks, so maybe we can work with that.
> >
> > We can work with that by using look ahead constructs as mentioned, which
> > can be quite intensive. For example:
> >
> > /(?<=\n\*).*?(?=\n\*|$)/gs
> >
> > looks for the start delimiter and for the start of the next block or the
> > end. This works perfectly! But it's performing 9576 steps (!) for just
> > two ports. The current solution only takes 10 steps in total.
>
> Thinking of it... we are not really aiming for performance, so I guess
> if it simplifies and it's justifiable, then it's not a problem.
> Especially since this command shouldn't be called continuosly.
>
We have to weigh the pros and cons on an individual basis. In this
case, the output is going to be short so basically any solution is
going to be indistinguishable from any other, performance wise.
> The equivalent /\n\*.+?(?=\n\*|$)/gs (but slightly more optimised) takes
> approximately 3*input_length steps to run (according to regex101 at
> least). If that's reasonable enough, I can do this:
>
> iter = re.finditer(input, "\n\*.+?(?=\n\*|$)", re.S)
> return [TestPmdPortInfo.parse(match.group(0)) for match in iter]
>
> Another optimization is artificially adding a `\n*` delimiter at the end
> before feeding it to the regex, thus removing the alternative case (|$),
> and making it 2*len steps:
>
> input += "\n*"
> iter = re.finditer(input, "\n\*.+?(?=\n\*)", re.S)
> return [TestPmdPortInfo.parse(match.group(0)) for match in iter]
>
I like this second one a bit more. How does the performance change if
we try to match four asterisks "\n\****.+?(?=\n\****)"? Four asterisks
shouldn't randomly be in the output as that's basically another
delimited.
And we should document this in the docstring - sample output, then
explain the extra characters and the regex itself. We shouldn't forget
this in the other commit as well (show port stats).
> Let me know what you think!
>
> Best,
> Luca
On 18/04/2024 07:41, Juraj Linkeš wrote:
>> The equivalent /\n\*.+?(?=\n\*|$)/gs (but slightly more optimised) takes
>> approximately 3*input_length steps to run (according to regex101 at
>> least). If that's reasonable enough, I can do this:
>>
>> iter = re.finditer(input, "\n\*.+?(?=\n\*|$)", re.S)
>> return [TestPmdPortInfo.parse(match.group(0)) for match in iter]
>>
>> Another optimization is artificially adding a `\n*` delimiter at the end
>> before feeding it to the regex, thus removing the alternative case (|$),
>> and making it 2*len steps:
>>
>> input += "\n*"
>> iter = re.finditer(input, "\n\*.+?(?=\n\*)", re.S)
>> return [TestPmdPortInfo.parse(match.group(0)) for match in iter]
>>
>
> I like this second one a bit more. How does the performance change if
> we try to match four asterisks "\n\****.+?(?=\n\****)"? Four asterisks
> shouldn't randomly be in the output as that's basically another
> delimited.
The difference is negligible as the regex walks every character anyways
– while there is a match. Either \* or \*{4} will result in a match. The
problem is that if an attempt of match fails, the regex backtracks, this
is where the problem is. Of course if we already matched 4 asterisks,
then the backtracking can skip the whole sequence at once (compared to
1), but it's only going to be 3 steps less per **** found. It's a bigger
difference if we attempt to match all the asterisks. The lookahead
construct also increases backtracking.
In the meantime, still by amending the output, I've got a solution that
doesn't perform any look aheads:
\*{21}.+?\n{2}
instead of treating blocks as they are (\n******<snip>\n), we can add an
extra \n at the end and treat the blocks as: ******<snip>\n\n. Basically
this assumes that an empty line is the end delimiter. This takes
input_length/2 steps!
Of course in reality every \n is \r\n, as I've discovered that when
shells are invoked using paramiko, the stream becomes CRLF for some
reason I haven't explored. I think this was worth mentioning for
everybody, in case surprise carriage returns may reveal disruptive.
> And we should document this in the docstring - sample output, then
> explain the extra characters and the regex itself. We shouldn't forget
> this in the other commit as well (show port stats).
Ack.
@@ -1,6 +1,7 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright(c) 2023 University of New Hampshire
# Copyright(c) 2023 PANTHEON.tech s.r.o.
+# Copyright(c) 2024 Arm Limited
"""Testpmd interactive shell.
@@ -15,14 +16,19 @@
testpmd_shell.close()
"""
+from dataclasses import dataclass, field
+import re
import time
-from enum import auto
+from enum import Flag, auto
from pathlib import PurePath
from typing import Callable, ClassVar
+from typing_extensions import Self
from framework.exception import InteractiveCommandExecutionError
from framework.settings import SETTINGS
from framework.utils import StrEnum
+from framework import parser
+from framework.parser import TextParser
from .interactive_shell import InteractiveShell
@@ -80,6 +86,439 @@ class TestPmdForwardingModes(StrEnum):
recycle_mbufs = auto()
+class VLANOffloadFlag(Flag):
+ #:
+ STRIP = auto()
+ #:
+ FILTER = auto()
+ #:
+ EXTEND = auto()
+ #:
+ QINQ_STRIP = auto()
+
+ @classmethod
+ def from_str_dict(cls, d):
+ """Makes an instance from a dictionary containing the flag member names with an "on" value."""
+ flag = cls(0)
+ for name in cls.__members__:
+ if d.get(name) == "on":
+ flag |= cls[name]
+ return flag
+
+
+class RSSOffloadTypesFlag(Flag):
+ #:
+ ipv4 = auto()
+ #:
+ ipv4_frag = auto()
+ #:
+ ipv4_tcp = auto()
+ #:
+ ipv4_udp = auto()
+ #:
+ ipv4_sctp = auto()
+ #:
+ ipv4_other = auto()
+ #:
+ ipv6 = auto()
+ #:
+ ipv6_frag = auto()
+ #:
+ ipv6_tcp = auto()
+ #:
+ ipv6_udp = auto()
+ #:
+ ipv6_sctp = auto()
+ #:
+ ipv6_other = auto()
+ #:
+ l2_payload = auto()
+ #:
+ ipv6_ex = auto()
+ #:
+ ipv6_tcp_ex = auto()
+ #:
+ ipv6_udp_ex = auto()
+ #:
+ port = auto()
+ #:
+ vxlan = auto()
+ #:
+ geneve = auto()
+ #:
+ nvgre = auto()
+ #:
+ user_defined_22 = auto()
+ #:
+ gtpu = auto()
+ #:
+ eth = auto()
+ #:
+ s_vlan = auto()
+ #:
+ c_vlan = auto()
+ #:
+ esp = auto()
+ #:
+ ah = auto()
+ #:
+ l2tpv3 = auto()
+ #:
+ pfcp = auto()
+ #:
+ pppoe = auto()
+ #:
+ ecpri = auto()
+ #:
+ mpls = auto()
+ #:
+ ipv4_chksum = auto()
+ #:
+ l4_chksum = auto()
+ #:
+ l2tpv2 = auto()
+ #:
+ ipv6_flow_label = auto()
+ #:
+ user_defined_38 = auto()
+ #:
+ user_defined_39 = auto()
+ #:
+ user_defined_40 = auto()
+ #:
+ user_defined_41 = auto()
+ #:
+ user_defined_42 = auto()
+ #:
+ user_defined_43 = auto()
+ #:
+ user_defined_44 = auto()
+ #:
+ user_defined_45 = auto()
+ #:
+ user_defined_46 = auto()
+ #:
+ user_defined_47 = auto()
+ #:
+ user_defined_48 = auto()
+ #:
+ user_defined_49 = auto()
+ #:
+ user_defined_50 = auto()
+ #:
+ user_defined_51 = auto()
+ #:
+ l3_pre96 = auto()
+ #:
+ l3_pre64 = auto()
+ #:
+ l3_pre56 = auto()
+ #:
+ l3_pre48 = auto()
+ #:
+ l3_pre40 = auto()
+ #:
+ l3_pre32 = auto()
+ #:
+ l2_dst_only = auto()
+ #:
+ l2_src_only = auto()
+ #:
+ l4_dst_only = auto()
+ #:
+ l4_src_only = auto()
+ #:
+ l3_dst_only = auto()
+ #:
+ l3_src_only = auto()
+
+ #:
+ ip = ipv4 | ipv4_frag | ipv4_other | ipv6 | ipv6_frag | ipv6_other | ipv6_ex
+ #:
+ udp = ipv4_udp | ipv6_udp | ipv6_udp_ex
+ #:
+ tcp = ipv4_tcp | ipv6_tcp | ipv6_tcp_ex
+ #:
+ sctp = ipv4_sctp | ipv6_sctp
+ #:
+ tunnel = vxlan | geneve | nvgre
+ #:
+ vlan = s_vlan | c_vlan
+ #:
+ all = (
+ eth
+ | vlan
+ | ip
+ | tcp
+ | udp
+ | sctp
+ | l2_payload
+ | l2tpv3
+ | esp
+ | ah
+ | pfcp
+ | gtpu
+ | ecpri
+ | mpls
+ | l2tpv2
+ )
+
+ @classmethod
+ def from_list_string(cls, names: str) -> Self:
+ flag = cls(0)
+ for name in names.split():
+ flag |= cls.from_str(name)
+ return flag
+
+ @classmethod
+ def from_str(cls, name: str) -> Self:
+ member_name = name.strip().replace("-", "_")
+ return cls[member_name]
+
+ def __str__(self):
+ return self.name.replace("_", "-")
+
+
+class DeviceCapabilitiesFlag(Flag):
+ RUNTIME_RX_QUEUE_SETUP = auto()
+ """Device supports Rx queue setup after device started."""
+ RUNTIME_TX_QUEUE_SETUP = auto()
+ """Device supports Tx queue setup after device started."""
+ RXQ_SHARE = auto()
+ """Device supports shared Rx queue among ports within Rx domain and switch domain."""
+ FLOW_RULE_KEEP = auto()
+ """Device supports keeping flow rules across restart."""
+ FLOW_SHARED_OBJECT_KEEP = auto()
+ """Device supports keeping shared flow objects across restart."""
+
+
+class DeviceErrorHandlingMode(StrEnum):
+ #:
+ none = auto()
+ #:
+ passive = auto()
+ #:
+ proactive = auto()
+ #:
+ unknown = auto()
+
+
+def _validate_device_private_info(info: str) -> str | None:
+ """Ensure that we are not parsing invalid device private info output."""
+ info = info.strip()
+ if info == "none" or info.startswith("Invalid file") or info.startswith("Failed to dump"):
+ return None
+ return info
+
+
+@dataclass
+class TestPmdPort(TextParser):
+ #:
+ id: int = field(metadata=parser.to_int(parser.regex(r"Infos for port (\d+)\b")))
+ #:
+ device_name: str = field(metadata=parser.regex(r"Device name: ([^\r\n]+)"))
+ #:
+ driver_name: str = field(metadata=parser.regex(r"Driver name: ([^\r\n]+)"))
+ #:
+ socket_id: int = field(metadata=parser.to_int(parser.regex(r"Connect to socket: (\d+)")))
+ #:
+ is_link_up: bool = field(metadata=parser.eq("up", parser.regex(r"Link status: (up|down)")))
+ #:
+ link_speed: str = field(metadata=parser.regex(r"Link speed: ([^\r\n]+)"))
+ #:
+ is_link_full_duplex: bool = field(
+ metadata=parser.eq("full", parser.regex(r"Link duplex: (full|half)-duplex"))
+ )
+ #:
+ is_link_autonegotiated: bool = field(
+ metadata=parser.to_bool(parser.regex(r"Autoneg status: (On|Off)"))
+ )
+ #:
+ is_promiscuous_mode_enabled: bool = field(
+ metadata=parser.to_bool(parser.regex(r"Promiscuous mode: (enabled|disabled)"))
+ )
+ #:
+ is_allmulticast_mode_enabled: bool = field(
+ metadata=parser.to_bool(parser.regex(r"Allmulticast mode: (enabled|disabled)"))
+ )
+ #: Maximum number of MAC addresses
+ max_mac_addresses_num: int = field(
+ metadata=parser.to_int(parser.regex(r"Maximum number of MAC addresses: (\d+)"))
+ )
+ #: Maximum configurable length of RX packet
+ max_hash_mac_addresses_num: int = field(
+ metadata=parser.to_int(
+ parser.regex(r"Maximum number of MAC addresses of hash filtering: (\d+)")
+ )
+ )
+ #: Minimum size of RX buffer
+ min_rx_bufsize: int = field(
+ metadata=parser.to_int(parser.regex(r"Minimum size of RX buffer: (\d+)"))
+ )
+ #: Maximum configurable length of RX packet
+ max_rx_packet_length: int = field(
+ metadata=parser.to_int(parser.regex(r"Maximum configurable length of RX packet: (\d+)"))
+ )
+ #: Maximum configurable size of LRO aggregated packet
+ max_lro_packet_size: int = field(
+ metadata=parser.to_int(
+ parser.regex(r"Maximum configurable size of LRO aggregated packet: (\d+)")
+ )
+ )
+
+ #: Current number of RX queues
+ rx_queues_num: int = field(
+ metadata=parser.to_int(parser.regex(r"Current number of RX queues: (\d+)"))
+ )
+ #: Max possible RX queues
+ max_rx_queues_num: int = field(
+ metadata=parser.to_int(parser.regex(r"Max possible RX queues: (\d+)"))
+ )
+ #: Max possible number of RXDs per queue
+ max_queue_rxd_num: int = field(
+ metadata=parser.to_int(parser.regex(r"Max possible number of RXDs per queue: (\d+)"))
+ )
+ #: Min possible number of RXDs per queue
+ min_queue_rxd_num: int = field(
+ metadata=parser.to_int(parser.regex(r"Min possible number of RXDs per queue: (\d+)"))
+ )
+ #: RXDs number alignment
+ rxd_alignment_num: int = field(
+ metadata=parser.to_int(parser.regex(r"RXDs number alignment: (\d+)"))
+ )
+
+ #: Current number of TX queues
+ tx_queues_num: int = field(
+ metadata=parser.to_int(parser.regex(r"Current number of TX queues: (\d+)"))
+ )
+ #: Max possible TX queues
+ max_tx_queues_num: int = field(
+ metadata=parser.to_int(parser.regex(r"Max possible TX queues: (\d+)"))
+ )
+ #: Max possible number of TXDs per queue
+ max_queue_txd_num: int = field(
+ metadata=parser.to_int(parser.regex(r"Max possible number of TXDs per queue: (\d+)"))
+ )
+ #: Min possible number of TXDs per queue
+ min_queue_txd_num: int = field(
+ metadata=parser.to_int(parser.regex(r"Min possible number of TXDs per queue: (\d+)"))
+ )
+ #: TXDs number alignment
+ txd_alignment_num: int = field(
+ metadata=parser.to_int(parser.regex(r"TXDs number alignment: (\d+)"))
+ )
+ #: Max segment number per packet
+ max_packet_segment_num: int = field(
+ metadata=parser.to_int(parser.regex(r"Max segment number per packet: (\d+)"))
+ )
+ #: Max segment number per MTU/TSO
+ max_mtu_segment_num: int = field(
+ metadata=parser.to_int(parser.regex(r"Max segment number per MTU\/TSO: (\d+)"))
+ )
+
+ #:
+ device_capabilities: DeviceCapabilitiesFlag = field(
+ metadata=parser.chain(
+ DeviceCapabilitiesFlag,
+ parser.to_int(parser.regex(r"Device capabilities: (0x[A-Fa-f\d]+)")),
+ )
+ )
+ #:
+ device_error_handling_mode: DeviceErrorHandlingMode = field(
+ metadata=parser.chain(
+ DeviceErrorHandlingMode, parser.regex(r"Device error handling mode: (\w+)")
+ )
+ )
+ #:
+ device_private_info: str | None = field(
+ default=None,
+ metadata=parser.chain(
+ _validate_device_private_info,
+ parser.regex(r"Device private info:\s+([\s\S]+)", re.MULTILINE),
+ ),
+ )
+
+ #:
+ hash_key_size: int | None = field(
+ default=None, metadata=parser.to_int(parser.regex(r"Hash key size in bytes: (\d+)"))
+ )
+ #:
+ redirection_table_size: int | None = field(
+ default=None, metadata=parser.to_int(parser.regex(r"Redirection table size: (\d+)"))
+ )
+ #:
+ supported_rss_offload_flow_types: RSSOffloadTypesFlag = field(
+ default=RSSOffloadTypesFlag(0),
+ metadata=parser.chain(
+ RSSOffloadTypesFlag.from_list_string,
+ parser.regex(r"Supported RSS offload flow types:((?:\r?\n? \S+)+)", re.MULTILINE),
+ ),
+ )
+
+ #:
+ mac_address: str | None = field(
+ default=None, metadata=parser.regex(r"MAC address: ([A-Fa-f0-9:]+)")
+ )
+ #:
+ fw_version: str | None = field(
+ default=None, metadata=parser.regex(r"Firmware-version: ([^\r\n]+)")
+ )
+ #:
+ dev_args: str | None = field(default=None, metadata=parser.regex(r"Devargs: ([^\r\n]+)"))
+ #: Socket id of the memory allocation
+ mem_alloc_socket_id: int | None = field(
+ default=None,
+ metadata=parser.to_int(parser.regex(r"memory allocation on the socket: (\d+)")),
+ )
+ #:
+ mtu: int | None = field(default=None, metadata=parser.to_int(parser.regex(r"MTU: (\d+)")))
+
+ #:
+ vlan_offload: VLANOffloadFlag | None = field(
+ default=None,
+ metadata=parser.chain(
+ VLANOffloadFlag.from_str_dict,
+ parser.regex(
+ r"VLAN offload:\s+"
+ r"strip (?P<STRIP>on|off), "
+ r"filter (?P<FILTER>on|off), "
+ r"extend (?P<EXTEND>on|off), "
+ r"qinq strip (?P<QINQ_STRIP>on|off)$",
+ re.MULTILINE,
+ named=True,
+ ),
+ ),
+ )
+
+ #: Maximum size of RX buffer
+ max_rx_bufsize: int | None = field(
+ default=None, metadata=parser.to_int(parser.regex(r"Maximum size of RX buffer: (\d+)"))
+ )
+ #: Maximum number of VFs
+ max_vfs_num: int | None = field(
+ default=None, metadata=parser.to_int(parser.regex(r"Maximum number of VFs: (\d+)"))
+ )
+ #: Maximum number of VMDq pools
+ max_vmdq_pools_num: int | None = field(
+ default=None, metadata=parser.to_int(parser.regex(r"Maximum number of VMDq pools: (\d+)"))
+ )
+
+ #:
+ switch_name: str | None = field(default=None, metadata=parser.regex(r"Switch name: ([\r\n]+)"))
+ #:
+ switch_domain_id: int | None = field(
+ default=None, metadata=parser.to_int(parser.regex(r"Switch domain Id: (\d+)"))
+ )
+ #:
+ switch_port_id: int | None = field(
+ default=None, metadata=parser.to_int(parser.regex(r"Switch Port Id: (\d+)"))
+ )
+ #:
+ switch_rx_domain: int | None = field(
+ default=None, metadata=parser.to_int(parser.regex(r"Switch Rx domain: (\d+)"))
+ )
+
+
class TestPmdShell(InteractiveShell):
"""Testpmd interactive shell.
@@ -225,6 +664,38 @@ def set_forward_mode(self, mode: TestPmdForwardingModes, verify: bool = True):
f"Test pmd failed to set fwd mode to {mode.value}"
)
+ def show_port_info_all(self) -> list[TestPmdPort]:
+ """Returns the information of all the ports."""
+ output = self.send_command("show port info all")
+
+ ports = []
+ iter = re.finditer(r"\*+.+\*+", output)
+ if next(iter, False): # we are slicing retrospectively, skip first block
+ start_pos = 0
+ for block in iter:
+ end_pos = block.start()
+ ports.append(TestPmdPort.parse(output[start_pos:end_pos]))
+ start_pos = end_pos
+
+ ports.append(TestPmdPort.parse(output[start_pos:]))
+
+ return ports
+
+ def show_port_info(self, port_id: int) -> TestPmdPort:
+ """Returns the given port information.
+
+ Args:
+ port_id: The port ID to gather information for.
+
+ Raises:
+ InteractiveCommandExecutionError: If `port_id` is invalid.
+ """
+ output = self.send_command(f"show port info {port_id}", skip_first_line=True)
+ if output.startswith("Invalid port"):
+ raise InteractiveCommandExecutionError("invalid port given")
+
+ return TestPmdPort.parse(output)
+
def close(self) -> None:
"""Overrides :meth:`~.interactive_shell.close`."""
self.send_command("quit", "")