Scapy utilities#

Overview#

  • ISOTP(Packet): Scapy Packet object with fragmentation and defragmentation support

  • ISOTPHeader and ISOTPHeaderEA: Packet classes for parsing CAN-traffic

  • ISOTPMessageBuilder: Helper class for defragmentation of CAN traffic into ISOTP packets

  • ISOTPSession: Scapy Session class to create ISOTP packets on the fly during a sniff()

ISOTP(Packet)#

  • Create an ISOTP packet

from scapy.all import *
from scapy.contrib.isotp import *

p = ISOTP(b"deadbeef", tx_id=0x123, rx_id=0x321)
p.show()
###[ ISOTP ]###
  data      = b'deadbeef'
  • Fragment ISOTP packet into CAN frames

frames = p.fragment()
frames
[<CAN  identifier=0x321 data=b'\x10\x08deadbe' |>,
 <CAN  identifier=0x321 data=b'!ef' |>]
  • Analyze ISOTP frame

ISOTPHeader(bytes(frames[0])).show()
###[ ISOTPHeader ]###
  flags     = 
  identifier= 0x321
  length    = 8
  reserved  = 0
###[ ISOTPFirstFrame ]###
     type      = first
     message_size= 8
     data      = b'deadbe'
  • Analyze ISOTP frame

ISOTPHeader(bytes(frames[1])).show()
###[ ISOTPHeader ]###
  flags     = 
  identifier= 0x321
  length    = 3
  reserved  = 0
###[ ISOTPConsecutiveFrame ]###
     type      = consecutive
     index     = 1
     data      = b'ef'
  • Defragment CAN frames into ISOTP packet

print(frames)
ISOTP.defragment(frames)
[<CAN  identifier=0x321 data=b'\x10\x08deadbe' |>, <CAN  identifier=0x321 data=b'!ef' |>]
<ISOTP  data=b'deadbeef' |>

ISOTPMessageBuilder#

  • Create ISOTP packet

print(frames)
builder = ISOTPMessageBuilder()
builder.feed(frames)
print(len(builder))
isotp_msg = builder.pop()
print(repr(isotp_msg))
print(len(builder))
[<CAN  identifier=0x321 data=b'\x10\x08deadbe' |>, <CAN  identifier=0x321 data=b'!ef' |>]
1
<ISOTP  data=b'deadbeef' |>
0

ISOTPSession#

  • Create CAN packets

frames = ISOTP(b"This is a long ISOTP message", tx_id=0x123).fragment()
frames += ISOTP(b"short", tx_id=0x123).fragment()
frames += ISOTP(b"And a long message again", tx_id=0x45).fragment()
for f in frames:
    print(repr(f))
<CAN  identifier=None data=b'\x10\x1cThis i' |>
<CAN  identifier=None data=b'!s a lon' |>
<CAN  identifier=None data=b'"g ISOTP' |>
<CAN  identifier=None data=b'# messag' |>
<CAN  identifier=None data=b'$e' |>
<CAN  identifier=None data=b'\x05short' |>
<CAN  identifier=None data=b'\x10\x18And a ' |>
<CAN  identifier=None data=b'!long me' |>
<CAN  identifier=None data=b'"ssage a' |>
<CAN  identifier=None data=b'#gain' |>
  • Serialize CAN packets to pcap

wrpcap("/tmp/can.pcap", frames)
!ls -lah /tmp/can.pcap
-rw-r--r-- 1 root root 333 Dec 19 21:10 /tmp/can.pcap
  • Use PcapReader as socket and extract ISOTP messages from CAN frames

with PcapReader("/tmp/can.pcap") as sock:
    msgs = sniff(session=ISOTPSession,
                 session_kwargs={"use_ext_addr":False, 
                                 "basecls":ISOTP},
                 count=3, opened_socket=sock)

print(msgs)
print(repr(msgs[0]))
print(repr(msgs[1]))
print(repr(msgs[2]))
<Sniffed: TCP:0 UDP:0 ICMP:0 Other:3>
<ISOTP  data=b'This is a long ISOTP message' |>
<ISOTP  data=b'short' |>
<ISOTP  data=b'And a long message again' |>

ISOTP MITM attack#

Communication overview#

../../_images/isotpmitm.svg

Fig. 57 ISO-TP fragmented communication#

Scapy example#

In this example, we use vcan0 and vcan1 as interfaces. Source and Destination communicate over the ISOTP addresses 0x241 and 0x641.

  • Setup two CAN interfaces

    sudo modprobe vcan
    sudo ip link add name vcan0 type vcan
    sudo ip link add name vcan1 type vcan
    sudo ip link set dev vcan0 up
    sudo ip link set dev vcan1 up
    
  • Import necessary modules in Scapy

    import threading
    load_contrib('cansocket')
    conf.contribs['ISOTP'] = {'use-can-isotp-kernel-module': True}
    load_contrib('isotp')
    
  • Create a forward function. Attack code need to be placed here.

    def forwarding(pkt):
        pkt.data = b"ATTACKED"
        return pkt
    
  • Create attacker sockets and run MITM attack

    bSocket0 = ISOTPSocket('vcan0', tx_id=0x641, rx_id=0x241)
    bSocket1 = ISOTPSocket('vcan1', tx_id=0x241, rx_id=0x641)
    bridge_and_sniff(if1=bSocket0, if2=bSocket1,
                     xfrm12=forwarding, xfrm21=forwarding,
                     timeout=1)