[V1,2/4] framework/packet: Update packet module for new methods.

Message ID 20230712193126.1994361-3-ke1.xu@intel.com (mailing list archive)
State New
Headers
Series Updating packet module and introducing a new common module. |

Commit Message

Ke Xu July 12, 2023, 7:31 p.m. UTC
  1. Add util method segment_bytes.

2. Add methods get_packet_layer_index_oip_ol4_tun_ip_l4, get_packet_layer_index_to_tso_seg, get_packet_layer_index_tunnel for packet layer locating.

3. Add doc string for strip_pktload.

4. Add methods for packet payload getting, payload segmentation and checking.

5. Add methods for packet checksum getting, correction, stating and offload flag checking.

Signed-off-by: Ke Xu <ke1.xu@intel.com>
---
 framework/packet.py | 858 +++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 856 insertions(+), 2 deletions(-)
  

Patch

diff --git a/framework/packet.py b/framework/packet.py
index c44c955f..02b9f885 100644
--- a/framework/packet.py
+++ b/framework/packet.py
@@ -27,9 +27,9 @@  from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, IPv6ExtHdrRouting
 from scapy.layers.l2 import ARP, GRE, Dot1Q, Ether
 from scapy.layers.sctp import SCTP
 from scapy.layers.vxlan import VXLAN
-from scapy.packet import Raw
+from scapy.packet import Padding, Raw
 from scapy.sendrecv import sendp
-from scapy.utils import hexstr, rdpcap, wrpcap
+from scapy.utils import hexstr, randstring, rdpcap, wrpcap
 
 from .utils import convert_int2ip, convert_ip2int, get_module_path
 
@@ -127,6 +127,152 @@  def write_raw_pkt(pkt_str, file_name):
         w.close()
 
 
