Skip to content

Commit

Permalink
src: rework internal architecture of Pyrrha
Browse files Browse the repository at this point in the history
  • Loading branch information
ebrocas committed Dec 20, 2024
1 parent a6c08c4 commit 224bf1a
Show file tree
Hide file tree
Showing 3 changed files with 347 additions and 232 deletions.
143 changes: 79 additions & 64 deletions src/pyrrha_mapper/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import click
from numbat import SourcetrailDB

from .filesystem import FileSystemMapper, ResolveDuplicateOption
from pyrrha_mapper.filesystem import ResolveDuplicateOption
from pyrrha_mapper.imports_mapper import FileSystemImportsMapper


# -------------------------------------------------------------------------------
Expand All @@ -41,31 +42,36 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.params.insert(
0,
click.core.Option(('--db',),
help='Sourcetrail DB file path (.srctrldb).',
type=click.Path(file_okay=True, dir_okay=True, path_type=Path),
default=Path() / 'pyrrha.srctrldb',
show_default=True)
)
self.params.insert(
0,
click.core.Option(('-d', '--debug'),
is_flag=True, help='Set log level to DEBUG')
click.core.Option(
("--db",),
help="Sourcetrail DB file path (.srctrldb).",
type=click.Path(file_okay=True, dir_okay=True, path_type=Path),
default=Path() / "pyrrha.srctrldb",
show_default=True,
),
)
self.params.insert(0, click.core.Option(("-d", "--debug"), is_flag=True, help="Set log level to DEBUG"))
self.no_args_is_help = True


def setup_logs(is_debug_level: bool) -> None:
"""
Setup logs.
:param is_debug_level: if True set the log level as DEBUG else INFO
:param is_debug_level: if True set the log level as DEBUG else INFO
"""
log_format = dict(fmt='[%(asctime)s][%(levelname)s]: %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
coloredlogs.install(level=logging.DEBUG if is_debug_level else logging.INFO,
level_styles={'debug' : {'color': 'magenta'}, 'info': {'color': 'cyan'},
'warning' : {'color': 'yellow'}, 'error': {'color': 'red'},
'critical': {'bold': True, 'color': 'red'}},
field_styles={'asctime': {'color': 'green'}, 'levelname': {'bold': True}}, **log_format)
log_format = dict(fmt="[%(asctime)s][%(levelname)s]: %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
coloredlogs.install(
level=logging.DEBUG if is_debug_level else logging.INFO,
level_styles={
"debug": {"color": "magenta"},
"info": {"color": "cyan"},
"warning": {"color": "yellow"},
"error": {"color": "red"},
"critical": {"bold": True, "color": "red"},
},
field_styles={"asctime": {"color": "green"}, "levelname": {"bold": True}},
**log_format,
)


def setup_db(db_path, overwrite_db: bool = True) -> SourcetrailDB:
Expand All @@ -87,13 +93,10 @@ def setup_db(db_path, overwrite_db: bool = True) -> SourcetrailDB:
# CLI
# -------------------------------------------------------------------------------

CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'],
max_content_width=120)
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"], max_content_width=120)


@click.group(context_settings=CONTEXT_SETTINGS,
help='Mapper collection for firmware analysis.',
no_args_is_help=True)
@click.group(context_settings=CONTEXT_SETTINGS, help="Mapper collection for firmware analysis.", no_args_is_help=True)
def pyrrha():
pass

Expand All @@ -105,44 +108,56 @@ def pyrrha():
"""


@pyrrha.command('fs',
cls=MapperCommand,
short_help='Map PE and ELF files of a filesystem into a sourcetrail-compatible db.',
help='Map a filesystem into a sourcetrail-compatible db. It maps ELF and PE files, \
their imports and their exports plus the symlinks that points on these executable files.')
@click.option('-e', '--json',
help='Create a JSON export of the resulting mapping.',
is_flag=True,
default=False,
show_default=False)
@click.option('-j', '--jobs',
help='Number of parallel jobs created (threads).',
type=click.IntRange(1, multiprocessing.cpu_count(), clamp=True),
metavar='INT',
default=1,
show_default=True)
@click.option('--ignore',
help='When resolving duplicate imports, ignore them',
is_flag=True,
default=False,
show_default=False)
@click.option('--arbitrary',
help='When resolving duplicate imports, select the first one available',
is_flag=True,
default=False,
show_default=False)
@click.option('--interactive',
help='When resolving duplicate imports, manually select which one to use',
is_flag=True,
default=False,
show_default=False)
@click.argument('root_directory',
# help='Path of the directory containing the filesystem to map.',
type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path))
@pyrrha.command(
"fs",
cls=MapperCommand,
short_help="Map PE and ELF files of a filesystem into a sourcetrail-compatible db.",
help="Map a filesystem into a sourcetrail-compatible db. It maps ELF and PE files, \
their imports and their exports plus the symlinks that points on these executable files.",
)
@click.option(
"-e",
"--json",
help="Create a JSON export of the resulting mapping.",
is_flag=True,
default=False,
show_default=False,
)
@click.option(
"-j",
"--jobs",
help="Number of parallel jobs created (threads).",
type=click.IntRange(1, multiprocessing.cpu_count(), clamp=True),
metavar="INT",
default=1,
show_default=True,
)
@click.option(
"--ignore", help="When resolving duplicate imports, ignore them", is_flag=True, default=False, show_default=False
)
@click.option(
"--arbitrary",
help="When resolving duplicate imports, select the first one available",
is_flag=True,
default=False,
show_default=False,
)
@click.option(
"--interactive",
help="When resolving duplicate imports, manually select which one to use",
is_flag=True,
default=False,
show_default=False,
)
@click.argument(
"root_directory",
# help='Path of the directory containing the filesystem to map.',
type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path),
)
def fs(debug: bool, db: Path, json, jobs, ignore, arbitrary, interactive, root_directory):
setup_logs(debug)
if ignore + arbitrary + interactive > 1:
logging.error('--ignore, --arbitrary and --interactive are mutually exclusive options.')
logging.error("--ignore, --arbitrary and --interactive are mutually exclusive options.")
return

resolve_duplicates = ResolveDuplicateOption.IGNORE
Expand All @@ -152,17 +167,17 @@ def fs(debug: bool, db: Path, json, jobs, ignore, arbitrary, interactive, root_d
resolve_duplicates = ResolveDuplicateOption.INTERACTIVE

db_instance = setup_db(db)
db_instance.set_node_type('class', 'Binaries', 'binary')
db_instance.set_node_type('typedef', 'Symlinks', 'symlink')
db_instance.set_node_type('method', hover_display='exported function')
db_instance.set_node_type('field', hover_display='exported symbol')
db_instance.set_node_type("class", "Binaries", "binary")
db_instance.set_node_type("typedef", "Symlinks", "symlink")
db_instance.set_node_type("method", hover_display="exported function")
db_instance.set_node_type("field", hover_display="exported symbol")

root_directory = root_directory.absolute()
fs_mapper = FileSystemMapper(root_directory, db_instance)
fs_mapper = FileSystemImportsMapper(root_directory, db_instance)
fs_mapper.map(jobs, json, resolve_duplicates)

db_instance.close()


if __name__ == '__main__':
if __name__ == "__main__":
pyrrha()
Loading

0 comments on commit 224bf1a

Please sign in to comment.