diff --git a/fact_extractor/docker_extraction.py b/fact_extractor/docker_extraction.py index 4c566367..709a55f2 100755 --- a/fact_extractor/docker_extraction.py +++ b/fact_extractor/docker_extraction.py @@ -1,21 +1,22 @@ #!/usr/bin/env python3 -''' - fact_extractor - Copyright (C) 2015-2019 Fraunhofer FKIE - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see . -''' +""" +fact_extractor +Copyright (C) 2015-2019 Fraunhofer FKIE + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . +""" + import argparse import sys from pathlib import Path diff --git a/fact_extractor/fact_extract.py b/fact_extractor/fact_extract.py index a859534c..c8f0828e 100755 --- a/fact_extractor/fact_extract.py +++ b/fact_extractor/fact_extract.py @@ -1,21 +1,21 @@ #!/usr/bin/env python3 -''' - fact_extractor - Copyright (C) 2015-2019 Fraunhofer FKIE +""" +fact_extractor +Copyright (C) 2015-2019 Fraunhofer FKIE - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. - You should have received a copy of the GNU General Public License - along with this program. If not, see . -''' +You should have received a copy of the GNU General Public License +along with this program. If not, see . +""" import sys from pathlib import Path @@ -39,4 +39,4 @@ def main(): if __name__ == '__main__': - exit(main()) + sys.exit(main()) diff --git a/fact_extractor/helperFunctions/program_setup.py b/fact_extractor/helperFunctions/program_setup.py index f7e72069..202e2413 100644 --- a/fact_extractor/helperFunctions/program_setup.py +++ b/fact_extractor/helperFunctions/program_setup.py @@ -12,19 +12,23 @@ def setup_argparser(name, description, command_line_options, version=__VERSION__): - parser = argparse.ArgumentParser(description='{} - {}'.format(name, description)) - parser.add_argument('-V', '--version', action='version', version='{} {}'.format(name, version)) + parser = argparse.ArgumentParser(description=f'{name} - {description}') + parser.add_argument('-V', '--version', action='version', version=f'{name} {version}') parser.add_argument('-l', '--log_file', help='path to log file', default=None) - parser.add_argument('-L', '--log_level', help='define the log level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], default=None) + parser.add_argument( + '-L', '--log_level', help='define the log level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], default=None + ) parser.add_argument('-d', '--debug', action='store_true', default=False, help='print debug messages') - parser.add_argument('-C', '--config_file', help='set path to config File', default='{}/main.cfg'.format(get_config_dir())) + parser.add_argument('-C', '--config_file', help='set path to config File', default=f'{get_config_dir()}/main.cfg') parser.add_argument('FILE_PATH', type=str, help='Path to file that should be extracted') return parser.parse_args(command_line_options[1:]) def setup_logging(debug, log_file=None, log_level=None): log_level = log_level if log_level else logging.WARNING - log_format = logging.Formatter(fmt='[%(asctime)s][%(module)s][%(levelname)s]: %(message)s', datefmt='%Y-%m-%d %H:%M:%S') + log_format = logging.Formatter( + fmt='[%(asctime)s][%(module)s][%(levelname)s]: %(message)s', datefmt='%Y-%m-%d %H:%M:%S' + ) logger = logging.getLogger('') logger.setLevel(logging.DEBUG) @@ -45,10 +49,10 @@ def setup_logging(debug, log_file=None, log_level=None): def check_ulimits(): # Get number of openable files soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) - if soft < 1024: + if soft < 1024: # noqa: PLR2004 resource.setrlimit(resource.RLIMIT_NOFILE, (min(1024, hard), hard)) logging.info(f'The number of openable files has been raised from {soft} to {min(1024, hard)}.') - elif soft == resource.RLIM_INFINITY or soft > 100000: + elif soft == resource.RLIM_INFINITY or soft > 100000: # noqa: PLR2004 logging.warning('Warning: A very high (or no) nofile limit will slow down fakeroot and cause other problems.') diff --git a/fact_extractor/helperFunctions/statistics.py b/fact_extractor/helperFunctions/statistics.py index 6172f10b..8ba5489a 100644 --- a/fact_extractor/helperFunctions/statistics.py +++ b/fact_extractor/helperFunctions/statistics.py @@ -1,17 +1,20 @@ -from configparser import ConfigParser +from __future__ import annotations + from contextlib import suppress -from pathlib import Path -from typing import Dict, List +from typing import TYPE_CHECKING import magic from common_helper_files import safe_rglob -from common_helper_unpacking_classifier import ( - avg_entropy, get_binary_size_without_padding, is_compressed -) +from common_helper_unpacking_classifier import avg_entropy, get_binary_size_without_padding, is_compressed + from helperFunctions.config import read_list_from_config +if TYPE_CHECKING: + from configparser import ConfigParser + from pathlib import Path + -def add_unpack_statistics(extraction_dir: Path, meta_data: Dict): +def add_unpack_statistics(extraction_dir: Path, meta_data: dict): unpacked_files, unpacked_directories = 0, 0 for extracted_item in safe_rglob(extraction_dir): if extracted_item.is_file(): @@ -23,13 +26,20 @@ def add_unpack_statistics(extraction_dir: Path, meta_data: Dict): meta_data['number_of_unpacked_directories'] = unpacked_directories -def get_unpack_status(file_path: str, binary: bytes, extracted_files: List[Path], meta_data: Dict, config: ConfigParser): +def get_unpack_status( + file_path: str, binary: bytes, extracted_files: list[Path], meta_data: dict, config: ConfigParser +): meta_data['summary'] = [] meta_data['entropy'] = avg_entropy(binary) if not extracted_files and meta_data.get('number_of_excluded_files', 0) == 0: - if magic.from_file(file_path, mime=True) in read_list_from_config(config, 'ExpertSettings', 'compressed_file_types')\ - or not is_compressed(binary, compress_entropy_threshold=config.getfloat('ExpertSettings', 'unpack_threshold'), classifier=avg_entropy): + if magic.from_file(file_path, mime=True) in read_list_from_config( + config, 'ExpertSettings', 'compressed_file_types' + ) or not is_compressed( + binary, + compress_entropy_threshold=config.getfloat('ExpertSettings', 'unpack_threshold'), + classifier=avg_entropy, + ): meta_data['summary'] = ['unpacked'] else: meta_data['summary'] = ['packed'] @@ -37,7 +47,7 @@ def get_unpack_status(file_path: str, binary: bytes, extracted_files: List[Path] _detect_unpack_loss(binary, extracted_files, meta_data, config.getint('ExpertSettings', 'header_overhead')) -def _detect_unpack_loss(binary: bytes, extracted_files: List[Path], meta_data: Dict, header_overhead: int): +def _detect_unpack_loss(binary: bytes, extracted_files: list[Path], meta_data: dict, header_overhead: int): decoding_overhead = 1 - meta_data.get('encoding_overhead', 0) cleaned_size = get_binary_size_without_padding(binary) * decoding_overhead - header_overhead size_of_extracted_files = _total_size_of_extracted_files(extracted_files) @@ -46,7 +56,7 @@ def _detect_unpack_loss(binary: bytes, extracted_files: List[Path], meta_data: D meta_data['summary'] = ['data lost'] if cleaned_size > size_of_extracted_files else ['no data lost'] -def _total_size_of_extracted_files(extracted_files: List[Path]) -> int: +def _total_size_of_extracted_files(extracted_files: list[Path]) -> int: total_size = 0 for item in extracted_files: with suppress(OSError): diff --git a/fact_extractor/install/common.py b/fact_extractor/install/common.py index 02c7d7a9..6c586d10 100644 --- a/fact_extractor/install/common.py +++ b/fact_extractor/install/common.py @@ -1,12 +1,14 @@ import logging import subprocess as sp -import os -from contextlib import suppress from pathlib import Path from helperFunctions.config import load_config from helperFunctions.install import ( - apt_install_packages, apt_update_sources, pip_install_packages, load_requirements_file, OperateInDirectory + OperateInDirectory, + apt_install_packages, + apt_update_sources, + load_requirements_file, + pip_install_packages, ) APT_DEPENDENCIES = { @@ -31,6 +33,7 @@ ], } PIP_DEPENDENCY_FILE = Path(__file__).parent.parent.parent / 'requirements-common.txt' +BIN_DIR = Path(__file__).parent.parent / 'bin' def install_apt_dependencies(distribution: str): @@ -39,23 +42,23 @@ def install_apt_dependencies(distribution: str): def _install_magic(): - bin_dir = Path(__file__).parent.parent / 'bin' - with OperateInDirectory(bin_dir): + with OperateInDirectory(BIN_DIR): sp.run( [ - "wget", - "--output-document", - "firmware.xz", - "https://github.com/fkie-cad/firmware-magic-database/releases/download/v0.2.1/firmware.xz", + 'wget', + '--output-document', + 'firmware.xz', + 'https://github.com/fkie-cad/firmware-magic-database/releases/download/v0.2.1/firmware.xz', ], check=True, ) sp.run( [ - "unxz", - "--force", - "firmware.xz", - ] + 'unxz', + '--force', + 'firmware.xz', + ], + check=False, ) @@ -68,14 +71,13 @@ def main(distribution): pip_install_packages(*load_requirements_file(PIP_DEPENDENCY_FILE)) # make bin dir - with suppress(FileExistsError): - os.mkdir('../bin') + BIN_DIR.mkdir(exist_ok=True) _install_magic() config = load_config('main.cfg') data_folder = config.get('unpack', 'data_folder') - os.makedirs(str(Path(data_folder, 'files')), exist_ok=True) - os.makedirs(str(Path(data_folder, 'reports')), exist_ok=True) + Path(data_folder, 'files').mkdir(parents=True, exist_ok=True) + Path(data_folder, 'reports').mkdir(exist_ok=True) return 0 diff --git a/fact_extractor/plugins/unpacking/generic_carver/code/generic_carver.py b/fact_extractor/plugins/unpacking/generic_carver/code/generic_carver.py index 9f21867d..cc3172c6 100644 --- a/fact_extractor/plugins/unpacking/generic_carver/code/generic_carver.py +++ b/fact_extractor/plugins/unpacking/generic_carver/code/generic_carver.py @@ -1,14 +1,15 @@ -''' +""" This plugin unpacks all files via carving -''' +""" + from __future__ import annotations -import magic import logging import re import shutil from pathlib import Path +import magic from common_helper_process import execute_shell_command NAME = 'generic_carver' @@ -24,10 +25,10 @@ def unpack_function(file_path, tmp_dir): - ''' + """ file_path specifies the input file. tmp_dir should be used to store the extracted files. - ''' + """ logging.debug(f'File Type unknown: execute binwalk on {file_path}') output = execute_shell_command(f'binwalk --extract --carve --signature --directory {tmp_dir} {file_path}') @@ -81,10 +82,7 @@ def _is_possible_tar(file_type: str, file_path: Path) -> bool: def _remove_invalid_archives(self, file_path: Path, command, search_key=None): output = execute_shell_command(command.format(file_path)) - if search_key and search_key in output.replace('\n ', ''): - self._remove_file(file_path) - - elif not search_key and _output_is_empty(output): + if search_key and search_key in output.replace('\n ', '') or not search_key and _output_is_empty(output): self._remove_file(file_path) def _remove_file(self, file_path): @@ -115,7 +113,7 @@ def _output_is_empty(output): def _find_trailing_data_index_zip(file_path: Path) -> int | None: - '''Archives carved by binwalk often have trailing data at the end. 7z can determine the actual file size.''' + """Archives carved by binwalk often have trailing data at the end. 7z can determine the actual file size.""" output = execute_shell_command(f'7z l {file_path}') if 'There are data after the end of archive' in output: match = REAL_SIZE_REGEX.search(output) @@ -140,7 +138,7 @@ def drop_underscore_directory(tmp_dir): extracted_contents = list(Path(tmp_dir).iterdir()) if not extracted_contents: return - if not len(extracted_contents) == 1 or not extracted_contents[0].name.endswith('.extracted'): + if len(extracted_contents) != 1 or not extracted_contents[0].name.endswith('.extracted'): return for result in extracted_contents[0].iterdir(): shutil.move(str(result), str(result.parent.parent)) diff --git a/fact_extractor/plugins/unpacking/generic_fs/code/generic_fs.py b/fact_extractor/plugins/unpacking/generic_fs/code/generic_fs.py index fdc213b0..5d94cbce 100644 --- a/fact_extractor/plugins/unpacking/generic_fs/code/generic_fs.py +++ b/fact_extractor/plugins/unpacking/generic_fs/code/generic_fs.py @@ -1,17 +1,27 @@ -''' +""" This plugin mounts filesystem images and extracts their content -''' +""" + import re -import magic from shlex import split -from subprocess import run, PIPE, STDOUT +from subprocess import PIPE, STDOUT, run from tempfile import TemporaryDirectory from time import sleep +import magic + NAME = 'genericFS' MIME_PATTERNS = [ - 'filesystem/btrfs', 'filesystem/dosmbr', 'filesystem/f2fs', 'filesystem/jfs', 'filesystem/minix', - 'filesystem/reiserfs', 'filesystem/romfs', 'filesystem/udf', 'filesystem/xfs', 'generic/fs', + 'filesystem/btrfs', + 'filesystem/dosmbr', + 'filesystem/f2fs', + 'filesystem/jfs', + 'filesystem/minix', + 'filesystem/reiserfs', + 'filesystem/romfs', + 'filesystem/udf', + 'filesystem/xfs', + 'generic/fs', ] VERSION = '0.6.1' TYPES = { diff --git a/fact_extractor/unpacker/unpackBase.py b/fact_extractor/unpacker/unpackBase.py index 8285a206..f046a741 100644 --- a/fact_extractor/unpacker/unpackBase.py +++ b/fact_extractor/unpacker/unpackBase.py @@ -1,3 +1,6 @@ +# noqa: N999 +from __future__ import annotations + import fnmatch import logging from os import getgid, getuid @@ -12,10 +15,10 @@ from helperFunctions.plugin import import_plugins -class UnpackBase(object): - ''' +class UnpackBase: + """ The unpacker module unpacks all files included in a file - ''' + """ def __init__(self, config=None, extract_everything: bool = False): self.config = config @@ -37,7 +40,7 @@ def load_plugins(self): def _set_whitelist(self): self.blacklist = read_list_from_config(self.config, 'unpack', 'blacklist') - logging.debug(f'''Ignore (Blacklist): {', '.join(self.blacklist)}''') + logging.debug(f"""Ignore (Blacklist): {', '.join(self.blacklist)}""") for item in self.blacklist: self.register_plugin(item, self.unpacker_plugins['generic/nop']) @@ -47,8 +50,7 @@ def register_plugin(self, mime_type: str, unpacker_name_and_function: Tuple[Call def get_unpacker(self, mime_type: str): if mime_type in list(self.unpacker_plugins.keys()): return self.unpacker_plugins[mime_type] - else: - return self.unpacker_plugins['generic/carver'] + return self.unpacker_plugins['generic/carver'] def extract_files_from_file(self, file_path: str, tmp_dir) -> Tuple[List, Dict]: current_unpacker = self.get_unpacker(magic.from_file(file_path, mime=True)) @@ -56,20 +58,25 @@ def extract_files_from_file(self, file_path: str, tmp_dir) -> Tuple[List, Dict]: def unpacking_fallback(self, file_path, tmp_dir, old_meta, fallback_plugin_mime) -> Tuple[List, Dict]: fallback_plugin = self.unpacker_plugins[fallback_plugin_mime] - old_meta[f'''0_FALLBACK_{old_meta['plugin_used']}'''] = f'''{old_meta['plugin_used']} (failed) -> {fallback_plugin_mime} (fallback)''' - if 'output' in old_meta.keys(): - old_meta[f'''0_ERROR_{old_meta['plugin_used']}'''] = old_meta['output'] - return self._extract_files_from_file_using_specific_unpacker(file_path, tmp_dir, fallback_plugin, meta_data=old_meta) + old_meta[f"""0_FALLBACK_{old_meta['plugin_used']}"""] = ( + f"""{old_meta['plugin_used']} (failed) -> {fallback_plugin_mime} (fallback)""" + ) + if 'output' in old_meta: + old_meta[f"""0_ERROR_{old_meta['plugin_used']}"""] = old_meta['output'] + return self._extract_files_from_file_using_specific_unpacker( + file_path, tmp_dir, fallback_plugin, meta_data=old_meta + ) def _should_ignore(self, file): path = str(file) - for pattern in self.exclude: - if fnmatch.fnmatchcase(path, pattern): - return True - return False + return any(fnmatch.fnmatchcase(path, pattern) for pattern in self.exclude) - def _extract_files_from_file_using_specific_unpacker(self, file_path: str, tmp_dir: str, selected_unpacker, meta_data: dict = None) -> Tuple[List, Dict]: - unpack_function, name, version = selected_unpacker # TODO Refactor register method to directly use four parameters instead of three + def _extract_files_from_file_using_specific_unpacker( + self, file_path: str, tmp_dir: str, selected_unpacker, meta_data: dict | None = None + ) -> Tuple[List, Dict]: + unpack_function, name, version = ( + selected_unpacker # TODO Refactor register method to directly use four parameters instead of three + ) if meta_data is None: meta_data = {} @@ -82,7 +89,7 @@ def _extract_files_from_file_using_specific_unpacker(self, file_path: str, tmp_d additional_meta = unpack_function(file_path, tmp_dir) except Exception as error: logging.debug(f'Unpacking of {file_path} failed: {error}', exc_info=True) - additional_meta = {'error': f'{type(error)}: {str(error)}'} + additional_meta = {'error': f'{type(error)}: {error!s}'} if isinstance(additional_meta, dict): meta_data.update(additional_meta) @@ -102,7 +109,7 @@ def _extract_files_from_file_using_specific_unpacker(self, file_path: str, tmp_d meta_data['number_of_excluded_files'] = excluded_count return out, meta_data - def change_owner_back_to_me(self, directory: str = None, permissions: str = 'u+r'): + def change_owner_back_to_me(self, directory: str, permissions: str = 'u+r'): with Popen(f'sudo chown -R {getuid()}:{getgid()} {directory}', shell=True, stdout=PIPE, stderr=PIPE) as pl: pl.communicate() self.grant_read_permission(directory, permissions)