Skip to content

Commit

Permalink
Fix new global assets (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
thesadru authored Nov 6, 2023
1 parent e0e60e7 commit a6ad022
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 55 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ jobs:
with:
python-version: ${{ matrix.python-version }}

- name: install flatbuffers
run: sudo apt-get install -y flatbuffers-compiler

- name: install nox
run: python -m pip install nox

Expand Down
99 changes: 70 additions & 29 deletions arkprts/assets/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ def load_unity_file(stream: io.BytesIO | bytes) -> bytes:
return asset_file


def decrypt_global_text(data: bytes, *, rsa: bool = True) -> bytes:
"""Decrypt global text."""
def decrypt_aes_text(data: bytes, *, rsa: bool = True) -> bytes:
"""Decrypt aes text."""
from Crypto.Cipher import AES

mask = bytes.fromhex("554954704169383270484157776e7a7148524d4377506f6e4a4c49423357436c")
Expand Down Expand Up @@ -100,31 +100,36 @@ def run_flatbuffers(
"--defaults-json",
"--unknown-json",
"--raw-binary",
"--no-warnings",
# unfortunately not in older versions
# "--no-warnings",
"--force-empty",
],
stderr=subprocess.DEVNULL,
)
if code != 0:
raise ValueError(f"flatc failed with code {code}")

return pathlib.Path(output_directory) / (pathlib.Path(fbs_path).stem + ".json")


def resolve_fbs_schema_directory() -> pathlib.Path:
def resolve_fbs_schema_directory(server: typing.Literal["cn", "yostar"]) -> pathlib.Path:
"""Resolve the flatbuffers schema directory."""
path = os.environ.get("FLATBUFFERS_SCHEMA_DIR")
path = os.environ.get(f"FLATBUFFERS_SCHEMA_DIR_{server.upper()}")
if path:
return pathlib.Path(path)

path = pathlib.Path(tempfile.gettempdir()) / "OpenArknightsFBS" / "FBS"
os.environ["FLATBUFFERS_SCHEMA_DIR"] = str(path)
core_path = pathlib.Path(tempfile.gettempdir()) / "ArknightsFBS"
core_path.mkdir(parents=True, exist_ok=True)
path = core_path / server / "OpenArknightsFBS" / "FBS"
os.environ[f"FLATBUFFERS_SCHEMA_DIR_{server.upper()}"] = str(path)
return path


async def update_fbs_schema(*, force: bool = False) -> None:
"""Download or otherwise update FBS files."""
directory = resolve_fbs_schema_directory().parent
await git.update_repository("MooncellWiki/OpenArknightsFBS", directory, force=force)
for server, branch in [("cn", "main"), ("yostar", "YoStar")]:
directory = resolve_fbs_schema_directory(server).parent
await git.update_repository("MooncellWiki/OpenArknightsFBS", directory, branch=branch, force=force)


def recursively_collapse_keys(obj: typing.Any) -> typing.Any:
Expand All @@ -143,8 +148,15 @@ def recursively_collapse_keys(obj: typing.Any) -> typing.Any:
return obj


def decrypt_fbs_file(data: bytes, table_name: str, *, rsa: bool = True) -> bytes:
"""Decrypt chinese fbs json file."""
def decrypt_fbs_file(
data: bytes,
table_name: str,
server: netn.ArknightsServer,
*,
rsa: bool = True,
normalize: bool = False,
) -> bytes:
"""Decrypt fbs json file."""
if rsa:
data = data[128:]

Expand All @@ -153,7 +165,9 @@ def decrypt_fbs_file(data: bytes, table_name: str, *, rsa: bool = True) -> bytes

fbs_path = tempdir / (table_name + ".bytes")
fbs_path.write_bytes(data)
fbs_schema_path = resolve_fbs_schema_directory() / (table_name + ".fbs")
fbs_schema_path = resolve_fbs_schema_directory(server="cn" if server in ("cn", "bili") else "yostar") / (
table_name + ".fbs"
)
output_directory = tempdir / "output"

output_path = run_flatbuffers(fbs_path, fbs_schema_path, output_directory)
Expand All @@ -163,15 +177,22 @@ def decrypt_fbs_file(data: bytes, table_name: str, *, rsa: bool = True) -> bytes
if len(parsed_data) == 1:
parsed_data, *_ = parsed_data.values()

return json.dumps(parsed_data, indent=4, ensure_ascii=False).encode("utf-8")
return json.dumps(parsed_data, indent=4 if normalize else None, ensure_ascii=False).encode("utf-8")


def decrypt_arknights_text(data: bytes, name: str, *, rsa: bool = True) -> bytes:
def decrypt_arknights_text(
data: bytes,
name: str,
server: netn.ArknightsServer,
*,
rsa: bool = True,
normalize: bool = False,
) -> bytes:
"""Decrypt arbitrary arknights data."""
if match := re.search(r"(\w+_(?:table|data|const|database))[0-9a-fA-F]{6}", name):
return decrypt_fbs_file(data, match[1], rsa=rsa)
return decrypt_fbs_file(data, match[1], rsa=rsa, server=server, normalize=normalize)

