From patchwork Wed Jul 12 19:31:24 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Ke Xu X-Patchwork-Id: 129509 Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by inbox.dpdk.org (Postfix) with ESMTP id 28A9B42E57; Wed, 12 Jul 2023 21:33:24 +0200 (CEST) Received: from mails.dpdk.org (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id 23C8140EE7; Wed, 12 Jul 2023 21:33:24 +0200 (CEST) Received: from mga06.intel.com (mga06b.intel.com [134.134.136.31]) by mails.dpdk.org (Postfix) with ESMTP id 9157E400D5 for ; Wed, 12 Jul 2023 21:33:21 +0200 (CEST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=intel.com; i=@intel.com; q=dns/txt; s=Intel; t=1689190402; x=1720726402; h=from:to:cc:subject:date:message-id:in-reply-to: references:mime-version:content-transfer-encoding; bh=3/cjcLq0mbnwq9mJKmDsdMNzo102YORPF1rZg+IA5YY=; b=XG0srs4lCacSSx/Nfr40aHtVI+SBO/wJvOctahHOcQu2qXsuwdWOxuFI u4IY/MCfdrV1Db/Wy3LAxAAD75cIarEnkXn8Dh97tCC3T9bqNlvht0dAV IaNLXu4getn/YPXZLmup6m4LIWbXDJw0FOPnHPOiVhsueXLef1gn5+1zL 93ZEdcW6kXD6ud7VFPOVZ1cIm6LnaNMgKjUMXDUg7Tn+gz+7iLU7coTvu j8mZPzW0NY3fRvr3AbQ0p7565J+b4yEdMcMvSZtP3dM34YRkC+MgNoY0w R+1z9tjamVd2cdskDCFCSteBABs8PRzasdKLoi+fMUgpfx25KvmEnefsa A==; X-IronPort-AV: E=McAfee;i="6600,9927,10769"; a="428728894" X-IronPort-AV: E=Sophos;i="6.01,200,1684825200"; d="scan'208";a="428728894" Received: from fmsmga004.fm.intel.com ([10.253.24.48]) by orsmga104.jf.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384; 12 Jul 2023 12:33:20 -0700 X-ExtLoop1: 1 X-IronPort-AV: E=McAfee;i="6600,9927,10769"; a="791740108" X-IronPort-AV: E=Sophos;i="6.01,200,1684825200"; d="scan'208";a="791740108" Received: from dpdk-xuke-host.sh.intel.com ([10.67.114.220]) by fmsmga004.fm.intel.com with ESMTP; 12 Jul 2023 12:33:19 -0700 From: Ke Xu To: dts@dpdk.org Cc: ke1.xu@intel.com, tarcadia@qq.com Subject: [DTS][Patch V1 2/4] framework/packet: Update packet module for new methods. Date: Wed, 12 Jul 2023 19:31:24 +0000 Message-Id: <20230712193126.1994361-3-ke1.xu@intel.com> X-Mailer: git-send-email 2.34.1 In-Reply-To: <20230712193126.1994361-1-ke1.xu@intel.com> References: <20230712193126.1994361-1-ke1.xu@intel.com> MIME-Version: 1.0 X-BeenThere: dts@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: test suite reviews and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dts-bounces@dpdk.org 1. Add util method segment_bytes. 2. Add methods get_packet_layer_index_oip_ol4_tun_ip_l4, get_packet_layer_index_to_tso_seg, get_packet_layer_index_tunnel for packet layer locating. 3. Add doc string for strip_pktload. 4. Add methods for packet payload getting, payload segmentation and checking. 5. Add methods for packet checksum getting, correction, stating and offload flag checking. Signed-off-by: Ke Xu --- framework/packet.py | 858 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 856 insertions(+), 2 deletions(-) diff --git a/framework/packet.py b/framework/packet.py index c44c955f..02b9f885 100644 --- a/framework/packet.py +++ b/framework/packet.py @@ -27,9 +27,9 @@ from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, IPv6ExtHdrRouting from scapy.layers.l2 import ARP, GRE, Dot1Q, Ether from scapy.layers.sctp import SCTP from scapy.layers.vxlan import VXLAN -from scapy.packet import Raw +from scapy.packet import Padding, Raw from scapy.sendrecv import sendp -from scapy.utils import hexstr, rdpcap, wrpcap +from scapy.utils import hexstr, randstring, rdpcap, wrpcap from .utils import convert_int2ip, convert_ip2int, get_module_path @@ -127,6 +127,152 @@ def write_raw_pkt(pkt_str, file_name): w.close() +def segment_bytes(payload: bytes, seg_len: int) -> list: + """ + Segment a bytes according to the seg_len. + payload: bytes, a sequence to be segmented. + seg_len: int, the length of segmentation. + return: list, a list of segmented bytes. + """ + segments = [] + if seg_len > 0: + while payload: + _s = payload[:seg_len] + payload = payload[seg_len:] + segments.append(_s) + else: + segments = [payload] + return segments + + +def get_packet_layer_index_oip_ol4_tun_ip_l4( + pkt, support_rx_tunnel: bool = True +) -> tuple: + """ + Get the layer index of outer-ip, outer-l4, tunnel, inner-ip, inner-l4 in a packet. + The index counting follows the DPDK rule, that if a non-tunnel packet is counted, + the ip / l4 layer are considered the inner ip / l4 layers. + If a layer is not found, it will return None for that index. + This method asserts that a tunnel layer is to transfer a L3 tunneled packet over a L3 + or higher network, which means a tunnel packet is laid between an outer IP layer and an + inner IP layer. + pkt: Packet, the Scapy Packet object to be counted. + support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 / + L4 layers will be considered the only L3 / L4 layers. + return: tuple, indexes of outer IP, outer L4, tunnel, inner IP, inner L4. + """ + _LAYERS_L3 = {IP, IPv6} + _LAYERS_L4 = {UDP, TCP, SCTP} + _layers = pkt.layers() + _index_tunnel = get_packet_layer_index_tunnel(pkt) + _index_oip = None + _index_ol4 = None + _index_ip = None + _index_l4 = None + if _index_tunnel is None: + for _i, _l in enumerate(_layers): + if _l in _LAYERS_L3: + _index_ip = _i + if _l in _LAYERS_L4: + _index_l4 = _i + else: + for _i in range(0, _index_tunnel + 1): + _l = _layers[_i] + if _l in _LAYERS_L3: + _index_oip = _i + if _l in _LAYERS_L4: + _index_ol4 = _i + for _i in range(_index_tunnel + 1, len(_layers)): + _l = _layers[_i] + if _l in _LAYERS_L3: + _index_ip = _i + if _l in _LAYERS_L4: + _index_l4 = _i + if not support_rx_tunnel and not _index_tunnel is None: + (_index_oip, _index_ol4, _index_tunnel, _index_ip, _index_l4) = ( + None, + None, + None, + _index_oip, + _index_ol4, + ) + return _index_oip, _index_ol4, _index_tunnel, _index_ip, _index_l4 + + +def get_packet_layer_index_to_tso_seg( + pkt, + support_rx_tunnel: bool = True, + support_seg_non_tunnel: bool = True, + support_seg_tunnel: bool = True, + support_tso: bool = True, + support_ufo: bool = True, +) -> int: + """ + Get the layer index of TSO in a packet. + In DPDK TSO is considered Transmittion Segmentation Offload. That both TCP + and UDP are considered to be segemeted. + pkt: Packt, Scapy Packet object to be segmeted. + support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 / + L4 layers will be considered the only L3 / L4 layers. + support_seg_non_tunnel: bool, flag indicating if a non-tunnel packet is to segment. + support_seg_tunnel: bool, flag indicating if a tunnel packet is to segment. + support_tso: bool, flag indicating if TCP Segmentation is enabled. + support_ufd: bool, flag indicating if UDP Fragmentation is enabled. + return: int, the index of the layer to be segmented in the packet. + """ + ( + _index_oip, + _index_ol4, + _index_tunnel, + _index_ip, + _index_l4, + ) = get_packet_layer_index_oip_ol4_tun_ip_l4( + pkt, support_rx_tunnel=support_rx_tunnel + ) + if support_seg_tunnel and not _index_tunnel is None: + if not _index_l4 is None and support_tso and isinstance(pkt[_index_l4], TCP): + return _index_l4 + if not _index_l4 is None and support_ufo and isinstance(pkt[_index_l4], UDP): + return _index_l4 + if support_seg_non_tunnel and _index_tunnel is None: + if not _index_l4 is None and support_tso and isinstance(pkt[_index_l4], TCP): + return _index_l4 + if not _index_l4 is None and support_ufo and isinstance(pkt[_index_l4], UDP): + return _index_l4 + return None + + +def get_packet_layer_index_tunnel(pkt): + """ + Get the layer index of tunnel layer in a packet. + If a layer is not found, it will return None for that index. + This method asserts that a tunnel layer is to transfer a L3 tunneled packet over a L3 + or higher network, which means a tunnel packet is laid between an outer IP layer and an + inner IP layer. + pkt: Packet, Scapy Packet object to be indexed. + return: int | None, the index of the tunnel layer in the packet. + """ + # packet types + _LAYERS_TUNNEL = {VXLAN, GRE, GTP_U_Header, GENEVE} + + _layers = pkt.layers() + for _i, _l in enumerate(_layers): + _layers_outer = _layers[: _i + 1] + _layers_inner = _layers[_i + 1 :] + if ( + (_l in _LAYERS_TUNNEL) + and (IP in _layers_outer or IPv6 in _layers_outer) + and (IP in _layers_inner or IPv6 in _layers_inner) + ): + return _i + for _i, _l in enumerate(_layers): + if (_l is IP or _l is IPv6) and ( + _layers[_i + 1] is IP or _layers[_i + 1] is IPv6 + ): + return _i + return None + + class scapy(object): SCAPY_LAYERS = { "ether": Ether(dst="ff:ff:ff:ff:ff:ff"), @@ -1188,6 +1334,12 @@ def compare_pktload(pkt1=None, pkt2=None, layer="L2"): def strip_pktload(pkt=None, layer="L2", p_index=0): + """ + Strip the payload of a specific layer indicated by `layer` in a pakcet. + pkt: Packet, DTS Packet object to be stripped. + layer: str, the layer to be stripped. + return: str, the stripped layer formatted in hex string. + """ if layer == "L2": l_idx = 0 elif layer == "L3": @@ -1205,6 +1357,708 @@ def strip_pktload(pkt=None, layer="L2", p_index=0): return load +def get_packet_payload_of(pkt, layer: int) -> bytes: + """ + Get the payload of a layer in a packet. + pkt: Packet, Scapy Packet object to get the payload of. + layer: int, the layer index to get the payload in the packet. + return: bytes, the payload of a layer. + """ + if layer is None: + return None + pkt_payload = pkt.copy() + if Padding in pkt_payload: + _index_padding = pkt_payload.layers().index(Padding) + pkt_payload[_index_padding - 1].remove_payload() + if layer >= _index_padding: + return None + return bytes(pkt_payload[layer].payload) + + +def get_packet_payload( + pkt, + support_rx_tunnel: bool = True, +) -> bytes: + """ + Get the payload of a packet. + pkt: Packet, Scapy Packet object to get the payload of. + support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 / + L4 layers will be considered the only L3 / L4 layers. + return: bytes, the payload of a packet. If the packet has an L4, the payload is the load of + L4, or it will be the load of L3 if L3 exists, else it returns None. + """ + ( + _index_oip, + _index_ol4, + _index_tunnel, + _index_ip, + _index_l4, + ) = get_packet_layer_index_oip_ol4_tun_ip_l4( + pkt, support_rx_tunnel=support_rx_tunnel + ) + if not _index_l4 is None: + return get_packet_payload_of(pkt, _index_l4) + elif not _index_ip is None: + return get_packet_payload_of(pkt, _index_ip) + else: + return None + + +def get_packet_payload_no_tso_seg( + pkt, + support_rx_tunnel: bool = True, + support_seg_non_tunnel: bool = True, + support_seg_tunnel: bool = True, + support_tso: bool = True, + support_ufo: bool = True, +) -> bytes: + """ + Get the payload of the layer to be segmented of a packet, not segmented. + pkt: Packet, Scapy Packet object to get the payload of. + support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 / + L4 layers will be considered the only L3 / L4 layers. + support_seg_non_tunnel: bool, flag indicating if a non-tunnel packet is to segment. + support_seg_tunnel: bool, flag indicating if a tunnel packet is to segment. + support_tso: bool, flag indicating if TCP Segmentation is enabled. + support_ufd: bool, flag indicating if UDP Fragmentation is enabled. + return: bytes, the payload of the layer to be segmented of a packet. If no layer is to be + segmented, it returns None. + """ + _index_seg = get_packet_layer_index_to_tso_seg( + pkt, + support_rx_tunnel=support_rx_tunnel, + support_seg_non_tunnel=support_seg_non_tunnel, + support_seg_tunnel=support_seg_tunnel, + support_tso=support_tso, + support_ufo=support_ufo, + ) + if not _index_seg is None: + return get_packet_payload_of(pkt, _index_seg) + else: + return None + + +def get_packet_payload_pre_tso_seg( + pkt, + support_rx_tunnel: bool = True, + support_seg_non_tunnel: bool = True, + support_seg_tunnel: bool = True, + support_tso: bool = True, + support_ufo: bool = True, +) -> bytes: + """ + Get the payload of a packet before considering it is segmented. + pkt: Packet, Scapy Packet object to get the payload of. + support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 / + L4 layers will be considered the only L3 / L4 layers. + support_seg_non_tunnel: bool, flag indicating if a non-tunnel packet is to segment. + support_seg_tunnel: bool, flag indicating if a tunnel packet is to segment. + support_tso: bool, flag indicating if TCP Segmentation is enabled. + support_ufd: bool, flag indicating if UDP Fragmentation is enabled. + return: bytes, the payload of the packet. If it can be segmented then returns the not + segmented payload, else it returns the payload despite the segmentation flags. + """ + _payload_no_seg = get_packet_payload_no_tso_seg( + pkt, + support_rx_tunnel=support_rx_tunnel, + support_seg_non_tunnel=support_seg_non_tunnel, + support_seg_tunnel=support_seg_tunnel, + support_tso=support_tso, + support_ufo=support_ufo, + ) + _payload = get_packet_payload( + pkt, + support_rx_tunnel=support_rx_tunnel, + ) + if not _payload_no_seg is None: + return _payload_no_seg + else: + return _payload + + +def get_packet_payload_post_tso_seg( + pkt, + seg_len: int = 800, + support_rx_tunnel: bool = True, + support_seg_non_tunnel: bool = True, + support_seg_tunnel: bool = True, + support_tso: bool = True, + support_ufo: bool = True, +) -> list: + """ + Get the list of payloads of a packet after it is segmented. + pkt: Packet, Scapy Packet object to get the payload of. + support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 / + L4 layers will be considered the only L3 / L4 layers. + support_seg_non_tunnel: bool, flag indicating if a non-tunnel packet is to segment. + support_seg_tunnel: bool, flag indicating if a tunnel packet is to segment. + support_tso: bool, flag indicating if TCP Segmentation is enabled. + support_ufd: bool, flag indicating if UDP Fragmentation is enabled. + return: list, sequence of the payloads after applying TSO to a packet. If a packet is not + to segment, it returns a list of only the payload. If the packet is not a proper packet, it + returns an empty list. + """ + _payload_no_seg = get_packet_payload_no_tso_seg( + pkt, + support_rx_tunnel=support_rx_tunnel, + support_seg_non_tunnel=support_seg_non_tunnel, + support_seg_tunnel=support_seg_tunnel, + support_tso=support_tso, + support_ufo=support_ufo, + ) + _payload = get_packet_payload( + pkt, + support_rx_tunnel=support_rx_tunnel, + ) + if not _payload_no_seg is None: + _segments = segment_bytes(_payload_no_seg, seg_len) + elif not _payload is None: + _segments = segment_bytes(_payload, -1) + else: + _segments = [] + return _segments + + +def get_packet_tso_seg_tx_flag( + pkt, + seg_len: int = 800, + support_rx_tunnel: bool = True, + support_seg_non_tunnel: bool = True, + support_seg_tunnel: bool = True, + support_tso: bool = True, + support_ufo: bool = True, +) -> list: + """ + Get the list of TSO TX offload flags of a packet. + pkt: Packet, Scapy Packet object to get the offload flag of. + support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 / + L4 layers will be considered the only L3 / L4 layers. + support_seg_non_tunnel: bool, flag indicating if a non-tunnel packet is to segment. + support_seg_tunnel: bool, flag indicating if a tunnel packet is to segment. + support_tso: bool, flag indicating if TCP Segmentation is enabled. + support_ufd: bool, flag indicating if UDP Fragmentation is enabled. + return: list, set of offload flags formatted in a list of str. + """ + _index_seg = get_packet_layer_index_to_tso_seg( + pkt, + support_rx_tunnel=support_rx_tunnel, + support_seg_non_tunnel=support_seg_non_tunnel, + support_seg_tunnel=support_seg_tunnel, + support_tso=support_tso, + support_ufo=support_ufo, + ) + _flags = [] + if not _index_seg is None and len(get_packet_payload_of(pkt, _index_seg)) > seg_len: + if isinstance(pkt[_index_seg], TCP): + _flags.append("RTE_MBUF_F_TX_TCP_SEG") + elif isinstance(pkt[_index_seg], UDP): + _flags.append("RTE_MBUF_F_TX_UDP_SEG") + return _flags + + +def get_packet_checksum_of( + pkt, + layer: int, + good_checksum: bool = False, +) -> int: + """ + Get the checksum of a specific layer in a packet. + pkt: Packet, Scapy Packet object to get the checksum of. + layer: int, the layer index to get the checksum in the packet. + good_checksum: bool, flag indicating if to get the corrected checksum or to get the raw one. + return: int, the checksum of the packet at the layer. + """ + _re_checksum_pattern = re.compile(r"chksum\s*=\s*(0x[0-9a-fA-F]+|None)") + if layer is None: + return None + p_chksum = pkt[layer].copy() + if "chksum" in p_chksum.fieldtype: + if not good_checksum: + checksum_list_str = _re_checksum_pattern.findall(p_chksum.show2(dump=True)) + return int(checksum_list_str[0], 16) + else: + p_chksum.chksum = None + checksum_list_str = _re_checksum_pattern.findall(p_chksum.show2(dump=True)) + return int(checksum_list_str[0], 16) + else: + return None + + +def get_packet_checksums( + pkt, + good_checksum: bool = False, + support_rx_tunnel: bool = True, +) -> tuple: + """ + Get the checksum list of a packet. + This method calculates the checksum of the whole packet. This means if an inner layer has a + bad checksum, it will be corrected then to calculate the outer checksum if `good_checksum` is + set. + If a layer is not found, it will return None for that layer. + pkt: Packet, Scapy Packet object to get the checksum of. + good_checksum: bool, flag indicating if to get the corrected checksum or to get the raw one. + support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 / + L4 layers will be considered the only L3 / L4 layers. + return: tuple, checksums of outer IP, outer L4, inner IP, inner L4 in the packet. + """ + ( + _index_oip, + _index_ol4, + _index_tunnel, + _index_ip, + _index_l4, + ) = get_packet_layer_index_oip_ol4_tun_ip_l4( + pkt, support_rx_tunnel=support_rx_tunnel + ) + p_chksum = pkt.copy() + if good_checksum: + for _i in [_index_oip, _index_ol4, _index_ip, _index_l4]: + if not _i is None and "chksum" in p_chksum[_i].fieldtype: + p_chksum[_i].chksum = None + return ( + get_packet_checksum_of(p_chksum, _index_oip), + get_packet_checksum_of(p_chksum, _index_ol4), + get_packet_checksum_of(p_chksum, _index_ip), + get_packet_checksum_of(p_chksum, _index_l4), + ) + + +def get_packet_checksum_of_each( + pkt, + good_checksum: bool = False, + support_rx_tunnel: bool = True, +) -> tuple: + """ + Get the checksum list of a packet. + This method calculates the checksums of each layer seperately. This means if an inner layer + has a bad checksum, an outer checksum will be calculated based on the bad inner checksum if + `good_checksum` is set. + If a layer is not found, it will return None for that layer. + pkt: Packet, Scapy Packet object to get the checksum of. + good_checksum: bool, flag indicating if to get the corrected checksum or to get the raw one. + support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 / + L4 layers will be considered the only L3 / L4 layers. + return: tuple, checksums of outer IP, outer L4, inner IP, inner L4 in the packet. + """ + ( + _index_oip, + _index_ol4, + _index_tunnel, + _index_ip, + _index_l4, + ) = get_packet_layer_index_oip_ol4_tun_ip_l4( + pkt, support_rx_tunnel=support_rx_tunnel + ) + return ( + get_packet_checksum_of(pkt, _index_oip, good_checksum=good_checksum), + get_packet_checksum_of(pkt, _index_ol4, good_checksum=good_checksum), + get_packet_checksum_of(pkt, _index_ip, good_checksum=good_checksum), + get_packet_checksum_of(pkt, _index_l4, good_checksum=good_checksum), + ) + + +def get_packet_checksum_rx_stat(pkt, support_rx_tunnel: bool = True) -> tuple: + """ + Get the checksum stats of a packet. + pkt: Packet, Scapy Packet object to get the checksum stats of. + support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 / + L4 layers will be considered the only L3 / L4 layers. + return: tuple, checksum stats of bad outer IP, bad outer L4, bad inner IP, bad inner L4. + """ + ( + _index_oip, + _index_ol4, + _index_tunnel, + _index_ip, + _index_l4, + ) = get_packet_layer_index_oip_ol4_tun_ip_l4( + pkt, support_rx_tunnel=support_rx_tunnel + ) + _bad_oip, _bad_ol4, _bad_ip, _bad_l4 = 0, 0, 0, 0 + if not _index_oip is None and get_packet_checksum_of( + pkt, _index_oip + ) != get_packet_checksum_of(pkt, _index_oip, good_checksum=True): + _bad_oip = 1 + if not _index_oip is None and get_packet_checksum_of( + pkt, _index_ol4 + ) != get_packet_checksum_of(pkt, _index_ol4, good_checksum=True): + _bad_ol4 = 1 + if not _index_ip is None and get_packet_checksum_of( + pkt, _index_ip + ) != get_packet_checksum_of(pkt, _index_ip, good_checksum=True): + _bad_ip = 1 + if not _index_l4 is None and get_packet_checksum_of( + pkt, _index_l4 + ) != get_packet_checksum_of(pkt, _index_l4, good_checksum=True): + _bad_l4 = 1 + return _bad_oip, _bad_ol4, _bad_ip, _bad_l4 + + +def get_packet_checksum_rx_olflag(pkt, support_rx_tunnel: bool = True) -> list: + """ + Get the checksum RX offload flags of a packet. + pkt: Packet, Scapy Packet object to get the checksum RX offload flag of. + support_rx_tunnel: bool, flag indicating if to parse the tunnel. If false, the outer L3 / + L4 layers will be considered the only L3 / L4 layers. + return: list, set of offload flags formated in a list of str. + """ + _FLAG_PAT = "RTE_MBUF_F_RX_%s_CKSUM_%s" + ( + _index_oip, + _index_ol4, + _index_tunnel, + _index_ip, + _index_l4, + ) = get_packet_layer_index_oip_ol4_tun_ip_l4( + pkt, support_rx_tunnel=support_rx_tunnel + ) + _flags = [] + if not _index_oip is None and get_packet_checksum_of( + pkt, _index_oip + ) != get_packet_checksum_of(pkt, _index_oip, good_checksum=True): + _flags.append(_FLAG_PAT % ("OUTER_IP", "BAD")) + else: + ## OUTER_IP_CKSUM_GOOD is default not shown + pass + if not _index_ol4 is None and get_packet_checksum_of( + pkt, _index_ol4 + ) != get_packet_checksum_of(pkt, _index_ol4, good_checksum=True): + _flags.append(_FLAG_PAT % ("OUTER_L4", "BAD")) + elif not _index_ol4 is None: + _flags.append(_FLAG_PAT % ("OUTER_L4", "GOOD")) + elif _index_ol4 is None: + pass + else: + pass + if get_packet_checksum_of(pkt, _index_ip) != get_packet_checksum_of( + pkt, _index_ip, good_checksum=True + ): + _flags.append(_FLAG_PAT % ("IP", "BAD")) + else: + _flags.append(_FLAG_PAT % ("IP", "GOOD")) + if get_packet_checksum_of(pkt, _index_l4) != get_packet_checksum_of( + pkt, _index_l4, good_checksum=True + ): + _flags.append(_FLAG_PAT % ("L4", "BAD")) + else: + _flags.append(_FLAG_PAT % ("L4", "GOOD")) + return _flags + + +def get_packets_payload_pre_seg_list( + pkts: list, + support_rx_tunnel: bool = True, + support_seg_non_tunnel: bool = True, + support_seg_tunnel: bool = True, + support_tso: bool = True, + support_ufo: bool = True, +) -> list: + """ + Get the payload of a seq of packets before considering it is segmented. + """ + return [ + get_packet_payload_pre_tso_seg( + _p, + support_rx_tunnel=support_rx_tunnel, + support_seg_non_tunnel=support_seg_non_tunnel, + support_seg_tunnel=support_seg_tunnel, + support_tso=support_tso, + support_ufo=support_ufo, + ) + for _p in pkts + ] + + +def get_packets_payload_post_seg_list( + pkts: list, + seg_len: int = 800, + support_rx_tunnel: bool = True, + support_seg_non_tunnel: bool = True, + support_seg_tunnel: bool = True, + support_tso: bool = True, + support_ufo: bool = True, +) -> list: + """ + Get the list of payloads of a seq of packets after it is segmented. + """ + return [ + get_packet_payload_post_tso_seg( + _p, + seg_len=seg_len, + support_rx_tunnel=support_rx_tunnel, + support_seg_non_tunnel=support_seg_non_tunnel, + support_seg_tunnel=support_seg_tunnel, + support_tso=support_tso, + support_ufo=support_ufo, + ) + for _p in pkts + ] + + +def get_packets_checksum_rx_stat_list( + pkts: list, support_rx_tunnel: bool = True +) -> list: + """ + Get the checksum stats of a set of packet. + """ + k_bad_ip = "Bad-ipcsum" + k_bad_l4 = "Bad-l4csum" + k_bad_oip = "Bad-outer-ipcsum" + k_bad_ol4 = "Bad-outer-l4csum" + k_rx = "RX-total" + stats = { + k_bad_ip: 0, + k_bad_l4: 0, + k_bad_oip: 0, + k_bad_ol4: 0, + k_rx: 0, + } + for pkt in pkts: + _bad_oip, _bad_ol4, _bad_ip, _bad_l4 = get_packet_checksum_rx_stat( + pkt, support_rx_tunnel=support_rx_tunnel + ) + stats[k_bad_oip] += _bad_oip + stats[k_bad_ol4] += _bad_ol4 + stats[k_bad_ip] += _bad_ip + stats[k_bad_l4] += _bad_l4 + stats[k_rx] += 1 + return stats + + +def get_packets_tso_seg_tx_flag_list( + pkts: list, + seg_len: int = 800, + support_rx_tunnel: bool = True, + support_seg_non_tunnel: bool = True, + support_seg_tunnel: bool = True, + support_tso: bool = True, + support_ufo: bool = True, +) -> list: + """ + Get the list of TSO TX offload flags of a seq of packet. + """ + flag_list = [] + for pkt in pkts: + flag_list.append( + get_packet_tso_seg_tx_flag( + pkt, + seg_len=seg_len, + support_rx_tunnel=support_rx_tunnel, + support_seg_non_tunnel=support_seg_non_tunnel, + support_seg_tunnel=support_seg_tunnel, + support_tso=support_tso, + support_ufo=support_ufo, + ) + ) + return flag_list + + +def get_packets_checksum_rx_olflag_list( + pkts: list, support_rx_tunnel: bool = True +) -> list: + """ + Get the checksum RX offload flags of a seq of packets. + """ + flag_list = [] + for pkt in pkts: + flag_list.append( + get_packet_checksum_rx_olflag(pkt, support_rx_tunnel=support_rx_tunnel) + ) + return flag_list + + +def check_packet_segment( + pkt, + segments: list, + seg_len: int, + support_rx_tunnel: bool = True, + support_seg_non_tunnel: bool = True, + support_seg_tunnel: bool = True, + support_tso: bool = True, + support_ufo: bool = True, +) -> bool: + """ + Check the packet segmentation. + """ + payload_expected = get_packet_payload_pre_tso_seg( + pkt, + support_rx_tunnel=support_rx_tunnel, + support_seg_non_tunnel=support_seg_non_tunnel, + support_seg_tunnel=support_seg_tunnel, + support_tso=support_tso, + support_ufo=support_ufo, + ) + segments_expected = get_packet_payload_post_tso_seg( + pkt, + seg_len=seg_len, + support_rx_tunnel=support_rx_tunnel, + support_seg_non_tunnel=support_seg_non_tunnel, + support_seg_tunnel=support_seg_tunnel, + support_tso=support_tso, + support_ufo=support_ufo, + ) + if not segments: + return False + if segments == segments_expected: + return True + else: + payload_segmented = b"" + for _i, _seg in enumerate(segments, 1): + if len(segments_expected) > 1 and len(_seg) > seg_len: + return False + payload_segmented += _seg + return payload_expected == payload_segmented + + +def check_packet_checksum_of(pkt: Packet, layer: int, checksum_ref: int = None) -> bool: + """ + Check the packet checksum of a specific layer indicated by `layer`. + """ + if checksum_ref is None: + checksum_ref = get_packet_checksum_of(pkt, layer, good_checksum=True) + return get_packet_checksum_of(pkt, layer) == checksum_ref + + +def check_packet_checksums( + pkt, + checksum_ref_list: list = None, + support_rx_tunnel: bool = True, +) -> bool: + """ + Check the packet checksum. + pkt: checked packet; + checksum_ref_list: refering checksum list, recalculate the whole packet for correct checksum in default; + return: if this packet is in good checksum. + """ + if checksum_ref_list is None: + checksum_ref_list = get_packet_checksums( + pkt, support_rx_tunnel=support_rx_tunnel, good_checksum=True + ) + return ( + get_packet_checksums(pkt, support_rx_tunnel=support_rx_tunnel) + == checksum_ref_list + ) + + +def check_packet_checksum_of_each( + pkt, + checksum_ref_list: list = None, + support_rx_tunnel: bool = True, + allow_zero_outer_ip: bool = False, + allow_zero_outer_udp: bool = False, + allow_zero_inner_ip: bool = False, + allow_zero_inner_udp: bool = False, + allow_zero_inner_tcp: bool = False, + allow_zero_inner_sctp: bool = False, +) -> tuple: + """ + Check the packet checksum of each layer. + pkt: checked packet; + checksum_ref_list: refering checksum list, recalculate each layer seperately for correct checksum in default; + allow_zero_*: allowing zero checksum validated passed on specific layer; + return: if this packet is validated passed on outer-ip, outer-l4, inner-ip, inner-l4 layer checksum. + """ + ( + _index_oip, + _index_ol4, + _index_tunnel, + _index_ip, + _index_l4, + ) = get_packet_layer_index_oip_ol4_tun_ip_l4( + pkt, support_rx_tunnel=support_rx_tunnel + ) + if checksum_ref_list and len(checksum_ref_list) == 2: + (_checksum_ref_ip, _checksum_ref_l4) = ( + checksum_ref_list[0], + checksum_ref_list[1], + ) + return ( + True, + True, + check_packet_checksum_of(pkt, _index_ip, _checksum_ref_ip), + check_packet_checksum_of(pkt, _index_l4, _checksum_ref_l4), + ) + elif checksum_ref_list and len(checksum_ref_list) == 4: + (_checksum_ref_oip, _checksum_ref_ol4, _checksum_ref_ip, _checksum_ref_l4) = ( + checksum_ref_list[0], + checksum_ref_list[1], + checksum_ref_list[2], + checksum_ref_list[3], + ) + return ( + check_packet_checksum_of(pkt, _index_oip, _checksum_ref_oip), + check_packet_checksum_of(pkt, _index_ol4, _checksum_ref_ol4), + check_packet_checksum_of(pkt, _index_ip, _checksum_ref_ip), + check_packet_checksum_of(pkt, _index_l4, _checksum_ref_l4), + ) + elif checksum_ref_list: + raise ValueError("Wrong checksum ref list length.") + else: + _verified_l4 = ( + _index_l4 is None + or check_packet_checksum_of(pkt, _index_l4) + or ( + not _index_l4 is None + and allow_zero_inner_udp + and isinstance(pkt[_index_l4], UDP) + and check_packet_checksum_of(pkt, _index_l4, 0x00) + ) + or ( + not _index_l4 is None + and allow_zero_inner_tcp + and isinstance(pkt[_index_l4], TCP) + and check_packet_checksum_of(pkt, _index_l4, 0x00) + ) + or ( + not _index_l4 is None + and allow_zero_inner_sctp + and isinstance(pkt[_index_l4], SCTP) + and check_packet_checksum_of(pkt, _index_l4, 0x0000) + ) + ) + _verified_ip = ( + _index_ip is None + or check_packet_checksum_of(pkt, _index_ip) + or ( + not _index_ip is None + and allow_zero_inner_ip + and isinstance(pkt[_index_ip], IP) + and check_packet_checksum_of(pkt, _index_ip, 0x00) + ) + ) + _verified_ol4 = ( + _index_ol4 is None + or check_packet_checksum_of(pkt, _index_ol4) + or ( + not _index_ol4 is None + and allow_zero_outer_udp + and isinstance(pkt[_index_ol4], UDP) + and get_packet_checksum_of(pkt, _index_ol4) == 0x00 + ) + ) + _verified_oip = ( + _index_oip is None + or check_packet_checksum_of(pkt, _index_oip) + or ( + not _index_oip is None + and allow_zero_outer_ip + and isinstance(pkt[_index_oip], IP) + and get_packet_checksum_of(pkt, _index_oip) == 0x00 + ) + ) + return _verified_oip, _verified_ol4, _verified_ip, _verified_l4 + + +def check_packet_verbose(verbose: str, verbose_ref_list: list) -> bool: + """ + Check the verbose with the reference list. + """ + for _ref in verbose_ref_list: + if not _ref in verbose: + return False + return True + + ############################################################################### ############################################################################### if __name__ == "__main__":