Skip to content

Commit

Permalink
remote: add sniffer.py
Browse files Browse the repository at this point in the history
  • Loading branch information
doronz88 committed Jul 16, 2023
1 parent 6b660a6 commit 6a1d1fc
Showing 1 changed file with 247 additions and 0 deletions.
247 changes: 247 additions & 0 deletions pymobiledevice3/remote/sniffer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
import logging
import struct
from pprint import pformat
from typing import List, Optional

import click
import coloredlogs
from construct import ConstError, StreamError
from hexdump import hexdump
from remotexpc import HTTP2_MAGIC
from scapy.contrib.http2 import H2Frame
from scapy.layers.inet import IP, TCP
from scapy.layers.inet6 import IPv6
from scapy.packet import Packet
from scapy.sendrecv import sniff

from pymobiledevice3.remote.core_device_tunnel_service import PairingDataComponentTLVBuf
from pymobiledevice3.remote.xpc_message import get_object_from_xpc_wrapper

logger = logging.getLogger()

coloredlogs.install(level=logging.DEBUG)


class TCPStream:
def __init__(self, key):
self.key = key
self.data = bytearray()
self.seq: Optional[int] = None # so we know seq hasn't been initialized yet
self.segments = {} # data segments to add later

def __repr__(self) -> str:
return f'Stream<{self.key}>'

def __len__(self) -> int:
return len(self.data)

def src(self) -> str:
return self.key.split('/')[0]

def dst(self) -> str:
return self.key.split('/')[3]

def sport(self) -> int:
return int(self.key.split('/')[1])

def dport(self) -> int:
return int(self.key.split('/')[4])

def add(self, tcp_pkt: TCP) -> bool:
"""
Returns True if we added an in-order segment, False if not
"""
# if this is a new stream
if self.seq is None:
# set initial seq
self.seq = tcp_pkt.seq
# grab payload bytes
data = bytes(tcp_pkt.payload)
data_len = len(data)
seq_idx = tcp_pkt.seq - self.seq
if len(self.data) < seq_idx:
# if this data is out of order and needs to be inserted later
self.segments[seq_idx] = data
return False
else:
# if this data is in order (has a place to be inserted)
self.data[seq_idx:seq_idx + data_len] = data
# check if there are any waiting data segments to add
for seq_i in sorted(self.segments.keys()):
if seq_i <= len(self.data): # if we can add this segment to the stream
pl = self.segments[seq_i]
self.data[seq_i:seq_i + len(pl)] = pl
self.segments.pop(seq_i)
else:
break # short circuit because list is sorted
return True

def pop_magic(self) -> bool:
""" If self.data starts with the http/2 magic bytes, pop them off """
magic = HTTP2_MAGIC
magic_len = len(magic)
if self.data[:magic_len] == magic:
self.data = self.data[magic_len:]
self.seq += magic_len
return True
return False

def pop_frames(self) -> List[H2Frame]:
""" Pop all available H2Frames """
# iterate over self.data and attempt to form HTTP/2 frames
frame_size = len(H2Frame())
frames = []
while len(self.data) >= frame_size:
try:
frame_len = H2Frame(self.data).len
except AssertionError: # when not enough data
break
# if we've got a frame, but don't have all the data for it yet
if frame_len > len(self.data):
break # without adding this frame
# if we pop this frame, remove its data from self.data
# and push self.seq up by len(frame)
current_frame_size = frame_size + frame_len
frame = H2Frame(self.data[:current_frame_size])
self.data = self.data[current_frame_size:]
self.seq += current_frame_size
frames.append(frame)
return frames


class RemoteXPCSniffer:
def __init__(self):
self._tcp_streams = {}
self._previous_frame_data = b''

def process_packet(self, packet: Packet) -> None:
if packet.haslayer(TCP) and packet[TCP].payload:
self._process_tcp(packet)

def _process_tcp(self, pkt: Packet) -> None:
# we are going to separate TCP packets into TCP streams between unique
# endpoints (ip/port) then, for each stream, we will create a new TCPStream
# object and pass TCP packets into it TCPStream objects will take the bytes
# from each TCP packet and add them to the stream. No error correction /
# checksum checking will be done. The stream will just overwrite its bytes
# with whatever is presented in the packets. If the stream receives packets
# out of order, it will add the bytes at the proper index.
if pkt.haslayer(IP):
net_pkt = pkt[IP]
elif pkt.haslayer(IPv6):
net_pkt = pkt[IPv6]
else:
return
# we assume the parent function already checked to make sure this packet has a TCP layer
tcp_pkt = pkt[TCP]
stream_id = self._create_stream_id(net_pkt.src, net_pkt.dst, tcp_pkt.sport, tcp_pkt.dport)
tcp_stream = self._tcp_streams.setdefault(stream_id, TCPStream(stream_id))
stream_finished_assembling = tcp_stream.add(tcp_pkt)
if stream_finished_assembling: # if we just added something in order
self._process_stream(tcp_stream)

