Skip to content

Commit

Permalink
remote: add sniffer.py
Browse files Browse the repository at this point in the history
  • Loading branch information
doronz88 committed Jul 16, 2023
1 parent 6b660a6 commit 84bb255
Showing 1 changed file with 208 additions and 0 deletions.
208 changes: 208 additions & 0 deletions pymobiledevice3/remote/sniffer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
import logging
from pprint import pformat
from typing import List, Optional

import click
import coloredlogs
from construct import ConstError, StreamError, Int24ul
from hexdump import hexdump
from scapy.contrib.http2 import H2Frame
from scapy.layers.inet import IP, TCP
from scapy.layers.inet6 import IPv6
from scapy.packet import Packet
from scapy.sendrecv import sniff

from pymobiledevice3.remote.core_device_tunnel_service import PairingDataComponentTLVBuf
from pymobiledevice3.remote.xpc_message import get_object_from_xpc_wrapper
from remotexpc import HTTP2_MAGIC

logger = logging.getLogger()

coloredlogs.install(level=logging.DEBUG)


class TCPStream:
def __init__(self, key):
self.key = key
self.data = bytearray()
self.seq: Optional[int] = None # so we know seq hasn't been initialized yet
self.segments = {} # data segments to add later

def __repr__(self) -> str:
return f'Stream<{self.key}>'

def __len__(self) -> int:
return len(self.data)

def src(self) -> str:
return self.key.split('/')[0]

def dst(self) -> str:
return self.key.split('/')[3]

def sport(self) -> int:
return int(self.key.split('/')[1])

def dport(self) -> int:
return int(self.key.split('/')[4])

def add(self, tcp_pkt: TCP) -> bool:
"""
Returns True if we added an in-order segment, False if not
"""
# if this is a new stream
if self.seq is None:
# set initial seq
self.seq = tcp_pkt.seq
# grab payload bytes
data = bytes(tcp_pkt.payload)
data_len = len(data)
seq_idx = tcp_pkt.seq - self.seq
if len(self.data) < seq_idx:
# if this data is out of order and needs to be inserted later
self.segments[seq_idx] = data
return False
else:
# if this data is in order (has a place to be inserted)
self.data[seq_idx:seq_idx + data_len] = data
# check if there are any waiting data segments to add
for seq_i in sorted(self.segments.keys()):
if seq_i <= len(self.data): # if we can add this segment to the stream
pl = self.segments[seq_i]
self.data[seq_i:seq_i + len(pl)] = pl
self.segments.pop(seq_i)
else:
break # short circuit because list is sorted
return True

def pop_magic(self) -> bool:
""" If self.data starts with the http/2 magic bytes, pop them off """
if self.data.startswith(HTTP2_MAGIC):
self.data = self.data[len(HTTP2_MAGIC):]
self.seq += len(HTTP2_MAGIC)
return True
return False

def pop_frames(self) -> List[H2Frame]:
""" Pop all available H2Frames """
# iterate over self.data and attempt to form HTTP/2 frames
frame_size = len(H2Frame())
frames = []
while len(self.data) >= frame_size:
try:
frame_len = H2Frame(self.data).len
except AssertionError: # when not enough data
break
# if we've got a frame, but don't have all the data for it yet
if frame_len > len(self.data):
break # without adding this frame
# if we pop this frame, remove its data from self.data
# and push self.seq up by len(frame)
current_frame_size = frame_size + frame_len
frame = H2Frame(self.data[:current_frame_size])
self.data = self.data[current_frame_size:]
self.seq += current_frame_size
frames.append(frame)
return frames


class RemoteXPCSniffer:
def __init__(self):
self._tcp_streams = {}
self._previous_frame_data = b''

def process_packet(self, packet: Packet) -> None:
if packet.haslayer(TCP) and packet[TCP].payload:
self._process_tcp(packet)

def _process_tcp(self, pkt: Packet) -> None:
# we are going to separate TCP packets into TCP streams between unique
# endpoints (ip/port) then, for each stream, we will create a new TCPStream
# object and pass TCP packets into it TCPStream objects will take the bytes
# from each TCP packet and add them to the stream. No error correction /
# checksum checking will be done. The stream will just overwrite its bytes
# with whatever is presented in the packets. If the stream receives packets
# out of order, it will add the bytes at the proper index.
if pkt.haslayer(IP):
net_pkt = pkt[IP]
elif pkt.haslayer(IPv6):
net_pkt = pkt[IPv6]
else:
return
tcp_pkt = pkt[TCP]
stream_id = f'{net_pkt.src}/{tcp_pkt.sport}//{net_pkt.dst}/{tcp_pkt.dport}'
tcp_stream = self._tcp_streams.setdefault(stream_id, TCPStream(stream_id))
stream_finished_assembling = tcp_stream.add(tcp_pkt)
if stream_finished_assembling: # if we just added something in order
self._process_stream(tcp_stream)

def _handle_data_frame(self, tcp_stream: TCPStream, frame: H2Frame) -> None:
try:
xpc_message = get_object_from_xpc_wrapper(self._previous_frame_data + frame.data)
self._previous_frame_data = b''

if xpc_message is not None:
logger.info(f'As Python Object: {pformat(xpc_message)}')
if 'value' not in xpc_message:
return
message = xpc_message['value']['message']
if 'plain' not in message:
return
plain = message['plain']['_0']
if 'event' not in plain:
return
pairing_data = plain['event']['_0']['pairingData']['_0']['data']
logger.info(PairingDataComponentTLVBuf.parse(pairing_data))
except ConstError: # if we don't know what this payload is
logger.debug(
f'New Data frame {tcp_stream.src()}->{tcp_stream.dst()} on '
f'HTTP/2 stream {frame.stream_id} TCP port {tcp_stream.dport()}')
hexdump(frame.data[:64])
if len(frame.data) > 64:
logger.debug(f'... {len(frame.data)} bytes')
except StreamError:
self._previous_frame_data += frame.data

def _handle_single_frame(self, tcp_stream: TCPStream, frame: H2Frame) -> None:
logger.debug(f'New HTTP/2 frame: {tcp_stream.key}')
if frame.fields.get('type', None) == 1: # Header Frame
logger.debug(
f'{tcp_stream.src()} opening stream {frame.stream_id} for communication on port {tcp_stream.dport()}')
elif frame.fields.get('type', None) == 3: # Reset Frame
logger.debug(f'{tcp_stream.src()} closing stream {frame.stream_id} on port {tcp_stream.sport()}')
elif frame.fields.get('type', None) == 0: # Data Frame
self._handle_data_frame(tcp_stream, frame)

def _process_stream(self, tcp_stream: TCPStream) -> None:
if tcp_stream.pop_magic():
logger.debug('HTTP/2 magic bytes')
for frame in tcp_stream.pop_frames():
self._handle_single_frame(tcp_stream, frame)


@click.group()
def cli():
""" Parse RemoteXPC traffic """
pass


@cli.command()
@click.argument('file', type=click.Path(exists=True, file_okay=True, dir_okay=False))
def offline(file: str):
""" Parse RemoteXPC traffic from a .pcap file """
sniffer = RemoteXPCSniffer()
for p in sniff(offline=file):
sniffer.process_packet(p)


@cli.command()
@click.argument('iface')
def live(iface: str):
""" Parse RemoteXPC live from a given network interface """
sniffer = RemoteXPCSniffer()
for p in sniff(iface=iface):
sniffer.process_packet(p)


if __name__ == '__main__':
cli()

0 comments on commit 84bb255

Please sign in to comment.