From 40a2d7284787b13ef371a612f582cd192b6e354b Mon Sep 17 00:00:00 2001 From: "FC (Fay) Stegerman" Date: Tue, 24 Dec 2024 00:18:03 +0100 Subject: [PATCH] put JSON w/ offsets in v1signature.zip & refactor (fixes #88) --- apksigcopier/__init__.py | 194 +++++++++++++++++++++++++-------------- 1 file changed, 125 insertions(+), 69 deletions(-) diff --git a/apksigcopier/__init__.py b/apksigcopier/__init__.py index 51e7ef9..72e8632 100755 --- a/apksigcopier/__init__.py +++ b/apksigcopier/__init__.py @@ -65,7 +65,8 @@ import zlib from collections import namedtuple -from typing import Any, BinaryIO, Callable, Dict, Iterable, Iterator, List, Optional, Tuple, Union +from typing import (Any, BinaryIO, Callable, Dict, Iterable, Iterator, List, Optional, + Set, Tuple, Union) __version__ = "1.1.1" NAME = "apksigcopier" @@ -84,10 +85,6 @@ V1SIGZIP = "v1signature.zip" DIFF_JSON = "differences.json" -ANDROID_MANIFEST = "AndroidManifest.xml" -AFTER_MANIFEST = b"after_manifest" -ZFE_SIZE = re.compile(rb"^zfe_size=(\d+)$") - NOAUTOYES: Tuple[NoAutoYes, NoAutoYes, NoAutoYes] = ("no", "auto", "yes") NO, AUTO, YES = NOAUTOYES @@ -464,19 +461,16 @@ def copy_apk(unsigned_apk: str, output_apk: str, *, realign = not skip_realignment if v1_sig: v1_sig_fhi = io.BytesIO(v1_sig) - with zipfile.ZipFile(v1_sig_fhi) as zf: - v1_infos = zf.infolist() - v1_options = zf.comment.split(b",") - after_manifest = AFTER_MANIFEST in v1_options - for opt in v1_options: - if match := ZFE_SIZE.fullmatch(opt): - zfe_size = int(match.group(1)) - if zfe_size and not (30 <= zfe_size <= 4096): - raise APKSigCopierError(f"Unsupported virtual entry size: {zfe_size}") + v1_infos, v1_datas, v1_comment_data = extract_v1_sig_data(v1_sig_fhi) + v1_indices = {idx: i for i, (idx, _) in enumerate(v1_comment_data["offsets"])} + v1_offsets = {off: i for i, (_, off) in enumerate(v1_comment_data["offsets"])} + if "zfe_size" in v1_comment_data and not zfe_size: + zfe_size = int(v1_comment_data["zfe_size"]) + v1_cd_offset = _zip_data(v1_sig_fhi, count=min(65536, len(v1_sig))).cd_offset with zipfile.ZipFile(unsigned_apk, "r") as zf: infos = zf.infolist() zdata = zip_data(unsigned_apk) - offsets = {} + offsets: Dict[str, int] = {} with open(unsigned_apk, "rb") as fhi, open(output_apk, "w+b") as fho: if zfe_size: zfe = zipflinger_virtual_entry(zfe_size) @@ -488,13 +482,21 @@ def copy_apk(unsigned_apk: str, output_apk: str, *, if info.header_offset > off_i: # copy extra bytes fho.write(fhi.read(info.header_offset - off_i)) + off_o = fho.tell() + while v1_sig and off_o in v1_offsets: + # FIXME: can these be STORED and need alignment? + # try to match original header offsets (for entries not at the end) + copy_v1_sig_entries(v1_sig_fhi, fho, [v1_infos[v1_offsets[off_o]]], + v1_datas, offsets) + del v1_offsets[off_o] + off_o = fho.tell() hdr, n, m = _read_lfh(fhi) if skip := exclude(info.filename): fhi.seek(info.compress_size, os.SEEK_CUR) else: if info.filename in offsets: raise ZipError(f"Duplicate ZIP entry: {info.filename!r}") - offsets[info.filename] = off_o = fho.tell() + offsets[info.filename] = off_o if realign and info.compress_type == 0 and off_o != info.header_offset: hdr = _realign_zip_entry(info, hdr, n, m, off_o, pad_like_apksigner=not zfe_size) @@ -502,33 +504,40 @@ def copy_apk(unsigned_apk: str, output_apk: str, *, _copy_bytes(fhi, fho, info.compress_size) if (data_descriptor := _read_data_descriptor(fhi, info)) and not skip: fho.write(data_descriptor) - if v1_sig and after_manifest and info.filename == ANDROID_MANIFEST: - copy_v1_sig_entries(v1_sig_fhi, fho, v1_infos, offsets) - date_time = max(info.date_time for info in infos if info.filename in offsets) - if v1_sig and not after_manifest: - copy_v1_sig_entries(v1_sig_fhi, fho, v1_infos, offsets) + if v1_sig and v1_offsets: + # copy (remaining) entries at the end + copy_v1_sig_entries(v1_sig_fhi, fho, [v1_infos[i] for i in v1_offsets.values()], + v1_datas, offsets) extra_bytes = zdata.cd_offset - fhi.tell() if copy_extra: _copy_bytes(fhi, fho, extra_bytes) else: fhi.seek(extra_bytes, os.SEEK_CUR) cd_offset = fho.tell() + idx_o = 0 for info in infos: + while v1_sig and idx_o in v1_indices: + # try to match original CD position (for entries not at the end) + v1_sig_fhi.seek(v1_cd_offset) + copy_v1_sig_cd_entries(v1_sig_fhi, fho, v1_infos, v1_datas, offsets, + only={v1_indices[idx_o]}) + del v1_indices[idx_o] + idx_o += 1 hdr, n, m, k = _read_cdfh(fhi) if not exclude(info.filename): fho.write(_adjust_offset(hdr, offsets[info.filename])) - if v1_sig and after_manifest and info.filename == ANDROID_MANIFEST: - v1_sig_fhi.seek(_zip_data(v1_sig_fhi, count=min(65536, len(v1_sig))).cd_offset) - copy_v1_sig_cd_entries(v1_sig_fhi, fho, v1_infos, offsets) - if v1_sig and not after_manifest: - v1_sig_fhi.seek(_zip_data(v1_sig_fhi, count=min(65536, len(v1_sig))).cd_offset) - copy_v1_sig_cd_entries(v1_sig_fhi, fho, v1_infos, offsets) + idx_o += 1 + if v1_sig and v1_indices: + # copy (remaining) entries at the end + v1_sig_fhi.seek(v1_cd_offset) + copy_v1_sig_cd_entries(v1_sig_fhi, fho, v1_infos, v1_datas, offsets, + only=set(v1_indices.values())) eocd_offset = fho.tell() fho.write(zdata.cd_and_eocd[zdata.eocd_offset - zdata.cd_offset:]) fho.seek(eocd_offset + 8) fho.write(struct.pack(" Tuple[bytes, int, int]: @@ -602,7 +611,7 @@ def _copy_bytes(fhi: BinaryIO, fho: BinaryIO, size: int, blocksize: int = 4096) raise ZipError("Unexpected EOF") -def extract_v1_sig(apkfile: str, check_at_end: bool = True) -> Optional[bytes]: +def extract_v1_sig(apkfile: str) -> Optional[bytes]: """ Extract v1 signature data as ZIP file data. @@ -610,7 +619,8 @@ def extract_v1_sig(apkfile: str, check_at_end: bool = True) -> Optional[bytes]: >>> from apksigcopier import extract_v1_sig >>> apk = "test/apks/apks/golden-aligned-v1v2v3-out.apk" >>> v1_sig = extract_v1_sig(apk) - >>> zf = zipfile.ZipFile(io.BytesIO(v1_sig)) + >>> fh = io.BytesIO(v1_sig) + >>> zf = zipfile.ZipFile(fh, "r") >>> [ x.filename for x in zf.infolist() ] ['META-INF/RSA-2048.SF', 'META-INF/RSA-2048.RSA', 'META-INF/MANIFEST.MF'] >>> for line in zf.read("META-INF/RSA-2048.SF").splitlines()[:4]: @@ -623,23 +633,34 @@ def extract_v1_sig(apkfile: str, check_at_end: bool = True) -> Optional[bytes]: ... print(line.decode()) Manifest-Version: 1.0 Created-By: 1.8.0_45-internal (Oracle Corporation) + >>> zf.comment + b'{"offsets":[[6,5109],[7,5595],[8,6706]]}' + >>> v1_infos, v1_datas, v1_comment_data = extract_v1_sig_data(fh) + >>> [ x.filename for x in v1_infos ] + ['META-INF/RSA-2048.SF', 'META-INF/RSA-2048.RSA', 'META-INF/MANIFEST.MF'] + >>> [ (k, len(v)) for k, v in v1_datas.items() ] + [('META-INF/RSA-2048.SF', 664), ('META-INF/RSA-2048.RSA', 1160), ('META-INF/MANIFEST.MF', 589)] + >>> v1_comment_data + {'offsets': [[6, 5109], [7, 5595], [8, 6706]]} """ with zipfile.ZipFile(apkfile, "r") as zf: infos = zf.infolist() - comments = [] + datas = {info.filename: zf.read(info.filename) for info in infos if is_meta(info.filename)} + coffs = [[i, info.header_offset] for i, info in enumerate(infos) if is_meta(info.filename)] + comment_data: Dict[str, Any] = dict(offsets=coffs) + if zfe_size := detect_zfe(apkfile): + comment_data["zfe_size"] = zfe_size + comment = json.dumps(comment_data, separators=(",", ":"), sort_keys=True).encode() offsets: Dict[str, int] = {} fho = io.BytesIO() - if zfe_size := detect_zfe(apkfile): - comments.append(f"zfe_size={zfe_size}".encode()) with open(apkfile, "rb") as fhi: - comments.extend(copy_v1_sig_entries(fhi, fho, infos, offsets, check_at_end=check_at_end)) + copy_v1_sig_entries(fhi, fho, infos, datas, offsets) cd_offset = fho.tell() fhi.seek(_zip_data(fhi).cd_offset) - copy_v1_sig_cd_entries( - fhi, fho, infos, offsets, check_at_end=check_at_end and AFTER_MANIFEST not in comments) + copy_v1_sig_cd_entries(fhi, fho, infos, datas, offsets) eocd_offset = fho.tell() - fho.write(_eocd(len(offsets), eocd_offset, cd_offset, b",".join(comments))) + fho.write(_eocd(len(offsets), eocd_offset, cd_offset, comment)) return fho.getvalue() if offsets else None @@ -650,59 +671,39 @@ def _eocd(entries: int, eocd_offset: int, cd_offset: int, comment: bytes = b"") def copy_v1_sig_entries(fhi: BinaryIO, fho: BinaryIO, infos: List[zipfile.ZipInfo], - offsets: Dict[str, int], check_at_end: bool = False) -> List[bytes]: + datas: Dict[str, bytes], offsets: Dict[str, int]) -> None: """Copy v1 signature entries.""" - comments = [] - meta_header_offsets = {} - last_non_meta_header_offset = -1 - sorted_infos = sorted(infos, key=lambda info: info.header_offset) - for info in sorted_infos: + for info in sorted(infos, key=lambda info: info.header_offset): if not is_meta(info.filename): - last_non_meta_header_offset = info.header_offset continue - meta_header_offsets[info.filename] = info.header_offset if info.filename in offsets: raise ZipError(f"Duplicate ZIP entry: {info.filename!r}") fhi.seek(info.header_offset) hdr, n, m = _read_lfh(fhi) - if error := validate_zip_header(hdr): - raise ZipError(f"Unsupported LFH for {info.filename!r}: {error}") offsets[info.filename] = fho.tell() fho.write(hdr) _copy_bytes(fhi, fho, info.compress_size) if data_descriptor := _read_data_descriptor(fhi, info): fho.write(data_descriptor) - if check_at_end and any(i < last_non_meta_header_offset for i in meta_header_offsets.values()): - for i, info in enumerate(sorted_infos): - if info.filename == ANDROID_MANIFEST: - after = sorted_infos[i + 1:i + 1 + len(meta_header_offsets)] - if [info.filename for info in after] == list(meta_header_offsets): - comments.append(AFTER_MANIFEST) - break - if AFTER_MANIFEST not in comments: - raise APKSigCopierError("Expected v1 signature entries at end of archive") - return comments + if error := validate_zip_header(hdr, info, datas, data_descriptor): + raise ZipError(f"Unsupported LFH for {info.filename!r}: {error}") def copy_v1_sig_cd_entries(fhi: BinaryIO, fho: BinaryIO, infos: List[zipfile.ZipInfo], - offsets: Dict[str, int], check_at_end: bool = False) -> None: + datas: Dict[str, bytes], offsets: Dict[str, int], + *, only: Optional[Set[int]] = None) -> None: """Copy v1 signature CD entries.""" - meta_indices = set() - last_non_meta_index = -1 for i, info in enumerate(infos): hdr, n, m, k = _read_cdfh(fhi) - if not is_meta(info.filename): - last_non_meta_index = i + if not is_meta(info.filename) or (only is not None and i not in only): continue - meta_indices.add(i) - if error := validate_zip_header(hdr): + if error := validate_zip_header(hdr, info, datas): raise ZipError(f"Unsupported CDFH for {info.filename!r}: {error}") fho.write(_adjust_offset(hdr, offsets[info.filename])) - if check_at_end and any(i < last_non_meta_index for i in meta_indices): - raise APKSigCopierError("Expected v1 signature CD entries at end of archive") -def validate_zip_header(hdr: bytes) -> Optional[str]: +def validate_zip_header(hdr: bytes, info: zipfile.ZipInfo, datas: Dict[str, bytes], + data_descriptor: Optional[bytes] = None) -> Optional[str]: """ Validate ZIP LHF or CDFH. @@ -713,6 +714,9 @@ def validate_zip_header(hdr: bytes) -> Optional[str]: uncompressed_size, n, m) = struct.unpack(" Optional[str]: return "non-empty extra field" if comment: return "non-empty file comment" + if compressed_size != info.compress_size: + return "compressed size mismatch" + if uncompressed_size != len(datas[info.filename]): + return "uncompressed size mismatch" + if crc32 != info.CRC: + return "crc32 mismatch" + return None + + +def extract_v1_sig_data(fhi: BinaryIO) -> Tuple[List[zipfile.ZipInfo], Dict[str, bytes], + Dict[str, Any]]: + """Extract data from v1_sig comment.""" + with zipfile.ZipFile(fhi, "r") as zf: + infos = zf.infolist() + datas = {info.filename: zf.read(info.filename) for info in infos} + try: + comment_data = json.loads(zf.comment.decode()) + except (UnicodeDecodeError, json.JSONDecodeError) as e: + raise APKSigCopierError(f"Invalid {V1SIGZIP} comment: {e}") # pylint: disable=W0707 + if error := validate_v1_sig_data(comment_data, len(infos)): + raise APKSigCopierError(f"Invalid {V1SIGZIP} comment: {error}") + return infos, datas, comment_data + + +def validate_v1_sig_data(data: Dict[str, Any], n_infos: int) -> Optional[str]: + """ + Validate data from v1_sig comment. + + Returns None if valid, error otherwise. + + >>> validate_v1_sig_data(dict(offsets=[[0, 128], [1, 256]], zfe_size=132), 2) + >>> validate_v1_sig_data(dict(offsets=[[1, 2], [3]]), 2) + '.offsets[1] is not a list of 2 ints' + + """ + if set(data) - {"offsets", "zfe_size"}: + return "contains unknown key(s)" + if "offsets" not in data: + return "missing .offsets" + if not isinstance(data["offsets"], list): + return ".offsets is not a list" + if len(data["offsets"]) != n_infos: + return ".offsets length does not match number of entries" + for i, pair in enumerate(data["offsets"]): + if not isinstance(pair, list) or len(pair) != 2 or \ + not all(type(x) is int for x in pair): + return f".offsets[{i}] is not a list of 2 ints" + if "zfe_size" in data: + if type(data["zfe_size"]) is not int: + return ".zfe_size is not an int" + if not (30 <= data["zfe_size"] <= 4096): + return ".zfe_size is < 30 or > 4096" return None @@ -1150,7 +1206,7 @@ def do_extract(signed_apk: str, output_dir: str, v1_only: NoAutoYesBoolNone = NO with open(os.path.join(output_dir, name), "wb") as fh: fh.write(data) else: - extracted_meta, v1_sig = None, extract_v1_sig(signed_apk, check_at_end=bool(v2_sig)) + extracted_meta, v1_sig = None, extract_v1_sig(signed_apk) if v1_sig: with open(os.path.join(output_dir, V1SIGZIP), "wb") as fh: fh.write(v1_sig) @@ -1253,7 +1309,7 @@ def do_copy(signed_apk: str, unsigned_apk: str, output_apk: str, if legacy: extracted_meta, v1_sig = tuple(extract_meta(signed_apk)), None else: - extracted_meta, v1_sig = None, extract_v1_sig(signed_apk, check_at_end=bool(v2_sig)) + extracted_meta, v1_sig = None, extract_v1_sig(signed_apk) if v2_sig and not (ignore_differences or v1_sig): differences = extract_differences(signed_apk, extracted_meta) else: