diff --git a/.github/workflows/build-unblob-image.yml b/.github/workflows/build-unblob-image.yml new file mode 100644 index 00000000..b5de81af --- /dev/null +++ b/.github/workflows/build-unblob-image.yml @@ -0,0 +1,27 @@ +name: Publish Docker image based on unblob-2 branch + +on: + push: + branches: ['unblob-2'] + workflow_dispatch: + +jobs: + build-and-publish-image: + runs-on: ubuntu-latest + + steps: + - name: Checkout repository + uses: actions/checkout@v2 + + - name: Login to DockerHub + uses: docker/login-action@v2 + with: + username: ${{ secrets.DOCKER_HUB_USER }} + password: ${{ secrets.DOCKER_HUB_TOKEN }} + + - name: Build and push Docker image + uses: docker/build-push-action@v3 + with: + context: . + push: true + tags: fkiecad/fact_extractor:unblob diff --git a/.gitignore b/.gitignore index 81976e22..a9bef5cf 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ __pycache__ bin/ install.log +unblob diff --git a/fact_extractor/install/unpacker.py b/fact_extractor/install/unpacker.py index 9304ddba..497910e8 100644 --- a/fact_extractor/install/unpacker.py +++ b/fact_extractor/install/unpacker.py @@ -16,8 +16,8 @@ apt_install_packages, apt_remove_packages, install_github_project, - pip_install_packages, load_requirements_file, + pip_install_packages, ) BIN_DIR = Path(__file__).parent.parent / 'bin' @@ -115,11 +115,13 @@ 'gzip', 'lhasa', 'libchm-dev', + 'liblz4-tool', 'lrzip', 'lzip', 'lzop', 'ncompress', 'nomarch', + 'p7zip-full', 'rpm2cpio', 'rzip', 'sharutils', @@ -130,6 +132,7 @@ 'unrar', 'xdms', 'zpaq', + 'zstd', # Freetz 'autoconf', 'automake', @@ -199,13 +202,21 @@ ), ] + +def check_mod_kit_installed() -> bool: + return all((Path(__file__).parent.parent / 'bin' / tool).exists() for tool in ['tpl-tool', 'untrx', 'unyaffs2']) + + def install_dependencies(dependencies): apt = dependencies.get('apt', []) github = dependencies.get('github', []) apt_install_packages(*apt) pip_install_packages(*load_requirements_file(PIP_DEPENDENCY_FILE)) for repo in github: - install_github_project(*repo) + if repo[0].endswith('firmware-mod-kit') and check_mod_kit_installed(): + logging.info('Skipping firmware-mod-kit since it is already installed') + else: + install_github_project(*repo) def main(distribution): @@ -237,23 +248,21 @@ def _edit_sudoers(): logging.info('add rules to sudo...') username = getuser() sudoers_content = '\n'.join( - ( - f'{username}\tALL=NOPASSWD: {command}' - for command in ( - '/sbin/kpartx', - '/sbin/losetup', - '/bin/mount', - '/bin/umount', - '/bin/mknod', - '/usr/bin/sasquatch', - '/bin/rm', - '/bin/cp', - '/bin/dd', - '/bin/chown', - ) + f'{username}\tALL=NOPASSWD: {command}' + for command in ( + '/sbin/kpartx', + '/sbin/losetup', + '/bin/mount', + '/bin/umount', + '/bin/mknod', + '/usr/bin/sasquatch', + '/bin/rm', + '/bin/cp', + '/bin/dd', + '/bin/chown', ) ) - Path('/tmp/fact_overrides').write_text(f'{sudoers_content}\n') # pylint: disable=unspecified-encoding + Path('/tmp/fact_overrides').write_text(f'{sudoers_content}\n', encoding='utf-8') _, chown_code = execute_shell_command_get_return_code('sudo chown root:root /tmp/fact_overrides') _, mv_code = execute_shell_command_get_return_code('sudo mv /tmp/fact_overrides /etc/sudoers.d/fact_overrides') if not chown_code == mv_code == 0: @@ -261,19 +270,18 @@ def _edit_sudoers(): def _install_external_deb_deps(): - ''' + """ install deb packages that aren't available through Debian/Ubuntu package sources - ''' - with TemporaryDirectory(prefix='patool') as build_directory: - with OperateInDirectory(build_directory): - for file_name, url, sha256 in EXTERNAL_DEB_DEPS: - try: - run(split(f'wget {url}/{file_name}'), check=True, env=os.environ) - if not _sha256_hash_file(Path(file_name)) == sha256: - raise InstallationError(f'Wrong file hash: {file_name}') - run(split(f'sudo dpkg -i {file_name}'), capture_output=True, check=True) - except CalledProcessError as error: - raise InstallationError(f'Error during {file_name} unpacker installation') from error + """ + with TemporaryDirectory(prefix='patool') as build_directory, OperateInDirectory(build_directory): + for file_name, url, sha256 in EXTERNAL_DEB_DEPS: + try: + run(split(f'wget {url}/{file_name}'), check=True, env=os.environ) + if not _sha256_hash_file(Path(file_name)) == sha256: + raise InstallationError(f'Wrong file hash: {file_name}') + run(split(f'sudo dpkg -i {file_name}'), capture_output=True, check=True) + except CalledProcessError as error: + raise InstallationError(f'Error during {file_name} unpacker installation') from error def _sha256_hash_file(file_path: Path) -> str: @@ -281,29 +289,44 @@ def _sha256_hash_file(file_path: Path) -> str: def _install_freetz(): + if all( + (Path(__file__).parent.parent / 'bin' / tool).exists() + for tool in [ + 'find-squashfs', + 'unpack-kernel', + 'freetz_bin_functions', + 'unlzma', + 'sfk', + 'unsquashfs4-avm-be', + 'unsquashfs4-avm-le', + 'unsquashfs3-multi', + ] + ): + logging.info('Skipping FREETZ as it is already installed') + return + logging.info('Installing FREETZ') current_user = getuser() freetz_build_config = Path(__file__).parent / 'freetz.config' - with TemporaryDirectory(prefix='fact_freetz') as build_directory: - with OperateInDirectory(build_directory): - os.umask(0o022) - install_github_project( - 'Freetz-NG/freetz-ng', - [ - # add user only if it does not exist to fix issues with re-running the installation after an error - 'id -u makeuser || sudo useradd -M makeuser', - 'sudo mkdir -p /home/makeuser', - 'sudo chown -R makeuser /home/makeuser', - f'cp {freetz_build_config} ./.config', - f'sudo chown -R makeuser {build_directory}', - 'sudo su makeuser -c "make -j$(nproc) tools"', - f'sudo chmod -R 777 {build_directory}', - f'sudo chown -R {current_user} {build_directory}', - 'cp tools/find-squashfs tools/unpack-kernel tools/freetz_bin_functions tools/unlzma tools/sfk ' - f'tools/unsquashfs4-avm-be tools/unsquashfs4-avm-le tools/unsquashfs3-multi {BIN_DIR}', - 'sudo userdel makeuser', - ], - ) + with TemporaryDirectory(prefix='fact_freetz') as build_directory, OperateInDirectory(build_directory): + os.umask(0o022) + install_github_project( + 'Freetz-NG/freetz-ng', + [ + # add user only if it does not exist to fix issues with re-running the installation after an error + 'id -u makeuser || sudo useradd -M makeuser', + 'sudo mkdir -p /home/makeuser', + 'sudo chown -R makeuser /home/makeuser', + f'cp {freetz_build_config} ./.config', + f'sudo chown -R makeuser {build_directory}', + 'sudo su makeuser -c "make -j$(nproc) tools"', + f'sudo chmod -R 777 {build_directory}', + f'sudo chown -R {current_user} {build_directory}', + 'cp tools/find-squashfs tools/unpack-kernel tools/freetz_bin_functions tools/unlzma tools/sfk ' + f'tools/unsquashfs4-avm-be tools/unsquashfs4-avm-le tools/unsquashfs3-multi {BIN_DIR}', + 'sudo userdel makeuser', + ], + ) def _install_plugins(): diff --git a/fact_extractor/plugins/unpacking/generic_carver/code/generic_carver.py b/fact_extractor/plugins/unpacking/generic_carver/code/generic_carver.py index 2d27322e..5a808e46 100644 --- a/fact_extractor/plugins/unpacking/generic_carver/code/generic_carver.py +++ b/fact_extractor/plugins/unpacking/generic_carver/code/generic_carver.py @@ -5,145 +5,100 @@ from __future__ import annotations import logging -import re -import shutil +import traceback +from itertools import chain from pathlib import Path - -from common_helper_process import execute_shell_command - -from helperFunctions import magic +from typing import Iterable + +import structlog +from common_helper_unpacking_classifier import avg_entropy +from unblob.extractor import carve_unknown_chunk, carve_valid_chunk +from unblob.file_utils import File +from unblob.finder import search_chunks +from unblob.handlers import BUILTIN_HANDLERS +from unblob.handlers.compression.zlib import ZlibHandler +from unblob.models import Chunk, HexString, PaddingChunk, TaskResult, UnknownChunk +from unblob.plugins import hookimpl +from unblob.processing import Task, calculate_unknown_chunks, remove_inner_chunks NAME = 'generic_carver' MIME_PATTERNS = ['generic/carver'] -VERSION = '0.8' - -TAR_MAGIC = b'ustar' -BZ2_EOF_MAGIC = [ # the magic string is only aligned to half bytes -> two possible strings - b'\x17\x72\x45\x38\x50\x90', - b'\x77\x24\x53\x85\x09', -] -REAL_SIZE_REGEX = re.compile(r'Physical Size = (\d+)') - - -def unpack_function(file_path, tmp_dir): - """ - file_path specifies the input file. - tmp_dir should be used to store the extracted files. - """ - - logging.debug(f'File Type unknown: execute binwalk on {file_path}') - output = execute_shell_command(f'binwalk --extract --carve --signature --directory {tmp_dir} {file_path}') - - drop_underscore_directory(tmp_dir) - return {'output': output, 'filter_log': ArchivesFilter(tmp_dir).remove_false_positive_archives()} - - -class ArchivesFilter: - def __init__(self, unpack_directory): - self.unpack_directory = Path(unpack_directory) - self.screening_logs = [] - - def remove_false_positive_archives(self) -> str: - for file_path in self.unpack_directory.glob('**/*'): - if not file_path.is_file(): - continue - file_type = magic.from_file(file_path, mime=True) - - if file_type == 'application/x-tar' or self._is_possible_tar(file_type, file_path): - self._remove_invalid_archives(file_path, 'tar -tvf {}', 'does not look like a tar archive') - - elif file_type == 'application/x-xz': - self._remove_invalid_archives(file_path, 'xz -c -d {} | wc -c') - - elif file_type == 'application/gzip': - self._remove_invalid_archives(file_path, 'gzip -c -d {} | wc -c') - - elif file_path.suffix == '7z' or file_type in [ - 'application/x-7z-compressed', - 'application/x-lzma', - 'application/zip', - 'application/zlib', - ]: - self._remove_invalid_archives(file_path, '7z l {}', 'ERROR') - - if file_path.is_file(): - self._remove_trailing_data(file_type, file_path) - - return '\n'.join(self.screening_logs) - - @staticmethod - def _is_possible_tar(file_type: str, file_path: Path) -> bool: - # broken tar archives may be identified as octet-stream by newer versions of libmagic - if file_type == 'application/octet-stream': - with file_path.open(mode='rb') as fp: - fp.seek(0x101) - return fp.read(5) == TAR_MAGIC - return False - - def _remove_invalid_archives(self, file_path: Path, command, search_key=None): - output = execute_shell_command(command.format(file_path)) - - if search_key and search_key in output.replace('\n ', '') or not search_key and _output_is_empty(output): - self._remove_file(file_path) - - def _remove_file(self, file_path): - file_path.unlink() - self.screening_logs.append(f'{file_path.name} was removed (invalid archive)') - - def _remove_trailing_data(self, file_type: str, file_path: Path): - trailing_data_index = None - - if file_type in ['application/zip', 'application/zlib']: - trailing_data_index = _find_trailing_data_index_zip(file_path) - - elif file_type == 'application/x-bzip2': - trailing_data_index = _find_trailing_data_index_bz2(file_path) - - if trailing_data_index: - self._resize_file(trailing_data_index, file_path) - - def _resize_file(self, actual_size: int, file_path: Path): - with file_path.open('rb') as fp: - actual_content = fp.read(actual_size) - file_path.write_bytes(actual_content) - self.screening_logs.append(f'Removed trailing data at the end of {file_path.name}') - - -def _output_is_empty(output): - return int((output.split())[-1]) == 0 - - -def _find_trailing_data_index_zip(file_path: Path) -> int | None: - """Archives carved by binwalk often have trailing data at the end. 7z can determine the actual file size.""" - output = execute_shell_command(f'7z l {file_path}') - if 'There are data after the end of archive' in output: - match = REAL_SIZE_REGEX.search(output) - if match: - return int(match.groups()[0]) - return None - - -def _find_trailing_data_index_bz2(file_path: Path) -> int | None: - output = execute_shell_command(f'bzip2 -t {file_path}') - if 'trailing garbage' in output: - file_content = file_path.read_bytes() - matches = sorted(index for magic in BZ2_EOF_MAGIC if (index := file_content.find(magic)) != -1) - # there may be two matches, but we want the first one (but also not -1 == no match) - if matches: - # 10 is magic string + CRC 32 checksum + padding (see https://en.wikipedia.org/wiki/Bzip2#File_format) - return matches[0] + 10 - return None - - -def drop_underscore_directory(tmp_dir): - extracted_contents = list(Path(tmp_dir).iterdir()) - if not extracted_contents: - return - if len(extracted_contents) != 1 or not extracted_contents[0].name.endswith('.extracted'): - return - for result in extracted_contents[0].iterdir(): - shutil.move(str(result), str(result.parent.parent)) - shutil.rmtree(str(extracted_contents[0])) +VERSION = '1.0.0' + +MIN_FILE_ENTROPY = 0.01 + +# deactivate internal logger of unblob because it can slow down searching chunks +structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(logging.CRITICAL)) + + +# register custom zlib handler to allow carving zlib chunks from inside files +@hookimpl +def unblob_register_handlers(): + yield from [ZlibCarvingHandler] + + +class ZlibCarvingHandler(ZlibHandler): + NAME = 'zlib_carver' + + PATTERNS = [ # noqa: RUF012 + HexString('78 01'), # low compression + HexString('78 9c'), # default compression + HexString('78 da'), # best compression + HexString('78 5e'), # compressed + ] + + +def unpack_function(file_path: str, tmp_dir: str) -> dict: + extraction_dir = Path(tmp_dir) + chunks = [] + filter_report = '' + path = Path(file_path) + + try: + with File.from_path(path) as file: + for chunk in _find_chunks(path, file): + if isinstance(chunk, PaddingChunk): + continue + if isinstance(chunk, UnknownChunk): + if _has_low_entropy(file, chunk): + filter_report += ( + f'removed chunk {chunk.start_offset}-{chunk.end_offset} (reason: low entropy)\n' + ) + continue + carve_unknown_chunk(extraction_dir, file, chunk) + else: + carve_valid_chunk(extraction_dir, file, chunk) + chunks.append(chunk.as_report(None).asdict()) + + report = _create_report(chunks) if chunks else 'No valid chunks found.' + if filter_report: + report += f'\nFiltered chunks:\n{filter_report}' + except Exception as error: + report = f'Error {error} during unblob extraction:\n{traceback.format_exc()}' + return {'output': report} + + +def _find_chunks(file_path: Path, file: File) -> Iterable[Chunk]: + task = Task(path=file_path, depth=0, blob_id='') + known_chunks = remove_inner_chunks(search_chunks(file, file.size(), BUILTIN_HANDLERS, TaskResult(task))) + unknown_chunks = calculate_unknown_chunks(known_chunks, file.size()) + yield from chain(known_chunks, unknown_chunks) + + +def _create_report(chunk_list: list[dict]) -> str: + report = ['Extracted chunks:'] + for chunk in sorted(chunk_list, key=lambda c: c['start_offset']): + chunk_type = chunk.get('handler_name', 'unknown') + report.append( + f'start: {chunk["start_offset"]}, end: {chunk["end_offset"]}, size: {chunk["size"]}, type: {chunk_type}' + ) + return '\n'.join(report) + + +def _has_low_entropy(file: File, chunk: UnknownChunk) -> bool: + file.seek(chunk.start_offset) + content = file.read(chunk.size) + return avg_entropy(content) < MIN_FILE_ENTROPY # ----> Do not edit below this line <---- diff --git a/fact_extractor/plugins/unpacking/generic_carver/install.sh b/fact_extractor/plugins/unpacking/generic_carver/install.sh new file mode 100755 index 00000000..571a4859 --- /dev/null +++ b/fact_extractor/plugins/unpacking/generic_carver/install.sh @@ -0,0 +1,15 @@ +#!/usr/bin/env bash + +cd "$( dirname "${BASH_SOURCE[0]}" )" || exit 1 + +echo "------------------------------------" +echo " install unblob dependencies " +echo "------------------------------------" + +sudo -EH apt-get install -y e2fsprogs img2simg lziprecover xz-utils libmagic1 libhyperscan5 + +curl -L -o sasquatch_1.0_amd64.deb https://github.com/onekey-sec/sasquatch/releases/download/sasquatch-v1.0/sasquatch_1.0_amd64.deb +sudo dpkg -i sasquatch_1.0_amd64.deb +rm -f sasquatch_1.0_amd64.deb + +exit 0 diff --git a/fact_extractor/plugins/unpacking/generic_carver/test/data/carving_test_file b/fact_extractor/plugins/unpacking/generic_carver/test/data/carving_test_file new file mode 100644 index 00000000..ede93aef Binary files /dev/null and b/fact_extractor/plugins/unpacking/generic_carver/test/data/carving_test_file differ diff --git a/fact_extractor/plugins/unpacking/generic_carver/test/data/fake_7z.7z b/fact_extractor/plugins/unpacking/generic_carver/test/data/fake_7z.7z deleted file mode 100644 index 9ebfa302..00000000 Binary files a/fact_extractor/plugins/unpacking/generic_carver/test/data/fake_7z.7z and /dev/null differ diff --git a/fact_extractor/plugins/unpacking/generic_carver/test/data/fake_gz.gz b/fact_extractor/plugins/unpacking/generic_carver/test/data/fake_gz.gz deleted file mode 100644 index 0d806e7d..00000000 Binary files a/fact_extractor/plugins/unpacking/generic_carver/test/data/fake_gz.gz and /dev/null differ diff --git a/fact_extractor/plugins/unpacking/generic_carver/test/data/fake_tar.tar b/fact_extractor/plugins/unpacking/generic_carver/test/data/fake_tar.tar deleted file mode 100644 index 92766c47..00000000 Binary files a/fact_extractor/plugins/unpacking/generic_carver/test/data/fake_tar.tar and /dev/null differ diff --git a/fact_extractor/plugins/unpacking/generic_carver/test/data/fake_xz.bin b/fact_extractor/plugins/unpacking/generic_carver/test/data/fake_xz.bin deleted file mode 100644 index 93cea4cb..00000000 Binary files a/fact_extractor/plugins/unpacking/generic_carver/test/data/fake_xz.bin and /dev/null differ diff --git a/fact_extractor/plugins/unpacking/generic_carver/test/data/fake_xz.xz b/fact_extractor/plugins/unpacking/generic_carver/test/data/fake_xz.xz deleted file mode 100644 index 7f957cb2..00000000 Binary files a/fact_extractor/plugins/unpacking/generic_carver/test/data/fake_xz.xz and /dev/null differ diff --git a/fact_extractor/plugins/unpacking/generic_carver/test/data/fake_zip.zip b/fact_extractor/plugins/unpacking/generic_carver/test/data/fake_zip.zip deleted file mode 100644 index 543e9ad5..00000000 Binary files a/fact_extractor/plugins/unpacking/generic_carver/test/data/fake_zip.zip and /dev/null differ diff --git a/fact_extractor/plugins/unpacking/generic_carver/test/data/trailing_data.bz2 b/fact_extractor/plugins/unpacking/generic_carver/test/data/trailing_data.bz2 deleted file mode 100644 index 0a359d72..00000000 Binary files a/fact_extractor/plugins/unpacking/generic_carver/test/data/trailing_data.bz2 and /dev/null differ diff --git a/fact_extractor/plugins/unpacking/generic_carver/test/data/trailing_data.zip b/fact_extractor/plugins/unpacking/generic_carver/test/data/trailing_data.zip deleted file mode 100644 index 9476e0d8..00000000 Binary files a/fact_extractor/plugins/unpacking/generic_carver/test/data/trailing_data.zip and /dev/null differ diff --git a/fact_extractor/plugins/unpacking/generic_carver/test/test_plugin_generic_carver.py b/fact_extractor/plugins/unpacking/generic_carver/test/test_plugin_generic_carver.py new file mode 100644 index 00000000..549ea610 --- /dev/null +++ b/fact_extractor/plugins/unpacking/generic_carver/test/test_plugin_generic_carver.py @@ -0,0 +1,35 @@ +from pathlib import Path + +from helperFunctions.file_system import get_test_data_dir +from test.unit.unpacker.test_unpacker import TestUnpackerBase + +# pylint: disable=protected-access + +TEST_DATA_DIR = Path(__file__).parent / 'data' + + +class TestGenericCarver(TestUnpackerBase): + def test_unpacker_selection_generic(self): + self.check_unpacker_selection('generic/carver', 'generic_carver') + + def test_extraction(self): + in_file = f'{get_test_data_dir()}/generic_carver_test' + files, meta_data = self.unpacker._extract_files_from_file_using_specific_unpacker( + in_file, self.tmp_dir.name, self.unpacker.unpacker_plugins['generic/carver'] + ) + files = set(files) + assert len(files) == 3, 'file number incorrect' # noqa: PLR2004 + assert f'{self.tmp_dir.name}/100-887.zip' in files, 'hidden zip not identified correctly' + assert 'output' in meta_data + + def test_filter(self): + in_file = TEST_DATA_DIR / 'carving_test_file' + assert Path(in_file).is_file() + files, meta_data = self.unpacker._extract_files_from_file_using_specific_unpacker( + str(in_file), self.tmp_dir.name, self.unpacker.unpacker_plugins['generic/carver'] + ) + files = set(files) + assert len(files) == 4, 'file number incorrect' # noqa: PLR2004 + assert 'removed chunk 300-428' in meta_data['output'] + for file in ('0-128.unknown', '128-300.zip', '428-562.sevenzip', '562-626.unknown'): + assert f'{self.tmp_dir.name}/{file}' in files diff --git a/fact_extractor/plugins/unpacking/generic_carver/test/test_plugin_generic_carver_binwalk.py b/fact_extractor/plugins/unpacking/generic_carver/test/test_plugin_generic_carver_binwalk.py deleted file mode 100644 index d21bc4b6..00000000 --- a/fact_extractor/plugins/unpacking/generic_carver/test/test_plugin_generic_carver_binwalk.py +++ /dev/null @@ -1,70 +0,0 @@ -import shutil -from dataclasses import dataclass -from pathlib import Path -from tempfile import TemporaryDirectory -import pytest -from contextlib import contextmanager - -from plugins.unpacking.generic_carver.code.generic_carver import ArchivesFilter -from test.unit.unpacker.test_unpacker import TestUnpackerBase -from helperFunctions.file_system import get_test_data_dir - -# pylint: disable=protected-access - -TEST_DATA_DIR = Path(__file__).parent / 'data' - - -class TestGenericCarver(TestUnpackerBase): - def test_unpacker_selection_generic(self): - self.check_unpacker_selection('generic/carver', 'generic_carver') - - def test_extraction(self): - in_file = f'{get_test_data_dir()}/generic_carver_test' - files, meta_data = self.unpacker._extract_files_from_file_using_specific_unpacker( - in_file, self.tmp_dir.name, self.unpacker.unpacker_plugins['generic/carver'] - ) - files = set(files) - assert len(files) == 1, 'file number incorrect' - assert files == {f'{self.tmp_dir.name}/64.zip'}, 'not all files found' - assert 'output' in meta_data - assert 'filter_log' in meta_data - - def test_extraction_of_filtered_files(self): - in_file = str(TEST_DATA_DIR / 'fake_xz.bin') - files, meta_data = self.unpacker._extract_files_from_file_using_specific_unpacker( - in_file, self.tmp_dir.name, self.unpacker.unpacker_plugins['generic/carver'] - ) - assert len(files) == 0 - assert 'was removed' in meta_data['filter_log'] - - -@dataclass -class FilterTest: - test_file: Path - source_file: Path - filter: ArchivesFilter - - -@contextmanager -def filter_test_setup(filename) -> FilterTest: - with TemporaryDirectory() as temp_dir: - test_file = Path(temp_dir) / filename - source_file = TEST_DATA_DIR / filename - shutil.copyfile(source_file, test_file) - arch_filter = ArchivesFilter(temp_dir) - yield FilterTest(test_file, source_file, arch_filter) - - -@pytest.mark.parametrize('filename', ['fake_zip.zip', 'fake_tar.tar', 'fake_7z.7z', 'fake_xz.xz', 'fake_gz.gz']) -def test_remove_false_positives(filename): - with filter_test_setup(filename) as setup: - setup.filter.remove_false_positive_archives() - assert setup.test_file.is_file() is False - - -@pytest.mark.parametrize('filename', ['trailing_data.zip', 'trailing_data.bz2']) -def test_remove_trailing_data(filename): - with filter_test_setup(filename) as setup: - setup.filter.remove_false_positive_archives() - assert setup.filter.screening_logs == [f'Removed trailing data at the end of {filename}'] - assert setup.test_file.stat().st_size < setup.source_file.stat().st_size diff --git a/fact_extractor/plugins/unpacking/linuxkernel/install.sh b/fact_extractor/plugins/unpacking/linuxkernel/install.sh deleted file mode 100755 index a12087e5..00000000 --- a/fact_extractor/plugins/unpacking/linuxkernel/install.sh +++ /dev/null @@ -1,9 +0,0 @@ -#!/usr/bin/env bash -set -e - -echo "------------------------------------" -echo " install liblz4-tools, zstd " -echo "------------------------------------" - -sudo apt-get install -y liblz4-tool zstd -exit 0 diff --git a/fact_extractor/plugins/unpacking/sevenz/install.sh b/fact_extractor/plugins/unpacking/sevenz/install.sh deleted file mode 100755 index 6e7d71c7..00000000 --- a/fact_extractor/plugins/unpacking/sevenz/install.sh +++ /dev/null @@ -1,31 +0,0 @@ -#!/usr/bin/env bash -set -e - -cd "$( dirname "${BASH_SOURCE[0]}" )" - -echo "------------------------------------" -echo " install p7z from source " -echo "------------------------------------" - - -# install newest version of p7zip -sudo apt-get remove -y p7zip-full - -mkdir -p /tmp/fact_build -cd /tmp/fact_build - -wget -O 7zip.tar.bz2 https://sourceforge.net/projects/p7zip/files/latest/download -# remove possible artifacts from previous installation (: == NOP) -rm -rf ./p7zip* || : -tar xvjf 7zip.tar.bz2 -cd p7zip* -# gcc >= 11 has -Wnarrowing as default flag which leads to an error during compilation -# g++ will try to use standard C++17 but the code is not compatible -> use C++14 -sed -i 's/CXXFLAGS=-c -I. \\/CXXFLAGS=-c -I. -Wno-narrowing -std=c++14 \\/g' makefile.glb || echo "Warning: Could not apply makefile patch" -cp makefile.linux_any_cpu makefile.machine -make -j"$(nproc)" all3 -sudo ./install.sh -cd .. -rm -fr p7zip* 7zip.tar.bz2 - -exit 0 diff --git a/fact_extractor/plugins/unpacking/squashFS/test/test_plugin_squashfs.py b/fact_extractor/plugins/unpacking/squashFS/test/test_plugin_squashfs.py old mode 100755 new mode 100644 index 111c5cae..0aa3e2cb --- a/fact_extractor/plugins/unpacking/squashFS/test/test_plugin_squashfs.py +++ b/fact_extractor/plugins/unpacking/squashFS/test/test_plugin_squashfs.py @@ -1,17 +1,17 @@ from pathlib import Path +from tempfile import TemporaryDirectory import pytest -from tempfile import TemporaryDirectory from test.unit.unpacker.test_unpacker import TestUnpackerBase -from ..code.squash_fs import _unpack_success, unpack_function, SQUASH_UNPACKER +from ..code.squash_fs import SQUASH_UNPACKER, _unpack_success, unpack_function TEST_DATA_DIR = Path(__file__).parent / 'data' @pytest.mark.parametrize( - 'unpack_path, expected', + ('unpack_path', 'expected'), [ ('/foo/bar/unpacker', False), (TEST_DATA_DIR, True), @@ -31,29 +31,32 @@ def test_not_unpackable_file(): def test_tool_paths_set_correctly(): for unpacker, _ in SQUASH_UNPACKER: - assert unpacker.exists() + assert unpacker.exists(), f'{unpacker} not found.' class TestSquashUnpacker(TestUnpackerBase): def test_unpacker_selection_generic(self): self.check_unpacker_selection('filesystem/squashfs', 'SquashFS') - @pytest.mark.parametrize(('file', 'expected'), [ - ('avm_be.sqfs4', 'sasquatch-v4be'), - ('avm_le.sqfs4', 'sasquatch'), - ('gzip.sqfs', 'sasquatch'), - ('lz4.sqfs', 'sasquatch'), - ('lzma.sqfs', 'sasquatch'), - ('lzma1_be.sqfs3', 'sasquatch'), - ('lzma1_le.sqfs3', 'sasquatch'), - ('lzma_be.sqfs2', 'unsquashfs4-avm-be'), - ('lzma_le.sqfs2', 'unsquashfs4-avm-be'), - ('lzo.sqfs', 'sasquatch'), - ('xz.sqfs', 'sasquatch'), - ('zlib_be.sqfs3', 'sasquatch'), - ('zlib_le.sqfs3', 'sasquatch'), - ('zstd.sqfs', 'sasquatch'), - ]) + @pytest.mark.parametrize( + ('file', 'expected'), + [ + ('avm_be.sqfs4', 'sasquatch-v4be'), + ('avm_le.sqfs4', 'sasquatch'), + ('gzip.sqfs', 'sasquatch'), + ('lz4.sqfs', 'sasquatch'), + ('lzma.sqfs', 'sasquatch'), + ('lzma1_be.sqfs3', 'sasquatch'), + ('lzma1_le.sqfs3', 'sasquatch'), + ('lzma_be.sqfs2', 'unsquashfs4-avm-be'), + ('lzma_le.sqfs2', 'unsquashfs4-avm-be'), + ('lzo.sqfs', 'sasquatch'), + ('xz.sqfs', 'sasquatch'), + ('zlib_be.sqfs3', 'sasquatch'), + ('zlib_le.sqfs3', 'sasquatch'), + ('zstd.sqfs', 'sasquatch'), + ], + ) def test_extraction_sqfs(self, file, expected): meta_data = self.check_unpacking_of_standard_unpack_set(TEST_DATA_DIR / file) assert meta_data['plugin_used'] == 'SquashFS' diff --git a/fact_extractor/plugins/unpacking/xiaomi_hdr/__init__.py b/fact_extractor/plugins/unpacking/xiaomi_hdr/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/fact_extractor/plugins/unpacking/xiaomi_hdr/code/__init__.py b/fact_extractor/plugins/unpacking/xiaomi_hdr/code/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/fact_extractor/plugins/unpacking/xiaomi_hdr/code/xiaomi_hdr.py b/fact_extractor/plugins/unpacking/xiaomi_hdr/code/xiaomi_hdr.py new file mode 100644 index 00000000..56e7a89f --- /dev/null +++ b/fact_extractor/plugins/unpacking/xiaomi_hdr/code/xiaomi_hdr.py @@ -0,0 +1,50 @@ +""" +This plugin uses unblob to unpack Xiaomi HDR1/2 images. +""" + +from __future__ import annotations + +import logging +from pathlib import Path + +import structlog +from structlog.testing import capture_logs +from unblob.handlers.archive.xiaomi.hdr import HDRExtractor + +NAME = 'Xiaomi HDR' +MIME_PATTERNS = ['firmware/xiaomi-hdr1', 'firmware/xiaomi-hdr2'] +VERSION = '0.1.0' + +structlog.configure( + wrapper_class=structlog.make_filtering_bound_logger(logging.DEBUG), +) + + +def unpack_function(file_path: str, tmp_dir: str) -> dict: + path = Path(file_path) + with path.open('rb') as fp: + magic = fp.read(4) + if magic in [b'HDR1', b'HDR2']: + extractor = HDRExtractor(f'{magic.decode().lower()}_header_t') + else: + return {'output': ''} + + # unblob uses structlog for logging, but we can capture the logs with this convenient testing function + with capture_logs() as log_list: + extractor.extract(path, Path(tmp_dir)) + return {'output': _format_logs(log_list)} + + +def _format_logs(logs: list[dict]) -> str: + output = '' + for entry in logs: + output += '\n'.join(f'{key}: {value}' for key, value in entry.items() if key not in {'_verbosity', 'log_level'}) + return output + + +# ----> Do not edit below this line <---- + + +def setup(unpack_tool): + for item in MIME_PATTERNS: + unpack_tool.register_plugin(item, (unpack_function, NAME, VERSION)) diff --git a/fact_extractor/plugins/unpacking/xiaomi_hdr/test/__init__.py b/fact_extractor/plugins/unpacking/xiaomi_hdr/test/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/fact_extractor/plugins/unpacking/xiaomi_hdr/test/data/test.hdr1 b/fact_extractor/plugins/unpacking/xiaomi_hdr/test/data/test.hdr1 new file mode 100644 index 00000000..3d97faa7 Binary files /dev/null and b/fact_extractor/plugins/unpacking/xiaomi_hdr/test/data/test.hdr1 differ diff --git a/fact_extractor/plugins/unpacking/xiaomi_hdr/test/test_hdr.py b/fact_extractor/plugins/unpacking/xiaomi_hdr/test/test_hdr.py new file mode 100644 index 00000000..2fb11f5e --- /dev/null +++ b/fact_extractor/plugins/unpacking/xiaomi_hdr/test/test_hdr.py @@ -0,0 +1,22 @@ +from pathlib import Path + +from plugins.unpacking.xiaomi_hdr.code.xiaomi_hdr import MIME_PATTERNS +from test.unit.unpacker.test_unpacker import TestUnpackerBase + +TEST_DATA_DIR = Path(__file__).parent / 'data' + + +class TestXiaomiHdrUnpacker(TestUnpackerBase): + def test_unpacker_selection_generic(self): + for mime in MIME_PATTERNS: + self.check_unpacker_selection(mime, 'Xiaomi HDR') + + def test_extraction_hdr(self): + in_file = TEST_DATA_DIR / 'test.hdr1' + assert in_file.is_file(), 'test file is missing' + meta = self.check_unpacking_of_standard_unpack_set( + in_file, + output=True, + ) + assert 'output' in meta + assert 'testfile1' in meta['output'] diff --git a/requirements-unpackers.txt b/requirements-unpackers.txt index 69b52e02..8ed68037 100644 --- a/requirements-unpackers.txt +++ b/requirements-unpackers.txt @@ -7,13 +7,6 @@ patool~=2.2.0 git+https://github.com/sviehb/jefferson.git@v0.4.1 cstruct==2.1 python-lzo==1.14 -# generic_carver: binwalk -# ToDo: pin to fork (?) -git+https://github.com/ReFirmLabs/binwalk@v2.3.2 -pyqtgraph~=0.13.4 -capstone~=5.0.1 -numpy~=1.26.4 -scipy~=1.13.0 # ubi ubi-reader~=0.8.9 # dji / dlink_shrs @@ -37,3 +30,6 @@ extract-dtb~=1.2.3 # uefi uefi-firmware~=1.11 pylibfdt ~= 1.7.1 +# unblob +# FixMe: pin to next stable version; the latest release is missing a bug fix related to zip64 +git+https://github.com/onekey-sec/unblob.git@e0d9805