return decrypt_global_text(data, rsa=rsa)
return decrypt_aes_text(data, rsa=rsa)


def load_json_or_bson(data: bytes) -> typing.Any:
Expand All @@ -184,8 +205,11 @@ def load_json_or_bson(data: bytes) -> typing.Any:
return json.loads(data)


def normalize_json(data: bytes, *, indent: int = 4) -> bytes:
def normalize_json(data: bytes, *, indent: int = 4, lenient: bool = True) -> bytes:
"""Normalize a json format."""
if lenient and b"\x00" not in data[:256]:
return data

json_data = load_json_or_bson(data)
return json.dumps(json_data, indent=indent, ensure_ascii=False).encode("utf-8")

Expand All @@ -197,44 +221,57 @@ def unpack_assets(
asset: UnityPyAsset,
target_container: str | None = None,
# target_path: str | None = None,
*,
server: netn.ArknightsServer | None = None,
normalize: bool = False,
) -> typing.Iterable[tuple[str, bytes]]:
"""Yield relative paths and data for a unity asset."""
for container, obj in asset.container.items():
if target_container and container != target_container:
continue

if obj.type.name == "TextAsset":
if server is None:
raise TypeError("Server required for text decryption")

if match := re.match(DYNP + r"(.+\.txt)", container):
data = obj.read()
yield (match[1], data.script)
continue

if match := re.match(DYNP + r"(gamedata/.+?\.json)", container):
data = obj.read()
yield (match[1], normalize_json(bytes(data.script)))
yield (match[1], normalize_json(bytes(data.script), lenient=not normalize))
continue

if match := re.match(DYNP + r"(gamedata/.+?)\.lua\.bytes", container):
data = obj.read()
text = decrypt_arknights_text(data.script, name=data.name)
text = decrypt_aes_text(data.script)
yield (match[1] + ".lua", text)
continue

if match := re.match(DYNP + r"(gamedata/levels/(?:obt|activities)/.+?)\.bytes", container):
data = obj.read()
try:
text = normalize_json(bytes(data.script)[128:])
text = normalize_json(bytes(data.script)[128:], lenient=not normalize)
except UnboundLocalError: # effectively bson's "type not recognized" error
text = decrypt_fbs_file(data.script, "prts___levels")
text = decrypt_fbs_file(data.script, "prts___levels", server=server)

yield (match[1] + ".json", text)
continue

if match := re.match(DYNP + r"(gamedata/.+?)(?:[a-fA-F0-9]{6})?\.bytes", container):
data = obj.read()
# the only rsa-less file is global's enemy_database
text = decrypt_arknights_text(data.script, name=data.name, rsa=data.name != "enemy_database")
yield (match[1] + ".json", normalize_json(text))
# the only rsa-less file is ~~global~~ tw's enemy_database

text = decrypt_arknights_text(
data.script,
name=data.name,
rsa=data.name != "enemy_database",
server=server,
normalize=normalize,
)
yield (match[1] + ".json", normalize_json(text, lenient=not normalize))
continue


Expand Down Expand Up @@ -332,14 +369,18 @@ async def _download_and_save(
*,
target_container: str | None = None,
server: netn.ArknightsServer | None = None,
normalize: bool = False,
) -> typing.AsyncIterable[tuple[str, bytes]]:
"""Download and extract an asset."""
server = server or self.default_server

asset = await self._download_unity_asset(path, server=server)

fetched_any = False
for fetched_any, (path, data) in enumerate(unpack_assets(asset, target_container), 1):
for fetched_any, (path, data) in enumerate(
unpack_assets(asset, target_container, server=server, normalize=normalize),
1,
):
savepath = self.directory / server / path
savepath.parent.mkdir(exist_ok=True, parents=True)
savepath.write_bytes(data)
Expand Down Expand Up @@ -367,9 +408,6 @@ async def update_assets(

return

if server in ("cn", "bili"):
await update_fbs_schema()

hot_update_list = await self._get_hot_update_list(server)
requested_names = [info["name"] for info in hot_update_list["abInfos"] if fnmatch.fnmatch(info["name"], allow)]

Expand All @@ -378,6 +416,9 @@ async def update_assets(
outdated_names = set(get_outdated_hashes(hot_update_list, old_hot_update_list))
requested_names = [name for name in requested_names if name in outdated_names]

if any("gamedata" in name for name in requested_names):
await update_fbs_schema()

# sequential doesn't matter since most of the time is spent unpacking
# Fix this once images come into play (threadpoolexecutor and such)
# first download all .ab files in a temporary directory then start extracting them.
Expand Down Expand Up @@ -409,7 +450,7 @@ async def aget_file(self, path: str, *, server: netn.ArknightsServer | None = No

for potential_asset_path in asset_paths:
asset = await self._download_unity_asset(potential_asset_path, server=server)
for output_path, data in unpack_assets(asset, path):
for output_path, data in unpack_assets(asset, path, server=server):
if save:
savepath = self.directory / server / output_path
savepath.parent.mkdir(exist_ok=True, parents=True)
Expand Down
Loading

0 comments on commit a6ad022

Please sign in to comment.