diff --git a/unblob/extractor.py b/unblob/extractor.py index 8b4aac0d29..8923b7ddde 100644 --- a/unblob/extractor.py +++ b/unblob/extractor.py @@ -1,4 +1,6 @@ """File extraction related functions.""" +# ruff: noqa + import errno import os from pathlib import Path @@ -15,6 +17,14 @@ FILE_PERMISSION_MASK = 0o644 DIR_PERMISSION_MASK = 0o775 +# re-exported, or unused symbols +__all__ = [ + "is_safe_path", + "MaliciousSymlinkRemoved", + "fix_symlink", + "is_recursive_link", +] + def carve_chunk_to_file(carve_path: Path, file: File, chunk: Chunk): """Extract valid chunk to a file, which we then pass to another tool to extract it.""" @@ -48,67 +58,96 @@ def is_recursive_link(path: Path) -> bool: def fix_symlink(path: Path, outdir: Path, task_result: TaskResult) -> Path: - """Rewrites absolute symlinks to point within the extraction directory (outdir). - - If it's not a relative symlink it is either removed it it attempts - to traverse outside of the extraction directory or rewritten to be - fully portable (no mention of the extraction directory in the link - value). - """ - if is_recursive_link(path): - logger.error("Symlink loop identified, removing", path=path) - error_report = MaliciousSymlinkRemoved( - link=path.as_posix(), target=os.readlink(path) - ) - task_result.add_report(error_report) - path.unlink() - return path - - raw_target = os.readlink(path) - if not raw_target: - logger.error("Symlink with empty target, removing.") - path.unlink() - return path - - target = Path(raw_target) - - if target.is_absolute(): - target = Path(target.as_posix().lstrip("/")) - else: - target = path.resolve() + # This is a temporary function for existing unit tests in tests/test_extractor.py + fix_extracted_directory(outdir, task_result) + return path - safe = is_safe_path(outdir, target) - if not safe: - logger.error("Path traversal attempt through symlink, removing", target=target) - error_report = MaliciousSymlinkRemoved( - link=path.as_posix(), target=target.as_posix() - ) - task_result.add_report(error_report) - path.unlink() +def sanitize_symlink_target(base_dir, current_dir, target): + # Normalize all paths to their absolute forms + base_dir_abs = os.path.abspath(base_dir) + current_dir_abs = os.path.abspath(current_dir) + target_abs = ( + os.path.abspath(os.path.join(current_dir, target)) + if not os.path.isabs(target) + else os.path.abspath(target) + ) + + # Check if the target is absolute and within the base_dir + if os.path.isabs(target): + if target_abs.startswith(base_dir_abs): + return os.path.relpath(target_abs, current_dir_abs) + else: + # Target is absolute but outside base_dir - we'll pretend base_dir is our root + # and adjust the target to be within base_dir + abs = ( + base_dir + + "/" + + os.path.relpath( + target_abs, os.path.commonpath([target_abs, base_dir_abs]) + ) + ) + # We want to return the relative path from current_dir to the adjusted target + return os.path.relpath(abs, current_dir_abs) else: - relative_target = os.path.relpath(outdir.joinpath(target), start=path.parent) - path.unlink() - path.symlink_to(relative_target) - return path + # Target is relative + if target_abs.startswith(base_dir_abs): + # Target is relative and does not escape base_dir + return os.path.relpath(target_abs, current_dir_abs) + else: + # Target is relative and escapes base_dir + # Say we have foo/passwd -> ../../../etc/passwd with root at /host/test_archive + # from /host/test_archive/foo/passwd, we want to return ../etc/passwd which is the + # relative path from /host/test_archive/foo to /host/test_archive/etc/passwd + # without escaping /host/test_archive + + for drop_count in range(len(target.split("/"))): + # We drop '..'s from the target by prepending placeholder directories until we get something valid + abs = current_dir + "/" + "/".join(["foo"] * drop_count) + target + resolved = os.path.abspath(abs) + if resolved.startswith(base_dir_abs): + break + else: + raise ValueError( + f"Could not resolve symlink target {target} within base_dir {base_dir}" + ) + + # We need to add the /placeholder to the relative path because we need + # to act like a file within base_dir is our root (as opposed to base_dir itself) + return os.path.relpath(resolved, base_dir_abs + "/placeholder") def fix_extracted_directory(outdir: Path, task_result: TaskResult): def _fix_extracted_directory(directory: Path): if not directory.exists(): return - for path in (directory / p for p in os.listdir(directory)): - try: - fix_permission(path) - if path.is_symlink(): - fix_symlink(path, outdir, task_result) - continue - if path.is_dir(): - _fix_extracted_directory(path) - except OSError as e: - if e.errno == errno.ENAMETOOLONG: - continue - raise e from None + + base_dir = os.path.abspath(outdir) + for root, dirs, files in os.walk(base_dir, topdown=True): + fix_permission(Path(root)) + for name in dirs + files: + try: + full_path = os.path.join(root, name) + if os.path.islink(full_path): + # Make symlinks relative and constrain them to the base_dir + target = os.readlink(full_path) + new_target = sanitize_symlink_target(base_dir, root, target) + if new_target != target: + os.remove(full_path) + os.symlink(new_target, full_path) + logger.info( + "Updated symlink", path=full_path, target=new_target + ) + else: + logger.debug( + "Symlink is already sanitized", + path=full_path, + target=new_target, + ) + except OSError as e: + if e.errno == errno.ENAMETOOLONG: + continue + raise e from None fix_permission(outdir) _fix_extracted_directory(outdir)