diff --git a/adb/adb_commands.py b/adb/adb_commands.py index 734e31c..6b6e028 100644 --- a/adb/adb_commands.py +++ b/adb/adb_commands.py @@ -256,9 +256,9 @@ def Push(self, source_file, device_filename, mtime='0', timeout_ms=None, progres device_filename: Destination on the device to write to. mtime: Optional, modification time to set on the file. timeout_ms: Expected timeout for any part of the push. - st_mode: stat mode for filename progress_callback: callback method that accepts filename, bytes_written and total_bytes, total_bytes will be -1 for file-like objects + st_mode: stat mode for filename """ if isinstance(source_file, str): diff --git a/adb/adb_protocol.py b/adb/adb_protocol.py index 4ff28c7..79b439f 100644 --- a/adb/adb_protocol.py +++ b/adb/adb_protocol.py @@ -19,6 +19,7 @@ import struct import time +import re from io import BytesIO from adb import usb_exceptions @@ -323,7 +324,7 @@ def Connect(cls, usb, banner=b'notadb', rsa_keys=None, auth_timeout_ms=100): 'Unknown AUTH response: %s %s %s' % (arg0, arg1, banner)) # Do not mangle the banner property here by converting it to a string - signed_token = rsa_key.Sign(banner) + signed_token = rsa_key.Sign(banner) + b'\0' msg = cls( command=b'AUTH', arg0=AUTH_SIGNATURE, arg1=0, data=signed_token) msg.Send(usb) @@ -551,8 +552,11 @@ def InteractiveShellCommand(cls, conn, cmd=None, strip_cmd=True, delim=None, str stdout = stdout.split(b'\r\r\n')[1] # Strip delim if requested - # TODO: Handling stripping partial delims here - not a deal breaker the way we're handling it now if delim and strip_delim: + prefix_exp = re.compile(r'(?P\d{1,3}\|)'+delim.decode('utf-8', errors='ignore')) + match = re.match(prefix_exp, stdout.decode('utf-8', errors='ignore')) + if match: + stdout = stdout.replace(str(match.group('prefix') + delim).encode('utf-8'), b'') stdout = stdout.replace(delim, b'') stdout = stdout.rstrip() diff --git a/adb/common_cli.py b/adb/common_cli.py index b4ab5e8..ccecfda 100644 --- a/adb/common_cli.py +++ b/adb/common_cli.py @@ -27,6 +27,7 @@ import re import sys import types +import traceback from adb import usb_exceptions @@ -158,7 +159,7 @@ def StartCli(args, adb_commands, extra=None, **device_kwargs): try: return _RunMethod(dev, args, extra or {}) except Exception as e: # pylint: disable=broad-except - sys.stdout.write(str(e)) + sys.stdout.write(traceback.format_exc()) return 1 finally: dev.Close() diff --git a/adb/fastboot.py b/adb/fastboot.py index 1507494..a83684d 100644 --- a/adb/fastboot.py +++ b/adb/fastboot.py @@ -19,6 +19,7 @@ import logging import os import struct +from io import BytesIO, StringIO from adb import common from adb import usb_exceptions @@ -99,9 +100,9 @@ def HandleSimpleResponses( info_cb: Optional callback for text sent from the bootloader. Returns: - OKAY packet's message. + OKAY packet's message """ - return self._AcceptResponses(b'OKAY', info_cb, timeout_ms=timeout_ms) + return self._AcceptResponses(b'OKAY', info_cb, timeout_ms=timeout_ms)[0] def HandleDataSending(self, source_file, source_len, info_cb=DEFAULT_MESSAGE_CALLBACK, @@ -123,9 +124,9 @@ def HandleDataSending(self, source_file, source_len, FastbootInvalidResponse: Fastboot responded with an unknown packet type. Returns: - OKAY packet's message. + tuple (OKAY packet's message, List of preceding Fastboot Messages) """ - accepted_size = self._AcceptResponses( + accepted_size, _msgs = self._AcceptResponses( b'DATA', info_cb, timeout_ms=timeout_ms) accepted_size = binascii.unhexlify(accepted_size[:8]) @@ -151,24 +152,33 @@ def _AcceptResponses(self, expected_header, info_cb, timeout_ms=None): FastbootInvalidResponse: Fastboot responded with an unknown packet type. Returns: - OKAY packet's message. + tuple (OKAY packet's message, List of preceding Fastboot Messages) """ + + messages = [] + while True: response = self.usb.BulkRead(64, timeout_ms=timeout_ms) header = bytes(response[:4]) remaining = bytes(response[4:]) if header == b'INFO': - info_cb(FastbootMessage(remaining, header)) + fbm = FastbootMessage(remaining, header) + messages.append(fbm) + info_cb(fbm) elif header in self.FINAL_HEADERS: if header != expected_header: raise FastbootStateMismatch( 'Expected %s, got %s', expected_header, header) if header == b'OKAY': - info_cb(FastbootMessage(remaining, header)) - return remaining + fbm = FastbootMessage(remaining, header) + messages.append(fbm) + info_cb(fbm) + return remaining, messages elif header == b'FAIL': - info_cb(FastbootMessage(remaining, header)) + fbm = FastbootMessage(remaining, header) + messages.append(fbm) + info_cb(fbm) raise FastbootRemoteFailure('FAIL: %s', remaining) else: raise FastbootInvalidResponse( @@ -188,6 +198,7 @@ def _HandleProgress(self, total, progress_callback): def _Write(self, data, length, progress_callback=None): """Sends the data to the device, tracking progress with the callback.""" + progress = None if progress_callback: progress = self._HandleProgress(length, progress_callback) next(progress) @@ -310,20 +321,24 @@ def Download(self, source_file, source_len=0, Returns: Response to a download request, normally nothing. """ + if isinstance(source_file, str): + source_file_path = str(source_file) source_len = os.stat(source_file).st_size - source_file = open(source_file) - - with source_file: - if source_len == 0: - # Fall back to storing it all in memory :( - data = source_file.read() - source_file = io.BytesIO(data.encode('utf8')) - source_len = len(data) + with open(source_file_path, 'rb') as fh: + source_file = BytesIO(fh.read()) + + if not source_len: + if isinstance(source_file, StringIO): + source_file.seek(0, os.SEEK_END) + source_len = source_file.tell() + source_file.seek(0) + else: + source_len = len(source_file) - self._protocol.SendCommand(b'download', b'%08x' % source_len) - return self._protocol.HandleDataSending( - source_file, source_len, info_cb, progress_callback=progress_callback) + self._protocol.SendCommand(b'download', b'%08x' % source_len) + return self._protocol.HandleDataSending( + source_file, source_len, info_cb, progress_callback=progress_callback)[0] def Flash(self, partition, timeout_ms=0, info_cb=DEFAULT_MESSAGE_CALLBACK): """Flashes the last downloaded file to the given partition. @@ -396,3 +411,19 @@ def Reboot(self, target_mode=b'', timeout_ms=None): def RebootBootloader(self, timeout_ms=None): """Reboots into the bootloader, usually equiv to Reboot('bootloader').""" return self._SimpleCommand(b'reboot-bootloader', timeout_ms=timeout_ms) + + def Boot(self, source_file): + """Fastboot boot image by sending image from local file system then issuing the boot command + + Args: + source_file: String file path to the image to send and boot + + Returns: + None + """ + + if not os.path.exists(source_file): + raise ValueError("source_file must exist") + + self.Download(source_file) + self._SimpleCommand(b'boot') diff --git a/adb/fastboot_debug.py b/adb/fastboot_debug.py index e168f69..7d83c68 100755 --- a/adb/fastboot_debug.py +++ b/adb/fastboot_debug.py @@ -86,6 +86,9 @@ def main(): subparsers, parents, fastboot.FastbootCommands.Oem) common_cli.MakeSubparser( subparsers, parents, fastboot.FastbootCommands.Reboot) + common_cli.MakeSubparser( + subparsers, parents, fastboot.FastbootCommands.Boot, + {'source_file': 'Image file on the host to push and boot'}) if len(sys.argv) == 1: parser.print_help() diff --git a/adb/sign_pycryptodome.py b/adb/sign_pycryptodome.py index 6a61ce9..9f68006 100644 --- a/adb/sign_pycryptodome.py +++ b/adb/sign_pycryptodome.py @@ -1,8 +1,8 @@ from adb import adb_protocol -from Crypto.Hash import SHA256 from Crypto.PublicKey import RSA from Crypto.Signature import pkcs1_15 +from Crypto.Util import number class PycryptodomeAuthSigner(adb_protocol.AuthSigner): @@ -18,8 +18,48 @@ def __init__(self, rsa_key_path=None): self.rsa_key = RSA.import_key(rsa_priv_file.read()) def Sign(self, data): - h = SHA256.new(data) - return pkcs1_15.new(self.rsa_key).sign(h) + # Prepend precomputed ASN1 hash code for SHA1 + data = b'\x30\x21\x30\x09\x06\x05\x2b\x0e\x03\x02\x1a\x05\x00\x04\x14' + data + pkcs = pkcs1_15.new(self.rsa_key) + + # See 8.2.1 in RFC3447 + modBits = number.size(pkcs._key.n) + k = pkcs1_15.ceil_div(modBits,8) # Convert from bits to bytes + + # Step 2a (OS2IP) + em_int = pkcs1_15.bytes_to_long(PycryptodomeAuthSigner._pad_for_signing(data, k)) + # Step 2b (RSASP1) + m_int = pkcs._key._decrypt(em_int) + # Step 2c (I2OSP) + signature = pkcs1_15.long_to_bytes(m_int, k) + + return signature def GetPublicKey(self): return self.public_key + + @staticmethod + def _pad_for_signing(message, target_length): + """Pads the message for signing, returning the padded message. + + The padding is always a repetition of FF bytes. + + Function from python-rsa to replace _EMSA_PKCS1_V1_5_ENCODE's for our use case + + :return: 00 01 PADDING 00 MESSAGE + + """ + + max_msglength = target_length - 11 + msglength = len(message) + + if msglength > max_msglength: + raise OverflowError('%i bytes needed for message, but there is only' + ' space for %i' % (msglength, max_msglength)) + + padding_length = target_length - msglength - 3 + + return b''.join([b'\x00\x01', + padding_length * b'\xff', + b'\x00', + message])