+def segment_bytes(payload: bytes, seg_len: int) -> list:
+    """
+    Segment a bytes according to the seg_len.
+    payload: bytes, a sequence to be segmented.
+    seg_len: int, the length of segmentation.
+    return: list, a list of segmented bytes.
+    """
+    segments = []
+    if seg_len > 0:
+        while payload:
+            _s = payload[:seg_len]
+            payload = payload[seg_len:]
+            segments.append(_s)
+    else:
+        segments = [payload]
+    return segments
+
+
+def get_packet_layer_index_oip_ol4_tun_ip_l4(
+    pkt, support_rx_tunnel: bool = True
+) -> tuple:
+    """
+    Get the layer index of outer-ip, outer-l4, tunnel, inner-ip, inner-l4 in a packet.
+    The index counting follows the DPDK rule, that if a non-tunnel packet is counted,
+    the ip / l4 layer are considered the inner ip / l4 layers.
+    If a layer is not found, it will return None for that index.
+    This method asserts that a tunnel layer is to transfer a L3 tunneled packet over a L3
+    or higher network, which means a tunnel packet is laid between an outer IP layer and an
+    inner IP layer.
+    pkt: Packet, the Scapy Packet object to be counted.
+    support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /
+    L4 layers will be considered the only L3 / L4 layers.
+    return: tuple, indexes of outer IP, outer L4, tunnel, inner IP, inner L4.
+    """
+    _LAYERS_L3 = {IP, IPv6}
+    _LAYERS_L4 = {UDP, TCP, SCTP}
+    _layers = pkt.layers()
+    _index_tunnel = get_packet_layer_index_tunnel(pkt)
+    _index_oip = None
+    _index_ol4 = None
+    _index_ip = None
+    _index_l4 = None
+    if _index_tunnel is None:
+        for _i, _l in enumerate(_layers):
+            if _l in _LAYERS_L3:
+                _index_ip = _i
+            if _l in _LAYERS_L4:
+                _index_l4 = _i
+    else:
+        for _i in range(0, _index_tunnel + 1):
+            _l = _layers[_i]
+            if _l in _LAYERS_L3:
+                _index_oip = _i
+            if _l in _LAYERS_L4:
+                _index_ol4 = _i
+        for _i in range(_index_tunnel + 1, len(_layers)):
+            _l = _layers[_i]
+            if _l in _LAYERS_L3:
+                _index_ip = _i
+            if _l in _LAYERS_L4:
+                _index_l4 = _i
+    if not support_rx_tunnel and not _index_tunnel is None:
+        (_index_oip, _index_ol4, _index_tunnel, _index_ip, _index_l4) = (
+            None,
+            None,
+            None,
+            _index_oip,
+            _index_ol4,
+        )
+    return _index_oip, _index_ol4, _index_tunnel, _index_ip, _index_l4
+
+
+def get_packet_layer_index_to_tso_seg(
+    pkt,
+    support_rx_tunnel: bool = True,
+    support_seg_non_tunnel: bool = True,
+    support_seg_tunnel: bool = True,
+    support_tso: bool = True,
+    support_ufo: bool = True,
+) -> int:
+    """
+    Get the layer index of TSO in a packet.
+    In DPDK TSO is considered Transmittion Segmentation Offload. That both TCP
+    and UDP are considered to be segemeted.
+    pkt: Packt, Scapy Packet object to be segmeted.
+    support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /
+    L4 layers will be considered the only L3 / L4 layers.
+    support_seg_non_tunnel: bool, flag indicating if a non-tunnel packet is to segment.
+    support_seg_tunnel: bool, flag indicating if a tunnel packet is to segment.
+    support_tso: bool, flag indicating if TCP Segmentation is enabled.
+    support_ufd: bool, flag indicating if UDP Fragmentation is enabled.
+    return: int, the index of the layer to be segmented in the packet.
+    """
+    (
+        _index_oip,
+        _index_ol4,
+        _index_tunnel,
+        _index_ip,
+        _index_l4,
+    ) = get_packet_layer_index_oip_ol4_tun_ip_l4(
+        pkt, support_rx_tunnel=support_rx_tunnel
+    )
+    if support_seg_tunnel and not _index_tunnel is None:
+        if not _index_l4 is None and support_tso and isinstance(pkt[_index_l4], TCP):
+            return _index_l4
+        if not _index_l4 is None and support_ufo and isinstance(pkt[_index_l4], UDP):
+            return _index_l4
+    if support_seg_non_tunnel and _index_tunnel is None:
+        if not _index_l4 is None and support_tso and isinstance(pkt[_index_l4], TCP):
+            return _index_l4
+        if not _index_l4 is None and support_ufo and isinstance(pkt[_index_l4], UDP):
+            return _index_l4
+    return None
+
+
+def get_packet_layer_index_tunnel(pkt):
+    """
+    Get the layer index of tunnel layer in a packet.
+    If a layer is not found, it will return None for that index.
+    This method asserts that a tunnel layer is to transfer a L3 tunneled packet over a L3
+    or higher network, which means a tunnel packet is laid between an outer IP layer and an
+    inner IP layer.
+    pkt: Packet, Scapy Packet object to be indexed.
+    return: int | None, the index of the tunnel layer in the packet.
+    """
+    # packet types
+    _LAYERS_TUNNEL = {VXLAN, GRE, GTP_U_Header, GENEVE}
+
+    _layers = pkt.layers()
+    for _i, _l in enumerate(_layers):
+        _layers_outer = _layers[: _i + 1]
+        _layers_inner = _layers[_i + 1 :]
+        if (
+            (_l in _LAYERS_TUNNEL)
+            and (IP in _layers_outer or IPv6 in _layers_outer)
+            and (IP in _layers_inner or IPv6 in _layers_inner)
+        ):
+            return _i
+    for _i, _l in enumerate(_layers):
+        if (_l is IP or _l is IPv6) and (
+            _layers[_i + 1] is IP or _layers[_i + 1] is IPv6
+        ):
+            return _i
+    return None
+
+
 class scapy(object):
     SCAPY_LAYERS = {
         "ether": Ether(dst="ff:ff:ff:ff:ff:ff"),
@@ -1188,6 +1334,12 @@  def compare_pktload(pkt1=None, pkt2=None, layer="L2"):
 
 
 def strip_pktload(pkt=None, layer="L2", p_index=0):
+    """
+    Strip the payload of a specific layer indicated by `layer` in a pakcet.
+    pkt: Packet, DTS Packet object to be stripped.
+    layer: str, the layer to be stripped.
+    return: str, the stripped layer formatted in hex string.
+    """
     if layer == "L2":
         l_idx = 0
     elif layer == "L3":
@@ -1205,6 +1357,708 @@  def strip_pktload(pkt=None, layer="L2", p_index=0):
     return load
 
 
+def get_packet_payload_of(pkt, layer: int) -> bytes:
+    """
+    Get the payload of a layer in a packet.
+    pkt: Packet, Scapy Packet object to get the payload of.
+    layer: int, the layer index to get the payload in the packet.
+    return: bytes, the payload of a layer.
+    """
+    if layer is None:
+        return None
+    pkt_payload = pkt.copy()
+    if Padding in pkt_payload:
+        _index_padding = pkt_payload.layers().index(Padding)
+        pkt_payload[_index_padding - 1].remove_payload()
+        if layer >= _index_padding:
+            return None
+    return bytes(pkt_payload[layer].payload)
+
+
+def get_packet_payload(
+    pkt,
+    support_rx_tunnel: bool = True,
+) -> bytes:
+    """
+    Get the payload of a packet.
+    pkt: Packet, Scapy Packet object to get the payload of.
+    support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /
+    L4 layers will be considered the only L3 / L4 layers.
+    return: bytes, the payload of a packet. If the packet has an L4, the payload is the load of
+    L4, or it will be the load of L3 if L3 exists, else it returns None.
+    """
+    (
+        _index_oip,
+        _index_ol4,
+        _index_tunnel,
+        _index_ip,
+        _index_l4,
+    ) = get_packet_layer_index_oip_ol4_tun_ip_l4(
+        pkt, support_rx_tunnel=support_rx_tunnel
+    )
+    if not _index_l4 is None:
+        return get_packet_payload_of(pkt, _index_l4)
+    elif not _index_ip is None:
+        return get_packet_payload_of(pkt, _index_ip)
+    else:
+        return None
+
+
+def get_packet_payload_no_tso_seg(
+    pkt,
+    support_rx_tunnel: bool = True,
+    support_seg_non_tunnel: bool = True,
+    support_seg_tunnel: bool = True,
+    support_tso: bool = True,
+    support_ufo: bool = True,
+) -> bytes:
+    """
+    Get the payload of the layer to be segmented of a packet, not segmented.
+    pkt: Packet, Scapy Packet object to get the payload of.
+    support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /
+    L4 layers will be considered the only L3 / L4 layers.
+    support_seg_non_tunnel: bool, flag indicating if a non-tunnel packet is to segment.
+    support_seg_tunnel: bool, flag indicating if a tunnel packet is to segment.
+    support_tso: bool, flag indicating if TCP Segmentation is enabled.
+    support_ufd: bool, flag indicating if UDP Fragmentation is enabled.
+    return: bytes, the payload of the layer to be segmented of a packet. If no layer is to be
+    segmented, it returns None.
+    """
+    _index_seg = get_packet_layer_index_to_tso_seg(
+        pkt,
+        support_rx_tunnel=support_rx_tunnel,
+        support_seg_non_tunnel=support_seg_non_tunnel,
+        support_seg_tunnel=support_seg_tunnel,
+        support_tso=support_tso,
+        support_ufo=support_ufo,
+    )
+    if not _index_seg is None:
+        return get_packet_payload_of(pkt, _index_seg)
+    else:
+        return None
+
+
+def get_packet_payload_pre_tso_seg(
+    pkt,
+    support_rx_tunnel: bool = True,
+    support_seg_non_tunnel: bool = True,
+    support_seg_tunnel: bool = True,
+    support_tso: bool = True,
+    support_ufo: bool = True,
+) -> bytes:
+    """
+    Get the payload of a packet before considering it is segmented.
+    pkt: Packet, Scapy Packet object to get the payload of.
+    support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /
+    L4 layers will be considered the only L3 / L4 layers.
+    support_seg_non_tunnel: bool, flag indicating if a non-tunnel packet is to segment.
+    support_seg_tunnel: bool, flag indicating if a tunnel packet is to segment.
+    support_tso: bool, flag indicating if TCP Segmentation is enabled.
+    support_ufd: bool, flag indicating if UDP Fragmentation is enabled.
+    return: bytes, the payload of the packet. If it can be segmented then returns the not
+    segmented payload, else it returns the payload despite the segmentation flags.
+    """
+    _payload_no_seg = get_packet_payload_no_tso_seg(
+        pkt,
+        support_rx_tunnel=support_rx_tunnel,
+        support_seg_non_tunnel=support_seg_non_tunnel,
+        support_seg_tunnel=support_seg_tunnel,
+        support_tso=support_tso,
+        support_ufo=support_ufo,
+    )
+    _payload = get_packet_payload(
+        pkt,
+        support_rx_tunnel=support_rx_tunnel,
+    )
+    if not _payload_no_seg is None:
+        return _payload_no_seg
+    else:
+        return _payload
+
+
+def get_packet_payload_post_tso_seg(
+    pkt,
+    seg_len: int = 800,
+    support_rx_tunnel: bool = True,
+    support_seg_non_tunnel: bool = True,
+    support_seg_tunnel: bool = True,
+    support_tso: bool = True,
+    support_ufo: bool = True,
+) -> list:
+    """
+    Get the list of payloads of a packet after it is segmented.
+    pkt: Packet, Scapy Packet object to get the payload of.
+    support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /
+    L4 layers will be considered the only L3 / L4 layers.
+    support_seg_non_tunnel: bool, flag indicating if a non-tunnel packet is to segment.
+    support_seg_tunnel: bool, flag indicating if a tunnel packet is to segment.
+    support_tso: bool, flag indicating if TCP Segmentation is enabled.
+    support_ufd: bool, flag indicating if UDP Fragmentation is enabled.
+    return: list, sequence of the payloads after applying TSO to a packet. If a packet is not
+    to segment, it returns a list of only the payload. If the packet is not a proper packet, it
+    returns an empty list.
+    """
+    _payload_no_seg = get_packet_payload_no_tso_seg(
+        pkt,
+        support_rx_tunnel=support_rx_tunnel,
+        support_seg_non_tunnel=support_seg_non_tunnel,
+        support_seg_tunnel=support_seg_tunnel,
+        support_tso=support_tso,
+        support_ufo=support_ufo,
+    )
+    _payload = get_packet_payload(
+        pkt,
+        support_rx_tunnel=support_rx_tunnel,
+    )
+    if not _payload_no_seg is None:
+        _segments = segment_bytes(_payload_no_seg, seg_len)
+    elif not _payload is None:
+        _segments = segment_bytes(_payload, -1)
+    else:
+        _segments = []
+    return _segments
+
+
+def get_packet_tso_seg_tx_flag(
+    pkt,
+    seg_len: int = 800,
+    support_rx_tunnel: bool = True,
+    support_seg_non_tunnel: bool = True,
+    support_seg_tunnel: bool = True,
+    support_tso: bool = True,
+    support_ufo: bool = True,
+) -> list:
+    """
+    Get the list of TSO TX offload flags of a packet.
+    pkt: Packet, Scapy Packet object to get the offload flag of.
+    support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /
+    L4 layers will be considered the only L3 / L4 layers.
+    support_seg_non_tunnel: bool, flag indicating if a non-tunnel packet is to segment.
+    support_seg_tunnel: bool, flag indicating if a tunnel packet is to segment.
+    support_tso: bool, flag indicating if TCP Segmentation is enabled.
+    support_ufd: bool, flag indicating if UDP Fragmentation is enabled.
+    return: list, set of offload flags formatted in a list of str.
+    """
+    _index_seg = get_packet_layer_index_to_tso_seg(
+        pkt,
+        support_rx_tunnel=support_rx_tunnel,
+        support_seg_non_tunnel=support_seg_non_tunnel,
+        support_seg_tunnel=support_seg_tunnel,
+        support_tso=support_tso,
+        support_ufo=support_ufo,
+    )
+    _flags = []
+    if not _index_seg is None and len(get_packet_payload_of(pkt, _index_seg)) > seg_len:
+        if isinstance(pkt[_index_seg], TCP):
+            _flags.append("RTE_MBUF_F_TX_TCP_SEG")
+        elif isinstance(pkt[_index_seg], UDP):
+            _flags.append("RTE_MBUF_F_TX_UDP_SEG")
+    return _flags
+
+
+def get_packet_checksum_of(
+    pkt,
+    layer: int,
+    good_checksum: bool = False,
+) -> int:
+    """
+    Get the checksum of a specific layer in a packet.
+    pkt: Packet, Scapy Packet object to get the checksum of.
+    layer: int, the layer index to get the checksum in the packet.
+    good_checksum: bool, flag indicating if to get the corrected checksum or to get the raw one.
+    return: int, the checksum of the packet at the layer.
+    """
+    _re_checksum_pattern = re.compile(r"chksum\s*=\s*(0x[0-9a-fA-F]+|None)")
+    if layer is None:
+        return None
+    p_chksum = pkt[layer].copy()
+    if "chksum" in p_chksum.fieldtype:
+        if not good_checksum:
+            checksum_list_str = _re_checksum_pattern.findall(p_chksum.show2(dump=True))
+            return int(checksum_list_str[0], 16)
+        else:
+            p_chksum.chksum = None
+            checksum_list_str = _re_checksum_pattern.findall(p_chksum.show2(dump=True))
+            return int(checksum_list_str[0], 16)
+    else:
+        return None
+
+
+def get_packet_checksums(
+    pkt,
+    good_checksum: bool = False,
+    support_rx_tunnel: bool = True,
+) -> tuple:
+    """
+    Get the checksum list of a packet.
+    This method calculates the checksum of the whole packet. This means if an inner layer has a
+    bad checksum, it will be corrected then to calculate the outer checksum if `good_checksum` is
+    set.
+    If a layer is not found, it will return None for that layer.
+    pkt: Packet, Scapy Packet object to get the checksum of.
+    good_checksum: bool, flag indicating if to get the corrected checksum or to get the raw one.
+    support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /
+    L4 layers will be considered the only L3 / L4 layers.
+    return: tuple, checksums of outer IP, outer L4, inner IP, inner L4 in the packet.
+    """
+    (
+        _index_oip,
+        _index_ol4,
+        _index_tunnel,
+        _index_ip,
+        _index_l4,
+    ) = get_packet_layer_index_oip_ol4_tun_ip_l4(
+        pkt, support_rx_tunnel=support_rx_tunnel
+    )
+    p_chksum = pkt.copy()
+    if good_checksum:
+        for _i in [_index_oip, _index_ol4, _index_ip, _index_l4]:
+            if not _i is None and "chksum" in p_chksum[_i].fieldtype:
+                p_chksum[_i].chksum = None
+    return (
+        get_packet_checksum_of(p_chksum, _index_oip),
+        get_packet_checksum_of(p_chksum, _index_ol4),
+        get_packet_checksum_of(p_chksum, _index_ip),
+        get_packet_checksum_of(p_chksum, _index_l4),
+    )
+
+
+def get_packet_checksum_of_each(
+    pkt,
+    good_checksum: bool = False,
+    support_rx_tunnel: bool = True,
+) -> tuple:
+    """
+    Get the checksum list of a packet.
+    This method calculates the checksums of each layer seperately. This means if an inner layer
+    has a bad checksum, an outer checksum will be calculated based on the bad inner checksum if
+    `good_checksum` is set.
+    If a layer is not found, it will return None for that layer.
+    pkt: Packet, Scapy Packet object to get the checksum of.
+    good_checksum: bool, flag indicating if to get the corrected checksum or to get the raw one.
+    support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /
+    L4 layers will be considered the only L3 / L4 layers.
+    return: tuple, checksums of outer IP, outer L4, inner IP, inner L4 in the packet.
+    """
+    (
+        _index_oip,
+        _index_ol4,
+        _index_tunnel,
+        _index_ip,
+        _index_l4,
+    ) = get_packet_layer_index_oip_ol4_tun_ip_l4(
+        pkt, support_rx_tunnel=support_rx_tunnel
+    )
+    return (
+        get_packet_checksum_of(pkt, _index_oip, good_checksum=good_checksum),
+        get_packet_checksum_of(pkt, _index_ol4, good_checksum=good_checksum),
+        get_packet_checksum_of(pkt, _index_ip, good_checksum=good_checksum),
+        get_packet_checksum_of(pkt, _index_l4, good_checksum=good_checksum),
+    )
+
+
+def get_packet_checksum_rx_stat(pkt, support_rx_tunnel: bool = True) -> tuple:
+    """
+    Get the checksum stats of a packet.
+    pkt: Packet, Scapy Packet object to get the checksum stats of.
+    support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /
+    L4 layers will be considered the only L3 / L4 layers.
+    return: tuple, checksum stats of bad outer IP, bad outer L4, bad inner IP, bad inner L4.
+    """
+    (
+        _index_oip,
+        _index_ol4,
+        _index_tunnel,
+        _index_ip,
+        _index_l4,
+    ) = get_packet_layer_index_oip_ol4_tun_ip_l4(
+        pkt, support_rx_tunnel=support_rx_tunnel
+    )
+    _bad_oip, _bad_ol4, _bad_ip, _bad_l4 = 0, 0, 0, 0
+    if not _index_oip is None and get_packet_checksum_of(
+        pkt, _index_oip
+    ) != get_packet_checksum_of(pkt, _index_oip, good_checksum=True):
+        _bad_oip = 1
+    if not _index_oip is None and get_packet_checksum_of(
+        pkt, _index_ol4
+    ) != get_packet_checksum_of(pkt, _index_ol4, good_checksum=True):
+        _bad_ol4 = 1
+    if not _index_ip is None and get_packet_checksum_of(
+        pkt, _index_ip
+    ) != get_packet_checksum_of(pkt, _index_ip, good_checksum=True):
+        _bad_ip = 1
+    if not _index_l4 is None and get_packet_checksum_of(
+        pkt, _index_l4
+    ) != get_packet_checksum_of(pkt, _index_l4, good_checksum=True):
+        _bad_l4 = 1
+    return _bad_oip, _bad_ol4, _bad_ip, _bad_l4
+
+
+def get_packet_checksum_rx_olflag(pkt, support_rx_tunnel: bool = True) -> list:
+    """
+    Get the checksum RX offload flags of a packet.
+    pkt: Packet, Scapy Packet object to get the checksum RX offload flag of.
+    support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 /
+    L4 layers will be considered the only L3 / L4 layers.
+    return: list, set of offload flags formated in a list of str.
+    """
+    _FLAG_PAT = "RTE_MBUF_F_RX_%s_CKSUM_%s"
+    (
+        _index_oip,
+        _index_ol4,
+        _index_tunnel,
+        _index_ip,
+        _index_l4,
+    ) = get_packet_layer_index_oip_ol4_tun_ip_l4(
+        pkt, support_rx_tunnel=support_rx_tunnel
+    )
+    _flags = []
+    if not _index_oip is None and get_packet_checksum_of(
+        pkt, _index_oip
+    ) != get_packet_checksum_of(pkt, _index_oip, good_checksum=True):
+        _flags.append(_FLAG_PAT % ("OUTER_IP", "BAD"))
+    else:
+        ## OUTER_IP_CKSUM_GOOD is default not shown
+        pass
+    if not _index_ol4 is None and get_packet_checksum_of(
+        pkt, _index_ol4
+    ) != get_packet_checksum_of(pkt, _index_ol4, good_checksum=True):
+        _flags.append(_FLAG_PAT % ("OUTER_L4", "BAD"))
+    elif not _index_ol4 is None:
+        _flags.append(_FLAG_PAT % ("OUTER_L4", "GOOD"))
+    elif _index_ol4 is None:
+        pass
+    else:
+        pass
+    if get_packet_checksum_of(pkt, _index_ip) != get_packet_checksum_of(
+        pkt, _index_ip, good_checksum=True
+    ):
+        _flags.append(_FLAG_PAT % ("IP", "BAD"))
+    else:
+        _flags.append(_FLAG_PAT % ("IP", "GOOD"))
+    if get_packet_checksum_of(pkt, _index_l4) != get_packet_checksum_of(
+        pkt, _index_l4, good_checksum=True
+    ):
+        _flags.append(_FLAG_PAT % ("L4", "BAD"))
+    else:
+        _flags.append(_FLAG_PAT % ("L4", "GOOD"))
+    return _flags
+
+
+def get_packets_payload_pre_seg_list(
+    pkts: list,
+    support_rx_tunnel: bool = True,
+    support_seg_non_tunnel: bool = True,
+    support_seg_tunnel: bool = True,
+    support_tso: bool = True,
+    support_ufo: bool = True,
+) -> list:
+    """
+    Get the payload of a seq of packets before considering it is segmented.
+    """
+    return [
+        get_packet_payload_pre_tso_seg(
+            _p,
+            support_rx_tunnel=support_rx_tunnel,
+            support_seg_non_tunnel=support_seg_non_tunnel,
+            support_seg_tunnel=support_seg_tunnel,
+            support_tso=support_tso,
+            support_ufo=support_ufo,
+        )
+        for _p in pkts
+    ]
+
+
+def get_packets_payload_post_seg_list(
+    pkts: list,
+    seg_len: int = 800,
+    support_rx_tunnel: bool = True,
+    support_seg_non_tunnel: bool = True,
+    support_seg_tunnel: bool = True,
+    support_tso: bool = True,
+    support_ufo: bool = True,
+) -> list:
+    """
+    Get the list of payloads of a seq of packets after it is segmented.
+    """
+    return [
+        get_packet_payload_post_tso_seg(
+            _p,
+            seg_len=seg_len,
+            support_rx_tunnel=support_rx_tunnel,
+            support_seg_non_tunnel=support_seg_non_tunnel,
+            support_seg_tunnel=support_seg_tunnel,
+            support_tso=support_tso,
+            support_ufo=support_ufo,
+        )
+        for _p in pkts
+    ]
+
+
+def get_packets_checksum_rx_stat_list(
+    pkts: list, support_rx_tunnel: bool = True
+) -> list:
+    """
+    Get the checksum stats of a set of packet.
+    """
+    k_bad_ip = "Bad-ipcsum"
+    k_bad_l4 = "Bad-l4csum"
+    k_bad_oip = "Bad-outer-ipcsum"
+    k_bad_ol4 = "Bad-outer-l4csum"
+    k_rx = "RX-total"
+    stats = {
+        k_bad_ip: 0,
+        k_bad_l4: 0,
+        k_bad_oip: 0,
+        k_bad_ol4: 0,
+        k_rx: 0,
+    }
+    for pkt in pkts:
+        _bad_oip, _bad_ol4, _bad_ip, _bad_l4 = get_packet_checksum_rx_stat(
+            pkt, support_rx_tunnel=support_rx_tunnel
+        )
+        stats[k_bad_oip] += _bad_oip
+        stats[k_bad_ol4] += _bad_ol4
+        stats[k_bad_ip] += _bad_ip
+        stats[k_bad_l4] += _bad_l4
+        stats[k_rx] += 1
+    return stats
+
+
+def get_packets_tso_seg_tx_flag_list(
+    pkts: list,
+    seg_len: int = 800,
+    support_rx_tunnel: bool = True,
+    support_seg_non_tunnel: bool = True,
+    support_seg_tunnel: bool = True,
+    support_tso: bool = True,
+    support_ufo: bool = True,
+) -> list:
+    """
+    Get the list of TSO TX offload flags of a seq of packet.
+    """
+    flag_list = []
+    for pkt in pkts:
+        flag_list.append(
+            get_packet_tso_seg_tx_flag(
+                pkt,
+                seg_len=seg_len,
+                support_rx_tunnel=support_rx_tunnel,
+                support_seg_non_tunnel=support_seg_non_tunnel,
+                support_seg_tunnel=support_seg_tunnel,
+                support_tso=support_tso,
+                support_ufo=support_ufo,
+            )
+        )
+    return flag_list
+
+
+def get_packets_checksum_rx_olflag_list(
+    pkts: list, support_rx_tunnel: bool = True
+) -> list:
+    """
+    Get the checksum RX offload flags of a seq of packets.
+    """
+    flag_list = []
+    for pkt in pkts:
+        flag_list.append(
+            get_packet_checksum_rx_olflag(pkt, support_rx_tunnel=support_rx_tunnel)
+        )
+    return flag_list
+
+
+def check_packet_segment(
+    pkt,
+    segments: list,
+    seg_len: int,
+    support_rx_tunnel: bool = True,
+    support_seg_non_tunnel: bool = True,
+    support_seg_tunnel: bool = True,
+    support_tso: bool = True,
+    support_ufo: bool = True,
+) -> bool:
+    """
+    Check the packet segmentation.
+    """
+    payload_expected = get_packet_payload_pre_tso_seg(
+        pkt,
+        support_rx_tunnel=support_rx_tunnel,
+        support_seg_non_tunnel=support_seg_non_tunnel,
+        support_seg_tunnel=support_seg_tunnel,
+        support_tso=support_tso,
+        support_ufo=support_ufo,
+    )
+    segments_expected = get_packet_payload_post_tso_seg(
+        pkt,
+        seg_len=seg_len,
+        support_rx_tunnel=support_rx_tunnel,
+        support_seg_non_tunnel=support_seg_non_tunnel,
+        support_seg_tunnel=support_seg_tunnel,
+        support_tso=support_tso,
+        support_ufo=support_ufo,
+    )
+    if not segments:
+        return False
+    if segments == segments_expected:
+        return True
+    else:
+        payload_segmented = b""
+        for _i, _seg in enumerate(segments, 1):
+            if len(segments_expected) > 1 and len(_seg) > seg_len:
+                return False
+            payload_segmented += _seg
+        return payload_expected == payload_segmented
+
+
+def check_packet_checksum_of(pkt: Packet, layer: int, checksum_ref: int = None) -> bool:
+    """
+    Check the packet checksum of a specific layer indicated by `layer`.
+    """
+    if checksum_ref is None:
+        checksum_ref = get_packet_checksum_of(pkt, layer, good_checksum=True)
+    return get_packet_checksum_of(pkt, layer) == checksum_ref
+
+
+def check_packet_checksums(
+    pkt,
+    checksum_ref_list: list = None,
+    support_rx_tunnel: bool = True,
+) -> bool:
+    """
+    Check the packet checksum.
+    pkt: checked packet;
+    checksum_ref_list: refering checksum list, recalculate the whole packet for correct checksum in default;
+    return: if this packet is in good checksum.
+    """
+    if checksum_ref_list is None:
+        checksum_ref_list = get_packet_checksums(
+            pkt, support_rx_tunnel=support_rx_tunnel, good_checksum=True
+        )
+    return (
+        get_packet_checksums(pkt, support_rx_tunnel=support_rx_tunnel)
+        == checksum_ref_list
+    )
+
+
+def check_packet_checksum_of_each(
+    pkt,
+    checksum_ref_list: list = None,
+    support_rx_tunnel: bool = True,
+    allow_zero_outer_ip: bool = False,
+    allow_zero_outer_udp: bool = False,
+    allow_zero_inner_ip: bool = False,
+    allow_zero_inner_udp: bool = False,
+    allow_zero_inner_tcp: bool = False,
+    allow_zero_inner_sctp: bool = False,
+) -> tuple:
+    """
+    Check the packet checksum of each layer.
+    pkt: checked packet;
+    checksum_ref_list: refering checksum list, recalculate each layer seperately for correct checksum in default;
+    allow_zero_*: allowing zero checksum validated passed on specific layer;
+    return: if this packet is validated passed on outer-ip, outer-l4, inner-ip, inner-l4 layer checksum.
+    """
+    (
+        _index_oip,
+        _index_ol4,
+        _index_tunnel,
+        _index_ip,
+        _index_l4,
+    ) = get_packet_layer_index_oip_ol4_tun_ip_l4(
+        pkt, support_rx_tunnel=support_rx_tunnel
+    )
+    if checksum_ref_list and len(checksum_ref_list) == 2:
+        (_checksum_ref_ip, _checksum_ref_l4) = (
+            checksum_ref_list[0],
+            checksum_ref_list[1],
+        )
+        return (
+            True,
+            True,
+            check_packet_checksum_of(pkt, _index_ip, _checksum_ref_ip),
+            check_packet_checksum_of(pkt, _index_l4, _checksum_ref_l4),
+        )
+    elif checksum_ref_list and len(checksum_ref_list) == 4:
+        (_checksum_ref_oip, _checksum_ref_ol4, _checksum_ref_ip, _checksum_ref_l4) = (
+            checksum_ref_list[0],
+            checksum_ref_list[1],
+            checksum_ref_list[2],
+            checksum_ref_list[3],
+        )
+        return (
+            check_packet_checksum_of(pkt, _index_oip, _checksum_ref_oip),
+            check_packet_checksum_of(pkt, _index_ol4, _checksum_ref_ol4),
+            check_packet_checksum_of(pkt, _index_ip, _checksum_ref_ip),
+            check_packet_checksum_of(pkt, _index_l4, _checksum_ref_l4),
+        )
+    elif checksum_ref_list:
+        raise ValueError("Wrong checksum ref list length.")
+    else:
+        _verified_l4 = (
+            _index_l4 is None
+            or check_packet_checksum_of(pkt, _index_l4)
+            or (
+                not _index_l4 is None
+                and allow_zero_inner_udp
+                and isinstance(pkt[_index_l4], UDP)
+                and check_packet_checksum_of(pkt, _index_l4, 0x00)
+            )
+            or (
+                not _index_l4 is None
+                and allow_zero_inner_tcp
+                and isinstance(pkt[_index_l4], TCP)
+                and check_packet_checksum_of(pkt, _index_l4, 0x00)
+            )
+            or (
+                not _index_l4 is None
+                and allow_zero_inner_sctp
+                and isinstance(pkt[_index_l4], SCTP)
+                and check_packet_checksum_of(pkt, _index_l4, 0x0000)
+            )
+        )
+        _verified_ip = (
+            _index_ip is None
+            or check_packet_checksum_of(pkt, _index_ip)
+            or (
+                not _index_ip is None
+                and allow_zero_inner_ip
+                and isinstance(pkt[_index_ip], IP)
+                and check_packet_checksum_of(pkt, _index_ip, 0x00)
+            )
+        )
+        _verified_ol4 = (
+            _index_ol4 is None
+            or check_packet_checksum_of(pkt, _index_ol4)
+            or (
+                not _index_ol4 is None
+                and allow_zero_outer_udp
+                and isinstance(pkt[_index_ol4], UDP)
+                and get_packet_checksum_of(pkt, _index_ol4) == 0x00
+            )
+        )
+        _verified_oip = (
+            _index_oip is None
+            or check_packet_checksum_of(pkt, _index_oip)
+            or (
+                not _index_oip is None
+                and allow_zero_outer_ip
+                and isinstance(pkt[_index_oip], IP)
+                and get_packet_checksum_of(pkt, _index_oip) == 0x00
+            )
+        )
+        return _verified_oip, _verified_ol4, _verified_ip, _verified_l4
+
+
+def check_packet_verbose(verbose: str, verbose_ref_list: list) -> bool:
+    """
+    Check the verbose with the reference list.
+    """
+    for _ref in verbose_ref_list:
+        if not _ref in verbose:
+            return False
+    return True
+
+
 ###############################################################################
 ###############################################################################
 if __name__ == "__main__":