diff --git a/pymobiledevice3/remote/sniffer.py b/pymobiledevice3/remote/sniffer.py new file mode 100644 index 000000000..0ae893023 --- /dev/null +++ b/pymobiledevice3/remote/sniffer.py @@ -0,0 +1,208 @@ +import logging +from pprint import pformat +from typing import List, Mapping, MutableMapping, Optional + +import click +import coloredlogs +from construct import ConstError, StreamError +from hexdump import hexdump +from hyperframe.frame import DataFrame, Frame, GoAwayFrame, HeadersFrame +from remotexpc import HTTP2_MAGIC +from scapy.layers.inet import IP, TCP +from scapy.layers.inet6 import IPv6 +from scapy.packet import Packet +from scapy.sendrecv import sniff + +from pymobiledevice3.remote.core_device_tunnel_service import PairingDataComponentTLVBuf +from pymobiledevice3.remote.xpc_message import get_object_from_xpc_wrapper + +logger = logging.getLogger() + +coloredlogs.install(level=logging.DEBUG) + +FRAME_HEADER_SIZE = 9 + + +class TCPStream: + def __init__(self, key: str): + self.key = key + self.data = bytearray() + self.seq: Optional[int] = None # so we know seq hasn't been initialized yet + self.segments = {} # data segments to add later + + def __repr__(self) -> str: + return f'Stream<{self.key}>' + + def __len__(self) -> int: + return len(self.data) + + def src(self) -> str: + return self.key.split('/')[0] + + def dst(self) -> str: + return self.key.split('/')[3] + + def sport(self) -> int: + return int(self.key.split('/')[1]) + + def dport(self) -> int: + return int(self.key.split('/')[4]) + + def add(self, tcp_pkt: TCP) -> bool: + """ + Returns True if we added an in-order segment, False if not + """ + if self.seq is None: + # set initial seq + self.seq = tcp_pkt.seq + data = bytes(tcp_pkt.payload) + data_len = len(data) + seq_offset = tcp_pkt.seq - self.seq + if len(self.data) < seq_offset: + # if this data is out of order and needs to be inserted later + self.segments[seq_offset] = data + return False + else: + # if this data is in order (has a place to be inserted) + self.data[seq_offset:seq_offset + data_len] = data + # check if there are any waiting data segments to add + for seq_offset in sorted(self.segments.keys()): + if seq_offset <= len(self.data): # if we can add this segment to the stream + segment_payload = self.segments[seq_offset] + self.data[seq_offset:seq_offset + len(segment_payload)] = segment_payload + self.segments.pop(seq_offset) + else: + break # short circuit because list is sorted + return True + + +class H2Stream(TCPStream): + def pop_magic(self) -> None: + """ If self.data starts with the http/2 magic bytes, pop them off """ + if self.data.startswith(HTTP2_MAGIC): + logger.debug('HTTP/2 magic bytes') + self.data = self.data[len(HTTP2_MAGIC):] + self.seq += len(HTTP2_MAGIC) + + def pop_frames(self) -> List[Frame]: + """ Pop all available H2Frames """ + frames = [] + while len(self.data) >= FRAME_HEADER_SIZE: + frame, additional_size = Frame.parse_frame_header(memoryview(self.data[:FRAME_HEADER_SIZE])) + if len(self.data) - FRAME_HEADER_SIZE < additional_size: + # the frame has an incomplete body + break + self.data = self.data[FRAME_HEADER_SIZE:] + frame.parse_body(memoryview(self.data[:additional_size])) + self.data = self.data[additional_size:] + self.seq += FRAME_HEADER_SIZE + additional_size + frames.append(frame) + return frames + + +class RemoteXPCSniffer: + def __init__(self): + self._h2_streams: Mapping[str, H2Stream] = {} + self._previous_frame_data: MutableMapping[str, bytes] = {} + + def process_packet(self, packet: Packet) -> None: + if packet.haslayer(TCP) and packet[TCP].payload: + self._process_tcp(packet) + + def _process_tcp(self, pkt: Packet) -> None: + # we are going to separate TCP packets into TCP streams between unique + # endpoints (ip/port) then, for each stream, we will create a new H2Stream + # object and pass TCP packets into it H2Stream objects will take the bytes + # from each TCP packet and add them to the stream. No error correction / + # checksum checking will be done. The stream will just overwrite its bytes + # with whatever is presented in the packets. If the stream receives packets + # out of order, it will add the bytes at the proper index. + if pkt.haslayer(IP): + net_pkt = pkt[IP] + elif pkt.haslayer(IPv6): + net_pkt = pkt[IPv6] + else: + return + tcp_pkt = pkt[TCP] + stream_id = f'{net_pkt.src}/{tcp_pkt.sport}//{net_pkt.dst}/{tcp_pkt.dport}' + stream = self._h2_streams.setdefault(stream_id, H2Stream(stream_id)) + stream_finished_assembling = stream.add(tcp_pkt) + if stream_finished_assembling: # if we just added something in order + self._process_stream(stream) + + def _handle_data_frame(self, stream: H2Stream, frame: DataFrame) -> None: + previous_frame_data = self._previous_frame_data.get(stream.key, b'') + try: + xpc_message = get_object_from_xpc_wrapper(previous_frame_data + frame.data) + except ConstError: # if we don't know what this payload is + logger.debug( + f'New Data frame {stream.src()}->{stream.dst()} on ' + f'HTTP/2 stream {frame.stream_id} TCP port {stream.dport()}') + hexdump(frame.data[:64]) + if len(frame.data) > 64: + logger.debug(f'... {len(frame.data)} bytes') + return + except StreamError: + self._previous_frame_data[stream.key] = previous_frame_data + frame.data + return + + if stream.key in self._previous_frame_data: + self._previous_frame_data.pop(stream.key) + + if xpc_message is None: + return + + logger.info(f'As Python Object: {pformat(xpc_message)}') + if 'value' not in xpc_message: + return + message = xpc_message['value']['message'] + if 'plain' not in message: + return + plain = message['plain']['_0'] + if 'event' not in plain: + return + pairing_data = plain['event']['_0']['pairingData']['_0']['data'] + logger.info(PairingDataComponentTLVBuf.parse(pairing_data)) + + def _handle_single_frame(self, stream: H2Stream, frame: Frame) -> None: + logger.debug(f'New HTTP/2 frame: {stream.key} ({frame})') + if isinstance(frame, HeadersFrame): + logger.debug( + f'{stream.src()} opening stream {frame.stream_id} for communication on port {stream.dport()}') + elif isinstance(frame, GoAwayFrame): + logger.debug(f'{stream.src()} closing stream {frame.stream_id} on port {stream.sport()}') + elif isinstance(frame, DataFrame): + self._handle_data_frame(stream, frame) + + def _process_stream(self, stream: H2Stream) -> None: + stream.pop_magic() + for frame in stream.pop_frames(): + self._handle_single_frame(stream, frame) + + +@click.group() +def cli(): + """ Parse RemoteXPC traffic """ + pass + + +@cli.command() +@click.argument('file', type=click.Path(exists=True, file_okay=True, dir_okay=False)) +def offline(file: str): + """ Parse RemoteXPC traffic from a .pcap file """ + sniffer = RemoteXPCSniffer() + for p in sniff(offline=file): + sniffer.process_packet(p) + + +@cli.command() +@click.argument('iface') +def live(iface: str): + """ Parse RemoteXPC live from a given network interface """ + sniffer = RemoteXPCSniffer() + for p in sniff(iface=iface): + sniffer.process_packet(p) + + +if __name__ == '__main__': + cli()