diff --git a/unblob/extractor.py b/unblob/extractor.py index 93880ed892..7af0479c3c 100644 --- a/unblob/extractor.py +++ b/unblob/extractor.py @@ -8,13 +8,14 @@ from .file_utils import carve, is_safe_path from .models import Chunk, File, PaddingChunk, TaskResult, UnknownChunk, ValidChunk -from .report import MaliciousSymlinkRemoved logger = get_logger() FILE_PERMISSION_MASK = 0o644 DIR_PERMISSION_MASK = 0o775 +_ = is_safe_path # it is re-exported + def carve_chunk_to_file(carve_path: Path, file: File, chunk: Chunk): """Extract valid chunk to a file, which we then pass to another tool to extract it.""" @@ -57,8 +58,11 @@ def sanitize_symlink_target(base_dir, current_dir, target): # Normalize all paths to their absolute forms base_dir_abs = os.path.abspath(base_dir) current_dir_abs = os.path.abspath(current_dir) - target_abs = os.path.abspath(os.path.join(current_dir, target)) \ - if not os.path.isabs(target) else os.path.abspath(target) + target_abs = ( + os.path.abspath(os.path.join(current_dir, target)) + if not os.path.isabs(target) + else os.path.abspath(target) + ) # Check if the target is absolute and within the base_dir if os.path.isabs(target): @@ -67,7 +71,13 @@ def sanitize_symlink_target(base_dir, current_dir, target): else: # Target is absolute but outside base_dir - we'll pretend base_dir is our root # and adjust the target to be within base_dir - abs = base_dir + "/" + os.path.relpath(target_abs, os.path.commonpath([target_abs, base_dir_abs])) + abs = ( + base_dir + + "/" + + os.path.relpath( + target_abs, os.path.commonpath([target_abs, base_dir_abs]) + ) + ) # We want to return the relative path from current_dir to the adjusted target return os.path.relpath(abs, current_dir_abs) else: @@ -82,18 +92,21 @@ def sanitize_symlink_target(base_dir, current_dir, target): # relative path from /host/test_archive/foo to /host/test_archive/etc/passwd # without escaping /host/test_archive - for drop_count in range(0, len(target.split('/'))): + for drop_count in range(len(target.split("/"))): # We drop '..'s from the target by prepending placeholder directories until we get something valid abs = current_dir + "/" + "/".join(["foo"] * drop_count) + target resolved = os.path.abspath(abs) if resolved.startswith(base_dir_abs): break else: - raise ValueError(f"Could not resolve symlink target {target} within base_dir {base_dir}") + raise ValueError( + f"Could not resolve symlink target {target} within base_dir {base_dir}" + ) # We need to add the /placeholder to the relative path because we need # to act like a file within base_dir is our root (as opposed to base_dir itself) - return os.path.relpath(resolved, base_dir_abs + '/placeholder') + return os.path.relpath(resolved, base_dir_abs + "/placeholder") + def fix_extracted_directory(outdir: Path, task_result: TaskResult): def _fix_extracted_directory(directory: Path): @@ -103,7 +116,7 @@ def _fix_extracted_directory(directory: Path): base_dir = os.path.abspath(outdir) for root, dirs, files in os.walk(base_dir, topdown=True): fix_permission(Path(root)) - for name in dirs+files: + for name in dirs + files: try: full_path = os.path.join(root, name) if os.path.islink(full_path): @@ -113,9 +126,15 @@ def _fix_extracted_directory(directory: Path): if new_target != target: os.remove(full_path) os.symlink(new_target, full_path) - logger.info("Updated symlink", path=full_path, target=new_target) + logger.info( + "Updated symlink", path=full_path, target=new_target + ) else: - logger.debug("Symlink is already sanitized", path=full_path, target=new_target) + logger.debug( + "Symlink is already sanitized", + path=full_path, + target=new_target, + ) except OSError as e: if e.errno == errno.ENAMETOOLONG: continue