From 8c554b2a0c05e85ce49bea867f9d7fef56220198 Mon Sep 17 00:00:00 2001 From: yinyajiang <32933356+yinyajiang@users.noreply.github.com> Date: Thu, 5 Sep 2024 10:21:01 +0800 Subject: [PATCH] restore: fix blocking bug in windows (#1172) (#1160) --- pymobiledevice3/restore/restore.py | 141 +++++++++++++++-------------- 1 file changed, 73 insertions(+), 68 deletions(-) diff --git a/pymobiledevice3/restore/restore.py b/pymobiledevice3/restore/restore.py index 8f8aeee3a..a8bd7b894 100644 --- a/pymobiledevice3/restore/restore.py +++ b/pymobiledevice3/restore/restore.py @@ -307,7 +307,7 @@ def get_recovery_os_local_policy_tss_response(self, args, build_identity=None): def get_build_identity(self, is_recovery_os: bool): if is_recovery_os: variant = RESTORE_VARIANT_MACOS_RECOVERY_OS - elif self.build_identity.restore_behavior == Behavior.Erase: + elif self.build_identity.restore_behavior == Behavior.Erase.value: variant = RESTORE_VARIANT_ERASE_INSTALL else: variant = RESTORE_VARIANT_UPGRADE_INSTALL @@ -319,7 +319,7 @@ async def send_restore_local_policy(self, message: Mapping) -> None: service = await self._get_service_for_data_request(message) # The Update mode does not have a specific build identity for the recovery os. - build_identity = self.get_build_identity(self.build_identity.restore_behavior == Behavior.Erase) + build_identity = self.get_build_identity(self.build_identity.restore_behavior == Behavior.Erase.value) tss_localpolicy = self.get_recovery_os_local_policy_tss_response(message['Arguments'], build_identity=build_identity) @@ -480,76 +480,81 @@ def sign_bbfw(self, bbfw_orig, bbtss, bb_nonce): is_fls = False signed_file = [] - with tempfile.NamedTemporaryFile() as tmp_zip_read: - with tempfile.NamedTemporaryFile() as tmp_zip_write: - bbfw_patched = zipfile.ZipFile(tmp_zip_write, 'w') - - tmp_zip_read.write(bbfw_orig) - bbfw_orig = zipfile.ZipFile(tmp_zip_read.name, 'r') - - for key, blob in bbfw_dict.items(): - if key.endswith('-Blob') and isinstance(blob, bytes): - key = key.split('-', 1)[0] - signfn = self.get_bbfw_fn_for_element(key) - - if signfn is None: - raise PyMobileDevice3Exception( - f'can\'t match element name \'{key}\' to baseband firmware file name.') - - if signfn.endswith('.fls'): - is_fls = True - - buffer = bbfw_orig.read(signfn) + with tempfile.NamedTemporaryFile(delete=False) as tmp_zip_read: + tmp_zip_read.write(bbfw_orig) + tmp_zip_read_name = tmp_zip_read.name + try: + with zipfile.ZipFile(tmp_zip_read_name, 'r') as bbfw_orig: + with tempfile.NamedTemporaryFile() as tmp_zip_write: + bbfw_patched = zipfile.ZipFile(tmp_zip_write, 'w') + + for key, blob in bbfw_dict.items(): + if key.endswith('-Blob') and isinstance(blob, bytes): + key = key.split('-', 1)[0] + signfn = self.get_bbfw_fn_for_element(key) + + if signfn is None: + raise PyMobileDevice3Exception( + f'can\'t match element name \'{key}\' to baseband firmware file name.') + + if signfn.endswith('.fls'): + is_fls = True + + buffer = bbfw_orig.read(signfn) + + if is_fls: + fls = self.fls_parse(buffer) + data = self.fls_update_sig_blob(fls, blob) + else: + parsed_sig_offset = len(buffer) - len(blob) + data = buffer[:parsed_sig_offset] + blob + + bbfw_patched.writestr(bbfw_orig.getinfo(signfn), data) + + if is_fls and (bb_nonce is None): + if key == 'RamPSI': + signed_file.append(signfn) + else: + signed_file.append(signfn) + + # remove everything but required files + for entry in bbfw_orig.filelist: + keep = False + filename = entry.filename + + if filename in signed_file: + keep = True + + # check for anything but .mbn and .fls if bb_nonce is set + if bb_nonce and not keep: + ext = os.path.splitext(filename)[1] + keep |= ext in ('.fls', '.mbn', '.elf', '.bin') + + if keep and (filename not in signed_file): + bbfw_patched.writestr(bbfw_orig.getinfo(filename), bbfw_orig.read(filename)) + + if bb_nonce: if is_fls: + # add BBTicket to file ebl.fls + buffer = bbfw_orig.read('ebl.fls') fls = self.fls_parse(buffer) - data = self.fls_update_sig_blob(fls, blob) - else: - parsed_sig_offset = len(buffer) - len(blob) - data = buffer[:parsed_sig_offset] + blob - - bbfw_patched.writestr(bbfw_orig.getinfo(signfn), data) - - if is_fls and (bb_nonce is None): - if key == 'RamPSI': - signed_file.append(signfn) + data = self.fls_insert_ticket(fls, bbticket) + bbfw_patched.writestr('ebl.fls', data) else: - signed_file.append(signfn) - - # remove everything but required files - for entry in bbfw_orig.filelist: - keep = False - filename = entry.filename - - if filename in signed_file: - keep = True - - # check for anything but .mbn and .fls if bb_nonce is set - if bb_nonce and not keep: - ext = os.path.splitext(filename)[1] - keep |= ext in ('.fls', '.mbn', '.elf', '.bin') - - if keep and (filename not in signed_file): - bbfw_patched.writestr(bbfw_orig.getinfo(filename), bbfw_orig.read(filename)) - - if bb_nonce: - if is_fls: - # add BBTicket to file ebl.fls - buffer = bbfw_orig.read('ebl.fls') - fls = self.fls_parse(buffer) - data = self.fls_insert_ticket(fls, bbticket) - bbfw_patched.writestr('ebl.fls', data) - else: - # add BBTicket as bbticket.der - zname = zipfile.ZipInfo('bbticket.der') - zname.filename = 'bbticket.der' - ZIP_EXT_ATTR_FILE = 0o100000 - zname.external_attr = (0o644 | ZIP_EXT_ATTR_FILE) << 16 - bbfw_patched.writestr(zname, bbticket) - - bbfw_patched.close() - tmp_zip_write.seek(0) - return tmp_zip_write.read() + # add BBTicket as bbticket.der + zname = zipfile.ZipInfo('bbticket.der') + zname.filename = 'bbticket.der' + ZIP_EXT_ATTR_FILE = 0o100000 + zname.external_attr = (0o644 | ZIP_EXT_ATTR_FILE) << 16 + bbfw_patched.writestr(zname, bbticket) + + bbfw_patched.close() + tmp_zip_write.seek(0) + return tmp_zip_write.read() + finally: + if tmp_zip_read_name: + os.remove(tmp_zip_read_name) async def send_baseband_data(self, message: Mapping): self.logger.info(f'About to send BasebandData: {message}')