Skip to content

Commit

Permalink
put JSON w/ offsets in v1signature.zip & refactor (fixes #88)
Browse files Browse the repository at this point in the history
  • Loading branch information
obfusk committed Dec 23, 2024
1 parent fd142b9 commit 40a2d72
Showing 1 changed file with 125 additions and 69 deletions.
194 changes: 125 additions & 69 deletions apksigcopier/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@
import zlib

from collections import namedtuple
from typing import Any, BinaryIO, Callable, Dict, Iterable, Iterator, List, Optional, Tuple, Union
from typing import (Any, BinaryIO, Callable, Dict, Iterable, Iterator, List, Optional,
Set, Tuple, Union)

__version__ = "1.1.1"
NAME = "apksigcopier"
Expand All @@ -84,10 +85,6 @@
V1SIGZIP = "v1signature.zip"
DIFF_JSON = "differences.json"

ANDROID_MANIFEST = "AndroidManifest.xml"
AFTER_MANIFEST = b"after_manifest"
ZFE_SIZE = re.compile(rb"^zfe_size=(\d+)$")

NOAUTOYES: Tuple[NoAutoYes, NoAutoYes, NoAutoYes] = ("no", "auto", "yes")
NO, AUTO, YES = NOAUTOYES

Expand Down Expand Up @@ -464,19 +461,16 @@ def copy_apk(unsigned_apk: str, output_apk: str, *,
realign = not skip_realignment
if v1_sig:
v1_sig_fhi = io.BytesIO(v1_sig)
with zipfile.ZipFile(v1_sig_fhi) as zf:
v1_infos = zf.infolist()
v1_options = zf.comment.split(b",")
after_manifest = AFTER_MANIFEST in v1_options
for opt in v1_options:
if match := ZFE_SIZE.fullmatch(opt):
zfe_size = int(match.group(1))
if zfe_size and not (30 <= zfe_size <= 4096):
raise APKSigCopierError(f"Unsupported virtual entry size: {zfe_size}")
v1_infos, v1_datas, v1_comment_data = extract_v1_sig_data(v1_sig_fhi)
v1_indices = {idx: i for i, (idx, _) in enumerate(v1_comment_data["offsets"])}
v1_offsets = {off: i for i, (_, off) in enumerate(v1_comment_data["offsets"])}
if "zfe_size" in v1_comment_data and not zfe_size:
zfe_size = int(v1_comment_data["zfe_size"])
v1_cd_offset = _zip_data(v1_sig_fhi, count=min(65536, len(v1_sig))).cd_offset
with zipfile.ZipFile(unsigned_apk, "r") as zf:
infos = zf.infolist()
zdata = zip_data(unsigned_apk)
offsets = {}
offsets: Dict[str, int] = {}
with open(unsigned_apk, "rb") as fhi, open(output_apk, "w+b") as fho:
if zfe_size:
zfe = zipflinger_virtual_entry(zfe_size)
Expand All @@ -488,47 +482,62 @@ def copy_apk(unsigned_apk: str, output_apk: str, *,
if info.header_offset > off_i:
# copy extra bytes
fho.write(fhi.read(info.header_offset - off_i))
off_o = fho.tell()
while v1_sig and off_o in v1_offsets:
# FIXME: can these be STORED and need alignment?
# try to match original header offsets (for entries not at the end)
copy_v1_sig_entries(v1_sig_fhi, fho, [v1_infos[v1_offsets[off_o]]],
v1_datas, offsets)
del v1_offsets[off_o]
off_o = fho.tell()
hdr, n, m = _read_lfh(fhi)
if skip := exclude(info.filename):
fhi.seek(info.compress_size, os.SEEK_CUR)
else:
if info.filename in offsets:
raise ZipError(f"Duplicate ZIP entry: {info.filename!r}")
offsets[info.filename] = off_o = fho.tell()
offsets[info.filename] = off_o
if realign and info.compress_type == 0 and off_o != info.header_offset:
hdr = _realign_zip_entry(info, hdr, n, m, off_o,
pad_like_apksigner=not zfe_size)
fho.write(hdr)
_copy_bytes(fhi, fho, info.compress_size)
if (data_descriptor := _read_data_descriptor(fhi, info)) and not skip:
fho.write(data_descriptor)
if v1_sig and after_manifest and info.filename == ANDROID_MANIFEST:
copy_v1_sig_entries(v1_sig_fhi, fho, v1_infos, offsets)
date_time = max(info.date_time for info in infos if info.filename in offsets)
if v1_sig and not after_manifest:
copy_v1_sig_entries(v1_sig_fhi, fho, v1_infos, offsets)
if v1_sig and v1_offsets:
# copy (remaining) entries at the end
copy_v1_sig_entries(v1_sig_fhi, fho, [v1_infos[i] for i in v1_offsets.values()],
v1_datas, offsets)
extra_bytes = zdata.cd_offset - fhi.tell()
if copy_extra:
_copy_bytes(fhi, fho, extra_bytes)
else:
fhi.seek(extra_bytes, os.SEEK_CUR)
cd_offset = fho.tell()
idx_o = 0
for info in infos:
while v1_sig and idx_o in v1_indices:
# try to match original CD position (for entries not at the end)
v1_sig_fhi.seek(v1_cd_offset)
copy_v1_sig_cd_entries(v1_sig_fhi, fho, v1_infos, v1_datas, offsets,
only={v1_indices[idx_o]})
del v1_indices[idx_o]
idx_o += 1
hdr, n, m, k = _read_cdfh(fhi)
if not exclude(info.filename):
fho.write(_adjust_offset(hdr, offsets[info.filename]))
if v1_sig and after_manifest and info.filename == ANDROID_MANIFEST:
v1_sig_fhi.seek(_zip_data(v1_sig_fhi, count=min(65536, len(v1_sig))).cd_offset)
copy_v1_sig_cd_entries(v1_sig_fhi, fho, v1_infos, offsets)
if v1_sig and not after_manifest:
v1_sig_fhi.seek(_zip_data(v1_sig_fhi, count=min(65536, len(v1_sig))).cd_offset)
copy_v1_sig_cd_entries(v1_sig_fhi, fho, v1_infos, offsets)
idx_o += 1
if v1_sig and v1_indices:
# copy (remaining) entries at the end
v1_sig_fhi.seek(v1_cd_offset)
copy_v1_sig_cd_entries(v1_sig_fhi, fho, v1_infos, v1_datas, offsets,
only=set(v1_indices.values()))
eocd_offset = fho.tell()
fho.write(zdata.cd_and_eocd[zdata.eocd_offset - zdata.cd_offset:])
fho.seek(eocd_offset + 8)
fho.write(struct.pack("<HHLL", len(offsets), len(offsets),
eocd_offset - cd_offset, cd_offset))
return date_time
return max(info.date_time for info in infos if not exclude(info.filename))


def _read_lfh(fh: BinaryIO) -> Tuple[bytes, int, int]:
Expand Down Expand Up @@ -602,15 +611,16 @@ def _copy_bytes(fhi: BinaryIO, fho: BinaryIO, size: int, blocksize: int = 4096)
raise ZipError("Unexpected EOF")


def extract_v1_sig(apkfile: str, check_at_end: bool = True) -> Optional[bytes]:
def extract_v1_sig(apkfile: str) -> Optional[bytes]:
"""
Extract v1 signature data as ZIP file data.
>>> import io
>>> from apksigcopier import extract_v1_sig
>>> apk = "test/apks/apks/golden-aligned-v1v2v3-out.apk"
>>> v1_sig = extract_v1_sig(apk)
>>> zf = zipfile.ZipFile(io.BytesIO(v1_sig))
>>> fh = io.BytesIO(v1_sig)
>>> zf = zipfile.ZipFile(fh, "r")
>>> [ x.filename for x in zf.infolist() ]
['META-INF/RSA-2048.SF', 'META-INF/RSA-2048.RSA', 'META-INF/MANIFEST.MF']
>>> for line in zf.read("META-INF/RSA-2048.SF").splitlines()[:4]:
Expand All @@ -623,23 +633,34 @@ def extract_v1_sig(apkfile: str, check_at_end: bool = True) -> Optional[bytes]:
... print(line.decode())
Manifest-Version: 1.0
Created-By: 1.8.0_45-internal (Oracle Corporation)
>>> zf.comment
b'{"offsets":[[6,5109],[7,5595],[8,6706]]}'
>>> v1_infos, v1_datas, v1_comment_data = extract_v1_sig_data(fh)
>>> [ x.filename for x in v1_infos ]
['META-INF/RSA-2048.SF', 'META-INF/RSA-2048.RSA', 'META-INF/MANIFEST.MF']
>>> [ (k, len(v)) for k, v in v1_datas.items() ]
[('META-INF/RSA-2048.SF', 664), ('META-INF/RSA-2048.RSA', 1160), ('META-INF/MANIFEST.MF', 589)]
>>> v1_comment_data
{'offsets': [[6, 5109], [7, 5595], [8, 6706]]}
"""
with zipfile.ZipFile(apkfile, "r") as zf:
infos = zf.infolist()
comments = []
datas = {info.filename: zf.read(info.filename) for info in infos if is_meta(info.filename)}
coffs = [[i, info.header_offset] for i, info in enumerate(infos) if is_meta(info.filename)]
comment_data: Dict[str, Any] = dict(offsets=coffs)
if zfe_size := detect_zfe(apkfile):
comment_data["zfe_size"] = zfe_size
comment = json.dumps(comment_data, separators=(",", ":"), sort_keys=True).encode()
offsets: Dict[str, int] = {}
fho = io.BytesIO()
if zfe_size := detect_zfe(apkfile):
comments.append(f"zfe_size={zfe_size}".encode())
with open(apkfile, "rb") as fhi:
comments.extend(copy_v1_sig_entries(fhi, fho, infos, offsets, check_at_end=check_at_end))
copy_v1_sig_entries(fhi, fho, infos, datas, offsets)
cd_offset = fho.tell()
fhi.seek(_zip_data(fhi).cd_offset)
copy_v1_sig_cd_entries(
fhi, fho, infos, offsets, check_at_end=check_at_end and AFTER_MANIFEST not in comments)
copy_v1_sig_cd_entries(fhi, fho, infos, datas, offsets)
eocd_offset = fho.tell()
fho.write(_eocd(len(offsets), eocd_offset, cd_offset, b",".join(comments)))
fho.write(_eocd(len(offsets), eocd_offset, cd_offset, comment))
return fho.getvalue() if offsets else None


Expand All @@ -650,59 +671,39 @@ def _eocd(entries: int, eocd_offset: int, cd_offset: int, comment: bytes = b"")


def copy_v1_sig_entries(fhi: BinaryIO, fho: BinaryIO, infos: List[zipfile.ZipInfo],
offsets: Dict[str, int], check_at_end: bool = False) -> List[bytes]:
datas: Dict[str, bytes], offsets: Dict[str, int]) -> None:
"""Copy v1 signature entries."""
comments = []
meta_header_offsets = {}
last_non_meta_header_offset = -1
sorted_infos = sorted(infos, key=lambda info: info.header_offset)
for info in sorted_infos:
for info in sorted(infos, key=lambda info: info.header_offset):
if not is_meta(info.filename):
last_non_meta_header_offset = info.header_offset
continue
meta_header_offsets[info.filename] = info.header_offset
if info.filename in offsets:
raise ZipError(f"Duplicate ZIP entry: {info.filename!r}")
fhi.seek(info.header_offset)
hdr, n, m = _read_lfh(fhi)
if error := validate_zip_header(hdr):
raise ZipError(f"Unsupported LFH for {info.filename!r}: {error}")
offsets[info.filename] = fho.tell()
fho.write(hdr)
_copy_bytes(fhi, fho, info.compress_size)
if data_descriptor := _read_data_descriptor(fhi, info):
fho.write(data_descriptor)
if check_at_end and any(i < last_non_meta_header_offset for i in meta_header_offsets.values()):
for i, info in enumerate(sorted_infos):
if info.filename == ANDROID_MANIFEST:
after = sorted_infos[i + 1:i + 1 + len(meta_header_offsets)]
if [info.filename for info in after] == list(meta_header_offsets):
comments.append(AFTER_MANIFEST)
break
if AFTER_MANIFEST not in comments:
raise APKSigCopierError("Expected v1 signature entries at end of archive")
return comments
if error := validate_zip_header(hdr, info, datas, data_descriptor):
raise ZipError(f"Unsupported LFH for {info.filename!r}: {error}")


def copy_v1_sig_cd_entries(fhi: BinaryIO, fho: BinaryIO, infos: List[zipfile.ZipInfo],
offsets: Dict[str, int], check_at_end: bool = False) -> None:
datas: Dict[str, bytes], offsets: Dict[str, int],
*, only: Optional[Set[int]] = None) -> None:
"""Copy v1 signature CD entries."""
meta_indices = set()
last_non_meta_index = -1
for i, info in enumerate(infos):
hdr, n, m, k = _read_cdfh(fhi)
if not is_meta(info.filename):
last_non_meta_index = i
if not is_meta(info.filename) or (only is not None and i not in only):
continue
meta_indices.add(i)
if error := validate_zip_header(hdr):
if error := validate_zip_header(hdr, info, datas):
raise ZipError(f"Unsupported CDFH for {info.filename!r}: {error}")
fho.write(_adjust_offset(hdr, offsets[info.filename]))
if check_at_end and any(i < last_non_meta_index for i in meta_indices):
raise APKSigCopierError("Expected v1 signature CD entries at end of archive")


def validate_zip_header(hdr: bytes) -> Optional[str]:
def validate_zip_header(hdr: bytes, info: zipfile.ZipInfo, datas: Dict[str, bytes],
data_descriptor: Optional[bytes] = None) -> Optional[str]:
"""
Validate ZIP LHF or CDFH.
Expand All @@ -713,6 +714,9 @@ def validate_zip_header(hdr: bytes) -> Optional[str]:
uncompressed_size, n, m) = struct.unpack("<HHHHHIIIHH", hdr[4:30])
filename = hdr[30:30 + n]
extra = hdr[30 + n:30 + n + m]
if data_descriptor:
# FIXME: check old values are zero or equal?
crc32, compressed_size, uncompressed_size = struct.unpack("<III", data_descriptor[-12:])
version_created = start_disk = internal_attrs = external_attrs = 0
comment = b""
else: # CDFH
Expand Down Expand Up @@ -745,6 +749,58 @@ def validate_zip_header(hdr: bytes) -> Optional[str]:
return "non-empty extra field"
if comment:
return "non-empty file comment"
if compressed_size != info.compress_size:
return "compressed size mismatch"
if uncompressed_size != len(datas[info.filename]):
return "uncompressed size mismatch"
if crc32 != info.CRC:
return "crc32 mismatch"
return None


def extract_v1_sig_data(fhi: BinaryIO) -> Tuple[List[zipfile.ZipInfo], Dict[str, bytes],
Dict[str, Any]]:
"""Extract data from v1_sig comment."""
with zipfile.ZipFile(fhi, "r") as zf:
infos = zf.infolist()
datas = {info.filename: zf.read(info.filename) for info in infos}
try:
comment_data = json.loads(zf.comment.decode())
except (UnicodeDecodeError, json.JSONDecodeError) as e:
raise APKSigCopierError(f"Invalid {V1SIGZIP} comment: {e}") # pylint: disable=W0707
if error := validate_v1_sig_data(comment_data, len(infos)):
raise APKSigCopierError(f"Invalid {V1SIGZIP} comment: {error}")
return infos, datas, comment_data


def validate_v1_sig_data(data: Dict[str, Any], n_infos: int) -> Optional[str]:
"""
Validate data from v1_sig comment.
Returns None if valid, error otherwise.
>>> validate_v1_sig_data(dict(offsets=[[0, 128], [1, 256]], zfe_size=132), 2)
>>> validate_v1_sig_data(dict(offsets=[[1, 2], [3]]), 2)
'.offsets[1] is not a list of 2 ints'
"""
if set(data) - {"offsets", "zfe_size"}:
return "contains unknown key(s)"
if "offsets" not in data:
return "missing .offsets"
if not isinstance(data["offsets"], list):
return ".offsets is not a list"
if len(data["offsets"]) != n_infos:
return ".offsets length does not match number of entries"
for i, pair in enumerate(data["offsets"]):
if not isinstance(pair, list) or len(pair) != 2 or \
not all(type(x) is int for x in pair):
return f".offsets[{i}] is not a list of 2 ints"
if "zfe_size" in data:
if type(data["zfe_size"]) is not int:
return ".zfe_size is not an int"
if not (30 <= data["zfe_size"] <= 4096):
return ".zfe_size is < 30 or > 4096"
return None


Expand Down Expand Up @@ -1150,7 +1206,7 @@ def do_extract(signed_apk: str, output_dir: str, v1_only: NoAutoYesBoolNone = NO
with open(os.path.join(output_dir, name), "wb") as fh:
fh.write(data)
else:
extracted_meta, v1_sig = None, extract_v1_sig(signed_apk, check_at_end=bool(v2_sig))
extracted_meta, v1_sig = None, extract_v1_sig(signed_apk)
if v1_sig:
with open(os.path.join(output_dir, V1SIGZIP), "wb") as fh:
fh.write(v1_sig)
Expand Down Expand Up @@ -1253,7 +1309,7 @@ def do_copy(signed_apk: str, unsigned_apk: str, output_apk: str,
if legacy:
extracted_meta, v1_sig = tuple(extract_meta(signed_apk)), None
else:
extracted_meta, v1_sig = None, extract_v1_sig(signed_apk, check_at_end=bool(v2_sig))
extracted_meta, v1_sig = None, extract_v1_sig(signed_apk)
if v2_sig and not (ignore_differences or v1_sig):
differences = extract_differences(signed_apk, extracted_meta)
else:
Expand Down

0 comments on commit 40a2d72

Please sign in to comment.