[3/3] examples/ipsec-secgw: add support for ipv6 options

Message ID 20190508104717.13448-3-marcinx.smoczynski@intel.com (mailing list archive)
State Superseded, archived
Delegated to: akhil goyal
Headers
Series [1/3] net: new ipv6 header extension parsing function |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation fail Compilation issues

Commit Message

Marcin Smoczynski May 8, 2019, 10:47 a.m. UTC
  Using transport with IPv6 and header extensions requires calculating
total header length including extensions up to ESP header which is
achieved with iteratively parsing extensions when preparing traffic
for processing. Calculated l3_len is later used to determine SPI
field offset for an inbound traffic and to reconstruct L3 header by
librte_ipsec.

A simple unittest script is provided to test various headers for the
IPv6 transport mode. Within each test case a test packet is crafted
with Scapy and sent as an inbound or outbound traffic. Application
response is then checked with a set of assertions.

Signed-off-by: Marcin Smoczynski <marcinx.smoczynski@intel.com>
---
 examples/ipsec-secgw/ipsec-secgw.c      |  33 +++-
 examples/ipsec-secgw/sa.c               |   5 +-
 examples/ipsec-secgw/test/test-scapy.py | 231 ++++++++++++++++++++++++
 3 files changed, 260 insertions(+), 9 deletions(-)
 create mode 100755 examples/ipsec-secgw/test/test-scapy.py
  

Comments

Ananyev, Konstantin May 14, 2019, 12:51 p.m. UTC | #1
> 
> Using transport with IPv6 and header extensions requires calculating
> total header length including extensions up to ESP header which is
> achieved with iteratively parsing extensions when preparing traffic
> for processing. Calculated l3_len is later used to determine SPI
> field offset for an inbound traffic and to reconstruct L3 header by
> librte_ipsec.
> 
> A simple unittest script is provided to test various headers for the
> IPv6 transport mode. Within each test case a test packet is crafted
> with Scapy and sent as an inbound or outbound traffic. Application
> response is then checked with a set of assertions.

ipsec-secgw changes itself looks good to me.
One comment I have - would be good to incorporate your test script
into run_test.sh, so user can run all tests in one go.
Konstantin 

