From patchwork Mon Jul 1 12:01:24 2019 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Fan Zhang X-Patchwork-Id: 55762 X-Patchwork-Delegate: gakhil@marvell.com Return-Path: X-Original-To: patchwork@dpdk.org Delivered-To: patchwork@dpdk.org Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 4BAB61B964; Mon, 1 Jul 2019 14:07:23 +0200 (CEST) Received: from mga04.intel.com (mga04.intel.com [192.55.52.120]) by dpdk.org (Postfix) with ESMTP id AFAB35B34 for ; Mon, 1 Jul 2019 14:07:20 +0200 (CEST) X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from orsmga004.jf.intel.com ([10.7.209.38]) by fmsmga104.fm.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 01 Jul 2019 05:07:20 -0700 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.63,439,1557212400"; d="scan'208";a="314864227" Received: from silpixa00398673.ir.intel.com (HELO silpixa00398673.ger.corp.intel.com) ([10.237.223.136]) by orsmga004.jf.intel.com with ESMTP; 01 Jul 2019 05:07:18 -0700 From: Fan Zhang To: dev@dpdk.org Cc: akhil.goyal@nxp.com, pablo.de.lara.guarch@intel.com, Fan Zhang Date: Mon, 1 Jul 2019 13:01:24 +0100 Message-Id: <20190701120124.70418-3-roy.fan.zhang@intel.com> X-Mailer: git-send-email 2.14.5 In-Reply-To: <20190701120124.70418-1-roy.fan.zhang@intel.com> References: <20190626150509.17442-1-roy.fan.zhang@intel.com> <20190701120124.70418-1-roy.fan.zhang@intel.com> Subject: [dpdk-dev] [PATCH v6 2/2] examples/ipsec-secgw: support header reconstruction X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" This patch updates the ipsec-secgw application to support header reconstruction. In addition a series of tests have been added to prove the implementation's correctness. Signed-off-by: Fan Zhang Acked-by: Konstantin Ananyev Tested-by: Konstantin Ananyev --- doc/guides/rel_notes/release_19_08.rst | 6 + examples/ipsec-secgw/sa.c | 2 + examples/ipsec-secgw/test/run_test.sh | 3 +- .../test/tun_null_header_reconstruct.py | 477 +++++++++++++++++++++ 4 files changed, 487 insertions(+), 1 deletion(-) create mode 100755 examples/ipsec-secgw/test/tun_null_header_reconstruct.py diff --git a/doc/guides/rel_notes/release_19_08.rst b/doc/guides/rel_notes/release_19_08.rst index 7c0435a43..d949dbcfb 100644 --- a/doc/guides/rel_notes/release_19_08.rst +++ b/doc/guides/rel_notes/release_19_08.rst @@ -99,6 +99,12 @@ New Features Updated ``librte_telemetry`` to fetch the global metrics from the ``librte_metrics`` library. +* **Updated IPSec library Header Reconstruction.** + + Updated the IPSec library with ECN and DSCP field header reconstruction + feature followed by RFC4301. The IPSec-secgw sample application is also + updated to support this feature by default. + Removed Items ------------- diff --git a/examples/ipsec-secgw/sa.c b/examples/ipsec-secgw/sa.c index 7262ccee8..447f9dbb4 100644 --- a/examples/ipsec-secgw/sa.c +++ b/examples/ipsec-secgw/sa.c @@ -991,6 +991,8 @@ fill_ipsec_sa_prm(struct rte_ipsec_sa_prm *prm, const struct ipsec_sa *ss, prm->ipsec_xform.mode = (ss->flags == TRANSPORT) ? RTE_SECURITY_IPSEC_SA_MODE_TRANSPORT : RTE_SECURITY_IPSEC_SA_MODE_TUNNEL; + prm->ipsec_xform.options.ecn = 1; + prm->ipsec_xform.options.copy_dscp = 1; if (ss->flags == IP4_TUNNEL) { prm->ipsec_xform.tunnel.type = RTE_SECURITY_IPSEC_TUNNEL_IPV4; diff --git a/examples/ipsec-secgw/test/run_test.sh b/examples/ipsec-secgw/test/run_test.sh index 4969effdb..3f73545c9 100755 --- a/examples/ipsec-secgw/test/run_test.sh +++ b/examples/ipsec-secgw/test/run_test.sh @@ -61,7 +61,8 @@ trs_3descbc_sha1_old \ trs_3descbc_sha1_esn \ trs_3descbc_sha1_esn_atom" -PKT_TESTS="trs_ipv6opts" +PKT_TESTS="trs_ipv6opts \ +tun_null_header_reconstruct" DIR=$(dirname $0) diff --git a/examples/ipsec-secgw/test/tun_null_header_reconstruct.py b/examples/ipsec-secgw/test/tun_null_header_reconstruct.py new file mode 100755 index 000000000..f2653b351 --- /dev/null +++ b/examples/ipsec-secgw/test/tun_null_header_reconstruct.py @@ -0,0 +1,477 @@ +#!/usr/bin/env python3 + +from scapy.all import * +import unittest +import pkttest + +#{ipv4{ipv4}} test +SRC_ADDR_IPV4_1 = "192.168.1.1" +DST_ADDR_IPV4_1 = "192.168.2.1" + +#{ipv6{ipv6}} test +SRC_ADDR_IPV6_1 = "1111:0000:0000:0000:0000:0000:0000:0001" +DST_ADDR_IPV6_1 = "2222:0000:0000:0000:0000:0000:0000:0001" + +#{ipv4{ipv6}} test +SRC_ADDR_IPV4_2 = "192.168.11.1" +DST_ADDR_IPV4_2 = "192.168.12.1" +SRC_ADDR_IPV6_2 = "1111:0000:0000:0000:0000:0000:0001:0001" +DST_ADDR_IPV6_2 = "2222:0000:0000:0000:0000:0000:0001:0001" + +#{ipv6{ipv4}} test +SRC_ADDR_IPV4_3 = "192.168.21.1" +DST_ADDR_IPV4_3 = "192.168.22.1" +SRC_ADDR_IPV6_3 = "1111:0000:0000:0000:0000:0001:0001:0001" +DST_ADDR_IPV6_3 = "2222:0000:0000:0000:0000:0001:0001:0001" + +def config(): + return """ +#outter-ipv4 inner-ipv4 tunnel mode test +sp ipv4 out esp protect 5 pri 1 \\ +src {0}/32 \\ +dst {1}/32 \\ +sport 0:65535 dport 0:65535 + +sp ipv4 in esp protect 6 pri 1 \\ +src {1}/32 \\ +dst {0}/32 \\ +sport 0:65535 dport 0:65535 + +sa out 5 cipher_algo null auth_algo null mode ipv4-tunnel \\ +src {0} dst {1} +sa in 6 cipher_algo null auth_algo null mode ipv4-tunnel \\ +src {1} dst {0} + +rt ipv4 dst {0}/32 port 1 +rt ipv4 dst {1}/32 port 0 + +#outter-ipv6 inner-ipv6 tunnel mode test +sp ipv6 out esp protect 7 pri 1 \\ +src {2}/128 \\ +dst {3}/128 \\ +sport 0:65535 dport 0:65535 + +sp ipv6 in esp protect 8 pri 1 \\ +src {3}/128 \\ +dst {2}/128 \\ +sport 0:65535 dport 0:65535 + +sa out 7 cipher_algo null auth_algo null mode ipv6-tunnel \\ +src {2} dst {3} +sa in 8 cipher_algo null auth_algo null mode ipv6-tunnel \\ +src {3} dst {2} + +rt ipv6 dst {2}/128 port 1 +rt ipv6 dst {3}/128 port 0 + +#outter-ipv4 inner-ipv6 tunnel mode test +sp ipv6 out esp protect 9 pri 1 \\ +src {4}/128 \\ +dst {5}/128 \\ +sport 0:65535 dport 0:65535 + +sp ipv6 in esp protect 10 pri 1 \\ +src {5}/128 \\ +dst {4}/128 \\ +sport 0:65535 dport 0:65535 + +sa out 9 cipher_algo null auth_algo null mode ipv4-tunnel \\ +src {6} dst {7} +sa in 10 cipher_algo null auth_algo null mode ipv4-tunnel \\ +src {7} dst {6} + +rt ipv6 dst {4}/128 port 1 +rt ipv4 dst {7}/32 port 0 + +#outter-ipv6 inner-ipv4 tunnel mode test +sp ipv4 out esp protect 11 pri 1 \\ +src {8}/32 \\ +dst {9}/32 \\ +sport 0:65535 dport 0:65535 + +sp ipv4 in esp protect 12 pri 1 \\ +src {9}/32 \\ +dst {8}/32 \\ +sport 0:65535 dport 0:65535 + +sa out 11 cipher_algo null auth_algo null mode ipv6-tunnel \\ +src {10} dst {11} +sa in 12 cipher_algo null auth_algo null mode ipv6-tunnel \\ +src {11} dst {10} + +rt ipv4 dst {8}/32 port 1 +rt ipv6 dst {11}/128 port 0 +""".format(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1, + SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1, + SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2, SRC_ADDR_IPV4_2, DST_ADDR_IPV4_2, + SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3, SRC_ADDR_IPV6_3, DST_ADDR_IPV6_3) + +ECN_ECT0 = 0x02 +ECN_ECT1 = 0x01 +ECN_CE = 0x03 +DSCP_1 = 0x04 +DSCP_3F = 0xFC + +class TestTunnelHeaderReconstruct(unittest.TestCase): + def setUp(self): + self.px = pkttest.PacketXfer() + th = IP(src=DST_ADDR_IPV4_1, dst=SRC_ADDR_IPV4_1) + self.sa_ipv4v4 = SecurityAssociation(ESP, spi=6, tunnel_header = th) + + th = IPv6(src=DST_ADDR_IPV6_1, dst=SRC_ADDR_IPV6_1) + self.sa_ipv6v6 = SecurityAssociation(ESP, spi=8, tunnel_header = th) + + th = IP(src=DST_ADDR_IPV4_2, dst=SRC_ADDR_IPV4_2) + self.sa_ipv4v6 = SecurityAssociation(ESP, spi=10, tunnel_header = th) + + th = IPv6(src=DST_ADDR_IPV6_3, dst=SRC_ADDR_IPV6_3) + self.sa_ipv6v4 = SecurityAssociation(ESP, spi=12, tunnel_header = th) + + def gen_pkt_plain_ipv4(self, src, dst, tos): + pkt = IP(src=src, dst=dst, tos=tos) + pkt /= UDP(sport=123,dport=456)/Raw(load="abc") + return pkt + + def gen_pkt_plain_ipv6(self, src, dst, tc): + pkt = IPv6(src=src, dst=dst, tc=tc) + pkt /= UDP(sport=123,dport=456)/Raw(load="abc") + return pkt + + def gen_pkt_tun_ipv4v4(self, tos_outter, tos_inner): + pkt = self.gen_pkt_plain_ipv4(DST_ADDR_IPV4_1, SRC_ADDR_IPV4_1, + tos_inner) + pkt = self.sa_ipv4v4.encrypt(pkt) + self.assertEqual(pkt[IP].proto, socket.IPPROTO_ESP) + self.assertEqual(pkt[ESP].spi, 6) + pkt[IP].tos = tos_outter + return pkt + + def gen_pkt_tun_ipv6v6(self, tc_outter, tc_inner): + pkt = self.gen_pkt_plain_ipv6(DST_ADDR_IPV6_1, SRC_ADDR_IPV6_1, + tc_inner) + pkt = self.sa_ipv6v6.encrypt(pkt) + self.assertEqual(pkt[IPv6].nh, socket.IPPROTO_ESP) + self.assertEqual(pkt[ESP].spi, 8) + pkt[IPv6].tc = tc_outter + return pkt + + def gen_pkt_tun_ipv4v6(self, tos_outter, tc_inner): + pkt = self.gen_pkt_plain_ipv6(DST_ADDR_IPV6_2, SRC_ADDR_IPV6_2, + tc_inner) + pkt = self.sa_ipv4v6.encrypt(pkt) + self.assertEqual(pkt[IP].proto, socket.IPPROTO_ESP) + self.assertEqual(pkt[ESP].spi, 10) + pkt[IP].tos = tos_outter + return pkt + + def gen_pkt_tun_ipv6v4(self, tc_outter, tos_inner): + pkt = self.gen_pkt_plain_ipv4(DST_ADDR_IPV4_3, SRC_ADDR_IPV4_3, + tos_inner) + pkt = self.sa_ipv6v4.encrypt(pkt) + self.assertEqual(pkt[IPv6].nh, socket.IPPROTO_ESP) + self.assertEqual(pkt[ESP].spi, 12) + pkt[IPv6].tc = tc_outter + return pkt + +#RFC4301 5.1.2.1 & 5.1.2.2, outbound packets shall be copied ECN field + def test_outb_ipv4v4_ecn(self): + pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1, + ECN_ECT1) + resp = self.px.xfer_unprotected(pkt) + self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP) + self.assertEqual(resp[ESP].spi, 5) + self.assertEqual(resp[IP].tos, ECN_ECT1) + + pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1, + ECN_ECT0) + resp = self.px.xfer_unprotected(pkt) + self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP) + self.assertEqual(resp[ESP].spi, 5) + self.assertEqual(resp[IP].tos, ECN_ECT0) + + pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1, + ECN_CE) + resp = self.px.xfer_unprotected(pkt) + self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP) + self.assertEqual(resp[ESP].spi, 5) + self.assertEqual(resp[IP].tos, ECN_CE) + + def test_outb_ipv6v6_ecn(self): + pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1, + ECN_ECT1) + resp = self.px.xfer_unprotected(pkt) + self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP) + self.assertEqual(resp[IPv6].tc, ECN_ECT1) + + pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1, + ECN_ECT0) + resp = self.px.xfer_unprotected(pkt) + self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP) + self.assertEqual(resp[ESP].spi, 7) + self.assertEqual(resp[IPv6].tc, ECN_ECT0) + + pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1, + ECN_CE) + resp = self.px.xfer_unprotected(pkt) + self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP) + self.assertEqual(resp[ESP].spi, 7) + self.assertEqual(resp[IPv6].tc, ECN_CE) + + def test_outb_ipv4v6_ecn(self): + pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2, + ECN_ECT1) + resp = self.px.xfer_unprotected(pkt) + self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP) + self.assertEqual(resp[IP].tos, ECN_ECT1) + + pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2, + ECN_ECT0) + resp = self.px.xfer_unprotected(pkt) + self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP) + self.assertEqual(resp[IP].tos, ECN_ECT0) + + pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2, + ECN_CE) + resp = self.px.xfer_unprotected(pkt) + self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP) + self.assertEqual(resp[IP].tos, ECN_CE) + + def test_outb_ipv6v4_ecn(self): + pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3, + ECN_ECT1) + resp = self.px.xfer_unprotected(pkt) + self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP) + self.assertEqual(resp[IPv6].tc, ECN_ECT1) + + pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3, + ECN_ECT0) + resp = self.px.xfer_unprotected(pkt) + self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP) + self.assertEqual(resp[IPv6].tc, ECN_ECT0) + + pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3, + ECN_CE) + resp = self.px.xfer_unprotected(pkt) + self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP) + self.assertEqual(resp[IPv6].tc, ECN_CE) + +#RFC4301 5.1.2.1 & 5.1.2.2, if outbound packets ECN is CE (0x3), inbound packets +#ECN is overwritten to CE, otherwise no change + +#Outter header not CE, Inner header should be no change + def test_inb_ipv4v4_ecn_inner_no_change(self): + pkt = self.gen_pkt_tun_ipv4v4(ECN_ECT1, ECN_ECT0) + resp = self.px.xfer_protected(pkt) + self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) + self.assertEqual(resp[IP].tos, ECN_ECT0) + + pkt = self.gen_pkt_tun_ipv4v4(ECN_ECT0, ECN_ECT1) + resp = self.px.xfer_protected(pkt) + self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) + self.assertEqual(resp[IP].tos, ECN_ECT1) + + pkt = self.gen_pkt_tun_ipv4v4(ECN_ECT1, ECN_CE) + resp = self.px.xfer_protected(pkt) + self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) + self.assertEqual(resp[IP].tos, ECN_CE) + + def test_inb_ipv6v6_ecn_inner_no_change(self): + pkt = self.gen_pkt_tun_ipv6v6(ECN_ECT1, ECN_ECT0) + resp = self.px.xfer_protected(pkt) + self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) + self.assertEqual(resp[IPv6].tc, ECN_ECT0) + + pkt = self.gen_pkt_tun_ipv6v6(ECN_ECT0, ECN_ECT1) + resp = self.px.xfer_protected(pkt) + self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) + self.assertEqual(resp[IPv6].tc, ECN_ECT1) + + pkt = self.gen_pkt_tun_ipv6v6(ECN_ECT1, ECN_CE) + resp = self.px.xfer_protected(pkt) + self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) + self.assertEqual(resp[IPv6].tc, ECN_CE) + + def test_inb_ipv4v6_ecn_inner_no_change(self): + pkt = self.gen_pkt_tun_ipv4v6(ECN_ECT1, ECN_ECT0) + resp = self.px.xfer_protected(pkt) + self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) + self.assertEqual(resp[IPv6].tc, ECN_ECT0) + + pkt = self.gen_pkt_tun_ipv4v6(ECN_ECT0, ECN_ECT1) + resp = self.px.xfer_protected(pkt) + self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) + self.assertEqual(resp[IPv6].tc, ECN_ECT1) + + pkt = self.gen_pkt_tun_ipv4v6(ECN_ECT1, ECN_CE) + resp = self.px.xfer_protected(pkt) + self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) + self.assertEqual(resp[IPv6].tc, ECN_CE) + + def test_inb_ipv6v4_ecn_inner_no_change(self): + pkt = self.gen_pkt_tun_ipv6v4(ECN_ECT1, ECN_ECT0) + resp = self.px.xfer_protected(pkt) + self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) + self.assertEqual(resp[IP].tos, ECN_ECT0) + + pkt = self.gen_pkt_tun_ipv6v4(ECN_ECT0, ECN_ECT1) + resp = self.px.xfer_protected(pkt) + self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) + self.assertEqual(resp[IP].tos, ECN_ECT1) + + pkt = self.gen_pkt_tun_ipv6v4(ECN_ECT1, ECN_CE) + resp = self.px.xfer_protected(pkt) + self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) + self.assertEqual(resp[IP].tos, ECN_CE) + +#Outter header CE, Inner header should be changed to CE + def test_inb_ipv4v4_ecn_inner_change(self): + pkt = self.gen_pkt_tun_ipv4v4(ECN_CE, ECN_ECT0) + resp = self.px.xfer_protected(pkt) + self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) + self.assertEqual(resp[IP].tos, ECN_CE) + + pkt = self.gen_pkt_tun_ipv4v4(ECN_CE, ECN_ECT1) + resp = self.px.xfer_protected(pkt) + self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) + self.assertEqual(resp[IP].tos, ECN_CE) + + def test_inb_ipv6v6_ecn_inner_change(self): + pkt = self.gen_pkt_tun_ipv6v6(ECN_CE, ECN_ECT0) + resp = self.px.xfer_protected(pkt) + self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) + self.assertEqual(resp[IPv6].tc, ECN_CE) + + pkt = self.gen_pkt_tun_ipv6v6(ECN_CE, ECN_ECT1) + resp = self.px.xfer_protected(pkt) + self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) + self.assertEqual(resp[IPv6].tc, ECN_CE) + + def test_inb_ipv4v6_ecn_inner_change(self): + pkt = self.gen_pkt_tun_ipv4v6(ECN_CE, ECN_ECT0) + resp = self.px.xfer_protected(pkt) + self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) + self.assertEqual(resp[IPv6].tc, ECN_CE) + + pkt = self.gen_pkt_tun_ipv4v6(ECN_CE, ECN_ECT1) + resp = self.px.xfer_protected(pkt) + self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) + self.assertEqual(resp[IPv6].tc, ECN_CE) + + def test_inb_ipv6v4_ecn_inner_change(self): + pkt = self.gen_pkt_tun_ipv6v4(ECN_CE, ECN_ECT0) + resp = self.px.xfer_protected(pkt) + self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) + self.assertEqual(resp[IP].tos, ECN_CE) + + pkt = self.gen_pkt_tun_ipv6v4(ECN_CE, ECN_ECT1) + resp = self.px.xfer_protected(pkt) + self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) + self.assertEqual(resp[IP].tos, ECN_CE) + +#RFC4301 5.1.2.1.5 Outer DS field should be copied from Inner DS field + def test_outb_ipv4v4_dscp(self): + pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1, + DSCP_1) + resp = self.px.xfer_unprotected(pkt) + self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP) + self.assertEqual(resp[ESP].spi, 5) + self.assertEqual(resp[IP].tos, DSCP_1) + + pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1, + DSCP_3F) + resp = self.px.xfer_unprotected(pkt) + self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP) + self.assertEqual(resp[ESP].spi, 5) + self.assertEqual(resp[IP].tos, DSCP_3F) + + def test_outb_ipv6v6_dscp(self): + pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1, + DSCP_1) + resp = self.px.xfer_unprotected(pkt) + self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP) + self.assertEqual(resp[ESP].spi, 7) + self.assertEqual(resp[IPv6].tc, DSCP_1) + + pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1, + DSCP_3F) + resp = self.px.xfer_unprotected(pkt) + self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP) + self.assertEqual(resp[ESP].spi, 7) + self.assertEqual(resp[IPv6].tc, DSCP_3F) + + def test_outb_ipv4v6_dscp(self): + pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2, + DSCP_1) + resp = self.px.xfer_unprotected(pkt) + self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP) + self.assertEqual(resp[ESP].spi, 9) + self.assertEqual(resp[IP].tos, DSCP_1) + + pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2, + DSCP_3F) + resp = self.px.xfer_unprotected(pkt) + self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP) + self.assertEqual(resp[ESP].spi, 9) + self.assertEqual(resp[IP].tos, DSCP_3F) + + def test_outb_ipv6v4_dscp(self): + pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3, + DSCP_1) + resp = self.px.xfer_unprotected(pkt) + self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP) + self.assertEqual(resp[ESP].spi, 11) + self.assertEqual(resp[IPv6].tc, DSCP_1) + + pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3, + DSCP_3F) + resp = self.px.xfer_unprotected(pkt) + self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP) + self.assertEqual(resp[ESP].spi, 11) + self.assertEqual(resp[IPv6].tc, DSCP_3F) + +#RFC4301 5.1.2.1.5 Inner DS field should not be affected by Outer DS field + def test_inb_ipv4v4_dscp(self): + pkt = self.gen_pkt_tun_ipv4v4(DSCP_3F, DSCP_1) + resp = self.px.xfer_protected(pkt) + self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) + self.assertEqual(resp[IP].tos, DSCP_1) + + pkt = self.gen_pkt_tun_ipv4v4(DSCP_1, DSCP_3F) + resp = self.px.xfer_protected(pkt) + self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) + self.assertEqual(resp[IP].tos, DSCP_3F) + + def test_inb_ipv6v6_dscp(self): + pkt = self.gen_pkt_tun_ipv6v6(DSCP_3F, DSCP_1) + resp = self.px.xfer_protected(pkt) + self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) + self.assertEqual(resp[IPv6].tc, DSCP_1) + + pkt = self.gen_pkt_tun_ipv6v6(DSCP_1, DSCP_3F) + resp = self.px.xfer_protected(pkt) + self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) + self.assertEqual(resp[IPv6].tc, DSCP_3F) + + def test_inb_ipv4v6_dscp(self): + pkt = self.gen_pkt_tun_ipv4v6(DSCP_3F, DSCP_1) + resp = self.px.xfer_protected(pkt) + self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) + self.assertEqual(resp[IPv6].tc, DSCP_1) + + pkt = self.gen_pkt_tun_ipv4v6(DSCP_1, DSCP_3F) + resp = self.px.xfer_protected(pkt) + self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) + self.assertEqual(resp[IPv6].tc, DSCP_3F) + + def test_inb_ipv6v4_dscp(self): + pkt = self.gen_pkt_tun_ipv6v4(DSCP_3F, DSCP_1) + resp = self.px.xfer_protected(pkt) + self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) + self.assertEqual(resp[IP].tos, DSCP_1) + + pkt = self.gen_pkt_tun_ipv6v4(DSCP_1, DSCP_3F) + resp = self.px.xfer_protected(pkt) + self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) + self.assertEqual(resp[IP].tos, DSCP_3F) + +pkttest.pkttest()