Skip to content

Commit

Permalink
Add new KVStorage parser based on py-datastruct
Browse files Browse the repository at this point in the history
  • Loading branch information
kuba2k2 committed Nov 18, 2023
1 parent 907d19c commit ba8755f
Show file tree
Hide file tree
Showing 6 changed files with 485 additions and 126 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Install the package from PyPI:
pip install bk7231tools[cli]
```

The `[cli]` extras will include PyCryptodomex, required for Tuya Storage extraction.
The `[cli]` extras will include PyCryptodome, required for Tuya Storage extraction.

## ⚠️ WARNING⚠️
Please be aware that this software is provided without any guarantees from the authors. If you will still use it, then please be aware that:
Expand Down
67 changes: 41 additions & 26 deletions bk7231tools/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io
import json
import os
import re
import sys
import traceback
from contextlib import closing
Expand Down Expand Up @@ -278,62 +279,76 @@ def dissect_dump_file(args):
app_code = code

try:
import Cryptodome
from Crypto.Cipher import AES
except (ImportError, ModuleNotFoundError):
print(
"NOTE: skipping storage decryption because of missing PyCryptodomex dependency."
"NOTE: skipping storage decryption because of missing PyCryptodome dependency."
)
print(
" Install using 'pip install bk7231tools[cli]' to add the dependency."
)
return

from bk7231tools.analysis.storage import TuyaStorage
from bk7231tools.analysis.kvstorage import KEY_MASTER, KVStorage

keys = []
kvs = None
while True:
storage = TuyaStorage()
print("Storage partition:")
pos = storage.load(dumpfile)
if pos is None:
with open(dumpfile, "rb") as f:
dump_data = f.read()
result = KVStorage.find_storage(dump_data)
if result is None:
print("\t- not found!")
break
if not storage.decrypt():
print("\t- failed to decrypt!")
break
keys = storage.find_all_keys()
print(f"\t{pos:#06x}: {storage.length // 1024:d} KiB - {len(keys)} keys")
pos, storage_data = result
aes = AES.new(key=KEY_MASTER, mode=AES.MODE_ECB)
kvs = KVStorage.unpack(storage_data, aes=aes)
keys = list(kvs.indexes.keys())
print(f"\t{pos:#06x}: {kvs.length // 1024:d} KiB - {len(keys)} keys")
print("\n".join(f"\t- '{key}'" for key in keys))
if args.extract:
storage.extract_all(output_directory, separate_keys=args.storage)
filepath = __generate_payload_output_file_path(
dumpfile=dumpfile,
payload_name="storage",
output_directory=output_directory,
extra_tag="decrypted",
)
storage.save(filepath)
dumpfile_name = Path(dumpfile).stem

out_name = os.path.join(output_directory, f"{dumpfile_name}_storage.json")
kvs_data = kvs.read_all_values_parsed()
with open(out_name, "w") as f:
print(f"\t\textracted all keys to {out_name}")
json.dump(kvs_data, f, indent="\t")
if not args.storage:
break

kvs_data = kvs.read_all_values()
for key, value in kvs_data.items():
key_file = re.sub(r"[^A-Za-z0-9-_.]", "_", key).strip("_")
out_name = os.path.join(
output_directory,
f"{dumpfile_name}_storage_{key_file}.bin",
)
with open(out_name, "wb") as f:
print(f"\t\textracted '{key}' to {out_name}")
f.write(value)
break

if not args.extract:
# no more work to do, upk can't be searched without app code
return

upk = None
while "user_param_key" in keys:
if kvs and "user_param_key" in keys:
print("Storage area `user_param_key`:")
kv = storage.read_all_keys()
upk = kv["user_param_key"]
break
upk = kvs.read_value_parsed(kvs.indexes["user_param_key"])

while "user_param_key" not in keys:
print("App code `user_param_key`:")
if not app_code:
print("\t- no app code, aborting!")
break
upk = TuyaStorage.find_user_param_key(app_code)
if not upk:
result = KVStorage.find_user_param_key(app_code)
if not result:
print("\t- not found!")
_, upk = result
upk = KVStorage.parse_user_param_key(upk)
break

if upk is not None:
Expand Down Expand Up @@ -580,7 +595,7 @@ def parse_args():
"--storage",
action="store_true",
default=False,
help="Extract storage keys into separate files (default: False)",
help="Extract raw storage values into separate files (default: False)",
)
parser_dissect_dump.set_defaults(handler=dissect_dump_file)
parser_dissect_dump.set_defaults(device_required=False)
Expand Down
Loading

0 comments on commit ba8755f

Please sign in to comment.