@staticmethod
def _handle_no_frames(tcp_stream: TCPStream) -> None:
"""
this might be because we just got a HUGE frame and have to wait for
it to be reassembled, so check the first three bytes as a length
field and see if tcp_stream is shorter than this
"""
if len(tcp_stream) >= 3:
len_bytes = struct.unpack('BBB', tcp_stream.data[:3])
potential_len = (len_bytes[0] << 16) + (
len_bytes[1] << 8) + len_bytes[2]
# ^^^ this is big-endian for some reason
if potential_len > len(tcp_stream):
logger.debug(f'Received {len(tcp_stream)} bytes of a {potential_len}-byte http/2 frame')
return
logger.warning(f'{tcp_stream} doesn\'t appear to have an http/2 frame')
hexdump(tcp_stream.data)

def _handle_data_frame(self, tcp_stream: TCPStream, frame: H2Frame) -> None:
try:
xpc_message = get_object_from_xpc_wrapper(self._previous_frame_data + frame.data)
self._previous_frame_data = b''

if xpc_message is not None:
logger.info(f'As Python Object: {pformat(xpc_message)}')
if 'value' not in xpc_message:
return
message = xpc_message['value']['message']
if 'plain' not in message:
return
plain = message['plain']['_0']
if 'event' not in plain:
return
pairing_data = plain['event']['_0']['pairingData']['_0']['data']
logger.info(PairingDataComponentTLVBuf.parse(pairing_data))
except ConstError: # if we don't know what this payload is
logger.debug(
f'New Data frame {tcp_stream.src()}->{tcp_stream.dst()} on '
f'HTTP/2 stream {frame.stream_id} TCP port {tcp_stream.dport()}')
hexdump(frame.data[:64])
if len(frame.data) > 64:
logger.debug(f'... {len(frame.data)} bytes')
except StreamError:
self._previous_frame_data += frame.data

def _handle_single_frame(self, tcp_stream: TCPStream, frame: H2Frame) -> None:
logger.debug(f'New HTTP/2 frame: {tcp_stream.key}')
if frame.fields.get('type', None) == 1: # Header Frame
logger.debug(
f'{tcp_stream.src()} opening stream {frame.stream_id} for communication on port {tcp_stream.dport()}')
elif frame.fields.get('type', None) == 3: # Reset Frame
logger.debug(f'{tcp_stream.src()} closing stream {frame.stream_id} on port '
f'{tcp_stream.sport()}')
elif frame.fields.get('type', None) == 0: # Data Frame
self._handle_data_frame(tcp_stream, frame)

def _process_stream(self, tcp_stream: TCPStream) -> None:
if tcp_stream.pop_magic():
logger.debug('HTTP/2 magic bytes')
# Does this tcp_stream contain an HTTP/2 frame?
frames = tcp_stream.pop_frames()
# if we get back an empty list, then the stream may have something else on
# it, but I don't know what that would be right now
if len(frames) == 0:
self._handle_no_frames(tcp_stream)
return

# each packet can store multiple frames -- we only care about data frames
for frame in frames:
self._handle_single_frame(tcp_stream, frame)

@staticmethod
def _create_stream_id(src: str, dst: str, sport: int, dport: int) -> str:
""" Create a stream identifier for stream segmentation """
s = f'{src}/{sport}'
d = f'{dst}/{dport}'
return '//'.join([s, d]) # we use this for directional streams


@click.group()
def cli():
""" Parse RemoteXPC traffic """
pass


@cli.command()
@click.argument('file', type=click.Path(exists=True, file_okay=True, dir_okay=False))
def offline(file: str):
""" Parse RemoteXPC traffic from a .pcap file """
sniffer = RemoteXPCSniffer()
for p in sniff(offline=file):
sniffer.process_packet(p)


@cli.command()
@click.argument('iface')
def live(iface: str):
""" Parse RemoteXPC live from a given network interface """
sniffer = RemoteXPCSniffer()
for p in sniff(iface=iface):
sniffer.process_packet(p)


if __name__ == '__main__':
cli()

0 comments on commit 6a1d1fc

Please sign in to comment.