Skip to content

Commit

Permalink
chore: fix 'compare' typos
Browse files Browse the repository at this point in the history
  • Loading branch information
jstucke committed Nov 27, 2024
1 parent 00dc1f3 commit cb9f134
Show file tree
Hide file tree
Showing 70 changed files with 425 additions and 423 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Contribute
There are many ways to contribute to FACT.
For instance, you can write an unpacking, compare or analysis plug-in.
For instance, you can write an unpacking, comparison or analysis plug-in.
You can develop your plug-in in your own repository under your favorite license.
It can be added to a local FACT installation as git submodule.
Have a look at [FACT’s Developer’s Manual](https://github.com/fkie-cad/FACT_core/wiki) for more details.
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ fixable = ["ALL"]
"migrate_database.py" = ["T201"]

[tool.ruff.lint.isort]
known-first-party = ["analysis", "compare", "helperFunctions", "install", "intercom", "objects", "plugins", "scheduler",
"statistic", "storage", "test", "unpacker", "version", "web_interface", "config"]
known-first-party = ["analysis", "comparison", "helperFunctions", "install", "intercom", "objects", "plugins",
"scheduler", "statistic", "storage", "test", "unpacker", "version", "web_interface", "config"]
known-third-party = ["docker"]

[tool.ruff.lint.pylint]
Expand Down
File renamed without changes.
26 changes: 14 additions & 12 deletions src/compare/compare.py → src/comparison/comparison.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,28 @@
import logging
from typing import TYPE_CHECKING

from helperFunctions.plugin import discover_compare_plugins
from helperFunctions.plugin import discover_comparison_plugins
from helperFunctions.virtual_file_path import get_paths_for_all_parents
from objects.firmware import Firmware
from storage.binary_service import BinaryService

if TYPE_CHECKING:
from compare.PluginBase import CompareBasePlugin
from comparison.comparison_base_plugin import ComparisonBasePlugin
from objects.file import FileObject
from storage.db_interface_comparison import ComparisonDbInterface


class Compare:
class Comparison:
"""
This Module compares firmware images
"""

compare_plugins = {} # noqa: RUF012
comparison_plugins = {} # noqa: RUF012

def __init__(self, db_interface: ComparisonDbInterface | None = None):
self.db_interface = db_interface
self._setup_plugins()
logging.info(f'Comparison plugins available: {", ".join(self.compare_plugins)}')
logging.info(f'Comparison plugins available: {", ".join(self.comparison_plugins)}')

def compare(self, uid_list):
logging.info(f'Comparison in progress: {uid_list}')
Expand All @@ -41,7 +41,7 @@ def compare(self, uid_list):
def compare_objects(self, fo_list):
return {
'general': self._create_general_section_dict(fo_list),
'plugins': self._execute_compare_plugins(fo_list),
'plugins': self._execute_comparison_plugins(fo_list),
}

def _create_general_section_dict(self, object_list):
Expand Down Expand Up @@ -78,24 +78,26 @@ def _get_vfp_data(self, object_list: list[FileObject]) -> dict[str, list[str]]:
# --- plug-in system ---

def _setup_plugins(self):
self.compare_plugins = {}
self.comparison_plugins = {}
self._init_plugins()

def _init_plugins(self):
for plugin in discover_compare_plugins():
for plugin in discover_comparison_plugins():
try:
self.compare_plugins[plugin.ComparePlugin.NAME] = plugin.ComparePlugin(db_interface=self.db_interface)
self.comparison_plugins[plugin.ComparisonPlugin.NAME] = plugin.ComparisonPlugin(
db_interface=self.db_interface
)
except Exception:
logging.error(f'Could not import comparison plugin {plugin.AnalysisPlugin.NAME}', exc_info=True)

def _execute_compare_plugins(self, fo_list: list[FileObject]) -> dict[str, dict]:
def _execute_comparison_plugins(self, fo_list: list[FileObject]) -> dict[str, dict]:
comparison_results = {}
for plugin in schedule_comparison_plugins(self.compare_plugins):
for plugin in schedule_comparison_plugins(self.comparison_plugins):
comparison_results[plugin.NAME] = plugin.compare(fo_list, comparison_results)
return comparison_results


def schedule_comparison_plugins(plugin_dict: dict[str, CompareBasePlugin]) -> list[CompareBasePlugin]:
def schedule_comparison_plugins(plugin_dict: dict[str, ComparisonBasePlugin]) -> list[ComparisonBasePlugin]:
# we use a reverse topological sort for scheduling the plugins while considering their dependencies
# see also: https://en.wikipedia.org/wiki/Topological_sorting#Depth-first_search
visited = set()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations # noqa: N999
from __future__ import annotations

from abc import abstractmethod
from typing import TYPE_CHECKING
Expand All @@ -9,9 +9,9 @@
from objects.file import FileObject


class CompareBasePlugin(BasePlugin):
class ComparisonBasePlugin(BasePlugin):
"""
This is the compare plug-in base class. All compare plug-ins should be derived from this class.
This is the comparison plugin base class. All comparison plugins should be derived from this class.
"""

# must be set by the plugin:
Expand All @@ -26,7 +26,7 @@ def __init__(self, config=None, db_interface=None, view_updater=None):
self.database = db_interface

@abstractmethod
def compare_function(self, fo_list: list[FileObject], dependency_results: dict[str, dict]) -> dict[str, dict]:
def compare_objects(self, fo_list: list[FileObject], dependency_results: dict[str, dict]) -> dict[str, dict]:
"""
This function must be implemented by the plugin.
`fo_list` is a list with file_objects including analysis and all summaries.
Expand All @@ -38,7 +38,7 @@ def compare_function(self, fo_list: list[FileObject], dependency_results: dict[s

def compare(self, fo_list: list[FileObject], dependency_results: dict[str, dict]) -> dict[str, dict]:
"""
This function is called by the compare module.
This function is called by the comparison module.
"""
missing_comparison_deps = self._get_missing_comparison_deps(dependency_results)
if missing_comparison_deps:
Expand All @@ -49,8 +49,8 @@ def compare(self, fo_list: list[FileObject], dependency_results: dict[str, dict]
}
missing_analysis_deps = self._get_missing_analysis_deps(fo_list)
if missing_analysis_deps:
return {'Compare Skipped': {'all': f"Required analyses not present: {', '.join(missing_analysis_deps)}"}}
return self.compare_function(fo_list, dependency_results)
return {'Comparison Skipped': {'all': f"Required analyses not present: {', '.join(missing_analysis_deps)}"}}
return self.compare_objects(fo_list, dependency_results)

def _get_missing_comparison_deps(self, dependency_results: dict[str, dict]) -> list[str]:
return [
Expand Down
26 changes: 13 additions & 13 deletions src/helperFunctions/data_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,36 +35,36 @@ def make_unicode_string(code: Any) -> str:
return code.__str__()


def convert_uid_list_to_compare_id(uid_list: Iterable[str]) -> str:
def convert_uid_list_to_comparison_id(uid_list: Iterable[str]) -> str:
"""
Convert a list of UIDs to a compare ID (which is a unique string consisting of UIDs separated by semi-colons, used
Convert a list of UIDs to a comparison ID (which is a unique string consisting of UIDs separated by semicolons, used
to identify a FACT `Firmware` or `FileObject` comparison).
:param uid_list: A list of `FileObject` or `Firmware` UIDs.
:return: The compare ID.
:return: The comparison ID.
"""
return ';'.join(sorted(uid_list))


def convert_compare_id_to_list(compare_id: str) -> list[str]:
def convert_comparison_id_to_list(comparison_id: str) -> list[str]:
"""
Convert a compare ID back to a list of UIDs.
Convert a comparison ID back to a list of UIDs.
:param compare_id: The compare ID.
:param comparison_id: The comparison ID.
:return: The according UID list.
"""
return compare_id.split(';')
return comparison_id.split(';')


def normalize_compare_id(compare_id: str) -> str:
def normalize_comparison_id(comparison_id: str) -> str:
"""
Sort the UIDs in a compare ID (so that it is unique) and return it.
Sort the UIDs in a comparison ID (so that it is unique) and return it.
:param compare_id: The compare ID.
:return: The according unique compare ID with reordered UIDs.
:param comparison_id: The comparison ID.
:return: The according unique comparison ID with reordered UIDs.
"""
uids = convert_compare_id_to_list(compare_id)
return convert_uid_list_to_compare_id(uids)
uids = convert_comparison_id_to_list(comparison_id)
return convert_uid_list_to_comparison_id(uids)


def get_value_of_first_key(input_dict: dict[_KT, _VT]) -> _VT | None:
Expand Down
8 changes: 4 additions & 4 deletions src/helperFunctions/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ def discover_analysis_plugins() -> list:
return _import_plugins('analysis')


def discover_compare_plugins() -> list:
"""Returns a list of modules where each module is a compare plugin."""
return _import_plugins('compare')
def discover_comparison_plugins() -> list:
"""Returns a list of modules where each module is a comparison plugin."""
return _import_plugins('comparison')


def _import_plugins(plugin_type):
assert plugin_type in ['analysis', 'compare']
assert plugin_type in ['analysis', 'comparison']

plugins = []
src_dir = get_src_dir()
Expand Down
10 changes: 5 additions & 5 deletions src/intercom/back_end_binding.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ class InterComBackEndBinding:
def __init__(
self,
analysis_service=None,
compare_service=None,
comparison_service=None,
unpacking_service=None,
unpacking_locks=None,
testing=False, # noqa: ARG002
):
self.analysis_service = analysis_service
self.compare_service = compare_service
self.comparison_service = comparison_service
self.unpacking_service = unpacking_service
self.unpacking_locks = unpacking_locks
self.poll_delay = config.backend.intercom_poll_delay
Expand All @@ -49,7 +49,7 @@ def start(self):
InterComBackEndAnalysisPlugInsPublisher(analysis_service=self.analysis_service)
self._start_listener(InterComBackEndAnalysisTask, self.unpacking_service.add_task)
self._start_listener(InterComBackEndReAnalyzeTask, self.unpacking_service.add_task)
self._start_listener(InterComBackEndCompareTask, self.compare_service.add_task)
self._start_listener(InterComBackEndComparisonTask, self.comparison_service.add_task)
self._start_listener(InterComBackEndRawDownloadTask)
self._start_listener(InterComBackEndFileDiffTask)
self._start_listener(InterComBackEndTarRepackTask)
Expand Down Expand Up @@ -131,8 +131,8 @@ class InterComBackEndSingleFileTask(InterComBackEndReAnalyzeTask):
CONNECTION_TYPE = 'single_file_task'


class InterComBackEndCompareTask(InterComListener):
CONNECTION_TYPE = 'compare_task'
class InterComBackEndComparisonTask(InterComListener):
CONNECTION_TYPE = 'comparison_task'


class InterComBackEndRawDownloadTask(InterComListenerAndResponder):
Expand Down
4 changes: 2 additions & 2 deletions src/intercom/front_end_binding.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ def add_re_analyze_task(self, fw, unpack=True):
def add_single_file_task(self, fw):
self._add_to_redis_queue('single_file_task', fw, fw.uid)

def add_compare_task(self, compare_id, force=False):
self._add_to_redis_queue('compare_task', (compare_id, force), compare_id)
def add_comparison_task(self, comparison_id, force=False):
self._add_to_redis_queue('comparison_task', (comparison_id, force), comparison_id)

def delete_file(self, uid_list: set[str]):
self._add_to_redis_queue('file_delete_task', uid_list)
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@
import ssdeep

import config
from compare.PluginBase import CompareBasePlugin
from comparison.comparison_base_plugin import ComparisonBasePlugin
from helperFunctions.compare_sets import iter_element_and_rest, remove_duplicates_from_list
from helperFunctions.data_conversion import convert_uid_list_to_compare_id
from helperFunctions.data_conversion import convert_uid_list_to_comparison_id
from objects.firmware import Firmware

if TYPE_CHECKING:
from objects.file import FileObject


class ComparePlugin(CompareBasePlugin):
class ComparisonPlugin(ComparisonBasePlugin):
"""
Compares file coverage
"""
Expand All @@ -29,27 +29,27 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.ssdeep_ignore_threshold = config.backend.ssdeep_ignore

def compare_function(self, fo_list, dependency_results: dict[str, dict]): # noqa: ARG002
compare_result = {
def compare_objects(self, fo_list, dependency_results: dict[str, dict]): # noqa: ARG002
comparison_result = {
'files_in_common': self._get_intersection_of_files(fo_list),
'exclusive_files': self._get_exclusive_files(fo_list),
}

self._handle_partially_common_files(compare_result, fo_list)
self._handle_partially_common_files(comparison_result, fo_list)

for result in compare_result.values():
for result in comparison_result.values():
if isinstance(result, dict):
result['collapse'] = False

similar_files, similarity = self._get_similar_files(fo_list, compare_result['exclusive_files'])
compare_result['similar_files'] = self.combine_similarity_results(similar_files, fo_list, similarity)
similar_files, similarity = self._get_similar_files(fo_list, comparison_result['exclusive_files'])
comparison_result['similar_files'] = self.combine_similarity_results(similar_files, fo_list, similarity)

if len(fo_list) == 2 and all(isinstance(fo, Firmware) for fo in fo_list): # noqa: PLR2004
compare_result['changed_text_files'] = self._find_changed_text_files(
fo_list, compare_result['files_in_common']['all']
comparison_result['changed_text_files'] = self._find_changed_text_files(
fo_list, comparison_result['files_in_common']['all']
)

return compare_result
return comparison_result

def _get_exclusive_files(self, fo_list: list[FileObject]) -> dict[str, list[str]]:
result = {}
Expand All @@ -68,16 +68,16 @@ def _get_intersection_of_files(self, fo_list: list[FileObject]) -> dict[str, lis
def _get_included_file_sets(fo_list: list[FileObject]) -> list[set[str]]:
return [set(file_object.list_of_all_included_files) for file_object in fo_list]

def _handle_partially_common_files(self, compare_result, fo_list):
def _handle_partially_common_files(self, comparison_result, fo_list):
if len(fo_list) > 2: # noqa: PLR2004
compare_result['files_in_more_than_one_but_not_in_all'] = self._get_files_in_more_than_one_but_not_in_all(
fo_list, compare_result
)
not_in_all = compare_result['files_in_more_than_one_but_not_in_all']
comparison_result[
'files_in_more_than_one_but_not_in_all'
] = self._get_files_in_more_than_one_but_not_in_all(fo_list, comparison_result)
not_in_all = comparison_result['files_in_more_than_one_but_not_in_all']
else:
not_in_all = {}
compare_result['non_zero_files_in_common'] = self._get_non_zero_common_files(
compare_result['files_in_common'], not_in_all
comparison_result['non_zero_files_in_common'] = self._get_non_zero_common_files(
comparison_result['files_in_common'], not_in_all
)

@staticmethod
Expand All @@ -104,7 +104,7 @@ def _get_similar_files(
for file_one in exclusive_files[parent_one.uid]:
for similar_file_pair, value in self._find_similar_file_for(file_one, parent_one.uid, parent_two):
similar_files.append(similar_file_pair)
similarity[convert_uid_list_to_compare_id(similar_file_pair)] = value
similarity[convert_uid_list_to_comparison_id(similar_file_pair)] = value
similarity_sets = generate_similarity_sets(remove_duplicates_from_list(similar_files))
return similarity_sets, similarity

Expand Down Expand Up @@ -134,7 +134,7 @@ def combine_similarity_results(self, similar_files: list[list[str]], fo_list: li
def _get_similarity_value(group_of_similar_files: list[str], similarity_dict: dict[str, str]) -> str:
similarities_list = []
for id_tuple in combinations(group_of_similar_files, 2):
similar_file_pair_id = convert_uid_list_to_compare_id(id_tuple)
similar_file_pair_id = convert_uid_list_to_comparison_id(id_tuple)
if similar_file_pair_id in similarity_dict:
similarities_list.append(similarity_dict[similar_file_pair_id])
if not similarities_list:
Expand Down
Loading

0 comments on commit cb9f134

Please sign in to comment.