> 
> Signed-off-by: Marcin Smoczynski <marcinx.smoczynski@intel.com>
> ---
>  examples/ipsec-secgw/ipsec-secgw.c      |  33 +++-
>  examples/ipsec-secgw/sa.c               |   5 +-
>  examples/ipsec-secgw/test/test-scapy.py | 231 ++++++++++++++++++++++++
>  3 files changed, 260 insertions(+), 9 deletions(-)
>  create mode 100755 examples/ipsec-secgw/test/test-scapy.py
> 
> diff --git a/examples/ipsec-secgw/ipsec-secgw.c b/examples/ipsec-secgw/ipsec-secgw.c
> index 478dd80c2..1c49aa22c 100644
> --- a/examples/ipsec-secgw/ipsec-secgw.c
> +++ b/examples/ipsec-secgw/ipsec-secgw.c
> @@ -41,6 +41,7 @@
>  #include <rte_jhash.h>
>  #include <rte_cryptodev.h>
>  #include <rte_security.h>
> +#include <rte_ip.h>
> 
>  #include "ipsec.h"
>  #include "parser.h"
> @@ -248,16 +249,38 @@ prepare_one_packet(struct rte_mbuf *pkt, struct ipsec_traffic *t)
>  		pkt->l2_len = 0;
>  		pkt->l3_len = sizeof(struct ip);
>  	} else if (eth->ether_type == rte_cpu_to_be_16(ETHER_TYPE_IPv6)) {
> -		nlp = (uint8_t *)rte_pktmbuf_adj(pkt, ETHER_HDR_LEN);
> -		nlp = RTE_PTR_ADD(nlp, offsetof(struct ip6_hdr, ip6_nxt));
> -		if (*nlp == IPPROTO_ESP)
> +		int next_proto;
> +		size_t l3len, ext_len;
> +		struct ipv6_hdr *v6h;
> +		uint8_t *p;
> +
> +		/* get protocol type */
> +		v6h = (struct ipv6_hdr *)rte_pktmbuf_adj(pkt, ETHER_HDR_LEN);
> +		next_proto = v6h->proto;
> +
> +		/* determine l3 header size up to ESP extension */
> +		l3len = sizeof(struct ip6_hdr);
> +		p = rte_pktmbuf_mtod(pkt, uint8_t *);
> +		while (next_proto != IPPROTO_ESP && l3len < pkt->data_len &&
> +			(next_proto = rte_ipv6_get_next_ext(p + l3len,
> +						next_proto, &ext_len)) >= 0)
> +			l3len += ext_len;
> +
> +		/* drop packet when IPv6 header exceeds first segment length */
> +		if (unlikely(l3len > pkt->data_len)) {
> +			rte_pktmbuf_free(pkt);
> +			return;
> +		}
> +
> +		if (next_proto == IPPROTO_ESP)
>  			t->ipsec.pkts[(t->ipsec.num)++] = pkt;
>  		else {
> -			t->ip6.data[t->ip6.num] = nlp;
> +			t->ip6.data[t->ip6.num] = rte_pktmbuf_mtod_offset(pkt,
> +				uint8_t *, offsetof(struct ipv6_hdr, proto));
>  			t->ip6.pkts[(t->ip6.num)++] = pkt;
>  		}
>  		pkt->l2_len = 0;
> -		pkt->l3_len = sizeof(struct ip6_hdr);
> +		pkt->l3_len = l3len;
>  	} else {
>  		/* Unknown/Unsupported type, drop the packet */
>  		RTE_LOG(ERR, IPSEC, "Unsupported packet type 0x%x\n",
> diff --git a/examples/ipsec-secgw/sa.c b/examples/ipsec-secgw/sa.c
> index b850e9839..607527d08 100644
> --- a/examples/ipsec-secgw/sa.c
> +++ b/examples/ipsec-secgw/sa.c
> @@ -1228,10 +1228,7 @@ single_inbound_lookup(struct ipsec_sa *sadb, struct rte_mbuf *pkt,
>  	*sa_ret = NULL;
> 
>  	ip = rte_pktmbuf_mtod(pkt, struct ip *);
> -	if (ip->ip_v == IPVERSION)
> -		esp = (struct esp_hdr *)(ip + 1);
> -	else
> -		esp = (struct esp_hdr *)(((struct ip6_hdr *)ip) + 1);
> +	esp = rte_pktmbuf_mtod_offset(pkt, struct esp_hdr *, pkt->l3_len);
> 
>  	if (esp->spi == INVALID_SPI)
>  		return;
> diff --git a/examples/ipsec-secgw/test/test-scapy.py b/examples/ipsec-secgw/test/test-scapy.py
> new file mode 100755
> index 000000000..d7f66b734
> --- /dev/null
> +++ b/examples/ipsec-secgw/test/test-scapy.py
> @@ -0,0 +1,231 @@
> +#!/usr/bin/env python3
> +
> +# Run DPDK IPsec example with following arguments:
> +# ./dpdk-ipsec-secgw --log-level=31 -l 0 --vdev=crypto_openssl --vdev=net_tap0 --vdev=net_tap1 -- -P -p 0x3 -u 0x1 --config
> "(0,0,0),(1,0,0)" -f test-transport.cfg -l
> +# Two tap ports are expected: 0: unprotected (remote), 1: protected (local)
> +
> +# sample configuration:
> +#	sp ipv6 out esp protect 5 pri 1 \
> +#	src 1111:0000:0000:0000:0000:0000:0000:0000/64 \
> +#	dst 2222:0000:0000:0000:0000:0000:0000:0000/64 \
> +#	sport 0:65535 dport 0:65535
> +#
> +#	sp ipv6 in esp protect 6 pri 1 \
> +#	src 2222:0000:0000:0000:0000:0000:0000:0000/64 \
> +#	dst 1111:0000:0000:0000:0000:0000:0000:0000/64 \
> +#	sport 0:65535 dport 0:65535
> +#
> +#	sa out 5 cipher_algo aes-128-cbc cipher_key 0:0:0:0:0:0:0:0:0:0:0:0:0:0:0:0 \
> +#	auth_algo sha1-hmac auth_key 0:0:0:0:0:0:0:0:0:0:0:0:0:0:0:0:0:0:0:0 \
> +#	mode transport
> +#
> +#	sa in 6 cipher_algo aes-128-cbc cipher_key 0:0:0:0:0:0:0:0:0:0:0:0:0:0:0:0 \
> +#	auth_algo sha1-hmac auth_key 0:0:0:0:0:0:0:0:0:0:0:0:0:0:0:0:0:0:0:0 \
> +#	mode transport
> +#
> +#	rt ipv6 dst 1111:0000:0000:0000:0000:0000:0000:0000/64 port 1
> +#	rt ipv6 dst 2222:0000:0000:0000:0000:0000:0000:0000/64 port 0
> +#
> +# run tests with:
> +# python3 -m unittest test-scapy
> +
> +
> +import socket
> +import sys
> +import unittest
> +from scapy.all import *
> +
> +
> +SRC_ETHER = "52:54:00:00:00:01"
> +DST_ETHER = "52:54:00:00:00:02"
> +SRC_ADDR = "1111::1"
> +DST_ADDR = "2222::1"
> +LOCAL_IFACE = "dtap1"
> +REMOTE_IFACE = "dtap0"
> +
> +
> +class Interface(object):
> +    ETH_P_ALL = 3
> +    MAX_PACKET_SIZE = 1280
> +    def __init__(self, ifname):
> +        self.name = ifname
> +        self.s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(ETH_P_ALL))
> +        self.s.bind((self.name, 0, socket.PACKET_OTHERHOST))
> +
> +    def __del__(self):
> +        self.s.close()
> +
> +    def send_packet(self, pkt):
> +        self.send_bytes(bytes(pkt))
> +
> +    def send_bytes(self, bytedata):
> +        self.s.send(bytedata)
> +
> +    def recv_packet(self):
> +        return Ether(self.recv_bytes())
> +
> +    def recv_bytes(self):
> +        return self.s.recv(Interface.MAX_PACKET_SIZE)
> +
> +
> +class TestTransportMode(unittest.TestCase):
> +    # There is a bug in the IPsec Scapy implementation
> +    # which causes invalid packet reconstruction after
> +    # successful decryption. This method is a workaround.
> +    @staticmethod
> +    def decrypt(pkt, sa):
> +        esp = pkt[ESP]
> +
> +        # decrypt dummy packet with no extensions
> +        d = sa.decrypt(IPv6()/esp)
> +
> +        # fix 'next header' in the preceding header of the original
> +        # packet and remove ESP
> +        pkt[ESP].underlayer.nh = d[IPv6].nh
> +        pkt[ESP].underlayer.remove_payload()
> +
> +        # combine L3 header with decrypted payload
> +        npkt = pkt/d[IPv6].payload
> +
> +        # fix length
> +        npkt[IPv6].plen = d[IPv6].plen + len(pkt[IPv6].payload)
> +
> +        return npkt
> +
> +    def setUp(self):
> +        self.ilocal = Interface(LOCAL_IFACE)
> +        self.iremote = Interface(REMOTE_IFACE)
> +        self.outb_sa = SecurityAssociation(ESP, spi=5, crypt_algo='AES-CBC', crypt_key='\x00'*16, auth_algo='HMAC-SHA1-96',
> auth_key='\x00'*20)
> +        self.inb_sa = SecurityAssociation(ESP, spi=6, crypt_algo='AES-CBC', crypt_key='\x00'*16, auth_algo='HMAC-SHA1-96',
> auth_key='\x00'*20)
> +
> +    def test_outb_ipv6_noopt(self):
> +        pkt = Ether(src=SRC_ETHER, dst=DST_ETHER)
> +        pkt /= IPv6(src=SRC_ADDR, dst=DST_ADDR)
> +        pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
> +        self.ilocal.send_packet(pkt)
> +
> +        # check response
> +        resp = self.iremote.recv_packet()
> +        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
> +        self.assertEqual(resp[ESP].spi, 5)
> +
> +        # decrypt response, check packet after decryption
> +        d = TestTransportMode.decrypt(resp[IPv6], self.outb_sa)
> +        self.assertEqual(d[IPv6].nh, socket.IPPROTO_UDP)
> +        self.assertEqual(d[UDP].sport, 123)
> +        self.assertEqual(d[UDP].dport, 456)
> +        self.assertEqual(bytes(d[UDP].payload), b'abc')
> +
> +    def test_outb_ipv6_opt(self):
> +        hoptions = []
> +        hoptions.append(RouterAlert(value=2))
> +        hoptions.append(Jumbo(jumboplen=5000))
> +        hoptions.append(Pad1())
> +
> +        doptions = []
> +        doptions.append(HAO(hoa="1234::4321"))
> +
> +        pkt = Ether(src=SRC_ETHER, dst=DST_ETHER)
> +        pkt /= IPv6(src=SRC_ADDR, dst=DST_ADDR)
> +        pkt /= IPv6ExtHdrHopByHop(options=hoptions)
> +        pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"])
> +        pkt /= IPv6ExtHdrDestOpt(options=doptions)
> +        pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
> +        self.ilocal.send_packet(pkt)
> +
> +        # check response
> +        resp = self.iremote.recv_packet()
> +        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS)
> +
> +        # check extensions
> +        self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
> +        self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
> +        self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_ESP)
> +
> +        # check ESP
> +        self.assertEqual(resp[ESP].spi, 5)
> +
> +        # decrypt response, check packet after decryption
> +        d = TestTransportMode.decrypt(resp[IPv6], self.outb_sa)
> +        self.assertEqual(d[IPv6].nh, socket.IPPROTO_HOPOPTS)
> +        self.assertEqual(d[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
> +        self.assertEqual(d[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
> +        self.assertEqual(d[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP)
> +
> +        # check UDP
> +        self.assertEqual(d[UDP].sport, 123)
> +        self.assertEqual(d[UDP].dport, 456)
> +        self.assertEqual(bytes(d[UDP].payload), b'abc')
> +
> +    def test_inb_ipv6_noopt(self):
> +        # encrypt and send raw UDP packet
> +        pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR)
> +        pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
> +        e = self.inb_sa.encrypt(pkt)
> +        e = Ether(src=DST_ETHER, dst=SRC_ETHER)/e
> +        self.iremote.send_packet(e)
> +
> +        # check response
> +        resp = self.ilocal.recv_packet()
> +        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
> +
> +        # check UDP packet
> +        self.assertEqual(resp[UDP].sport, 123)
> +        self.assertEqual(resp[UDP].dport, 456)
> +        self.assertEqual(bytes(resp[UDP].payload), b'abc')
> +
> +    def test_inb_ipv6_opt(self):
> +        hoptions = []
> +        hoptions.append(RouterAlert(value=2))
> +        hoptions.append(Jumbo(jumboplen=5000))
> +        hoptions.append(Pad1())
> +
> +        doptions = []
> +        doptions.append(HAO(hoa="1234::4321"))
> +
> +        # prepare packet with options
> +        pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR)
> +        pkt /= IPv6ExtHdrHopByHop(options=hoptions)
> +        pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"])
> +        pkt /= IPv6ExtHdrDestOpt(options=doptions)
> +        pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
> +
> +        # encrypt and send packet
> +        e = self.inb_sa.encrypt(pkt)
> +        e = Ether(src=DST_ETHER, dst=SRC_ETHER)/e
> +        self.iremote.send_packet(e)
> +
> +        # check response
> +        resp = self.ilocal.recv_packet()
> +        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS)
> +        self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
> +        self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
> +        self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP)
> +
> +        # check UDP
> +        self.assertEqual(resp[UDP].sport, 123)
> +        self.assertEqual(resp[UDP].dport, 456)
> +        self.assertEqual(bytes(resp[UDP].payload), b'abc')
> +
> +    def test_inb_ipv6_frag(self):
> +        # prepare ESP payload
> +        pkt = UDP(sport=123,dport=456)/Raw(load="abc")
> +        e = self.inb_sa.encrypt(IPv6()/pkt)
> +
> +        # craft and send inbound packet
> +        e = Ether(src=DST_ETHER, dst=SRC_ETHER)/IPv6(src=DST_ADDR, dst=SRC_ADDR)/IPv6ExtHdrFragment()/e[IPv6].payload
> +        self.iremote.send_packet(e)
> +
> +        # check response
> +        resp = self.ilocal.recv_packet()
> +        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_FRAGMENT)
> +        self.assertEqual(resp[IPv6ExtHdrFragment].nh, socket.IPPROTO_UDP)
> +
> +        # check UDP
> +        self.assertEqual(resp[UDP].sport, 123)
> +        self.assertEqual(resp[UDP].dport, 456)
> +        self.assertEqual(bytes(resp[UDP].payload), b'abc')
> +
> +
> +if __name__ == "__main__":
> +    unittest.main()
> --
> 2.21.0.windows.1
  

