@@ -19,7 +19,7 @@
from dataclasses import dataclass, field
from enum import Flag, auto
from pathlib import PurePath
-from typing import ClassVar
+from typing import ClassVar, cast
from typing_extensions import Self, Unpack
@@ -541,6 +541,56 @@ class TestPmdPort(TextParser):
)
+@dataclass
+class TestPmdPortQueue(TextParser):
+ """Dataclass representation of the common parts of the testpmd `show rxq/txq info` commands."""
+
+ #:
+ prefetch_threshold: int = field(metadata=TextParser.find_int(r"prefetch threshold: (\d+)"))
+ #:
+ host_threshold: int = field(metadata=TextParser.find_int(r"host threshold: (\d+)"))
+ #:
+ writeback_threshold: int = field(metadata=TextParser.find_int(r"writeback threshold: (\d+)"))
+ #:
+ free_threshold: int = field(metadata=TextParser.find_int(r"free threshold: (\d+)"))
+ #:
+ deferred_start: bool = field(metadata=TextParser.find("deferred start: on"))
+ #: The number of RXD/TXDs is just the ring size of the queue.
+ ring_size: int = field(metadata=TextParser.find_int(r"Number of (?:RXDs|TXDs): (\d+)"))
+ #:
+ is_queue_started: bool = field(metadata=TextParser.find("queue state: started"))
+ #:
+ burst_mode: str | None = field(
+ default=None, metadata=TextParser.find(r"Burst mode: ([^\r\n]+)")
+ )
+
+
+@dataclass
+class TestPmdTxPortQueue(TestPmdPortQueue):
+ """Dataclass representation for testpmd `show txq info` command."""
+
+ #:
+ rs_threshold: int | None = field(
+ default=None, metadata=TextParser.find_int(r"RS threshold: (\d+)")
+ )
+
+
+@dataclass
+class TestPmdRxPortQueue(TestPmdPortQueue):
+ """Dataclass representation for testpmd `show rxq info` command."""
+
+ #:
+ mempool: str | None = field(default=None, metadata=TextParser.find(r"Mempool: ([^\r\n]+)"))
+ #:
+ can_drop_packets: bool | None = field(
+ default=None, metadata=TextParser.find(r"drop packets: on")
+ )
+ #:
+ is_scattering_packets: bool | None = field(
+ default=None, metadata=TextParser.find(r"scattered packets: on")
+ )
+
+
@dataclass
class TestPmdPortStats(TextParser):
"""Port statistics."""
@@ -643,7 +693,7 @@ def start(self, verify: bool = True) -> None:
"Not all ports came up after starting packet forwarding in testpmd."
)
- def stop(self, verify: bool = True) -> None:
+ def stop(self, verify: bool = True) -> str:
"""Stop packet forwarding.
Args:
@@ -651,6 +701,9 @@ def stop(self, verify: bool = True) -> None:
forwarding was stopped successfully or not started. If neither is found, it is
considered an error.
+ Returns:
+ Output gathered from sending the stop command.
+
Raises:
InteractiveCommandExecutionError: If `verify` is :data:`True` and the command to stop
forwarding results in an error.
@@ -663,6 +716,7 @@ def stop(self, verify: bool = True) -> None:
):
self._logger.debug(f"Failed to stop packet forwarding: \n{stop_cmd_output}")
raise InteractiveCommandExecutionError("Testpmd failed to stop packet forwarding.")
+ return stop_cmd_output
def get_devices(self) -> list[TestPmdDevice]:
"""Get a list of device names that are known to testpmd.
@@ -804,6 +858,181 @@ def show_port_stats(self, port_id: int) -> TestPmdPortStats:
return TestPmdPortStats.parse(output)
+ def show_port_queue_info(
+ self, port_id: int, queue_id: int, is_rx_queue: bool
+ ) -> TestPmdPortQueue:
+ """Get the info for a queue on a given port.
+
+ Args:
+ port_id: ID of the port where the queue resides.
+ queue_id: ID of the queue to query.
+ is_rx_queue: Whether to check an RX or TX queue. If :data:`True` an RX queue will be
+ queried, otherwise a TX queue will be queried.
+
+ Raises:
+ InteractiveCommandExecutionError: If there is a failure when getting the info for the
+ queue.
+
+ Returns:
+ Information about the queue on the given port.
+ """
+ queue_type = "rxq" if is_rx_queue else "txq"
+ queue_info = self.send_command(
+ f"show {queue_type} info {port_id} {queue_id}", skip_first_line=True
+ )
+ if queue_info.startswith("ETHDEV: Invalid"):
+ raise InteractiveCommandExecutionError(
+ f"Could not get the info for {queue_type} {queue_id} on port {port_id}"
+ )
+ return (
+ TestPmdRxPortQueue.parse(queue_info)
+ if is_rx_queue
+ else TestPmdTxPortQueue.parse(queue_info)
+ )
+
+ def show_port_rx_queue_info(self, port_id: int, queue_id: int) -> TestPmdRxPortQueue:
+ """Get port queue info and cast to :class:`TestPmdRxPortQueue`.
+
+ Wrapper around :meth:`show_port_queue_info` that casts the more generic type into the
+ correct subclass.
+
+ Args:
+ port_id: ID of the port where the queue resides.
+ queue_id: ID of the queue to query.
+
+ Returns:
+ Information about the Rx queue on the given port.
+ """
+ return cast(TestPmdRxPortQueue, self.show_port_queue_info(port_id, queue_id, True))
+
+ def show_port_tx_queue_info(self, port_id: int, queue_id: int) -> TestPmdTxPortQueue:
+ """Get port queue info and cast to :class:`TestPmdTxPortQueue`.
+
+ Wrapper around :meth:`show_port_queue_info` that casts the more generic type into the
+ correct subclass.
+
+ Args:
+ port_id: ID of the port where the queue resides.
+ queue_id: ID of the queue to query.
+
+ Returns:
+ Information about the Tx queue on the given port.
+ """
+ return cast(TestPmdTxPortQueue, self.show_port_queue_info(port_id, queue_id, False))
+
+ def stop_port_queue(
+ self, port_id: int, queue_id: int, is_rx_queue: bool, verify: bool = True
+ ) -> None:
+ """Stops a given queue on a port.
+
+ Args:
+ port_id: ID of the port that the queue belongs to.
+ queue_id: ID of the queue to stop.
+ is_rx_queue: Type of queue to stop. If :data:`True` an RX queue will be stopped,
+ otherwise a TX queue will be stopped.
+ verify: If :data:`True` an additional command will be sent to verify the queue stopped.
+ Defaults to :data:`True`.
+
+ Raises:
+ InteractiveCommandExecutionError: If `verify` is :data:`True` and the queue fails to
+ stop.
+ """
+ port_type = "rxq" if is_rx_queue else "txq"
+ stop_cmd_output = self.send_command(f"port {port_id} {port_type} {queue_id} stop")
+ if verify:
+ if self.show_port_queue_info(port_id, queue_id, is_rx_queue).is_queue_started:
+ self._logger.debug(
+ f"Failed to stop {port_type} {queue_id} on port {port_id}:\n{stop_cmd_output}"
+ )
+ raise InteractiveCommandExecutionError(
+ f"Test pmd failed to stop {port_type} {queue_id} on port {port_id}"
+ )
+
+ def start_port_queue(
+ self, port_id: int, queue_id: int, is_rx_queue: bool, verify: bool = True
+ ) -> None:
+ """Starts a given queue on a port.
+
+ First sets up the port queue, then starts it.
+
+ Args:
+ port_id: ID of the port that the queue belongs to.
+ queue_id: ID of the queue to start.
+ is_rx_queue: Type of queue to start. If :data:`True` an RX queue will be started,
+ otherwise a TX queue will be started.
+ verify: if :data:`True` an additional command will be sent to verify that the queue was
+ started. Defaults to :data:`True`.
+
+ Raises:
+ InteractiveCommandExecutionError: If `verify` is :data:`True` and the queue fails to
+ start.
+ """
+ port_type = "rxq" if is_rx_queue else "txq"
+ self.setup_port_queue(port_id, queue_id, is_rx_queue)
+ start_cmd_output = self.send_command(f"port {port_id} {port_type} {queue_id} start")
+ if verify:
+ if not self.show_port_queue_info(port_id, queue_id, is_rx_queue).is_queue_started:
+ self._logger.debug(
+ f"Failed to start {port_type} {queue_id} on port {port_id}:\n{start_cmd_output}"
+ )
+ raise InteractiveCommandExecutionError(
+ f"Test pmd failed to start {port_type} {queue_id} on port {port_id}"
+ )
+
+ def setup_port_queue(self, port_id: int, queue_id: int, is_rx_queue: bool) -> None:
+ """Setup a given queue on a port.
+
+ This functionality cannot be verified because the setup action only takes effect when the
+ queue is started.
+
+ Args:
+ port_id: ID of the port where the queue resides.
+ queue_id: ID of the queue to setup.
+ is_rx_queue: Type of queue to setup. If :data:`True` an RX queue will be setup,
+ otherwise a TX queue will be setup.
+ """
+ self.send_command(f"port {port_id} {'rxq' if is_rx_queue else 'txq'} {queue_id} setup")
+
+ def set_queue_ring_size(
+ self,
+ port_id: int,
+ queue_id: int,
+ size: int,
+ is_rx_queue: bool,
+ verify: bool = True,
+ ) -> None:
+ """Update the ring size of an Rx/Tx queue on a given port.
+
+ Queue is setup after setting the ring size so that the queue info reflects this change and
+ it can be verified.
+
+ Args:
+ port_id: The port that the queue resides on.
+ queue_id: The ID of the queue on the port.
+ size: The size to update the ring size to.
+ is_rx_queue: Whether to modify an RX or TX queue. If :data:`True` an RX queue will be
+ updated, otherwise a TX queue will be updated.
+ verify: If :data:`True` an additional command will be sent to check the ring size of
+ the queue in an attempt to validate that the size was changes properly.
+
+ Raises:
+ InteractiveCommandExecutionError: If `verify` is :data:`True` and there is a failure
+ when updating ring size.
+ """
+ queue_type = "rxq" if is_rx_queue else "txq"
+ self.send_command(f"port config {port_id} {queue_type} {queue_id} ring_size {size}")
+ self.setup_port_queue(port_id, queue_id, is_rx_queue)
+ if verify:
+ curr_ring_size = self.show_port_queue_info(port_id, queue_id, is_rx_queue).ring_size
+ if curr_ring_size != size:
+ self._logger.debug(
+ f"Failed up update ring size of queue {queue_id} on port {port_id}. Current"
+ f" ring size is {curr_ring_size}."
+ )
+ raise InteractiveCommandExecutionError(
+ f"Failed to update ring size of queue {queue_id} on port {port_id}"
+ )
+
def _close(self) -> None:
"""Overrides :meth:`~.interactive_shell.close`."""
self.stop()