Skip to content

Commit

Permalink
fix analyzer bug when protecting bytes and str's
Browse files Browse the repository at this point in the history
This makes it so that if a bytes-path is being protected, the str
equivalent is also protected, and vice versa.
  • Loading branch information
nbargnesi committed Apr 11, 2024
1 parent a0a7ca1 commit fc0c305
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 64 deletions.
44 changes: 22 additions & 22 deletions analyzer/windows/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
# https://github.com/cuckoosandbox/cuckoo/blob/ad5bf8939fb4b86d03c4d96014b174b8b56885e3/cuckoo/core/plugins.py#L29

import hashlib
import importlib
import inspect
import logging
import os
import pkgutil
import socket
import inspect
import importlib
import struct
import subprocess
import sys
Expand Down Expand Up @@ -135,22 +135,32 @@ def pids_from_image_names(suffixlist):
return retpids


def in_protected_path(fname: bytes):
"""Checks file name against some protected names."""
if not fname:
return False
def _normalized_protected_path(path: str | bytes) -> bytes:
if isinstance(path, str):
path = path.encode()
if os.path.isdir(path) and path[-1] != b"\\":
path += b"\\"
return path.lower()

fnamelower = fname.lower()

for name in PROTECTED_PATH_LIST:
if name[-1] == "\\" and fnamelower.startswith(name):
def in_protected_path(path: str | bytes) -> bool:
"""Checks if a path is protected."""
if not path:
return False
path = _normalized_protected_path(path)
for protected_path in PROTECTED_PATH_LIST:
if protected_path[-1] == "\\" and path.starswith(protected_path):
return True
elif fnamelower == name:
if protected_path == path:
return True

return False


def add_protected_path(name: str | bytes):
"""Adds a pathname to the protected list"""
PROTECTED_PATH_LIST.append(_normalized_protected_path(name))


def add_pid_to_aux_modules(pid):
for aux in AUX_ENABLED:
try:
Expand All @@ -167,14 +177,6 @@ def del_pid_from_aux_modules(pid):
continue


def add_protected_path(name: bytes):
"""Adds a pathname to the protected list"""
if os.path.isdir(name) and name[-1] != b"\\":
PROTECTED_PATH_LIST.append(name.lower() + b"\\")
else:
PROTECTED_PATH_LIST.append(name.lower())


def upload_files(folder):
"""Create a copy of the given file path."""
log_folder = f"{PATHS['root']}\\{folder}"
Expand Down Expand Up @@ -389,7 +391,7 @@ def choose_package(self):

num_pkg_classes = len(pkg_classes)
if num_pkg_classes != 1:
raise CuckooError(f'expected a single Package subclass in {package_name}, got {num_pkg_classes}')
raise CuckooError(f"expected a single Package subclass in {package_name}, got {num_pkg_classes}")

# Initialize the single analysis package class
log.debug('initializing analysis package "%s"...', package)
Expand Down Expand Up @@ -1007,7 +1009,6 @@ def _handle_kprocess(self, data):
proc = Process(options=self.analyzer.options, config=self.analyzer.config, pid=process_id, thread_id=thread_id)
filepath = proc.get_filepath()
filename = os.path.basename(filepath)
# BUG(njb) potentially; in_protected_path only plays well with bytes
if not in_protected_path(filename):
self.analyzer.process_list.add_pid(process_id)
log.info("Announce process name : %s", filename)
Expand Down Expand Up @@ -1300,7 +1301,6 @@ def _handle_process(self, data):
self.analyzer.CRITICAL_PROCESS_LIST.append(int(self.analyzer.SERVICES_PID))
log.info("Announced %s process name: %s pid: %d", "64-bit" if is_64bit else "32-bit", filename, process_id)
# We want to prevent multiple injection attempts if one is already underway
# BUG(njb) potentially; in_protected_path only plays well with bytes
if not in_protected_path(filename):
_ = proc.inject(interest)
self.LASTINJECT_TIME = timeit.default_timer()
Expand Down
66 changes: 24 additions & 42 deletions analyzer/windows/tests/test_analyzer.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
"""Tests for the analyzer."""

import os
import pathlib
import tempfile
import unittest
from unittest.mock import MagicMock, patch
from unittest.mock import patch, MagicMock

import analyzer