Patch

diff --git a/examples/ipsec-secgw/ipsec-secgw.c b/examples/ipsec-secgw/ipsec-secgw.c
index 478dd80c2..1c49aa22c 100644
--- a/examples/ipsec-secgw/ipsec-secgw.c
+++ b/examples/ipsec-secgw/ipsec-secgw.c
@@ -41,6 +41,7 @@ 
 #include <rte_jhash.h>
 #include <rte_cryptodev.h>
 #include <rte_security.h>
+#include <rte_ip.h>
 
 #include "ipsec.h"
 #include "parser.h"
@@ -248,16 +249,38 @@  prepare_one_packet(struct rte_mbuf *pkt, struct ipsec_traffic *t)
 		pkt->l2_len = 0;
 		pkt->l3_len = sizeof(struct ip);
 	} else if (eth->ether_type == rte_cpu_to_be_16(ETHER_TYPE_IPv6)) {
-		nlp = (uint8_t *)rte_pktmbuf_adj(pkt, ETHER_HDR_LEN);
-		nlp = RTE_PTR_ADD(nlp, offsetof(struct ip6_hdr, ip6_nxt));
-		if (*nlp == IPPROTO_ESP)
+		int next_proto;
+		size_t l3len, ext_len;
+		struct ipv6_hdr *v6h;
+		uint8_t *p;
+
+		/* get protocol type */
+		v6h = (struct ipv6_hdr *)rte_pktmbuf_adj(pkt, ETHER_HDR_LEN);
+		next_proto = v6h->proto;
+
+		/* determine l3 header size up to ESP extension */
+		l3len = sizeof(struct ip6_hdr);
+		p = rte_pktmbuf_mtod(pkt, uint8_t *);
+		while (next_proto != IPPROTO_ESP && l3len < pkt->data_len &&
+			(next_proto = rte_ipv6_get_next_ext(p + l3len,
+						next_proto, &ext_len)) >= 0)
+			l3len += ext_len;
+
+		/* drop packet when IPv6 header exceeds first segment length */
+		if (unlikely(l3len > pkt->data_len)) {
+			rte_pktmbuf_free(pkt);
+			return;
+		}
+
+		if (next_proto == IPPROTO_ESP)
 			t->ipsec.pkts[(t->ipsec.num)++] = pkt;
 		else {
-			t->ip6.data[t->ip6.num] = nlp;
+			t->ip6.data[t->ip6.num] = rte_pktmbuf_mtod_offset(pkt,
+				uint8_t *, offsetof(struct ipv6_hdr, proto));
 			t->ip6.pkts[(t->ip6.num)++] = pkt;
 		}
 		pkt->l2_len = 0;
-		pkt->l3_len = sizeof(struct ip6_hdr);
+		pkt->l3_len = l3len;
 	} else {
 		/* Unknown/Unsupported type, drop the packet */
 		RTE_LOG(ERR, IPSEC, "Unsupported packet type 0x%x\n",
diff --git a/examples/ipsec-secgw/sa.c b/examples/ipsec-secgw/sa.c
index b850e9839..607527d08 100644
--- a/examples/ipsec-secgw/sa.c
+++ b/examples/ipsec-secgw/sa.c
@@ -1228,10 +1228,7 @@  single_inbound_lookup(struct ipsec_sa *sadb, struct rte_mbuf *pkt,
 	*sa_ret = NULL;
 
 	ip = rte_pktmbuf_mtod(pkt, struct ip *);
-	if (ip->ip_v == IPVERSION)
-		esp = (struct esp_hdr *)(ip + 1);
-	else
-		esp = (struct esp_hdr *)(((struct ip6_hdr *)ip) + 1);
+	esp = rte_pktmbuf_mtod_offset(pkt, struct esp_hdr *, pkt->l3_len);
 
 	if (esp->spi == INVALID_SPI)
 		return;
diff --git a/examples/ipsec-secgw/test/test-scapy.py b/examples/ipsec-secgw/test/test-scapy.py
new file mode 100755
index 000000000..d7f66b734
--- /dev/null
+++ b/examples/ipsec-secgw/test/test-scapy.py
@@ -0,0 +1,231 @@ 
+#!/usr/bin/env python3
+
+# Run DPDK IPsec example with following arguments:
+# ./dpdk-ipsec-secgw --log-level=31 -l 0 --vdev=crypto_openssl --vdev=net_tap0 --vdev=net_tap1 -- -P -p 0x3 -u 0x1 --config "(0,0,0),(1,0,0)" -f test-transport.cfg -l
+# Two tap ports are expected: 0: unprotected (remote), 1: protected (local)
+
+# sample configuration:
+#	sp ipv6 out esp protect 5 pri 1 \
+#	src 1111:0000:0000:0000:0000:0000:0000:0000/64 \
+#	dst 2222:0000:0000:0000:0000:0000:0000:0000/64 \
+#	sport 0:65535 dport 0:65535
+#
+#	sp ipv6 in esp protect 6 pri 1 \
+#	src 2222:0000:0000:0000:0000:0000:0000:0000/64 \
+#	dst 1111:0000:0000:0000:0000:0000:0000:0000/64 \
+#	sport 0:65535 dport 0:65535
+#
+#	sa out 5 cipher_algo aes-128-cbc cipher_key 0:0:0:0:0:0:0:0:0:0:0:0:0:0:0:0 \
+#	auth_algo sha1-hmac auth_key 0:0:0:0:0:0:0:0:0:0:0:0:0:0:0:0:0:0:0:0 \
+#	mode transport
+#
+#	sa in 6 cipher_algo aes-128-cbc cipher_key 0:0:0:0:0:0:0:0:0:0:0:0:0:0:0:0 \
+#	auth_algo sha1-hmac auth_key 0:0:0:0:0:0:0:0:0:0:0:0:0:0:0:0:0:0:0:0 \
+#	mode transport
+#
+#	rt ipv6 dst 1111:0000:0000:0000:0000:0000:0000:0000/64 port 1
+#	rt ipv6 dst 2222:0000:0000:0000:0000:0000:0000:0000/64 port 0
+#
+# run tests with:
+# python3 -m unittest test-scapy
+
+
+import socket
+import sys
+import unittest
+from scapy.all import *
+
+
+SRC_ETHER = "52:54:00:00:00:01"
+DST_ETHER = "52:54:00:00:00:02"
+SRC_ADDR = "1111::1"
+DST_ADDR = "2222::1"
+LOCAL_IFACE = "dtap1"
+REMOTE_IFACE = "dtap0"
+
+
+class Interface(object):
+    ETH_P_ALL = 3
+    MAX_PACKET_SIZE = 1280
+    def __init__(self, ifname):
+        self.name = ifname
+        self.s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(ETH_P_ALL))
+        self.s.bind((self.name, 0, socket.PACKET_OTHERHOST))
+
+    def __del__(self):
+        self.s.close()
+
+    def send_packet(self, pkt):
+        self.send_bytes(bytes(pkt))
+
+    def send_bytes(self, bytedata):
+        self.s.send(bytedata)
+
+    def recv_packet(self):
+        return Ether(self.recv_bytes())
+
+    def recv_bytes(self):
+        return self.s.recv(Interface.MAX_PACKET_SIZE)
+
+
+class TestTransportMode(unittest.TestCase):
+    # There is a bug in the IPsec Scapy implementation
+    # which causes invalid packet reconstruction after
+    # successful decryption. This method is a workaround.
+    @staticmethod
+    def decrypt(pkt, sa):
+        esp = pkt[ESP]
+
+        # decrypt dummy packet with no extensions
+        d = sa.decrypt(IPv6()/esp)
+
+        # fix 'next header' in the preceding header of the original
+        # packet and remove ESP
+        pkt[ESP].underlayer.nh = d[IPv6].nh
+        pkt[ESP].underlayer.remove_payload()
+
+        # combine L3 header with decrypted payload
+        npkt = pkt/d[IPv6].payload
+
+        # fix length
+        npkt[IPv6].plen = d[IPv6].plen + len(pkt[IPv6].payload)
+
+        return npkt
+
+    def setUp(self):
+        self.ilocal = Interface(LOCAL_IFACE)
+        self.iremote = Interface(REMOTE_IFACE)
+        self.outb_sa = SecurityAssociation(ESP, spi=5, crypt_algo='AES-CBC', crypt_key='\x00'*16, auth_algo='HMAC-SHA1-96', auth_key='\x00'*20)
+        self.inb_sa = SecurityAssociation(ESP, spi=6, crypt_algo='AES-CBC', crypt_key='\x00'*16, auth_algo='HMAC-SHA1-96', auth_key='\x00'*20)
+
+    def test_outb_ipv6_noopt(self):
+        pkt = Ether(src=SRC_ETHER, dst=DST_ETHER)
+        pkt /= IPv6(src=SRC_ADDR, dst=DST_ADDR)
+        pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
+        self.ilocal.send_packet(pkt)
+
+        # check response
+        resp = self.iremote.recv_packet()
+        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
+        self.assertEqual(resp[ESP].spi, 5)
+
+        # decrypt response, check packet after decryption
+        d = TestTransportMode.decrypt(resp[IPv6], self.outb_sa)
+        self.assertEqual(d[IPv6].nh, socket.IPPROTO_UDP)
+        self.assertEqual(d[UDP].sport, 123)
+        self.assertEqual(d[UDP].dport, 456)
+        self.assertEqual(bytes(d[UDP].payload), b'abc')
+
+    def test_outb_ipv6_opt(self):
+        hoptions = []
+        hoptions.append(RouterAlert(value=2))
+        hoptions.append(Jumbo(jumboplen=5000))
+        hoptions.append(Pad1())
+
+        doptions = []
+        doptions.append(HAO(hoa="1234::4321"))
+
+        pkt = Ether(src=SRC_ETHER, dst=DST_ETHER)
+        pkt /= IPv6(src=SRC_ADDR, dst=DST_ADDR)
+        pkt /= IPv6ExtHdrHopByHop(options=hoptions)
+        pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"])
+        pkt /= IPv6ExtHdrDestOpt(options=doptions)
+        pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
+        self.ilocal.send_packet(pkt)
+
+        # check response
+        resp = self.iremote.recv_packet()
+        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS)
+
+        # check extensions
+        self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
+        self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
+        self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_ESP)
+
+        # check ESP
+        self.assertEqual(resp[ESP].spi, 5)
+
+        # decrypt response, check packet after decryption
+        d = TestTransportMode.decrypt(resp[IPv6], self.outb_sa)
+        self.assertEqual(d[IPv6].nh, socket.IPPROTO_HOPOPTS)
+        self.assertEqual(d[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
+        self.assertEqual(d[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
+        self.assertEqual(d[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP)
+
+        # check UDP
+        self.assertEqual(d[UDP].sport, 123)
+        self.assertEqual(d[UDP].dport, 456)
+        self.assertEqual(bytes(d[UDP].payload), b'abc')
+
+    def test_inb_ipv6_noopt(self):
+        # encrypt and send raw UDP packet
+        pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR)
+        pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
+        e = self.inb_sa.encrypt(pkt)
+        e = Ether(src=DST_ETHER, dst=SRC_ETHER)/e
+        self.iremote.send_packet(e)
+
+        # check response
+        resp = self.ilocal.recv_packet()
+        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
+
+        # check UDP packet
+        self.assertEqual(resp[UDP].sport, 123)
+        self.assertEqual(resp[UDP].dport, 456)
+        self.assertEqual(bytes(resp[UDP].payload), b'abc')
+
+    def test_inb_ipv6_opt(self):
+        hoptions = []
+        hoptions.append(RouterAlert(value=2))
+        hoptions.append(Jumbo(jumboplen=5000))
+        hoptions.append(Pad1())
+
+        doptions = []
+        doptions.append(HAO(hoa="1234::4321"))
+
+        # prepare packet with options
+        pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR)
+        pkt /= IPv6ExtHdrHopByHop(options=hoptions)
+        pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"])
+        pkt /= IPv6ExtHdrDestOpt(options=doptions)
+        pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
+
+        # encrypt and send packet
+        e = self.inb_sa.encrypt(pkt)
+        e = Ether(src=DST_ETHER, dst=SRC_ETHER)/e
+        self.iremote.send_packet(e)
+
+        # check response
+        resp = self.ilocal.recv_packet()
+        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS)
+        self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
+        self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
+        self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP)
+
+        # check UDP
+        self.assertEqual(resp[UDP].sport, 123)
+        self.assertEqual(resp[UDP].dport, 456)
+        self.assertEqual(bytes(resp[UDP].payload), b'abc')
+
+    def test_inb_ipv6_frag(self):
+        # prepare ESP payload
+        pkt = UDP(sport=123,dport=456)/Raw(load="abc")
+        e = self.inb_sa.encrypt(IPv6()/pkt)
+
+        # craft and send inbound packet
+        e = Ether(src=DST_ETHER, dst=SRC_ETHER)/IPv6(src=DST_ADDR, dst=SRC_ADDR)/IPv6ExtHdrFragment()/e[IPv6].payload
+        self.iremote.send_packet(e)
+
+        # check response
+        resp = self.ilocal.recv_packet()
+        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_FRAGMENT)
+        self.assertEqual(resp[IPv6ExtHdrFragment].nh, socket.IPPROTO_UDP)
+
+        # check UDP
+        self.assertEqual(resp[UDP].sport, 123)
+        self.assertEqual(resp[UDP].dport, 456)
+        self.assertEqual(bytes(resp[UDP].payload), b'abc')
+
+
+if __name__ == "__main__":
+    unittest.main()