diff --git a/src/pyrrha_mapper/__main__.py b/src/pyrrha_mapper/__main__.py index 48d825a..8779ae3 100644 --- a/src/pyrrha_mapper/__main__.py +++ b/src/pyrrha_mapper/__main__.py @@ -22,7 +22,8 @@ import click from numbat import SourcetrailDB -from .filesystem import FileSystemMapper, ResolveDuplicateOption +from pyrrha_mapper.filesystem import ResolveDuplicateOption +from pyrrha_mapper.imports_mapper import FileSystemImportsMapper # ------------------------------------------------------------------------------- @@ -41,31 +42,36 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.params.insert( 0, - click.core.Option(('--db',), - help='Sourcetrail DB file path (.srctrldb).', - type=click.Path(file_okay=True, dir_okay=True, path_type=Path), - default=Path() / 'pyrrha.srctrldb', - show_default=True) - ) - self.params.insert( - 0, - click.core.Option(('-d', '--debug'), - is_flag=True, help='Set log level to DEBUG') + click.core.Option( + ("--db",), + help="Sourcetrail DB file path (.srctrldb).", + type=click.Path(file_okay=True, dir_okay=True, path_type=Path), + default=Path() / "pyrrha.srctrldb", + show_default=True, + ), ) + self.params.insert(0, click.core.Option(("-d", "--debug"), is_flag=True, help="Set log level to DEBUG")) self.no_args_is_help = True def setup_logs(is_debug_level: bool) -> None: """ Setup logs. - :param is_debug_level: if True set the log level as DEBUG else INFO + :param is_debug_level: if True set the log level as DEBUG else INFO """ - log_format = dict(fmt='[%(asctime)s][%(levelname)s]: %(message)s', datefmt='%Y-%m-%d %H:%M:%S') - coloredlogs.install(level=logging.DEBUG if is_debug_level else logging.INFO, - level_styles={'debug' : {'color': 'magenta'}, 'info': {'color': 'cyan'}, - 'warning' : {'color': 'yellow'}, 'error': {'color': 'red'}, - 'critical': {'bold': True, 'color': 'red'}}, - field_styles={'asctime': {'color': 'green'}, 'levelname': {'bold': True}}, **log_format) + log_format = dict(fmt="[%(asctime)s][%(levelname)s]: %(message)s", datefmt="%Y-%m-%d %H:%M:%S") + coloredlogs.install( + level=logging.DEBUG if is_debug_level else logging.INFO, + level_styles={ + "debug": {"color": "magenta"}, + "info": {"color": "cyan"}, + "warning": {"color": "yellow"}, + "error": {"color": "red"}, + "critical": {"bold": True, "color": "red"}, + }, + field_styles={"asctime": {"color": "green"}, "levelname": {"bold": True}}, + **log_format, + ) def setup_db(db_path, overwrite_db: bool = True) -> SourcetrailDB: @@ -87,13 +93,10 @@ def setup_db(db_path, overwrite_db: bool = True) -> SourcetrailDB: # CLI # ------------------------------------------------------------------------------- -CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'], - max_content_width=120) +CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"], max_content_width=120) -@click.group(context_settings=CONTEXT_SETTINGS, - help='Mapper collection for firmware analysis.', - no_args_is_help=True) +@click.group(context_settings=CONTEXT_SETTINGS, help="Mapper collection for firmware analysis.", no_args_is_help=True) def pyrrha(): pass @@ -105,44 +108,56 @@ def pyrrha(): """ -@pyrrha.command('fs', - cls=MapperCommand, - short_help='Map PE and ELF files of a filesystem into a sourcetrail-compatible db.', - help='Map a filesystem into a sourcetrail-compatible db. It maps ELF and PE files, \ -their imports and their exports plus the symlinks that points on these executable files.') -@click.option('-e', '--json', - help='Create a JSON export of the resulting mapping.', - is_flag=True, - default=False, - show_default=False) -@click.option('-j', '--jobs', - help='Number of parallel jobs created (threads).', - type=click.IntRange(1, multiprocessing.cpu_count(), clamp=True), - metavar='INT', - default=1, - show_default=True) -@click.option('--ignore', - help='When resolving duplicate imports, ignore them', - is_flag=True, - default=False, - show_default=False) -@click.option('--arbitrary', - help='When resolving duplicate imports, select the first one available', - is_flag=True, - default=False, - show_default=False) -@click.option('--interactive', - help='When resolving duplicate imports, manually select which one to use', - is_flag=True, - default=False, - show_default=False) -@click.argument('root_directory', - # help='Path of the directory containing the filesystem to map.', - type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path)) +@pyrrha.command( + "fs", + cls=MapperCommand, + short_help="Map PE and ELF files of a filesystem into a sourcetrail-compatible db.", + help="Map a filesystem into a sourcetrail-compatible db. It maps ELF and PE files, \ +their imports and their exports plus the symlinks that points on these executable files.", +) +@click.option( + "-e", + "--json", + help="Create a JSON export of the resulting mapping.", + is_flag=True, + default=False, + show_default=False, +) +@click.option( + "-j", + "--jobs", + help="Number of parallel jobs created (threads).", + type=click.IntRange(1, multiprocessing.cpu_count(), clamp=True), + metavar="INT", + default=1, + show_default=True, +) +@click.option( + "--ignore", help="When resolving duplicate imports, ignore them", is_flag=True, default=False, show_default=False +) +@click.option( + "--arbitrary", + help="When resolving duplicate imports, select the first one available", + is_flag=True, + default=False, + show_default=False, +) +@click.option( + "--interactive", + help="When resolving duplicate imports, manually select which one to use", + is_flag=True, + default=False, + show_default=False, +) +@click.argument( + "root_directory", + # help='Path of the directory containing the filesystem to map.', + type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path), +) def fs(debug: bool, db: Path, json, jobs, ignore, arbitrary, interactive, root_directory): setup_logs(debug) if ignore + arbitrary + interactive > 1: - logging.error('--ignore, --arbitrary and --interactive are mutually exclusive options.') + logging.error("--ignore, --arbitrary and --interactive are mutually exclusive options.") return resolve_duplicates = ResolveDuplicateOption.IGNORE @@ -152,17 +167,17 @@ def fs(debug: bool, db: Path, json, jobs, ignore, arbitrary, interactive, root_d resolve_duplicates = ResolveDuplicateOption.INTERACTIVE db_instance = setup_db(db) - db_instance.set_node_type('class', 'Binaries', 'binary') - db_instance.set_node_type('typedef', 'Symlinks', 'symlink') - db_instance.set_node_type('method', hover_display='exported function') - db_instance.set_node_type('field', hover_display='exported symbol') + db_instance.set_node_type("class", "Binaries", "binary") + db_instance.set_node_type("typedef", "Symlinks", "symlink") + db_instance.set_node_type("method", hover_display="exported function") + db_instance.set_node_type("field", hover_display="exported symbol") root_directory = root_directory.absolute() - fs_mapper = FileSystemMapper(root_directory, db_instance) + fs_mapper = FileSystemImportsMapper(root_directory, db_instance) fs_mapper.map(jobs, json, resolve_duplicates) db_instance.close() -if __name__ == '__main__': +if __name__ == "__main__": pyrrha() diff --git a/src/pyrrha_mapper/filesystem.py b/src/pyrrha_mapper/filesystem.py index 3d6e313..7ea59f6 100755 --- a/src/pyrrha_mapper/filesystem.py +++ b/src/pyrrha_mapper/filesystem.py @@ -13,118 +13,84 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import json +"""Base classes for mapping binaries of a filesystem""" import logging import queue +from abc import abstractmethod, ABC from dataclasses import dataclass, field from multiprocessing import Pool, Queue, Manager from pathlib import Path from enum import Enum -import lief from numbat import SourcetrailDB from rich.progress import Progress + class ResolveDuplicateOption(Enum): IGNORE = 1 ARBITRARY = 2 INTERACTIVE = 3 -lief.logging.disable() - @dataclass -class Binary: +class Binary(ABC): + """ + Abstract class that represents a binary. It stores symbols/lib imported + and exported + The following methods should be implemented in its subclasses: + - is_supported + - load + - record_in_db + """ + file_path: Path fw_path: Path id: int = None lib_names: list[str] = field(default_factory=list) - libs: list['Binary'] = field(default_factory=list) + libs: list["Binary"] = field(default_factory=list) imported_symbols: list[str] = field(default_factory=list) # list(symbol names) symbols and functions imported_symbol_ids: list[int] = field(default_factory=list) non_resolved_libs: list[str] = field(default_factory=list) non_resolved_symbol_imports: list[str] = field(default_factory=list) - exported_function_ids: dict[str, int] = field(default_factory=dict) # dict(name, id) + exported_function_ids: dict[str, int | None] = field(default_factory=dict) # dict(name, id) # ELF specific fields version_requirement: dict[str, list[str]] = field(default_factory=dict) # dict(symbol_name, list(requirements)) - exported_symbol_ids: dict[str, int] = field(default_factory=dict) # dict(name, id) + exported_symbol_ids: dict[str, int | None] = field(default_factory=dict) # dict(name, id) + + @property + def name(self): + """:return: name of the binary without its path""" + return self.file_path.name @staticmethod + @abstractmethod def is_supported(p: Path) -> bool: """ - Check if the given path points on a file (NOT via a symlink) which is - of a format handled by this parser. + Check if the given path points on a file (NOT via a symlink) which is supported by the parser :param p: the path of the file to analyzed :return: True is the path point on a file """ - return p.is_file() and not p.is_symlink() and (lief.is_elf(str(p)) or lief.is_pe(str(p))) + pass + @abstractmethod def load(self): """ parse the given path with lief to automatically fill the other fields at the exception done of the ids (the object should be put on a DB) """ - lief_obj: lief.Binary = lief.parse(str(self.file_path)) - is_elf = isinstance(lief_obj, lief.ELF.Binary) - - if is_elf: - if self.name.startswith('libcrypto') and len(lief_obj.exported_functions) == 0: - parser_config = lief.ELF.ParserConfig() - parser_config.count_mtd = lief.ELF.ParserConfig.DYNSYM_COUNT.HASH - lief_obj = lief.ELF.parse(str(self.file_path), parser_config) - - # parse imported libs - self.lib_names = lief_obj.libraries - - if is_elf: - # parse imported symbols - lief_obj: lief.ELF.Binary - self.imported_symbols = [s.name for s in lief_obj.imported_symbols] - - # parse exported symbols - for s in lief_obj.exported_symbols: - if s.is_function: - self.exported_function_ids[s.name] = None - else: - self.exported_symbol_ids[s.name] = None - - # parse version requirements - for req in lief_obj.symbols_version_requirement: - for symb in req.get_auxiliary_symbols(): - if symb.name in self.version_requirement: - self.version_requirement[symb.name].append(req.name) - else: - self.version_requirement[symb.name] = [req.name] - else: - self.imported_symbols = [f.name for f in lief_obj.imported_functions] - for s in lief_obj.exported_functions: - self.exported_function_ids[s.name] = None - - @property - def name(self): - """:return: name of the binary without its path""" - return self.file_path.name + pass + @abstractmethod def record_in_db(self, db: SourcetrailDB) -> None: - """ - Record the binary inside the given db as well as its exported - symbols/functions. - Update 'self.id' with the id of the created object in DB as well as - 'self.exported_symbol/function_ids' dictionaries. - :param db: DB interface - """ - self.id = db.record_class(self.name, prefix=f"{self.fw_path.parent}/", delimiter=":") - for name in self.exported_symbol_ids.keys(): - self.exported_symbol_ids[name] = db.record_field(name, parent_id=self.id) - for name in self.exported_function_ids.keys(): - self.exported_function_ids[name] = db.record_method(name, parent_id=self.id) + """record the Binary and its components in the DB""" + pass @dataclass class Symlink: """Class that represents a Symlink and store the associated DB id""" + path: Path target_path: Path target_id: int @@ -155,44 +121,42 @@ def gen_fw_path(path: Path, root_directory: Path) -> Path: return Path(root_directory.anchor).joinpath(path.relative_to(root_directory)) -def parse_binary_job(ingress: Queue, egress: Queue, root_directory: Path) -> None: +# TODO FILESYSTEM + + +class FileSystemMapper(ABC): """ - Parse an executable file and create the associated Binary object. - It is used for multiprocessing. - :param ingress: input Queue, contain a Path - :param egress: output Queue, send back (file path, Binary result or - logging string if an issue happen) - :param root_directory: path of the virtual root of the firmware + Abstract class which is a base mapper to binaries of a filesystem. + It maps a filesystem in the following order: + - binaries + - symlinks + - lib imports + - symbol_imports. + To change the behavior of these mapping you can reimplement the + map_* corresponding method. + + The following methods should be implemented: + - create_export + + Warning: you can change the class used to represent a binary with the + cls.BINARY_CLASS field. """ - while True: - try: - path = ingress.get(timeout=0.5) - try: - res = Binary(path, gen_fw_path(path, root_directory)) - res.load() - except Exception as e: - res = e - egress.put((path, res)) - except queue.Empty: - pass - except KeyboardInterrupt: - break - - -class FileSystemMapper: + + BINARY_CLASS = Binary + def __init__(self, root_directory: Path, db: SourcetrailDB): """ :param root_directory: directory containing the filesystem to map :param db: interface to the DB """ - self.root_directory = root_directory + self.root_directory = root_directory.resolve().absolute() self.db_interface = db self.binaries: set[Path] = set( - filter(lambda p: Binary.is_supported(p), - self.root_directory.rglob('*'))) + filter(lambda p: self.BINARY_CLASS.is_supported(p), self.root_directory.rglob("*")) + ) self.binary_names: dict[str, list[Binary]] = dict() self.binary_paths: dict[Path, Binary] = dict() - self.symlinks: set[Path] = set(filter(lambda p: p.is_symlink(), self.root_directory.rglob('*'))) + self.symlinks: set[Path] = set(filter(lambda p: p.is_symlink(), self.root_directory.rglob("*"))) self.symlink_names: dict[str, list[Symlink]] = dict() self.symlink_paths: dict[Path, Symlink] = dict() @@ -204,7 +168,31 @@ def gen_fw_path(self, path: Path) -> Path: """ return gen_fw_path(path, self.root_directory) - def _map_binary(self, bin_object: Binary) -> None: + @classmethod + def parse_binary_job(cls, ingress: Queue, egress: Queue, root_directory: Path) -> None: + """ + Parse an executable file and create the associated Binary object. + It is used for multiprocessing. + :param ingress: input Queue, contain a Path + :param egress: output Queue, send back (file path, Binary result or + logging string if an issue happen) + :param root_directory: path of the virtual root of the firmware + """ + while True: + try: + path = ingress.get(timeout=0.5) + try: + res = cls.BINARY_CLASS(path, gen_fw_path(path, root_directory)) + res.load() + except Exception as e: + res = e + egress.put((path, res)) + except queue.Empty: + pass + except KeyboardInterrupt: + break + + def map_binary(self, bin_object: Binary) -> None: """ Given a Binary object add it to the DB. This function updates the fields 'self.binary_paths' and 'self.binary_names' @@ -219,39 +207,40 @@ def _map_binary(self, bin_object: Binary) -> None: else: self.binary_names[bin_object.name] = [bin_object] - def _map_symlink(self, path) -> None: + def map_symlink(self, path) -> None: """ - Given a symlink, resolve it and if it points on a binary file, add it to the DB and create - the associated Symlink object. Also add in db a link between the Symlink object - and the Binary object corresponding to its target. - This function updates the fields 'self.symlink_paths' and 'self.symlink_names' - which are respectively symlink paths and names dictionaries pointing on - the created Symlink objects. - :param path: Symlink path + Given a symlink, resolve it and if it points on a binary file, add it to the DB and create + the associated Symlink object. Also add in db a link between the Symlink object + and the Binary object corresponding to its target. + This function updates the fields 'self.symlink_paths' and 'self.symlink_names' + which are respectively symlink paths and names dictionaries pointing on + the created Symlink objects. + :param path: Symlink path """ target = path.readlink() if not target.is_absolute(): target = path.resolve() if not target.is_relative_to(self.root_directory): logging.warning( - f"[symlinks] cannot resolve '{path.name}': path '{target}' does not exist in {self.root_directory}") + f"[symlinks] cannot resolve '{path.name}': path '{target}' does not exist in {self.root_directory}" + ) return if not target.exists(): target = self.gen_fw_path(target) logging.warning(f"[symlinks] path {target} does not exist") return - if not Binary.is_supported(target): + if not self.BINARY_CLASS.is_supported(target): target = self.gen_fw_path(target) logging.debug(f"path {target} does not correspond to a supported binary") return target = self.gen_fw_path(target) - elif target == Path('/dev/null'): + elif target == Path("/dev/null"): logging.debug(f"[symlinks] '{path.name}': path '{path}' points on '/dev/null'") return elif not target.exists(): logging.warning(f"[symlinks] path {target} does not exist") return - elif not Binary.is_supported(target): + elif not self.BINARY_CLASS.is_supported(target): logging.debug(f"path {target} does not correspond to a supported binary") return if target in self.binary_paths: @@ -266,9 +255,10 @@ def _map_symlink(self, path) -> None: self.symlink_names[symlink_obj.name] = [symlink_obj] else: logging.warning( - f"[symlinks] cannot resolve '{path.name}': path '{target}' does not correspond to a recorded binary") + f"[symlinks] cannot resolve '{path.name}': path '{target}' does not correspond to a recorded binary" + ) - def _map_lib_imports(self, binary, resolve_duplicate_imports = ResolveDuplicateOption.IGNORE) -> None: + def map_lib_imports(self, binary, resolve_duplicate_imports=ResolveDuplicateOption.IGNORE) -> None: """ Given an already mapped binary, resolve its library imports using the following heuristics: @@ -284,10 +274,14 @@ def _map_lib_imports(self, binary, resolve_duplicate_imports = ResolveDuplicateO if lib_name in self.binary_names: if len(self.binary_names[lib_name]) > 1 and resolve_duplicate_imports is ResolveDuplicateOption.IGNORE: logging.warning( - f"[lib imports] {binary.fw_path}: several matches for importing lib {lib_name}, not put into DB") + f"[lib imports] {binary.fw_path}: several matches for importing lib {lib_name}, not put into DB" + ) else: to_import = None - if len(self.binary_names[lib_name]) > 1 and resolve_duplicate_imports is ResolveDuplicateOption.INTERACTIVE: + if ( + len(self.binary_names[lib_name]) > 1 + and resolve_duplicate_imports is ResolveDuplicateOption.INTERACTIVE + ): while to_import is None or to_import < 0 or to_import >= len(self.binary_names[lib_name]): print(f"several matches for importing lib {lib_name}, choose one to keep\n") for i in range(len(self.binary_names[lib_name])): @@ -296,7 +290,7 @@ def _map_lib_imports(self, binary, resolve_duplicate_imports = ResolveDuplicateO to_import = int(input()) except ValueError: print("Enter a valid number") - else: # "arbitrary" option + else: # "arbitrary" option to_import = 0 lib_obj = self.binary_names[lib_name][to_import] self.db_interface.record_ref_import(binary.id, lib_obj.id) @@ -304,10 +298,14 @@ def _map_lib_imports(self, binary, resolve_duplicate_imports = ResolveDuplicateO elif lib_name in self.symlink_names: if len(self.symlink_names[lib_name]) > 1 and resolve_duplicate_imports is ResolveDuplicateOption.IGNORE: logging.warning( - f"[lib imports] {binary.fw_path}: several matches for importing lib {lib_name}, not put into DB") + f"[lib imports] {binary.fw_path}: several matches for importing lib {lib_name}, not put into DB" + ) else: to_import = None - if len(self.symlink_names[lib_name]) > 1 and resolve_duplicate_imports is ResolveDuplicateOption.INTERACTIVE: + if ( + len(self.symlink_names[lib_name]) > 1 + and resolve_duplicate_imports is ResolveDuplicateOption.INTERACTIVE + ): while to_import is None or to_import < 0 or to_import >= len(self.symlink_names[lib_name]): print(f"several matches for importing lib {lib_name}, choose one to keep\n") for i in range(len(self.symlink_names[lib_name])): @@ -316,7 +314,7 @@ def _map_lib_imports(self, binary, resolve_duplicate_imports = ResolveDuplicateO to_import = int(input()) except ValueError: print("Enter a valid number") - else: # "arbitrary" option + else: # "arbitrary" option to_import = 0 sym_obj = self.symlink_names[lib_name][to_import] self.db_interface.record_ref_import(binary.id, sym_obj.id) @@ -327,30 +325,38 @@ def _map_lib_imports(self, binary, resolve_duplicate_imports = ResolveDuplicateO self.db_interface.record_ref_import(binary.id, lib_id) binary.non_resolved_libs.append(lib_name) - def _map_symbol_imports(self, binary: Binary, resolve_duplicate_imports = ResolveDuplicateOption.IGNORE) -> None: + def map_symbol_imports(self, binary: Binary, resolve_duplicate_imports=ResolveDuplicateOption.IGNORE) -> None: """ Given an already mapped binary, resolve its symbols. This function update the DB with the import links found. """ for func_name in binary.imported_symbols: - if len(func_name.split('@@')) == 2: # symbols with a specific version - symb_name, symb_version = func_name.split('@@') + if len(func_name.split("@@")) == 2: # symbols with a specific version + symb_name, symb_version = func_name.split("@@") if symb_version in binary.version_requirement: for lib_name in binary.version_requirement[symb_version]: if lib_name not in self.binary_names: logging.debug(f"[symbol imports] {binary.fw_path}: lib '{lib_name}' not found in DB") lib_id = self.db_interface.record_class(lib_name, is_indexed=False) - symb_id = self.db_interface.record_field(symb_name, parent_id=lib_id, - is_indexed=False) + symb_id = self.db_interface.record_field(symb_name, parent_id=lib_id, is_indexed=False) self.db_interface.record_ref_import(binary.id, symb_id) binary.non_resolved_symbol_imports.append(func_name) - elif len(self.binary_names[lib_name]) > 1 and resolve_duplicate_imports is ResolveDuplicateOption.IGNORE: + elif ( + len(self.binary_names[lib_name]) > 1 + and resolve_duplicate_imports is ResolveDuplicateOption.IGNORE + ): logging.warning( - f"[symbol imports] {binary.fw_path}: several matches for importing lib {lib_name}, not put into DB") + f"[symbol imports] {binary.fw_path}: several matches for importing lib {lib_name}, not put into DB" + ) else: to_import = None - if len(self.binary_names[lib_name]) > 1 and resolve_duplicate_imports is ResolveDuplicateOption.INTERACTIVE: - while to_import is None or to_import < 0 or to_import >= len(self.binary_names[lib_name]): + if ( + len(self.binary_names[lib_name]) > 1 + and resolve_duplicate_imports is ResolveDuplicateOption.INTERACTIVE + ): + while ( + to_import is None or to_import < 0 or to_import >= len(self.binary_names[lib_name]) + ): print(f"several matches for importing lib {lib_name}, choose one to keep\n") for i in range(len(self.binary_names[lib_name])): print(f"{i}: {self.binary_names[lib_name][i].file_path}") @@ -358,7 +364,7 @@ def _map_symbol_imports(self, binary: Binary, resolve_duplicate_imports = Resolv to_import = int(input()) except ValueError: print("Enter a valid number") - else: # "arbitrary" option + else: # "arbitrary" option to_import = 0 lib: Binary = self.binary_names[lib_name][to_import] if symb_name in lib.exported_symbol_ids: @@ -370,8 +376,7 @@ def _map_symbol_imports(self, binary: Binary, resolve_duplicate_imports = Resolv self.db_interface.record_ref_import(binary.id, symb_id) binary.imported_symbol_ids.append(symb_id) else: - symb_id = self.db_interface.record_field(symb_name, parent_id=lib.id, - is_indexed=False) + symb_id = self.db_interface.record_field(symb_name, parent_id=lib.id, is_indexed=False) self.db_interface.record_ref_import(binary.id, symb_id) binary.non_resolved_symbol_imports.append(func_name) else: @@ -395,40 +400,13 @@ def _map_symbol_imports(self, binary: Binary, resolve_duplicate_imports = Resolv self.db_interface.record_ref_import(binary.id, symb_id) binary.non_resolved_symbol_imports.append(func_name) - def _create_export(self): - logging.debug("Start export") - - export = {"symlinks": dict(), - "binaries": dict(), - "symbols" : dict()} - for sym in self.symlink_paths.values(): - export["symlinks"][sym.id] = {"name" : sym.path.name, - "path" : str(sym.path), - "target_id": sym.target_id} - - for bin in self.binary_paths.values(): - exported_symbol_ids = list(bin.exported_symbol_ids.values()) + list(bin.exported_function_ids.values()) - export["binaries"][bin.id] = {"name" : bin.name, - "path" : str(bin.fw_path), - "export_ids": exported_symbol_ids, - "imports" : {"lib" : {"ids" : [str(lib.id) for lib in bin.libs], - # keys are string so to keep type unicity - "non-resolved": bin.non_resolved_libs}, - "symbols": {"ids" : bin.imported_symbol_ids, - "non-resolved": bin.non_resolved_symbol_imports}}} - for name, s_id in bin.exported_symbol_ids.items(): - export["symbols"][s_id] = {"name" : name, - "is_func": False} - for name, f_id in bin.exported_function_ids.items(): - export["symbols"][f_id] = {"name" : name, - "is_func": True} - - logging.debug("Saving export") - json_path = self.db_interface.path.with_suffix('.json') - json_path.write_text(json.dumps(export)) - logging.info(f'Export saved: {json_path}') - - def map(self, threads: int, export: bool = False, resolve_duplicate_imports = ResolveDuplicateOption.IGNORE) -> None: + @abstractmethod + def create_export(self): + """Abstract class which should be implemented in order to know how + to export the current pyrrha results""" + pass + + def map(self, threads: int, export: bool = False, resolve_duplicate_imports=ResolveDuplicateOption.IGNORE) -> None: """ Map all the content of 'self.root_directory', in the order: - binaries; @@ -439,7 +417,7 @@ def map(self, threads: int, export: bool = False, resolve_duplicate_imports = Re :param threads: number of threads to use :param export: if True create a JSON export of the mapping. It will be stored at the same place as the DB (file name: DB_NAME.json) - :param resolve_duplicate_imports: the chosen option for duplicate import resolution + :param resolve_duplicate_imports: the chosen option for duplicate import resolution """ with Progress() as progress: @@ -458,7 +436,7 @@ def map(self, threads: int, export: bool = False, resolve_duplicate_imports = Re # Launch all workers and fill input queue for _ in range(threads - 1): - pool.apply_async(parse_binary_job, (ingress, egress, self.root_directory)) + pool.apply_async(self.parse_binary_job, (ingress, egress, self.root_directory)) for path in self.binaries: ingress.put(path) logging.debug(f"[main] {threads - 1} threads created") @@ -467,10 +445,10 @@ def map(self, threads: int, export: bool = False, resolve_duplicate_imports = Re while True: path, res = egress.get() i += 1 - if isinstance(res, Binary): - self._map_binary(res) + if isinstance(res, self.BINARY_CLASS): + self.map_binary(res) else: - logging.warning(f'Error while parsing {path}: {res}') + logging.warning(f"Error while parsing {path}: {res}") progress.update(binaries_map, advance=1) if i == len(self.binaries): break @@ -478,28 +456,28 @@ def map(self, threads: int, export: bool = False, resolve_duplicate_imports = Re else: logging.debug("[main] One thread mode") for path in self.binaries: - self._map_binary(Binary(path, self.gen_fw_path(path))) + self.map_binary(self.BINARY_CLASS(path, self.gen_fw_path(path))) progress.update(binaries_map, advance=1) self.db_interface.commit() # Parse and resolve symlinks logging.debug(f"[main] Start Symlinks parsing: {len(self.symlinks)} symlinks to parse") for path in self.symlinks: - self._map_symlink(path) + self.map_symlink(path) progress.update(symlinks_map, advance=1) self.db_interface.commit() # Handle imports logging.debug("[main] Start Libraries imports resolution") for binary in self.binary_paths.values(): - self._map_lib_imports(binary, resolve_duplicate_imports) + self.map_lib_imports(binary, resolve_duplicate_imports) progress.update(lib_imports, advance=1) self.db_interface.commit() logging.debug(f"[main] Start Symbols imports resolution") for binary in self.binary_paths.values(): - self._map_symbol_imports(binary, resolve_duplicate_imports) + self.map_symbol_imports(binary, resolve_duplicate_imports) progress.update(symbol_imports, advance=1) self.db_interface.commit() if export: - self._create_export() + self.create_export() diff --git a/src/pyrrha_mapper/imports_mapper.py b/src/pyrrha_mapper/imports_mapper.py new file mode 100644 index 0000000..efd0e8f --- /dev/null +++ b/src/pyrrha_mapper/imports_mapper.py @@ -0,0 +1,122 @@ +"""Filesystem mapper based on Lief, which computes imports and exports""" +import json +import logging +from dataclasses import dataclass +from pathlib import Path + +import lief +from numbat import SourcetrailDB + +from pyrrha_mapper.filesystem import Binary, FileSystemMapper + +lief.logging.disable() + + +@dataclass +class ImportBinary(Binary): + @staticmethod + def is_supported(p: Path) -> bool: + """ + Check if the given path points on a file (NOT via a symlink) which is + of a format handled by this parser. + :param p: the path of the file to analyzed + :return: True is the path point on a file + """ + return p.is_file() and not p.is_symlink() and (lief.is_elf(str(p)) or lief.is_pe(str(p))) + + def load(self): + """ + parse the given path with lief to automatically fill the other fields + at the exception done of the ids (the object should be put on a DB) + """ + lief_obj: lief.Binary = lief.parse(str(self.file_path)) + is_elf = isinstance(lief_obj, lief.ELF.Binary) + + if is_elf: + if self.name.startswith("libcrypto") and len(lief_obj.exported_functions) == 0: + parser_config = lief.ELF.ParserConfig() + parser_config.count_mtd = lief.ELF.ParserConfig.DYNSYM_COUNT.HASH + lief_obj = lief.ELF.parse(str(self.file_path), parser_config) + + # parse imported libs + self.lib_names = lief_obj.libraries + + if is_elf: + # parse imported symbols + lief_obj: lief.ELF.Binary + self.imported_symbols = [s.name for s in lief_obj.imported_symbols] + + # parse exported symbols + for s in lief_obj.exported_symbols: + if s.is_function: + self.exported_function_ids[s.name] = None + else: + self.exported_symbol_ids[s.name] = None + + # parse version requirements + for req in lief_obj.symbols_version_requirement: + for symb in req.get_auxiliary_symbols(): + if symb.name in self.version_requirement: + self.version_requirement[symb.name].append(req.name) + else: + self.version_requirement[symb.name] = [req.name] + else: + self.imported_symbols = [f.name for f in lief_obj.imported_functions] + for s in lief_obj.exported_functions: + self.exported_function_ids[s.name] = None + + def record_in_db(self, db: SourcetrailDB) -> None: + """ + Record the binary inside the given db as well as its exported + symbols/functions and its internal functions. + Update 'self.id' with the id of the created object in DB as well as + 'self.exported_symbol/function_ids' and 'self.local_function_ids' + dictionaries. + :param db: DB interface + """ + self.id = db.record_class(self.name, prefix=f"{self.fw_path.parent}/", delimiter=":") + for name in self.exported_symbol_ids.keys(): + node_id = db.record_symbol_node(name, parent_id=self.id) + db.record_public_access(node_id) + self.exported_symbol_ids[name] = node_id + for name in self.exported_function_ids.keys(): + node_id = db.record_method(name, parent_id=self.id) + db.record_public_access(node_id) + self.exported_function_ids[name] = node_id + + +class FileSystemImportsMapper(FileSystemMapper): + BINARY_CLASS = ImportBinary + + def create_export(self): + """Create a JSON export of the current Pyrrha results""" + logging.debug("Start export") + + export = {"symlinks": dict(), "binaries": dict(), "symbols": dict()} + for sym in self.symlink_paths.values(): + export["symlinks"][sym.id] = {"name": sym.path.name, "path": str(sym.path), "target_id": sym.target_id} + + for b in self.binary_paths.values(): + exported_symbol_ids = list(b.exported_symbol_ids.values()) + list(b.exported_function_ids.values()) + export["binaries"][b.id] = { + "name": b.name, + "path": str(b.fw_path), + "export_ids": exported_symbol_ids, + "imports": { + "lib": { + "ids": [str(lib.id) for lib in b.libs], + # keys are string so to keep type unicity + "non-resolved": b.non_resolved_libs, + }, + "symbols": {"ids": b.imported_symbol_ids, "non-resolved": b.non_resolved_symbol_imports}, + }, + } + for name, s_id in b.exported_symbol_ids.items(): + export["symbols"][s_id] = {"name": name, "is_func": False} + for name, f_id in b.exported_function_ids.items(): + export["symbols"][f_id] = {"name": name, "is_func": True} + + logging.debug("Saving export") + json_path = self.db_interface.path.with_suffix(".json") + json_path.write_text(json.dumps(export)) + logging.info(f"Export saved: {json_path}")