Skip to content


feat: added CLI script for importing/exporting FW
Browse files Browse the repository at this point in the history
  • Loading branch information
jstucke committed Oct 31, 2024
1 parent c67d34c commit 355762e
Show file tree
Hide file tree
Showing 10 changed files with 364 additions and 7 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ docs/.buildinfo/

# pytest
Expand Down
16 changes: 16 additions & 0 deletions
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,22 @@ FACT provides an optional basic authentication, role and user management. More i
* [FACT Search and Download](
* [PDF Report Generator](

### Import/Export of Results

The script `src/` can be used to export unpacked files and analysis results and import them
into another FACT instance. The data is stored as a ZIP archive and this is also the format the scripts expects during
import. To export files and analysis data of analyzed firmware images simply run

python3 export FW_UID [FW_UID_2 ...] [-o OUTPUT_DIR]

After this, you can import the exported files with

python3 import [ ...]

## Vagrant
We provide monthly and ready-to-use vagrant boxes of our master branch. [Vagrant]( is an easy and convenient way to get started with FACT without having to install it on your machine. Just setup vagrant and import our provided box into VirtualBox. Our boxes can be found [here](!

Expand Down
207 changes: 207 additions & 0 deletions src/
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
#!/usr/bin/env python3
from __future__ import annotations

import argparse
import json
import logging
import sys
from io import BytesIO
from pathlib import Path
from zipfile import ZIP_DEFLATED, BadZipFile, ZipFile

from rich.logging import RichHandler
from rich.progress import MofNCompleteColumn, Progress, SpinnerColumn, TimeElapsedColumn

from config import load
from helperFunctions.database import get_shared_session
from objects.file import FileObject
from objects.firmware import Firmware
from storage.db_interface_backend import BackendDbInterface
from storage.fsorganizer import FSOrganizer
from storage.migration import get_current_revision

logging.basicConfig(level='NOTSET', format='%(message)s', datefmt='[%X]', handlers=[RichHandler(rich_tracebacks=True)])
logger = logging.getLogger('rich')
COLUMNS = [SpinnerColumn(), *Progress.get_default_columns(), TimeElapsedColumn(), MofNCompleteColumn()]
EXPECTED_KEYS = ['db_revision', 'files', 'firmware', 'uid']
'The import feature only works with archives exported by FACT and '
'is not intended to be used to import arbitrary firmware!'

class FwExporter:
def __init__(self, output_dir: str):
self.target_dir = Path(output_dir)
self.db_interface = BackendDbInterface()
self.fs_organizer = FSOrganizer()

def export_files(self, uid_list: list[str]):
with get_shared_session(self.db_interface) as db_session, Progress(*COLUMNS) as progress:
export_task = progress.add_task('Firmware export', total=len(uid_list))
for uid in uid_list:
self._export_single_file(db_session, uid, progress)

def _export_single_file(self, db, fw_uid: str, progress: Progress):
included_files = db.get_all_files_in_fw(fw_uid)
with BytesIO() as buffer:
with ZipFile(buffer, 'w', ZIP_DEFLATED) as zip_file:
file_task = progress.add_task('Fetching files', total=len(included_files) + 1)
for fo_uid in included_files.union({fw_uid}):
file_path = self.fs_organizer.generate_path_from_uid(fo_uid)
zip_file.writestr(f'files/{fo_uid}', Path(file_path).read_bytes())
json.dumps(self._fetch_db_data(fw_uid, included_files, db, progress)),
target_path = self.target_dir / f'FACT_export_{fw_uid}.zip'
target_path.write_bytes(buffer.getvalue())'Exported firmware {fw_uid} to {target_path}')

def _fetch_db_data(uid: str, all_files: set[str], db, progress: Progress) -> dict:
db_data = {
'db_revision': get_current_revision(),
'files': [],
'firmware': db.get_object(uid).to_json(),
'uid': uid,
db_task = progress.add_task('Fetching DB entries', total=len(all_files))
for fo in db.get_objects_by_uid_list(all_files):
return db_data

class FwImporter:
def __init__(self, force: bool):
self.db_interface = BackendDbInterface()
self.fs_organizer = FSOrganizer()
self.force = force
self.progress: Progress | None = None

def import_files(self, file_list: list[str]):
with Progress(*COLUMNS) as progress:
self.progress = progress
import_task = progress.add_task('Importing files', total=len(file_list))
for file in file_list:
path = Path(file)
if not path.is_file():
logging.error(f'File {path} does not exist')
if self._import_file(path):
self.progress = None

def _import_file(self, path: Path) -> bool: # noqa: PLR0911
with ZipFile(path, 'r') as zip_file:
if 'data.json' not in zip_file.namelist():
logging.error(f'Error: data.json not found in uploaded import file. {ERROR_MESSAGE}')
return False
data = json.loads('data.json'))
except json.JSONDecodeError as error:
logging.error(f'Error: data.json is not a valid JSON file: {error}')
return False
if not all(k in data for k in EXPECTED_KEYS):
logging.error(f'Error: data.json is missing mandatory keys (expected: {EXPECTED_KEYS}')
return False
if self.db_interface.is_firmware(data['uid']):
logging.warning(f'Skipping firmware {data["uid"]}. Reason: is already in the DB')
return False
current_revision = get_current_revision()
if not self.force and data['db_revision'] != current_revision:
f'Error: import file was created with a different DB revision: '
f'{data["db_revision"]} (current revision is {current_revision}). '
f'Please upgrade/downgrade to a compatible revision.',
return False

imported_objects = self._import_objects(data)
imported_files = self._import_files(zip_file)
f'Successfully imported {imported_files} files and {imported_objects} DB entries from {path}'
return True
except BadZipFile:
logging.error(f'Error: File {path} is not a ZIP file. {ERROR_MESSAGE}')
return False

def _import_files(self, zip_file) -> int:
files = [f for f in zip_file.namelist() if f != 'data.json']
file_task = self.progress.add_task('Importing files', total=len(files))
for file in files:
return len(files)

def _import_objects(self, data: dict) -> int:
firmware = Firmware.from_json(data['firmware'])
file_objects = {fo_data['uid']: FileObject.from_json(fo_data, firmware.uid) for fo_data in data['files']}
with get_shared_session(self.db_interface) as db_session:
return self._insert_objects_hierarchically(file_objects, firmware.uid, db_session)

def _insert_objects_hierarchically(self, fo_dict: dict[str, FileObject], root_uid: str, db) -> int:
already_added = {root_uid}
all_uids = already_added.union(fo_dict)
orphans = {uid for uid, fo in fo_dict.items() if any(parent not in all_uids for parent in fo.parents)}
for uid in orphans:
logging.warning(f'FW import contains orphaned object {uid} (ignored)')
db_task = self.progress.add_task('Importing DB entries', total=len(fo_dict))
while fo_dict:
addable_uids = set()
for fo in fo_dict.values():
if all(parent in already_added for parent in fo.parents):
for uid in addable_uids:
return len(already_added)

def _parse_args(args=None):
if args is None:
args = sys.argv[1:]
parser = argparse.ArgumentParser(description='Script to import and export firmware analyses')
subparsers = parser.add_subparsers(
description='valid subcommands',
help='additional help',

parser_export = subparsers.add_parser('export')
parser_export.add_argument('uid_list', nargs='+', help='The UIDs of the firmware(s) to export')
'-o', '--output', help='The output directory (default: (cwd)/FACT_export)', type=str, default='FACT_export'

parser_import = subparsers.add_parser('import')
parser_import.add_argument('files', nargs='+', help='The FACT export archive(s) to import')
parser_import.add_argument('-f', '--force', action='store_true', help='ignore DB revision check')
return parser.parse_args(args)

def main():
args = _parse_args()
if args.command == 'export':

if __name__ == '__main__':
9 changes: 9 additions & 0 deletions src/helperFunctions/
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,12 @@ def get_some_vfp(vfp_dict: dict[str, list[str]]) -> str | None:
for vfp_list in vfp_dict.values():
return vfp_list[0]
return None

def filter_vpf_dict(vfp_dict: dict[str, list[str]], parent_uids: set[str]) -> dict[str, list[str]]:
Get only VFPs from parent files that are contained in `parent_uids`.
:param vfp_dict: A virtual file path dict
:param parent_uids: A set of allowed parent UIDs (VFPs from other parent files are filtered out)
return {k: v for k, v in vfp_dict.items() if k in parent_uids}
38 changes: 37 additions & 1 deletion src/objects/
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from helperFunctions.data_conversion import make_bytes, make_unicode_string
from helperFunctions.hash import get_sha256
from helperFunctions.uid import create_uid
from helperFunctions.virtual_file_path import get_some_vfp
from helperFunctions.virtual_file_path import filter_vpf_dict, get_some_vfp

class FileObject:
Expand Down Expand Up @@ -213,3 +213,39 @@ def __str__(self) -> str:

def __repr__(self) -> str:
return self.__str__()

def to_json(self, vfp_parent_filter: set[str] | None = None) -> dict:
Get a FileObject as JSON. `vfp_parent_filter` can be used to filter the entries with a UID whitelist.
return {
'comments': self.comments,
'depth': self.depth,
'file_name': self.file_name,
'files_included': list(self.files_included),
'processed_analysis': self.processed_analysis,
'sha256': self.sha256,
'size': self.size,
'uid': self.uid,
'virtual_file_path': (
filter_vpf_dict(self.virtual_file_path, vfp_parent_filter)
if vfp_parent_filter is not None
else self.virtual_file_path

def from_json(cls, json_dict: dict, root_uid: str | None = None) -> FileObject:
fo = cls(file_name=json_dict['file_name'])
fo.comments = json_dict.get('comments')
fo.depth = json_dict.get('depth')
fo.files_included = json_dict.get('files_included')
fo.processed_analysis = json_dict.get('processed_analysis')
fo.sha256 = json_dict.get('sha256') or json_dict.get('uid').split('_')[0]
fo.size = json_dict.get('size')
fo.uid = json_dict.get('uid')
fo.virtual_file_path = json_dict.get('virtual_file_path')
# these entries are necessary for correctly filling the included_files_table and fw_files_table
fo.parent_firmware_uids = [root_uid] if root_uid else []
fo.parents = list(fo.virtual_file_path)
return fo
27 changes: 27 additions & 0 deletions src/objects/
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,30 @@ def __str__(self) -> str:

def __repr__(self) -> str:
return self.__str__()

def to_json(self, vfp_parent_filter: set[str] | None = None) -> dict:
json = super().to_json(vfp_parent_filter)
'device_class': self.device_class,
'device_name': self.device_name,
'part': self.part,
'release_date': self.release_date,
'tags': self.tags,
'vendor': self.vendor,
'version': self.version,
return json

def from_json(cls, json: dict, root_uid: str | None = None):
instance = super().from_json(json, root_uid)
instance.device_class = json.get('device_class')
instance.device_name = json.get('device_name')
instance.part = json.get('part')
instance.release_date = json.get('release_date')
instance.tags = json.get('tags')
instance.vendor = json.get('vendor')
instance.version = json.get('version')
return instance
20 changes: 15 additions & 5 deletions src/storage/migration/
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,25 @@ def alembic_table_exists():
return inspect(connection).has_table('alembic_version', None)

def db_needs_migration():
def get_current_revision():
# alembic must be executed from src for paths to line up
with OperateInDirectory(get_src_dir()), AdminConnection().engine.connect().engine.begin() as connection:
logging.getLogger('alembic.runtime.migration').setLevel(logging.WARNING) # hide alembic log messages
context = migration.MigrationContext.configure(connection)
current_revision = context.get_current_revision()
current_head = script.ScriptDirectory.from_config(ALEMBIC_CFG).get_current_head()'Alembic DB revision: head: {current_head}, current: {current_revision}')
return current_revision != current_head
return context.get_current_revision()

def _get_current_head():
# alembic must be executed from src for paths to line up
with OperateInDirectory(get_src_dir()):
return script.ScriptDirectory.from_config(ALEMBIC_CFG).get_current_head()

def db_needs_migration():
current_revision = get_current_revision()
current_head = _get_current_head()'Alembic DB revision: head: {current_head}, current: {current_revision}')
return current_revision != current_head

def create_alembic_table():
Expand Down
Empty file.

0 comments on commit 355762e

Please sign in to comment.