Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into task-server
Browse files Browse the repository at this point in the history
  • Loading branch information
jstucke committed Nov 23, 2022
2 parents 7d5ff06 + eb9fa78 commit cfc8167
Show file tree
Hide file tree
Showing 27 changed files with 286 additions and 122 deletions.
45 changes: 45 additions & 0 deletions .github/workflows/build_ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
name: Build CI
run-name: Build CI
on:
pull_request:
branches: [ master ]
schedule:
- cron: "0 2 * * *"
workflow_dispatch:

jobs:
build-ci:
runs-on: [ self-hosted, linux, x64, focal ]
timeout-minutes: 45
steps:
- name: Add Masks
run: |
echo "::add-mask::${{ secrets.NETWORK_MASK_1 }}"
echo "::add-mask::${{ secrets.NETWORK_MASK_2 }}"
echo "::add-mask::${{ secrets.NETWORK_MASK_3 }}"
echo "::add-mask::${{ secrets.INTERNAL_NODE_1 }}"
echo "::add-mask::${{ secrets.INTERNAL_NODE_2 }}"
echo "::add-mask::${{ secrets.INTERNAL_NODE_3 }}"
echo "::add-mask::${{ secrets.INTERNAL_NODE_4 }}"
echo "::add-mask::${{ secrets.INTERNAL_NODE_5 }}"
echo "::add-mask::${{ secrets.INTERNAL_NODE_6 }}"
echo "::add-mask::${{ secrets.INTERNAL_NODE_7 }}"
echo "::add-mask::${{ secrets.INTERNAL_NODE_8 }}"
echo "::add-mask::${{ secrets.SECRET_STRING_1 }}"
echo "::add-mask::${{ secrets.SECRET_STRING_2 }}"
echo "::add-mask::${{ secrets.SECRET_STRING_3 }}"
- name: Checkout Branch
uses: actions/checkout@v3
with:
ref: ${{ github.ref }}
- name: Set ulimit
run: ulimit -n 9999
- name: Pre-Installation
shell: 'script -q -e -c "bash {0}"'
run: ./fact_extractor/install/pre_install.sh
- name: Install fact_extractor
shell: 'script -q -e -c "bash {0}"'
run: ./fact_extractor/install.py
- name: Unit Tests
shell: 'script -q -e -c "bash {0}"'
run: pytest
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,15 @@ fact_extractor/install/pre_install.sh
fact_extractor/install.py
```

:warning: **We no longer support Ubuntu 16.04 and Python 3.5**
:warning: **We no longer support Ubuntu 16.04 and Python <3.7**
(It may still work with a bit of tinkering, though)

:warning: For the `generic_fs` unpacker plugin to work with all file system types, you may need to install extra kernel modules

```sh
sudo apt install linux-modules-extra-$(uname -r)
```

The tool can then be run with

```bash
Expand Down Expand Up @@ -92,7 +98,7 @@ This project is partly financed by [German Federal Office for Information Securi
## License
```
Firmware Analysis and Comparison Tool (FACT) extractor
Copyright (C) 2015-2019 Fraunhofer FKIE
Copyright (C) 2015-2022 Fraunhofer FKIE
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
Expand Down
12 changes: 12 additions & 0 deletions fact_extractor/helperFunctions/file_system.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import logging
import lzma
from contextlib import contextmanager
from pathlib import Path
from re import match
from tempfile import TemporaryDirectory

from common_helper_process import execute_shell_command_get_return_code

Expand Down Expand Up @@ -54,3 +57,12 @@ def change_owner_of_output_files(files_dir: Path, owner: str) -> int:
_, return_code_chown = execute_shell_command_get_return_code(f'sudo chown -R {owner} {files_dir}')
_, return_code_chmod = execute_shell_command_get_return_code(f'sudo chmod -R u+rw {files_dir}')
return return_code_chmod | return_code_chown


@contextmanager
def decompress_test_file(test_file: Path) -> Path:
with TemporaryDirectory() as tmp_dir:
target_file = Path(tmp_dir) / 'fs.img'
with lzma.open(test_file) as decompressed_file:
target_file.write_bytes(decompressed_file.read())
yield target_file
4 changes: 3 additions & 1 deletion fact_extractor/install/unpacker.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,9 @@
'gcc-multilib',
'lib32stdc++6',
'gawk',
'pkg-config'
'pkg-config',
# android sparse image
'simg2img',
],
'pip3': [
'pluginbase',
Expand Down
Empty file.
Empty file.
26 changes: 26 additions & 0 deletions fact_extractor/plugins/unpacking/android_simg/code/android_simg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
'''
This plugin "unpacks" Android sparse image by converting them to regular filesystem images using the simg2img tool
'''
import logging
from pathlib import Path

from common_helper_process import execute_shell_command

NAME = 'Android-sparse-image'
MIME_PATTERNS = ['filesystem/android-simg']
VERSION = '0.1'


def unpack_function(file_path, tmp_dir):
extract_dir = Path(tmp_dir)
file_to_unpack = Path(file_path)
output = execute_shell_command(f'simg2img {file_path} {extract_dir / file_to_unpack.name}.raw') + '\n'
meta_data = {'output': output}
logging.debug(output)
return meta_data


# ----> Do not edit below this line <----
def setup(unpack_tool):
for item in MIME_PATTERNS:
unpack_tool.register_plugin(item, (unpack_function, NAME, VERSION))
Empty file.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from pathlib import Path


import sys
root_dir = Path(__file__).parent.parent.parent.parent.parent
print(root_dir, [str(d) for d in root_dir.iterdir() if d.is_dir()])
sys.path.append(str(root_dir))

from test.unit.unpacker.test_unpacker import TestUnpackerBase

TEST_DATA_DIR = Path(__file__).parent / 'data'


class TestAndroidSimgUnpacker(TestUnpackerBase):

def test_unpacker_selection_generic(self):
self.check_unpacker_selection('filesystem/android-simg', 'Android-sparse-image')

def test_extraction_simg(self):
in_file = TEST_DATA_DIR / 'simg.img'
files, _ = self.unpacker.extract_files_from_file(str(in_file), self.tmp_dir.name)
assert len(files) == 1
assert files[0].endswith('raw')
54 changes: 31 additions & 23 deletions fact_extractor/plugins/unpacking/generic_fs/code/generic_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,21 @@
This plugin mounts filesystem images and extracts their content
'''
import re
from shlex import split
from subprocess import run, PIPE, STDOUT
from tempfile import TemporaryDirectory
from time import sleep

from common_helper_process import (
execute_shell_command, execute_shell_command_get_return_code
)
from fact_helper_file import get_file_type_from_path

NAME = 'genericFS'
MIME_PATTERNS = [
'filesystem/btrfs', 'filesystem/cramfs', 'filesystem/dosmbr', 'filesystem/ext2', 'filesystem/ext3',
'filesystem/ext4', 'filesystem/f2fs', 'filesystem/fat', 'filesystem/hfs', 'filesystem/jfs', 'filesystem/minix',
'filesystem/ntfs', 'filesystem/reiserfs', 'filesystem/romfs', 'filesystem/udf', 'filesystem/xfs', 'generic/fs',
'filesystem/btrfs', 'filesystem/dosmbr', 'filesystem/f2fs', 'filesystem/jfs', 'filesystem/minix',
'filesystem/reiserfs', 'filesystem/romfs', 'filesystem/udf', 'filesystem/xfs', 'generic/fs',
]
VERSION = '0.6'
VERSION = '0.6.1'
TYPES = {
'filesystem/btrfs': 'btrfs',
'filesystem/cramfs': 'cramfs',
'filesystem/f2fs': 'f2fs',
'filesystem/jfs': 'jfs',
'filesystem/minix': 'minix',
Expand All @@ -41,17 +38,28 @@ def unpack_function(file_path, tmp_dir):


def _mount_single_filesystem(file_path, mime_type, tmp_dir):
type_parameter = '-t {}'.format(TYPES[mime_type]) if mime_type in TYPES else ''
mount_dir = TemporaryDirectory()
output = execute_shell_command(f'sudo mount {type_parameter} -v -o ro,loop {file_path} {mount_dir.name}')
output += execute_shell_command(f'sudo cp -av {mount_dir.name}/* {tmp_dir}/')
output += execute_shell_command(f'sudo umount -v {mount_dir.name}')
mount_dir.cleanup()
type_parameter = f'-t {TYPES[mime_type]}' if mime_type in TYPES else ''
with TemporaryDirectory() as mount_dir:
output = _get_output(f'sudo mount {type_parameter} -v -o ro,loop {file_path} {mount_dir}')
output += _get_output(f'sudo cp -av {mount_dir}/* {tmp_dir}/')
output += _get_output(f'sudo umount -v {mount_dir}')

if 'unknown filesystem type' in output:
output += '\nwarning: you may need to install additional kernel modules'
return output


def _get_output(command: str) -> str:
environment = {'LANG': 'en_US.UTF-8'} # use LANG env variable to get unified localization output
return run(command, shell=True, env=environment, check=False, text=True, stdout=PIPE, stderr=STDOUT).stdout


def _run(command: str):
run(split(command), check=False)


def _mount_from_boot_record(file_path, tmp_dir):
output, return_code = execute_shell_command_get_return_code('sudo kpartx -a -v {}'.format(file_path))
output = _get_output(f'sudo kpartx -a -v {file_path}')
sleep(1) # Necessary since initialization of special devices seem to take some time
# kpartx may return an error on one partition but others are still loaded correctly.
loop_devices = _extract_loop_devices(output)
Expand All @@ -63,29 +71,29 @@ def _mount_from_boot_record(file_path, tmp_dir):
if loop_devices:
# Occasionally device mapping isn't removed correctly and results in losetup -d to fail, so remove explicitly
for loop_dev in loop_devices:
execute_shell_command(f'sudo dmsetup remove /dev/mapper/{loop_dev}')
_run(f'sudo dmsetup remove /dev/mapper/{loop_dev}')

# Bug in kpartx doesn't allow -d to work on long file names (as in /storage/path/<prefix>/<sha_hash>_<length>)
# thus "host" loop device is used instead of filename
k_output, return_code = execute_shell_command_get_return_code(f'sudo kpartx -d -v {_get_host_loop(loop_devices)}')
execute_shell_command(f'sudo losetup -d {_get_host_loop(loop_devices)}')
return output + k_output
output += _get_output(f'sudo kpartx -d -v {_get_host_loop(loop_devices)}')
_run(f'sudo losetup -d {_get_host_loop(loop_devices)}')

return output


def _process_loop_device(loop_device, mount_point, target_directory, index):
output = execute_shell_command(f'sudo mount -o ro -v /dev/mapper/{loop_device} {mount_point}')
output += execute_shell_command(f'sudo cp -av {mount_point}/ {target_directory}/partition_{index}/')
return output + execute_shell_command(f'sudo umount -v {mount_point}')
output = _get_output(f'sudo mount -o ro -v /dev/mapper/{loop_device} {mount_point}')
output += _get_output(f'sudo cp -av {mount_point}/ {target_directory}/partition_{index}/')
return output + _get_output(f'sudo umount -v {mount_point}')


def _extract_loop_devices(kpartx_output):
return re.findall(r'.*map (loop\d{1,2}p\d{1,2})\s.*', kpartx_output)


def _get_host_loop(devices):
return '/dev/{}'.format(re.findall(r'(loop\d{1,2})', devices[0])[0])
device = re.findall(r'(loop\d{1,2})', devices[0])[0]
return f'/dev/{device}'


# ----> Do not edit below this line <----
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import lzma
from contextlib import contextmanager
from pathlib import Path
from tempfile import TemporaryDirectory

from helperFunctions.file_system import decompress_test_file
from test.unit.unpacker.test_unpacker import TestUnpackerBase
from ..code.generic_fs import _extract_loop_devices
from ..code.generic_fs import _extract_loop_devices, _mount_single_filesystem, TYPES

TEST_DATA_DIR = Path(__file__).parent / 'data'
KPARTX_OUTPUT = '''
Expand All @@ -17,54 +16,22 @@ class TestGenericFsUnpacker(TestUnpackerBase):

def test_unpacker_selection_generic(self):
self.check_unpacker_selection('filesystem/btrfs', 'genericFS')
self.check_unpacker_selection('filesystem/cramfs', 'genericFS')
self.check_unpacker_selection('filesystem/dosmbr', 'genericFS')
self.check_unpacker_selection('filesystem/ext2', 'genericFS')
self.check_unpacker_selection('filesystem/ext3', 'genericFS')
self.check_unpacker_selection('filesystem/ext4', 'genericFS')
self.check_unpacker_selection('filesystem/f2fs', 'genericFS')
self.check_unpacker_selection('filesystem/hfs', 'genericFS')
self.check_unpacker_selection('filesystem/jfs', 'genericFS')
self.check_unpacker_selection('filesystem/minix', 'genericFS')
self.check_unpacker_selection('filesystem/reiserfs', 'genericFS')
self.check_unpacker_selection('filesystem/romfs', 'genericFS')
self.check_unpacker_selection('filesystem/udf', 'genericFS')
self.check_unpacker_selection('filesystem/xfs', 'genericFS')

def test_extraction_cramfs(self):
self.check_unpacking_of_standard_unpack_set(TEST_DATA_DIR / 'cramfs.img')

def test_extraction_romfs(self):
self.check_unpacking_of_standard_unpack_set(TEST_DATA_DIR / 'romfs.img')

def test_extraction_btrfs(self):
with decompress_test_file(TEST_DATA_DIR / 'btrfs.img.xz') as test_file:
self.check_unpacking_of_standard_unpack_set(test_file, additional_prefix_folder='get_files_test')

def test_extraction_ext2(self):
with decompress_test_file(TEST_DATA_DIR / 'ext2.img.xz') as test_file:
self.check_unpacking_of_standard_unpack_set(test_file, additional_prefix_folder='get_files_test')

def test_extraction_ext3(self):
with decompress_test_file(TEST_DATA_DIR / 'ext3.img.xz') as test_file:
self.check_unpacking_of_standard_unpack_set(test_file, additional_prefix_folder='get_files_test')

def test_extraction_ext4(self):
with decompress_test_file(TEST_DATA_DIR / 'ext4.img.xz') as test_file:
self.check_unpacking_of_standard_unpack_set(test_file, additional_prefix_folder='get_files_test')

def test_extraction_fat(self):
with decompress_test_file(TEST_DATA_DIR / 'fat.img.xz') as test_file:
self.check_unpacking_of_standard_unpack_set(test_file, additional_prefix_folder='get_files_test')

def test_extraction_ntfs(self):
with decompress_test_file(TEST_DATA_DIR / 'ntfs.img.xz') as test_file:
self.check_unpacking_of_standard_unpack_set(test_file, additional_prefix_folder='get_files_test')

def test_extraction_hfs(self):
with decompress_test_file(TEST_DATA_DIR / 'hfs.img.xz') as test_file:
self.check_unpacking_of_standard_unpack_set(test_file, additional_prefix_folder='get_files_test')

def test_extraction_jfs(self):
with decompress_test_file(TEST_DATA_DIR / 'jfs.img.xz') as test_file:
self.check_unpacking_of_standard_unpack_set(test_file, additional_prefix_folder='get_files_test')
Expand Down Expand Up @@ -111,10 +78,11 @@ def test_extract_loop_devices():
assert loop_devices == ['loop7p1', 'loop7p2']


@contextmanager
def decompress_test_file(test_file: Path) -> Path:
with TemporaryDirectory() as tmp_dir:
target_file = Path(tmp_dir) / 'fs.img'
with lzma.open(test_file) as decompressed_file:
target_file.write_bytes(decompressed_file.read())
yield target_file
def test_unknown_filesystem():
try:
TYPES['foobar'] = 'foobar'
with TemporaryDirectory() as tmp_dir:
output = _mount_single_filesystem(TEST_DATA_DIR / 'romfs.img', 'foobar', tmp_dir)
assert 'you may need to install additional kernel modules' in output
finally:
TYPES.pop('foobar')
26 changes: 20 additions & 6 deletions fact_extractor/plugins/unpacking/sevenz/code/sevenz.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,24 @@
from helperFunctions.file_system import get_src_dir

NAME = '7z'
MIME_PATTERNS = ['application/x-lzma', 'application/x-7z-compressed', 'application/zip', 'application/x-zip-compressed']
VERSION = '0.7'

UNPACKER_EXECUTEABLE = '7z'
MIME_PATTERNS = [
# compressed archives
'application/x-lzma',
'application/x-7z-compressed',
'application/zip',
'application/x-zip-compressed',
# file systems
'filesystem/cramfs',
'filesystem/ext2',
'filesystem/ext3',
'filesystem/ext4',
'filesystem/fat',
'filesystem/hfs',
'filesystem/ntfs',
]
VERSION = '0.8'

UNPACKER_EXECUTABLE = '7z'
PW_LIST = get_merged_password_set(os.path.join(get_src_dir(), 'unpacker/passwords'))


Expand All @@ -23,7 +37,7 @@ def unpack_function(file_path, tmp_dir):
'''
meta = {}
for password in PW_LIST:
execution_string = 'fakeroot {} x -y -p{} -o{} {}'.format(UNPACKER_EXECUTEABLE, password, tmp_dir, file_path)
execution_string = f'fakeroot {UNPACKER_EXECUTABLE} x -y -p{password} -o{tmp_dir} {file_path}'
output = execute_shell_command(execution_string)

meta['output'] = output
Expand All @@ -34,7 +48,7 @@ def unpack_function(file_path, tmp_dir):

# Inform the user if not correct password was found
if 'Wrong password' in meta['output']:
logging.warn('Password for {} not found in fact_extractor/unpacker/passwords directory'.format(file_path))
logging.warning(f'Password for {file_path} not found in fact_extractor/unpacker/passwords directory')

return meta

Expand Down
Loading

0 comments on commit cfc8167

Please sign in to comment.