class TestModule(unittest.TestCase):
def test_pid_from_service_name(self):
# just check it doesn't crash
Expand All @@ -19,62 +16,46 @@ def test_get_explorer_pid(self):
_ = analyzer.get_explorer_pid()

def test_pids_from_image_names(self):
pids = analyzer.pids_from_image_names("python.exe")
pids = analyzer.pids_from_image_names('python.exe')
# should be at least one Python process running
self.assertGreaterEqual(len(pids), 1)
self.assertGreaterEqual(len(pids),1)
self.assertIn(os.getpid(), pids)

def test_protected_path_file(self):
with tempfile.NamedTemporaryFile() as ntf:
# test protecting bytes-based paths
protected_path = str(pathlib.Path(ntf.name)).lower().encode()
protected_paths = [protected_path]
with patch("analyzer.PROTECTED_PATH_LIST", protected_paths):
self.assertTrue(analyzer.in_protected_path(ntf.name.encode()))
analyzer.add_protected_path(ntf.name.encode())
self.assertTrue(analyzer.in_protected_path(ntf.name.encode()))

# test protected str-based paths
protected_path = str(pathlib.Path(ntf.name)).lower()
protected_paths = [protected_path]
with patch("analyzer.PROTECTED_PATH_LIST", protected_paths):
self.assertTrue(analyzer.in_protected_path(ntf.name))
# test protecting str-based paths
analyzer.add_protected_path(ntf.name)
self.assertTrue(analyzer.in_protected_path(ntf.name))

# test protecting bytes-based paths, asking for str's
protected_path = str(pathlib.Path(ntf.name)).lower().encode()
protected_paths = [protected_path]
with patch("analyzer.PROTECTED_PATH_LIST", protected_paths):
self.assertTrue(analyzer.in_protected_path(ntf.name))
analyzer.add_protected_path(ntf.name.encode())
self.assertTrue(analyzer.in_protected_path(ntf.name))

# test protected str-based paths, asking for bytes
protected_path = str(pathlib.Path(ntf.name)).lower()
protected_paths = [protected_path]
with patch("analyzer.PROTECTED_PATH_LIST", protected_paths):
self.assertTrue(analyzer.in_protected_path(ntf.name.encode()))
analyzer.add_protected_path(ntf.name)
self.assertTrue(analyzer.in_protected_path(ntf.name.encode()))

def test_protected_path_dir(self):
with tempfile.TemporaryDirectory() as tmpdir:
# test protecting bytes-based dirs
protected_path = tmpdir.lower().encode()
protected_paths = [protected_path]
with patch("analyzer.PROTECTED_PATH_LIST", protected_paths):
self.assertTrue(analyzer.in_protected_path(tmpdir.encode()))

# test protected str-based paths
protected_path = tmpdir.lower()
protected_paths = [protected_path]
with patch("analyzer.PROTECTED_PATH_LIST", protected_paths):
self.assertTrue(analyzer.in_protected_path(tmpdir))
# test protecting bytes-based paths
analyzer.add_protected_path(tmpdir.encode())
self.assertTrue(analyzer.in_protected_path(tmpdir.encode()))

# test protecting str-based paths
analyzer.add_protected_path(tmpdir)
self.assertTrue(analyzer.in_protected_path(tmpdir))

# test protecting bytes-based paths, asking for str's
protected_path = tmpdir.lower().encode()
protected_paths = [protected_path]
with patch("analyzer.PROTECTED_PATH_LIST", protected_paths):
self.assertTrue(analyzer.in_protected_path(tmpdir))
analyzer.add_protected_path(tmpdir.encode())
self.assertTrue(analyzer.in_protected_path(tmpdir))

# test protected str-based paths, asking for bytes
protected_path = tmpdir.lower()
protected_paths = [protected_path]
with patch("analyzer.PROTECTED_PATH_LIST", protected_paths):
self.assertTrue(analyzer.in_protected_path(tmpdir.encode()))
analyzer.add_protected_path(tmpdir)
self.assertTrue(analyzer.in_protected_path(tmpdir.encode()))


class TestAnalyzerInternals(unittest.TestCase):
Expand Down Expand Up @@ -702,3 +683,4 @@ def test_choose_package_zip_compound(self):
pkg_name, pkg_class = test.choose_package()
self.assertEqual("modules.packages.zip_compound", pkg_name)
self.assertEqual(pkg_class.__class__.__name__, "ZipCompound")

0 comments on commit fc0c305

Please sign in to comment.