diff --git a/src/storage/db_interface_frontend.py b/src/storage/db_interface_frontend.py index bd734bf7a..c10c6a919 100644 --- a/src/storage/db_interface_frontend.py +++ b/src/storage/db_interface_frontend.py @@ -1,6 +1,8 @@ from __future__ import annotations +import os.path import re +from pathlib import Path from typing import Any, NamedTuple, Optional from sqlalchemy import Column, func, or_, select @@ -17,6 +19,7 @@ FileObjectEntry, FirmwareEntry, SearchCacheEntry, + VirtualFilePath, fw_files_table, included_files_table, ) @@ -453,3 +456,33 @@ def _get_mode_dict(self, parent_uid: str | None) -> dict[str, str]: meta_dict['path'].lstrip('/'): meta_dict['mode'] for meta_dict in fs_metadata.get('result', {}).get('files', []) } + + def find_link_target(self, virtual_file_path: dict[str, list[str]], root_uid: str, target_path: str) -> str | None: + if target_path.startswith('/'): + candidate_paths = {target_path} + else: + candidate_paths = { + # we need to resolve stuff like /sbin/../bin/busybox to /bin/busybox + # there is currently no equivalent to os.path.normpath in pathlib + # (and no, Path.resolve() is not equivalent!) + os.path.normpath(Path(path).parent / target_path) + for path_list in virtual_file_path.values() + for path in path_list + } + with self.get_read_only_session() as session: + parents = list(virtual_file_path) + query = ( + select(VirtualFilePath.file_uid) + .join(fw_files_table, fw_files_table.c.root_uid == root_uid) + .filter( + or_( + VirtualFilePath.parent_uid == fw_files_table.c.file_uid, + VirtualFilePath.parent_uid == root_uid, # special case: parent is also root + ) + ) + .filter(VirtualFilePath.parent_uid.in_(parents)) + .filter(VirtualFilePath.file_path.in_(candidate_paths)) + ) + for uid in session.execute(query.limit(1)).scalars(): + return uid + return None diff --git a/src/test/integration/storage/test_db_interface_frontend.py b/src/test/integration/storage/test_db_interface_frontend.py index 3be20ac65..1c9ab3696 100644 --- a/src/test/integration/storage/test_db_interface_frontend.py +++ b/src/test/integration/storage/test_db_interface_frontend.py @@ -9,6 +9,7 @@ from .helper import ( TEST_FO, TEST_FW, + add_included_file, create_fw_with_child_fo, create_fw_with_parent_and_child, get_fo_with_2_root_fw, @@ -621,3 +622,26 @@ def test_get_root_uid(frontend_db, backend_db): backend_db.insert_multiple_objects(parent_fw, child_fo) assert frontend_db.get_root_uid(child_fo.uid) == parent_fw.uid assert frontend_db.get_root_uid(parent_fw.uid) == parent_fw.uid + + +def test_find_link_target(frontend_db, backend_db): + fw, parent_fo, child_fo = create_fw_with_parent_and_child() + child_fo.virtual_file_path[parent_fo.uid].append('/usr/bin/foo') + link_to_fo = create_test_file_object(uid='deadbeef_1') + add_included_file(link_to_fo, parent_fo, fw, ['/usr/sbin/bar']) + backend_db.insert_multiple_objects(fw, parent_fo, child_fo, link_to_fo) + + result = frontend_db.find_link_target(link_to_fo.virtual_file_path, fw.uid, '../bin/foo') + assert result == child_fo.uid + + +def test_find_link_parent_is_root(frontend_db, backend_db): + # special case: parent is also root + child_fo, parent_fw = create_fw_with_child_fo() + child_fo.virtual_file_path[parent_fw.uid].append('/usr/bin/foo') + link_to_fo = create_test_file_object(uid='deadbeef_1') + add_included_file(link_to_fo, parent_fw, parent_fw, ['/usr/sbin/bar']) + backend_db.insert_multiple_objects(parent_fw, child_fo, link_to_fo) + + result = frontend_db.find_link_target(link_to_fo.virtual_file_path, parent_fw.uid, '../bin/foo') + assert result == child_fo.uid diff --git a/src/web_interface/components/analysis_routes.py b/src/web_interface/components/analysis_routes.py index bd253f1a3..63689a5cd 100644 --- a/src/web_interface/components/analysis_routes.py +++ b/src/web_interface/components/analysis_routes.py @@ -93,6 +93,7 @@ def show_analysis(self, uid, selected_analysis=None, root_uid=None): available_plugins=self._get_used_and_unused_plugins( file_obj.processed_analysis, [x for x in analysis_plugins if x != 'unpacker'] ), + link_target=self._get_link_target(file_obj, root_uid) if self._is_link(file_obj) else None, ) def _get_correct_template(self, selected_analysis: str | None, fw_object: Firmware | FileObject): @@ -232,6 +233,25 @@ def show_elf_dependency_graph(self, uid: str, root_uid: str): colors=colors, ) + @staticmethod + def _is_link(file_obj: FileObject) -> bool: + type_analysis = file_obj.processed_analysis.get('file_type', {}).get('result', {}) + mime = type_analysis.get('mime') + full_type = type_analysis.get('full', '') + return mime == 'inode/symlink' and full_type.startswith('symbolic link to') + + def _get_link_target(self, file_obj: FileObject, root_uid: str) -> str | None: + if not root_uid: + return None + try: + full_type = file_obj.processed_analysis['file_type']['result']['full'] + # if FO is a symlink, file_type analysis "full" will be something like "symbolic link to 'busybox'" + target_path = full_type.split("'")[1] + except (IndexError, KeyError): + return None + target_uid = self.db.frontend.find_link_target(file_obj.virtual_file_path, root_uid, target_path) + return f'{full_type}' if target_uid else None + def _add_preset_from_firmware(plugin_dict, fw: Firmware): """ diff --git a/src/web_interface/templates/show_analysis.html b/src/web_interface/templates/show_analysis.html index d28b35927..2d780b17a 100644 --- a/src/web_interface/templates/show_analysis.html +++ b/src/web_interface/templates/show_analysis.html @@ -112,7 +112,11 @@ {# Header section #}