diff --git a/src/intercom/back_end_binding.py b/src/intercom/back_end_binding.py index 655598371f..336155ddf2 100644 --- a/src/intercom/back_end_binding.py +++ b/src/intercom/back_end_binding.py @@ -20,6 +20,7 @@ from collections.abc import Callable from objects.firmware import Firmware + from scheduler.unpacking_scheduler import UnpackingScheduler from storage.unpacking_locks import UnpackingLockManager @@ -38,7 +39,7 @@ def __init__( ): self.analysis_service = analysis_service self.compare_service = compare_service - self.unpacking_service = unpacking_service + self.unpacking_service: UnpackingScheduler = unpacking_service self.unpacking_locks = unpacking_locks self.poll_delay = config.backend.intercom_poll_delay @@ -64,6 +65,7 @@ def start(self): self._start_listener(InterComBackEndSingleFileTask, self.analysis_service.update_analysis_of_single_object) self._start_listener(InterComBackEndPeekBinaryTask) self._start_listener(InterComBackEndLogsTask) + self._start_listener(InterComBackEndCancelTask, self._cancel_task) logging.info('Intercom online') def shutdown(self): @@ -87,6 +89,11 @@ def _backend_worker(self, listener: type[InterComListener], do_after_function: C do_after_function(task) logging.debug(f'{listener.__name__} listener stopped') + def _cancel_task(self, root_uid: str): + logging.warning(f'Cancelling unpacking and analysis of {root_uid}.') + self.unpacking_service.cancel_unpacking(root_uid) + self.analysis_service.cancel_analysis(root_uid) + class InterComBackEndAnalysisPlugInsPublisher(InterComRedisInterface): def __init__(self, analysis_service=None): @@ -135,6 +142,10 @@ class InterComBackEndCompareTask(InterComListener): CONNECTION_TYPE = 'compare_task' +class InterComBackEndCancelTask(InterComListener): + CONNECTION_TYPE = 'cancel_task' + + class InterComBackEndRawDownloadTask(InterComListenerAndResponder): CONNECTION_TYPE = 'raw_download_task' OUTGOING_CONNECTION_TYPE = 'raw_download_task_resp' diff --git a/src/intercom/front_end_binding.py b/src/intercom/front_end_binding.py index 218fce1136..3b9a1c39d4 100644 --- a/src/intercom/front_end_binding.py +++ b/src/intercom/front_end_binding.py @@ -31,6 +31,9 @@ def add_compare_task(self, compare_id, force=False): def delete_file(self, uid_list: set[str]): self._add_to_redis_queue('file_delete_task', uid_list) + def cancel_analysis(self, root_uid: str): + self._add_to_redis_queue('cancel_task', root_uid) + def get_available_analysis_plugins(self): plugin_dict = self.redis.get('analysis_plugins', delete=False) if plugin_dict is None: diff --git a/src/scheduler/analysis/scheduler.py b/src/scheduler/analysis/scheduler.py index f735da4fee..a72e4919e5 100644 --- a/src/scheduler/analysis/scheduler.py +++ b/src/scheduler/analysis/scheduler.py @@ -20,6 +20,7 @@ from helperFunctions.logging import TerminalColors, color_string from helperFunctions.plugin import discover_analysis_plugins from helperFunctions.process import ExceptionSafeProcess, check_worker_exceptions, stop_processes +from objects.firmware import Firmware from scheduler.analysis_status import AnalysisStatus from scheduler.task_scheduler import MANDATORY_PLUGINS, AnalysisTaskScheduler from statistic.analysis_stats import get_plugin_stats @@ -197,6 +198,7 @@ def update_analysis_of_single_object(self, fo: FileObject): :param fo: The file that is to be analyzed """ + fo.root_uid = None # for status/scheduling self.task_scheduler.schedule_analysis_tasks(fo, fo.scheduled_analysis) self._check_further_process_or_complete(fo) @@ -209,6 +211,9 @@ def _format_available_plugins(self) -> str: plugins.append(f'{plugin_name} {self.analysis_plugins[plugin_name].VERSION}') return ', '.join(plugins) + def cancel_analysis(self, root_uid: str): + self.status.cancel_analysis(root_uid) + # ---- plugin initialization ---- def _remove_example_plugins(self): @@ -549,8 +554,13 @@ def _check_further_process_or_complete(self, fw_object): if not fw_object.scheduled_analysis: logging.info(f'Analysis Completed: {fw_object.uid}') self.status.remove_object(fw_object) - else: + elif isinstance(fw_object, Firmware) or self.status.fw_analysis_is_in_progress(fw_object): self.process_queue.put(fw_object) + else: + logging.debug( + f'Removing {fw_object.uid} from analysis scheduling because analysis of FW {fw_object.root_uid} ' + f'was cancelled.' + ) # ---- miscellaneous functions ---- diff --git a/src/scheduler/analysis_status.py b/src/scheduler/analysis_status.py index 502952f74d..502793fdf5 100644 --- a/src/scheduler/analysis_status.py +++ b/src/scheduler/analysis_status.py @@ -5,10 +5,10 @@ import os from dataclasses import dataclass, field from enum import Enum, auto -from multiprocessing import Process, Queue, Value +from multiprocessing import Manager, Process, Queue, Value from queue import Empty from time import time -from typing import TYPE_CHECKING, Dict, Set +from typing import TYPE_CHECKING from helperFunctions.process import stop_process from objects.firmware import Firmware @@ -27,17 +27,23 @@ class _UpdateType(Enum): add_file = auto() add_analysis = auto() remove_file = auto() + is_currently_analyzed = auto() + cancel = auto() class AnalysisStatus: def __init__(self): - self._worker = AnalysisStatusWorker() + self.manager = Manager() + # this object tracks only the FW objects and not the status of the individual files + self.currently_analyzed = self.manager.dict() + self._worker = AnalysisStatusWorker(currently_analyzed_fw=self.currently_analyzed) def start(self): self._worker.start() def shutdown(self): self._worker.shutdown() + self.manager.shutdown() def add_update(self, fw_object: Firmware | FileObject, included_files: list[str] | set[str]): self.add_object(fw_object) @@ -70,25 +76,33 @@ def add_analysis(self, fw_object: FileObject, plugin: str): def remove_object(self, fw_object: Firmware | FileObject): self._worker.queue.put((_UpdateType.remove_file, fw_object.uid, fw_object.root_uid)) + def fw_analysis_is_in_progress(self, fw_object: Firmware | FileObject) -> bool: + return fw_object.root_uid in self.currently_analyzed or fw_object.uid in self.currently_analyzed + + def cancel_analysis(self, root_uid: str): + self._worker.queue.put((_UpdateType.cancel, root_uid)) + @dataclass class FwAnalysisStatus: - files_to_unpack: Set[str] - files_to_analyze: Set[str] + files_to_unpack: set[str] + files_to_analyze: set[str] total_files_count: int hid: str - analysis_plugins: Dict[str, int] + analysis_plugins: dict[str, int] start_time: float = field(default_factory=time) - completed_files: Set[str] = field(default_factory=set) + completed_files: set[str] = field(default_factory=set) total_files_with_duplicates: int = 1 unpacked_files_count: int = 1 analyzed_files_count: int = 0 class AnalysisStatusWorker: - def __init__(self): + def __init__(self, currently_analyzed_fw: dict): self.recently_finished = {} - self.currently_running: Dict[str, FwAnalysisStatus] = {} + self.recently_canceled = {} + self.currently_running: dict[str, FwAnalysisStatus] = {} + self.currently_analyzed: dict = currently_analyzed_fw self._worker_process = None self.queue = Queue() self._running = Value('i', 0) @@ -131,6 +145,8 @@ def _update_status(self): self._add_analysis(*args) elif update_type == _UpdateType.remove_file: self._remove_object(*args) + elif update_type == _UpdateType.cancel: + self._cancel_analysis(*args) def _add_update(self, fw_uid: str, included_files: set[str]): status = self.currently_running[fw_uid] @@ -149,6 +165,8 @@ def _add_firmware(self, uid: str, included_files: set[str], hid: str, scheduled_ hid=hid, analysis_plugins={p: 0 for p in scheduled_analyses or []}, ) + # This is only for checking if a FW is currently analyzed from *outside* of this class + self.currently_analyzed[uid] = True def _add_included_file(self, uid: str, root_uid: str, included_files: set[str]): """ @@ -190,6 +208,7 @@ def _remove_object(self, uid: str, root_uid: str): if len(status.files_to_unpack) == len(status.files_to_analyze) == 0: self.recently_finished[root_uid] = self._init_recently_finished(status) del self.currently_running[root_uid] + self.currently_analyzed.pop(root_uid, None) logging.info(f'Analysis of firmware {root_uid} completed') @staticmethod @@ -210,6 +229,7 @@ def _store_status(self): status = { 'current_analyses': self._get_current_analyses_stats(), 'recently_finished_analyses': self.recently_finished, + 'recently_canceled_analyses': self.recently_canceled, } self.redis.set_analysis_status(status) @@ -226,3 +246,15 @@ def _get_current_analyses_stats(self): } for uid, status in self.currently_running.items() } + + def _cancel_analysis(self, root_uid: str): + if root_uid in self.currently_running: + status = self.currently_running.pop(root_uid) + self.recently_canceled[root_uid] = { + 'unpacked_count': status.unpacked_files_count, + 'analyzed_count': status.analyzed_files_count, + 'total_count': status.total_files_count, + 'hid': status.hid, + 'time_finished': time(), + } + self.currently_analyzed.pop(root_uid, None) diff --git a/src/scheduler/unpacking_scheduler.py b/src/scheduler/unpacking_scheduler.py index b0279b1b54..633c252d50 100644 --- a/src/scheduler/unpacking_scheduler.py +++ b/src/scheduler/unpacking_scheduler.py @@ -193,6 +193,10 @@ def _work_thread_wrapper(self, task: FileObject, container: ExtractionContainer) def work_thread(self, task: FileObject, container: ExtractionContainer): if isinstance(task, Firmware): self._init_currently_unpacked(task) + elif task.root_uid not in self.currently_extracted: + # this should only happen if the unpacking of the parent FW was cancelled => skip unpacking + logging.debug(f'Cancelling unpacking of {task.uid}. Reason: Unpacking of {task.root_uid} was cancelled') + return with TemporaryDirectory(dir=container.tmp_dir.name) as tmp_dir: try: @@ -333,3 +337,7 @@ def _init_currently_unpacked(self, fo: Firmware): logging.warning(f'Starting unpacking of {fo.uid} but it is currently also still being unpacked') else: self.currently_extracted[fo.uid] = {'remaining': {fo.uid}, 'done': set(), 'delayed_vfp_update': {}} + + def cancel_unpacking(self, root_uid: str): + if self.currently_extracted is not None and root_uid in self.currently_extracted: + self.currently_extracted.pop(root_uid) diff --git a/src/test/integration/intercom/test_backend_scheduler.py b/src/test/integration/intercom/test_backend_scheduler.py index 444cf3b9e0..88a0d8fff2 100644 --- a/src/test/integration/intercom/test_backend_scheduler.py +++ b/src/test/integration/intercom/test_backend_scheduler.py @@ -6,7 +6,7 @@ from intercom.back_end_binding import InterComBackEndBinding # This number must be changed, whenever a listener is added or removed -NUMBER_OF_LISTENERS = 12 +NUMBER_OF_LISTENERS = 13 class ServiceMock: diff --git a/src/test/integration/intercom/test_task_communication.py b/src/test/integration/intercom/test_task_communication.py index d6564f4ab7..98d8672125 100644 --- a/src/test/integration/intercom/test_task_communication.py +++ b/src/test/integration/intercom/test_task_communication.py @@ -11,6 +11,7 @@ InterComBackEndAnalysisPlugInsPublisher, InterComBackEndAnalysisTask, InterComBackEndBinarySearchTask, + InterComBackEndCancelTask, InterComBackEndCompareTask, InterComBackEndFileDiffTask, InterComBackEndLogsTask, @@ -25,7 +26,8 @@ class AnalysisServiceMock: - def get_plugin_dict(self): + @staticmethod + def get_plugin_dict(): return {'dummy': 'dummy description'} @@ -188,3 +190,10 @@ def test_logs_task(self, intercom_frontend, monkeypatch): result = result_future.result() assert task is None, 'task not correct' assert result == expected_result.split() + + def test_cancel_task(self, intercom_frontend): + task = InterComBackEndCancelTask() + root_uid = 'root_uid' + intercom_frontend.cancel_analysis(root_uid) + result = task.get_next_task() + assert result == root_uid diff --git a/src/test/unit/conftest.py b/src/test/unit/conftest.py index c0a1ad470c..71b970d728 100644 --- a/src/test/unit/conftest.py +++ b/src/test/unit/conftest.py @@ -67,6 +67,9 @@ def add_analysis_task(self, task): def add_re_analyze_task(self, task, unpack=True): self.task_list.append(task) + def cancel_analysis(self, root_uid): + self.task_list.append(root_uid) + class FrontendDatabaseMock: """A class mocking :py:class:`~web_interface.frontend_database.FrontendDatabase`.""" diff --git a/src/test/unit/scheduler/test_analysis_status.py b/src/test/unit/scheduler/test_analysis_status.py index c5fa38dba8..01a186c12a 100644 --- a/src/test/unit/scheduler/test_analysis_status.py +++ b/src/test/unit/scheduler/test_analysis_status.py @@ -169,3 +169,41 @@ def test_clear_recently_finished(self, time_finished_delay, expected_result): self.status._worker.recently_finished = {'foo': {'time_finished': time() - time_finished_delay}} self.status._worker._clear_recently_finished() assert bool('foo' in self.status._worker.recently_finished) == expected_result + + def test_cancel_analysis(self): + self.status._worker.currently_running = { + ROOT_UID: FwAnalysisStatus( + files_to_unpack=set(), + files_to_analyze={'foo'}, + analysis_plugins={}, + hid='', + total_files_count=3, + ) + } + self.status.currently_analyzed[ROOT_UID] = True + fo = FileObject(binary=b'foo') + fo.root_uid = ROOT_UID + fo.uid = 'foo' + assert self.status.fw_analysis_is_in_progress(fo) + + self.status.cancel_analysis(ROOT_UID) + self.status._worker._update_status() + + assert ROOT_UID not in self.status._worker.currently_running + assert ROOT_UID not in self.status.currently_analyzed + assert not self.status.fw_analysis_is_in_progress(fo) + + def test_cancel_unknown_uid(self): + self.status._worker.currently_running = { + ROOT_UID: FwAnalysisStatus( + files_to_unpack=set(), + files_to_analyze={'foo'}, + analysis_plugins={}, + hid='', + total_files_count=3, + ) + } + self.status.cancel_analysis('unknown') + self.status._worker._update_status() + + assert ROOT_UID in self.status._worker.currently_running diff --git a/src/test/unit/web_interface/test_app_ajax_routes.py b/src/test/unit/web_interface/test_app_ajax_routes.py index 7b31d636c4..c11e193f1a 100644 --- a/src/test/unit/web_interface/test_app_ajax_routes.py +++ b/src/test/unit/web_interface/test_app_ajax_routes.py @@ -1,3 +1,5 @@ +from http import HTTPStatus + import pytest from helperFunctions.data_conversion import normalize_compare_id @@ -78,3 +80,10 @@ def test_ajax_get_hex_preview(self, test_client): result = test_client.get('/ajax_get_hex_preview/some_uid/0/10') assert result.data.startswith(b'
', GET) + def cancel_analysis(self, root_uid: str): + logging.info(f'Received analysis cancel request for {root_uid}') + self.intercom.cancel_analysis(root_uid=root_uid) + return {}, HTTPStatus.OK diff --git a/src/web_interface/security/privileges.py b/src/web_interface/security/privileges.py index 25dac788b6..8949f77989 100644 --- a/src/web_interface/security/privileges.py +++ b/src/web_interface/security/privileges.py @@ -9,6 +9,7 @@ 'advanced_search': ['superuser', 'senior_analyst', 'analyst'], 'pattern_search': ['superuser', 'senior_analyst', 'analyst'], 'submit_analysis': ['superuser', 'senior_analyst'], + 'cancel_analysis': ['superuser', 'senior_analyst'], 'download': ['superuser', 'senior_analyst'], 'delete': ['superuser'], 'manage_users': ['superuser'], diff --git a/src/web_interface/static/js/system_health.js b/src/web_interface/static/js/system_health.js index 9de00898ee..70da6898d1 100644 --- a/src/web_interface/static/js/system_health.js +++ b/src/web_interface/static/js/system_health.js @@ -146,50 +146,78 @@ function updateCurrentAnalyses(analysisData) { const currentAnalysesElement = document.getElementById("current-analyses"); const currentAnalysesHTML = [].concat( Object.entries(analysisData.current_analyses) - .map(([uid, analysisStats]) => createCurrentAnalysisItem(analysisStats, uid, false)), + .map(([uid, analysisStats]) => createCurrentAnalysisItem(analysisStats, uid)), Object.entries(analysisData.recently_finished_analyses) .map(([uid, analysisStats]) => createCurrentAnalysisItem(analysisStats, uid, true)), + Object.entries(analysisData.recently_canceled_analyses) + .map(([uid, analysisStats]) => createCurrentAnalysisItem(analysisStats, uid, false, true)), ).join("\n"); currentAnalysesElement.innerHTML = currentAnalysesHTML !== "" ? currentAnalysesHTML : "No analysis in progress"; document.querySelectorAll('div[role=tooltip]').forEach((element) => {element.remove();}); $("body").tooltip({selector: '[data-toggle="tooltip"]'}); // update tooltips for dynamically created elements } -function createCurrentAnalysisItem(data, uid, isFinished) { +function createCurrentAnalysisItem(data, uid, isFinished=false, isCancelled=false) { const timeString = isFinished ? `Finished in ${getDuration(null, data.duration)}` : `${getDuration(data.start_time)}`; const total = isFinished ? data.total_files_count : data.total_count; const showDetails = Boolean(document.getElementById("ca-show-details").checked); - const width = isFinished || !showDetails ? "30px": "50%"; + const width = isFinished || isCancelled || !showDetails ? "30px": "50%"; const unpackingIsFinished = isFinished ? null : (data.unpacked_count == data.total_count); - const padding = isFinished || !showDetails ? 55 : 211; + const padding = isFinished || isCancelled || !showDetails ? 55 : 211; + const cancelButton = isFinished || isCancelled ? '' : ` + + `; + const elapsedTime = isCancelled ? 'Cancelled' : ` ++ ${createIconCell("clock", "Elapsed Time", width)} + + `; return ` - -+ +${timeString}
+-${data.hid}
++- `; } +function cancelAnalysis(element, root_uid) { + element.innerHTML = ` ++ + ${data.hid} + + ${cancelButton} +
-
- ${createIconCell("clock", "Elapsed Time", width)} - + ${elapsedTime}- -${timeString}
-${createIconCell("box-open", "Unpacking Progress", width)} - ${createProgressBarCell(isFinished ? data.total_files_count : data.unpacked_count, total, padding)} + ${createProgressBarCell(isFinished ? data.total_files_count : data.unpacked_count, total, padding, isFinished, isCancelled)} ${createIconCell("microscope", "Analysis Progress", width)} - ${createProgressBarCell(isFinished ? data.total_files_count : data.analyzed_count, total, padding)} + ${createProgressBarCell(isFinished ? data.total_files_count : data.analyzed_count, total, padding, isFinished, isCancelled)} - ${!isFinished && showDetails ? createPluginProgress(data, unpackingIsFinished) : ""} + ${!isFinished && !isCancelled && showDetails ? createPluginProgress(data, unpackingIsFinished) : ""}+ Cancelling... ++ `; + fetch(`/ajax/cancel_analysis/${root_uid}`).then(response => { + if (!response.ok) { + console.log(`Error: could not cancel analysis of ${root_uid}`); + } + }); +} + function createPluginProgress(data, unpackingIsFinished) { return Object.entries(data.plugins).map( ([pluginName, pluginCount]) => createSinglePluginProgress(pluginName, pluginCount, data.total_count_with_duplicates, unpackingIsFinished) @@ -205,10 +233,10 @@ function createSinglePluginProgress(plugin, count, total, unpackingIsFinished) { `; } -function createProgressBarCell(count, total, padding_offset=211, unpackingIsFinished=true) { +function createProgressBarCell(count, total, padding_offset=211, unpackingIsFinished=true, isCancelled=false) { const progress = count / total * 100; const progressString = `${count} / ${total} (${progress.toFixed(1)}%)`; - const divClass = (progress >= 100.0) ? `progress-bar ${unpackingIsFinished ? "bg-success" : "bg-warning"}` : "progress-bar"; + const divClass = (progress >= 100.0) ? `progress-bar ${unpackingIsFinished ? "bg-success" : "bg-warning"}` : isCancelled ? "bg-danger" : "progress-bar"; const pStyle = { "color": "white", "font-size": "0.75rem",