diff --git a/CHANGELOG.md b/CHANGELOG.md index ce1aa983e..954deda3f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,12 +2,15 @@ ## master (unreleased) +Unlock powerful malware analysis with capa's new [VMRay sandbox](https://www.vmray.com/) integration! Simply provide a VMRay analysis archive, and capa will automatically extract and match capabilties, streamlining your workflow. + ### New Features - regenerate ruleset cache automatically on source change (only in dev mode) #2133 @s-ff - add landing page https://mandiant.github.io/capa/ @williballenthin #2310 - add rules website https://mandiant.github.io/capa/rules @DeeyaSingh #2310 - add .justfile @williballenthin #2325 +- dynamic: add support for VMRay dynamic sandbox traces #2208 @mike-hunhoff @r-sm2024 @mr-tz ### Breaking Changes diff --git a/README.md b/README.md index 4fc491b25..3a987b7d5 100644 --- a/README.md +++ b/README.md @@ -150,13 +150,15 @@ function @ 0x4011C0 ... ``` -## analyzing sandbox reports -Additionally, capa also supports analyzing sandbox reports for dynamic capability extraction. -In order to use this, you first submit your sample to one of supported sandboxes for analysis, and then run capa against the generated report file. +capa also supports dynamic capabilities detection for multiple sandboxes including: +* [CAPE](https://github.com/kevoreilly/CAPEv2) (supported report formats: `.json`, `.json_`, `.json.gz`) +* [DRAKVUF](https://github.com/CERT-Polska/drakvuf-sandbox/) (supported report formats: `.log`, `.log.gz`) +* [VMRay](https://www.vmray.com/) (supported report formats: analysis archive `.zip`) -Currently, capa supports the [CAPE sandbox](https://github.com/kevoreilly/CAPEv2) and the [DRAKVUF sandbox](https://github.com/CERT-Polska/drakvuf-sandbox/). In order to use either, simply run capa against the generated file (JSON for CAPE or LOG for DRAKVUF sandbox) and it will automatically detect the sandbox and extract capabilities from it. -Here's an example of running capa against a packed binary, and then running capa against the CAPE report of that binary: +To use this feature, submit your file to a supported sandbox and then download and run capa against the generated report file. This feature enables capa to match capabilities against dynamic and static features that the sandbox captured during execution. + +Here's an example of running capa against a packed file, and then running capa against the CAPE report generated for the same packed file: ```yaml $ capa 05be49819139a3fdcdbddbdefd298398779521f3d68daa25275cc77508e42310.exe diff --git a/capa/features/common.py b/capa/features/common.py index 7c9f382b8..18c5b9e58 100644 --- a/capa/features/common.py +++ b/capa/features/common.py @@ -462,6 +462,7 @@ def evaluate(self, features: "capa.engine.FeatureSet", short_circuit=True): FORMAT_SC64 = "sc64" FORMAT_CAPE = "cape" FORMAT_DRAKVUF = "drakvuf" +FORMAT_VMRAY = "vmray" FORMAT_FREEZE = "freeze" FORMAT_RESULT = "result" STATIC_FORMATS = { @@ -476,6 +477,7 @@ def evaluate(self, features: "capa.engine.FeatureSet", short_circuit=True): DYNAMIC_FORMATS = { FORMAT_CAPE, FORMAT_DRAKVUF, + FORMAT_VMRAY, FORMAT_FREEZE, FORMAT_RESULT, } diff --git a/capa/features/extractors/vmray/__init__.py b/capa/features/extractors/vmray/__init__.py new file mode 100644 index 000000000..06d581cc9 --- /dev/null +++ b/capa/features/extractors/vmray/__init__.py @@ -0,0 +1,161 @@ +# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. +import logging +from typing import Dict, List, Tuple, Optional +from pathlib import Path +from zipfile import ZipFile +from collections import defaultdict + +from capa.exceptions import UnsupportedFormatError +from capa.features.extractors.vmray.models import File, Flog, SummaryV2, StaticData, FunctionCall, xml_to_dict + +logger = logging.getLogger(__name__) + +DEFAULT_ARCHIVE_PASSWORD = b"infected" + +SUPPORTED_FLOG_VERSIONS = ("2",) + + +class VMRayAnalysis: + def __init__(self, zipfile_path: Path): + self.zipfile = ZipFile(zipfile_path, "r") + + # summary_v2.json is the entry point to the entire VMRay archive and + # we use its data to find everything else that we need for capa + self.sv2 = SummaryV2.model_validate_json( + self.zipfile.read("logs/summary_v2.json", pwd=DEFAULT_ARCHIVE_PASSWORD) + ) + self.file_type: str = self.sv2.analysis_metadata.sample_type + + # flog.xml contains all of the call information that VMRay captured during execution + flog_xml = self.zipfile.read("logs/flog.xml", pwd=DEFAULT_ARCHIVE_PASSWORD) + flog_dict = xml_to_dict(flog_xml) + self.flog = Flog.model_validate(flog_dict) + + if self.flog.analysis.log_version not in SUPPORTED_FLOG_VERSIONS: + raise UnsupportedFormatError( + "VMRay feature extractor does not support flog version %s" % self.flog.analysis.log_version + ) + + self.exports: Dict[int, str] = {} + self.imports: Dict[int, Tuple[str, str]] = {} + self.sections: Dict[int, str] = {} + self.process_ids: Dict[int, int] = {} + self.process_threads: Dict[int, List[int]] = defaultdict(list) + self.process_calls: Dict[int, Dict[int, List[FunctionCall]]] = defaultdict(lambda: defaultdict(list)) + self.base_address: int + + self.sample_file_name: Optional[str] = None + self.sample_file_analysis: Optional[File] = None + self.sample_file_static_data: Optional[StaticData] = None + + self._find_sample_file() + + # VMRay analysis archives in various shapes and sizes and file type does not definitively tell us what data + # we can expect to find in the archive, so to be explicit we check for the various pieces that we need at + # minimum to run capa analysis + if self.sample_file_name is None or self.sample_file_analysis is None: + raise UnsupportedFormatError("VMRay archive does not contain sample file (file_type: %s)" % self.file_type) + + if not self.sample_file_static_data: + raise UnsupportedFormatError("VMRay archive does not contain static data (file_type: %s)" % self.file_type) + + if not self.sample_file_static_data.pe and not self.sample_file_static_data.elf: + raise UnsupportedFormatError( + "VMRay feature extractor only supports PE and ELF at this time (file_type: %s)" % self.file_type + ) + + # VMRay does not store static strings for the sample file so we must use the source file + # stored in the archive + sample_sha256: str = self.sample_file_analysis.hash_values.sha256.lower() + sample_file_path: str = f"internal/static_analyses/{sample_sha256}/objects/files/{sample_sha256}" + + logger.debug("file_type: %s, file_path: %s", self.file_type, sample_file_path) + + self.sample_file_buf: bytes = self.zipfile.read(sample_file_path, pwd=DEFAULT_ARCHIVE_PASSWORD) + + self._compute_base_address() + self._compute_imports() + self._compute_exports() + self._compute_sections() + self._compute_process_ids() + self._compute_process_threads() + self._compute_process_calls() + + def _find_sample_file(self): + for file_name, file_analysis in self.sv2.files.items(): + if file_analysis.is_sample: + # target the sample submitted for analysis + self.sample_file_name = file_name + self.sample_file_analysis = file_analysis + + if file_analysis.ref_static_data: + # like "path": ["static_data","static_data_0"] where "static_data_0" is the summary_v2 static data + # key for the file's static data + self.sample_file_static_data = self.sv2.static_data[file_analysis.ref_static_data.path[1]] + + break + + def _compute_base_address(self): + assert self.sample_file_static_data is not None + if self.sample_file_static_data.pe: + self.base_address = self.sample_file_static_data.pe.basic_info.image_base + + def _compute_exports(self): + assert self.sample_file_static_data is not None + if self.sample_file_static_data.pe: + for export in self.sample_file_static_data.pe.exports: + self.exports[export.address] = export.api.name + + def _compute_imports(self): + assert self.sample_file_static_data is not None + if self.sample_file_static_data.pe: + for module in self.sample_file_static_data.pe.imports: + for api in module.apis: + self.imports[api.address] = (module.dll, api.api.name) + + def _compute_sections(self): + assert self.sample_file_static_data is not None + if self.sample_file_static_data.pe: + for pefile_section in self.sample_file_static_data.pe.sections: + self.sections[pefile_section.virtual_address] = pefile_section.name + elif self.sample_file_static_data.elf: + for elffile_section in self.sample_file_static_data.elf.sections: + self.sections[elffile_section.header.sh_addr] = elffile_section.header.sh_name + + def _compute_process_ids(self): + for process in self.sv2.processes.values(): + # we expect VMRay's monitor IDs to be unique, but OS PIDs may be reused + assert process.monitor_id not in self.process_ids.keys() + self.process_ids[process.monitor_id] = process.os_pid + + def _compute_process_threads(self): + # logs/flog.xml appears to be the only file that contains thread-related data + # so we use it here to map processes to threads + for function_call in self.flog.analysis.function_calls: + pid: int = self.get_process_os_pid(function_call.process_id) # flog.xml uses process monitor ID, not OS PID + tid: int = function_call.thread_id + + assert isinstance(pid, int) + assert isinstance(tid, int) + + if tid not in self.process_threads[pid]: + self.process_threads[pid].append(tid) + + def _compute_process_calls(self): + for function_call in self.flog.analysis.function_calls: + pid: int = self.get_process_os_pid(function_call.process_id) # flog.xml uses process monitor ID, not OS PID + tid: int = function_call.thread_id + + assert isinstance(pid, int) + assert isinstance(tid, int) + + self.process_calls[pid][tid].append(function_call) + + def get_process_os_pid(self, monitor_id: int) -> int: + return self.process_ids[monitor_id] diff --git a/capa/features/extractors/vmray/call.py b/capa/features/extractors/vmray/call.py new file mode 100644 index 000000000..436b4bebb --- /dev/null +++ b/capa/features/extractors/vmray/call.py @@ -0,0 +1,53 @@ +# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. +import logging +from typing import Tuple, Iterator + +from capa.features.insn import API, Number +from capa.features.common import String, Feature +from capa.features.address import Address +from capa.features.extractors.vmray.models import PARAM_TYPE_INT, PARAM_TYPE_STR, Param, FunctionCall, hexint +from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle + +logger = logging.getLogger(__name__) + + +def get_call_param_features(param: Param, ch: CallHandle) -> Iterator[Tuple[Feature, Address]]: + if param.deref is not None: + # pointer types contain a special "deref" member that stores the deref'd value + # so we check for this first and ignore Param.value as this always contains the + # deref'd pointer value + if param.deref.value is not None: + if param.deref.type_ in PARAM_TYPE_INT: + yield Number(hexint(param.deref.value)), ch.address + elif param.deref.type_ in PARAM_TYPE_STR: + yield String(param.deref.value), ch.address + else: + logger.debug("skipping deref param type %s", param.deref.type_) + elif param.value is not None: + if param.type_ in PARAM_TYPE_INT: + yield Number(hexint(param.value)), ch.address + + +def extract_call_features(ph: ProcessHandle, th: ThreadHandle, ch: CallHandle) -> Iterator[Tuple[Feature, Address]]: + call: FunctionCall = ch.inner + + if call.params_in: + for param in call.params_in.params: + yield from get_call_param_features(param, ch) + + yield API(call.name), ch.address + + +def extract_features(ph: ProcessHandle, th: ThreadHandle, ch: CallHandle) -> Iterator[Tuple[Feature, Address]]: + for handler in CALL_HANDLERS: + for feature, addr in handler(ph, th, ch): + yield feature, addr + + +CALL_HANDLERS = (extract_call_features,) diff --git a/capa/features/extractors/vmray/extractor.py b/capa/features/extractors/vmray/extractor.py new file mode 100644 index 000000000..735c646b9 --- /dev/null +++ b/capa/features/extractors/vmray/extractor.py @@ -0,0 +1,122 @@ +# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + + +from typing import List, Tuple, Iterator +from pathlib import Path + +import capa.helpers +import capa.features.extractors.vmray.call +import capa.features.extractors.vmray.file +import capa.features.extractors.vmray.global_ +from capa.features.common import Feature, Characteristic +from capa.features.address import NO_ADDRESS, Address, ThreadAddress, DynamicCallAddress, AbsoluteVirtualAddress +from capa.features.extractors.vmray import VMRayAnalysis +from capa.features.extractors.vmray.models import PARAM_TYPE_STR, Process, ParamList, FunctionCall +from capa.features.extractors.base_extractor import ( + CallHandle, + SampleHashes, + ThreadHandle, + ProcessHandle, + DynamicFeatureExtractor, +) + + +def get_formatted_params(params: ParamList) -> List[str]: + params_list: List[str] = [] + + for param in params: + if param.deref and param.deref.value is not None: + deref_value: str = f'"{param.deref.value}"' if param.deref.type_ in PARAM_TYPE_STR else param.deref.value + params_list.append(f"{param.name}: {deref_value}") + else: + value: str = "" if param.value is None else param.value + params_list.append(f"{param.name}: {value}") + + return params_list + + +class VMRayExtractor(DynamicFeatureExtractor): + def __init__(self, analysis: VMRayAnalysis): + assert analysis.sample_file_analysis is not None + + super().__init__( + hashes=SampleHashes( + md5=analysis.sample_file_analysis.hash_values.md5.lower(), + sha1=analysis.sample_file_analysis.hash_values.sha1.lower(), + sha256=analysis.sample_file_analysis.hash_values.sha256.lower(), + ) + ) + + self.analysis = analysis + + # pre-compute these because we'll yield them at *every* scope. + self.global_features = list(capa.features.extractors.vmray.global_.extract_features(self.analysis)) + + def get_base_address(self) -> Address: + # value according to the PE header, the actual trace may use a different imagebase + return AbsoluteVirtualAddress(self.analysis.base_address) + + def extract_file_features(self) -> Iterator[Tuple[Feature, Address]]: + yield from capa.features.extractors.vmray.file.extract_features(self.analysis) + + def extract_global_features(self) -> Iterator[Tuple[Feature, Address]]: + yield from self.global_features + + def get_processes(self) -> Iterator[ProcessHandle]: + yield from capa.features.extractors.vmray.file.get_processes(self.analysis) + + def extract_process_features(self, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]: + # we have not identified process-specific features for VMRay yet + yield from [] + + def get_process_name(self, ph) -> str: + process: Process = ph.inner + return process.image_name + + def get_threads(self, ph: ProcessHandle) -> Iterator[ThreadHandle]: + for thread in self.analysis.process_threads[ph.address.pid]: + address: ThreadAddress = ThreadAddress(process=ph.address, tid=thread) + yield ThreadHandle(address=address, inner={}) + + def extract_thread_features(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]: + if False: + # force this routine to be a generator, + # but we don't actually have any elements to generate. + yield Characteristic("never"), NO_ADDRESS + return + + def get_calls(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[CallHandle]: + for function_call in self.analysis.process_calls[ph.address.pid][th.address.tid]: + addr = DynamicCallAddress(thread=th.address, id=function_call.fncall_id) + yield CallHandle(address=addr, inner=function_call) + + def extract_call_features( + self, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle + ) -> Iterator[Tuple[Feature, Address]]: + yield from capa.features.extractors.vmray.call.extract_features(ph, th, ch) + + def get_call_name(self, ph, th, ch) -> str: + call: FunctionCall = ch.inner + call_formatted: str = call.name + + # format input parameters + if call.params_in: + call_formatted += f"({', '.join(get_formatted_params(call.params_in.params))})" + else: + call_formatted += "()" + + # format output parameters + if call.params_out: + call_formatted += f" -> {', '.join(get_formatted_params(call.params_out.params))}" + + return call_formatted + + @classmethod + def from_zipfile(cls, zipfile_path: Path): + return cls(VMRayAnalysis(zipfile_path)) diff --git a/capa/features/extractors/vmray/file.py b/capa/features/extractors/vmray/file.py new file mode 100644 index 000000000..38ac9db01 --- /dev/null +++ b/capa/features/extractors/vmray/file.py @@ -0,0 +1,101 @@ +# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. +import logging +from typing import Dict, Tuple, Iterator + +import capa.features.extractors.common +from capa.features.file import Export, Import, Section +from capa.features.common import String, Feature +from capa.features.address import NO_ADDRESS, Address, ProcessAddress, AbsoluteVirtualAddress +from capa.features.extractors.vmray import VMRayAnalysis +from capa.features.extractors.helpers import generate_symbols +from capa.features.extractors.vmray.models import Process +from capa.features.extractors.base_extractor import ProcessHandle + +logger = logging.getLogger(__name__) + + +def get_processes(analysis: VMRayAnalysis) -> Iterator[ProcessHandle]: + processes: Dict[str, Process] = analysis.sv2.processes + + for process in processes.values(): + # we map VMRay's monitor ID to the OS PID to make it easier for users + # to follow the processes in capa's output + pid: int = analysis.get_process_os_pid(process.monitor_id) + ppid: int = ( + analysis.get_process_os_pid(processes[process.ref_parent_process.path[1]].monitor_id) + if process.ref_parent_process + else 0 + ) + + addr: ProcessAddress = ProcessAddress(pid=pid, ppid=ppid) + yield ProcessHandle(address=addr, inner=process) + + +def extract_export_names(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]: + for addr, name in analysis.exports.items(): + yield Export(name), AbsoluteVirtualAddress(addr) + + +def extract_import_names(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]: + for addr, (module, api) in analysis.imports.items(): + for symbol in generate_symbols(module, api, include_dll=True): + yield Import(symbol), AbsoluteVirtualAddress(addr) + + +def extract_section_names(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]: + for addr, name in analysis.sections.items(): + yield Section(name), AbsoluteVirtualAddress(addr) + + +def extract_referenced_filenames(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]: + for filename in analysis.sv2.filenames.values(): + yield String(filename.filename), NO_ADDRESS + + +def extract_referenced_mutex_names(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]: + for mutex in analysis.sv2.mutexes.values(): + yield String(mutex.name), NO_ADDRESS + + +def extract_referenced_domain_names(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]: + for domain in analysis.sv2.domains.values(): + yield String(domain.domain), NO_ADDRESS + + +def extract_referenced_ip_addresses(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]: + for ip_address in analysis.sv2.ip_addresses.values(): + yield String(ip_address.ip_address), NO_ADDRESS + + +def extract_referenced_registry_key_names(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]: + for registry_record in analysis.sv2.registry_records.values(): + yield String(registry_record.reg_key_name), NO_ADDRESS + + +def extract_file_strings(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]: + yield from capa.features.extractors.common.extract_file_strings(analysis.sample_file_buf) + + +def extract_features(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]: + for handler in FILE_HANDLERS: + for feature, addr in handler(analysis): + yield feature, addr + + +FILE_HANDLERS = ( + extract_import_names, + extract_export_names, + extract_section_names, + extract_referenced_filenames, + extract_referenced_mutex_names, + extract_referenced_domain_names, + extract_referenced_ip_addresses, + extract_referenced_registry_key_names, + extract_file_strings, +) diff --git a/capa/features/extractors/vmray/global_.py b/capa/features/extractors/vmray/global_.py new file mode 100644 index 000000000..a42ce511e --- /dev/null +++ b/capa/features/extractors/vmray/global_.py @@ -0,0 +1,72 @@ +# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + +import logging +from typing import Tuple, Iterator + +from capa.features.common import ( + OS, + OS_LINUX, + ARCH_I386, + FORMAT_PE, + ARCH_AMD64, + FORMAT_ELF, + OS_WINDOWS, + Arch, + Format, + Feature, +) +from capa.features.address import NO_ADDRESS, Address +from capa.features.extractors.vmray import VMRayAnalysis + +logger = logging.getLogger(__name__) + + +def extract_arch(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]: + file_type: str = analysis.file_type + + if "x86-32" in file_type: + yield Arch(ARCH_I386), NO_ADDRESS + elif "x86-64" in file_type: + yield Arch(ARCH_AMD64), NO_ADDRESS + else: + raise ValueError("unrecognized arch from the VMRay report: %s" % file_type) + + +def extract_format(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]: + assert analysis.sample_file_static_data is not None + if analysis.sample_file_static_data.pe: + yield Format(FORMAT_PE), NO_ADDRESS + elif analysis.sample_file_static_data.elf: + yield Format(FORMAT_ELF), NO_ADDRESS + else: + raise ValueError("unrecognized file format from the VMRay report: %s" % analysis.file_type) + + +def extract_os(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]: + file_type: str = analysis.file_type + + if "windows" in file_type.lower(): + yield OS(OS_WINDOWS), NO_ADDRESS + elif "linux" in file_type.lower(): + yield OS(OS_LINUX), NO_ADDRESS + else: + raise ValueError("unrecognized OS from the VMRay report: %s" % file_type) + + +def extract_features(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]: + for global_handler in GLOBAL_HANDLER: + for feature, addr in global_handler(analysis): + yield feature, addr + + +GLOBAL_HANDLER = ( + extract_format, + extract_os, + extract_arch, +) diff --git a/capa/features/extractors/vmray/models.py b/capa/features/extractors/vmray/models.py new file mode 100644 index 000000000..a599dc420 --- /dev/null +++ b/capa/features/extractors/vmray/models.py @@ -0,0 +1,334 @@ +# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + +from typing import Dict, List, Union, Optional + +import xmltodict +from pydantic import Field, BaseModel +from typing_extensions import Annotated +from pydantic.functional_validators import BeforeValidator + +""" +# possible param types, included for documentation +PARAM_TYPE = ( + "signed_8bit", + "unsigned_8bit", + "signed_16bit", + "unsigned_16bit", + "signed_32bit", + "unsigned_32bit", + "signed_64bit", + "unsigned_64bit", + "double", + "void_ptr", + "bool", + "unknown", + "ptr", + "void", + "str", + "array", + "container", + "bindata", + "undefined_type", +) +""" + +PARAM_TYPE_PTR = ("void_ptr", "ptr") +PARAM_TYPE_STR = ("str",) +PARAM_TYPE_INT = ( + "signed_8bit", + "unsigned_8bit", + "signed_16bit", + "unsigned_16bit", + "signed_32bit", + "unsigned_32bit", + "signed_64bit", + "unsigned_64bit", + "double", + "bool", + "unknown", +) + + +def xml_to_dict(xml): + return xmltodict.parse(xml, attr_prefix="") + + +def hexint(value: Union[str, int]) -> int: + if isinstance(value, str): + return int(value, 16) if value.startswith("0x") else int(value, 10) + else: + return value + + +def validate_hex_int(value: Union[str, int]) -> int: + return hexint(value) + + +# convert the input value to a Python int type before inner validation (int) is called +HexInt = Annotated[int, BeforeValidator(validate_hex_int)] + + +# models flog.xml file, certain fields left as comments for documentation purposes +class ParamDeref(BaseModel): + type_: str = Field(alias="type") + value: Optional[str] = None + + +class Param(BaseModel): + name: str + type_: str = Field(alias="type") + value: Optional[str] = None + deref: Optional[ParamDeref] = None + + +def validate_param_list(value: Union[List[Param], Param]) -> List[Param]: + if isinstance(value, list): + return value + else: + return [value] + + +# params may be stored as a list of Param or a single Param so we convert +# the input value to Python list type before the inner validation (List[Param]) +# is called +ParamList = Annotated[List[Param], BeforeValidator(validate_param_list)] + + +class Params(BaseModel): + params: ParamList = Field(alias="param") + + +def validate_call_name(value: str) -> str: + if value.startswith("sys_"): + # VMRay appears to log kernel function calls ("sys_*") for Linux so we remove that + # here to enable capa matching + return value[4:] + else: + return value + + +# function call names may need to be reformatted to remove data, etc. so we reformat +# before calling the inner validation (str) +CallName = Annotated[str, BeforeValidator(validate_call_name)] + + +class FunctionCall(BaseModel): + # ts: HexInt + fncall_id: HexInt + process_id: HexInt + thread_id: HexInt + name: CallName + # addr: HexInt + # from_addr: HexInt = Field(alias="from") + params_in: Optional[Params] = Field(alias="in", default=None) + params_out: Optional[Params] = Field(alias="out", default=None) + + +class FunctionReturn(BaseModel): + ts: HexInt + fncall_id: HexInt + addr: HexInt + from_addr: HexInt = Field(alias="from") + + +class Analysis(BaseModel): + log_version: str # tested 2 + analyzer_version: str # tested 2024.2.1 + # analysis_date: str + + function_calls: List[FunctionCall] = Field(alias="fncall", default=[]) + # function_returns: List[FunctionReturn] = Field(alias="fnret", default=[]) + + +class Flog(BaseModel): + analysis: Analysis + + +# models for summary_v2.json file, certain fields left as comments for documentation purposes +class GenericReference(BaseModel): + path: List[str] + source: str + + +class StaticDataReference(GenericReference): ... + + +class PEFileBasicInfo(BaseModel): + # compile_time: str + # file_type: str + image_base: int + # machine_type: str + # size_of_code: int + # size_of_initialized_data: int + # size_of_uninitialized_data: int + # subsystem: str + # entry_point: int + # imphash: Optional[str] = None + + +class API(BaseModel): + name: str + ordinal: Optional[int] = None + + +class PEFileExport(BaseModel): + address: int + api: API + + +class PEFileImport(BaseModel): + address: int + api: API + # thunk_offset: int + # hint: Optional[int] = None + # thunk_rva: int + + +class PEFileImportModule(BaseModel): + dll: str + apis: List[PEFileImport] + + +class PEFileSection(BaseModel): + # entropy: float + # flags: List[str] = [] + name: str + # raw_data_offset: int + # raw_data_size: int + virtual_address: int + # virtual_size: int + + +class PEFile(BaseModel): + basic_info: PEFileBasicInfo + exports: List[PEFileExport] = [] + imports: List[PEFileImportModule] = [] + sections: List[PEFileSection] = [] + + +class ElfFileSectionHeader(BaseModel): + sh_name: str + sh_addr: int + + +class ElfFileSection(BaseModel): + header: ElfFileSectionHeader + + +""" +class ElfFileHeader(BaseModel): + file_class: str + endianness: str + file_type: str + architecture: str + architecture_human_str: str + entry_point: int +""" + + +class ElfFile(BaseModel): + # file_header: ElfFileHeader + sections: List[ElfFileSection] + + +class StaticData(BaseModel): + pe: Optional[PEFile] = None + elf: Optional[ElfFile] = None + + +class FileHashes(BaseModel): + md5: str + sha1: str + sha256: str + # ssdeep: str + + +class File(BaseModel): + # categories: List[str] + hash_values: FileHashes + # is_artifact: bool + # is_ioc: bool + is_sample: bool + # size: int + # is_truncated: bool + # mime_type: Optional[str] = None + # operations: List[str] = [] + # ref_filenames: List[GenericReference] = [] + # ref_gfncalls: List[GenericReference] = [] + ref_static_data: Optional[StaticDataReference] = None + # ref_vti_matches: List[GenericReference] = [] + # verdict: str + + +class Process(BaseModel): + # bitness: int + # is_artifact: bool + # is_ioc: bool + monitor_id: int + # monitor_reason: str + os_pid: int + filename: str + image_name: str + ref_parent_process: Optional[GenericReference] = None + + +class Filename(BaseModel): + filename: str + # is_artifact: bool + # is_ioc: bool + # verdict: str + + +class Mutex(BaseModel): + name: str + # is_artifact: bool + # is_ioc: bool + # verdict: str + + +class Registry(BaseModel): + reg_key_name: str + # reg_key_value_type: Optional[str] = None + # is_artifact: bool + # is_ioc: bool + # verdict: str + + +class Domain(BaseModel): + domain: str + # is_artifact: bool + # is_ioc: bool + # verdict: str + + +class IPAddress(BaseModel): + ip_address: str + # is_artifact: bool + # is_ioc: bool + # verdict: str + + +class AnalysisMetadata(BaseModel): + sample_type: str + submission_filename: str + + +class SummaryV2(BaseModel): + analysis_metadata: AnalysisMetadata + + static_data: Dict[str, StaticData] = {} + + # recorded artifacts + files: Dict[str, File] = {} + processes: Dict[str, Process] = {} + filenames: Dict[str, Filename] = {} + mutexes: Dict[str, Mutex] = {} + domains: Dict[str, Domain] = {} + ip_addresses: Dict[str, IPAddress] = {} + registry_records: Dict[str, Registry] = {} diff --git a/capa/helpers.py b/capa/helpers.py index ef8e94c62..86f4db694 100644 --- a/capa/helpers.py +++ b/capa/helpers.py @@ -12,8 +12,9 @@ import logging import contextlib import importlib.util -from typing import Dict, Union, BinaryIO, Iterator, NoReturn +from typing import Dict, List, Union, BinaryIO, Iterator, NoReturn from pathlib import Path +from zipfile import ZipFile from datetime import datetime import tqdm @@ -25,6 +26,7 @@ FORMAT_CAPE, FORMAT_SC32, FORMAT_SC64, + FORMAT_VMRAY, FORMAT_DOTNET, FORMAT_FREEZE, FORMAT_DRAKVUF, @@ -34,9 +36,10 @@ EXTENSIONS_SHELLCODE_32 = ("sc32", "raw32") EXTENSIONS_SHELLCODE_64 = ("sc64", "raw64") -# CAPE extensions: .json, .json_, .json.gz -# DRAKVUF Sandbox extensions: .log, .log.gz -EXTENSIONS_DYNAMIC = ("json", "json_", "json.gz", "log", ".log.gz") +# CAPE (.json, .json_, .json.gz) +# DRAKVUF (.log, .log.gz) +# VMRay (.zip) +EXTENSIONS_DYNAMIC = ("json", "json_", "json.gz", "log", ".log.gz", ".zip") EXTENSIONS_ELF = "elf_" EXTENSIONS_FREEZE = "frz" @@ -125,16 +128,20 @@ def get_format_from_report(sample: Path) -> str: line = load_one_jsonl_from_path(sample) if "Plugin" in line: return FORMAT_DRAKVUF - return FORMAT_UNKNOWN - - report = load_json_from_path(sample) - if "CAPE" in report: - return FORMAT_CAPE - - if "target" in report and "info" in report and "behavior" in report: - # CAPE report that's missing the "CAPE" key, - # which is not going to be much use, but its correct. - return FORMAT_CAPE + elif sample.name.endswith(".zip"): + with ZipFile(sample, "r") as zipfile: + namelist: List[str] = zipfile.namelist() + if "logs/summary_v2.json" in namelist and "logs/flog.xml" in namelist: + # assume VMRay zipfile at a minimum has these files + return FORMAT_VMRAY + elif sample.name.endswith(("json", "json_", "json.gz")): + report = load_json_from_path(sample) + if "CAPE" in report: + return FORMAT_CAPE + if "target" in report and "info" in report and "behavior" in report: + # CAPE report that's missing the "CAPE" key, + # which is not going to be much use, but its correct. + return FORMAT_CAPE return FORMAT_UNKNOWN @@ -244,6 +251,17 @@ def log_unsupported_drakvuf_report_error(error: str): logger.error("-" * 80) +def log_unsupported_vmray_report_error(error: str): + logger.error("-" * 80) + logger.error(" Input file is not a valid VMRay analysis archive: %s", error) + logger.error(" ") + logger.error( + " capa only supports analyzing VMRay dynamic analysis archives containing summary_v2.json and flog.xml log files." + ) + logger.error(" Please make sure you have downloaded a dynamic analysis archive from VMRay.") + logger.error("-" * 80) + + def log_empty_sandbox_report_error(error: str, sandbox_name: str): logger.error("-" * 80) logger.error(" %s report is empty or only contains little useful data: %s", sandbox_name, error) diff --git a/capa/loader.py b/capa/loader.py index bc69ffb3a..949308c5e 100644 --- a/capa/loader.py +++ b/capa/loader.py @@ -44,6 +44,7 @@ FORMAT_CAPE, FORMAT_SC32, FORMAT_SC64, + FORMAT_VMRAY, FORMAT_DOTNET, FORMAT_DRAKVUF, ) @@ -63,6 +64,7 @@ BACKEND_PEFILE = "pefile" BACKEND_CAPE = "cape" BACKEND_DRAKVUF = "drakvuf" +BACKEND_VMRAY = "vmray" BACKEND_FREEZE = "freeze" @@ -218,6 +220,11 @@ def get_extractor( report = capa.helpers.load_jsonl_from_path(input_path) return capa.features.extractors.drakvuf.extractor.DrakvufExtractor.from_report(report) + elif backend == BACKEND_VMRAY: + import capa.features.extractors.vmray.extractor + + return capa.features.extractors.vmray.extractor.VMRayExtractor.from_zipfile(input_path) + elif backend == BACKEND_DOTNET: import capa.features.extractors.dnfile.extractor @@ -342,6 +349,11 @@ def get_file_extractors(input_file: Path, input_format: str) -> List[FeatureExtr report = capa.helpers.load_jsonl_from_path(input_file) file_extractors.append(capa.features.extractors.drakvuf.extractor.DrakvufExtractor.from_report(report)) + elif input_format == FORMAT_VMRAY: + import capa.features.extractors.vmray.extractor + + file_extractors.append(capa.features.extractors.vmray.extractor.VMRayExtractor.from_zipfile(input_file)) + return file_extractors diff --git a/capa/main.py b/capa/main.py index f9e0ce249..f6dc0f522 100644 --- a/capa/main.py +++ b/capa/main.py @@ -46,6 +46,7 @@ BACKEND_VIV, BACKEND_CAPE, BACKEND_BINJA, + BACKEND_VMRAY, BACKEND_DOTNET, BACKEND_FREEZE, BACKEND_PEFILE, @@ -59,6 +60,7 @@ log_unsupported_format_error, log_empty_sandbox_report_error, log_unsupported_cape_report_error, + log_unsupported_vmray_report_error, log_unsupported_drakvuf_report_error, ) from capa.exceptions import ( @@ -80,6 +82,7 @@ FORMAT_CAPE, FORMAT_SC32, FORMAT_SC64, + FORMAT_VMRAY, FORMAT_DOTNET, FORMAT_FREEZE, FORMAT_RESULT, @@ -259,6 +262,7 @@ def install_common_args(parser, wanted=None): (FORMAT_SC64, "64-bit shellcode"), (FORMAT_CAPE, "CAPE sandbox report"), (FORMAT_DRAKVUF, "DRAKVUF sandbox report"), + (FORMAT_VMRAY, "VMRay sandbox report"), (FORMAT_FREEZE, "features previously frozen by capa"), ] format_help = ", ".join([f"{f[0]}: {f[1]}" for f in formats]) @@ -281,6 +285,7 @@ def install_common_args(parser, wanted=None): (BACKEND_FREEZE, "capa freeze"), (BACKEND_CAPE, "CAPE"), (BACKEND_DRAKVUF, "DRAKVUF"), + (BACKEND_VMRAY, "VMRay"), ] backend_help = ", ".join([f"{f[0]}: {f[1]}" for f in backends]) parser.add_argument( @@ -552,6 +557,9 @@ def get_backend_from_cli(args, input_format: str) -> str: if input_format == FORMAT_DRAKVUF: return BACKEND_DRAKVUF + elif input_format == FORMAT_VMRAY: + return BACKEND_VMRAY + elif input_format == FORMAT_DOTNET: return BACKEND_DOTNET @@ -576,7 +584,7 @@ def get_sample_path_from_cli(args, backend: str) -> Optional[Path]: raises: ShouldExitError: if the program is invoked incorrectly and should exit. """ - if backend in (BACKEND_CAPE, BACKEND_DRAKVUF): + if backend in (BACKEND_CAPE, BACKEND_DRAKVUF, BACKEND_VMRAY): return None else: return args.input_file @@ -690,6 +698,8 @@ def get_file_extractors_from_cli(args, input_format: str) -> List[FeatureExtract log_unsupported_cape_report_error(str(e)) elif input_format == FORMAT_DRAKVUF: log_unsupported_drakvuf_report_error(str(e)) + elif input_format == FORMAT_VMRAY: + log_unsupported_vmray_report_error(str(e)) else: log_unsupported_format_error() raise ShouldExitError(E_INVALID_FILE_TYPE) from e @@ -809,6 +819,8 @@ def get_extractor_from_cli(args, input_format: str, backend: str) -> FeatureExtr log_unsupported_cape_report_error(str(e)) elif input_format == FORMAT_DRAKVUF: log_unsupported_drakvuf_report_error(str(e)) + elif input_format == FORMAT_VMRAY: + log_unsupported_vmray_report_error(str(e)) else: log_unsupported_format_error() raise ShouldExitError(E_INVALID_FILE_TYPE) from e diff --git a/capa/render/result_document.py b/capa/render/result_document.py index 975e37431..ab6b03979 100644 --- a/capa/render/result_document.py +++ b/capa/render/result_document.py @@ -22,7 +22,7 @@ import capa.features.freeze.features as frzf from capa.rules import RuleSet from capa.engine import MatchResults -from capa.helpers import assert_never +from capa.helpers import assert_never, load_json_from_path class FrozenModel(BaseModel): @@ -668,4 +668,5 @@ def to_capa(self) -> Tuple[Metadata, Dict]: @classmethod def from_file(cls, path: Path) -> "ResultDocument": - return cls.model_validate_json(path.read_text(encoding="utf-8")) + report = load_json_from_path(path) + return cls.model_validate(report) diff --git a/pyproject.toml b/pyproject.toml index 67c7d28d8..e7218a772 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -80,6 +80,7 @@ dependencies = [ "humanize>=4", "protobuf>=5", "msgspec>=0.18.6", + "xmltodict>=0.13.0", # --------------------------------------- # Dependencies that we develop diff --git a/requirements.txt b/requirements.txt index 3b5122ab6..5067f2e04 100644 --- a/requirements.txt +++ b/requirements.txt @@ -28,6 +28,7 @@ pyasn1-modules==0.2.8 pycparser==2.22 pydantic==2.7.3 pydantic-core==2.18.4 +xmltodict==0.13.0 pyelftools==0.31 pygments==2.18.0 python-flirt==0.8.10 diff --git a/scripts/capa_as_library.py b/scripts/capa-as-library.py similarity index 100% rename from scripts/capa_as_library.py rename to scripts/capa-as-library.py diff --git a/scripts/minimize_vmray_results.py b/scripts/minimize_vmray_results.py new file mode 100644 index 000000000..15ab81c26 --- /dev/null +++ b/scripts/minimize_vmray_results.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python +""" +Copyright (C) 2024 Mandiant, Inc. All Rights Reserved. +Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. +You may obtain a copy of the License at: [package root]/LICENSE.txt +Unless required by applicable law or agreed to in writing, software distributed under the License + is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and limitations under the License. + +Extract files relevant to capa analysis from VMRay Analysis Archive and create a new ZIP file. +""" +import sys +import logging +import zipfile +import argparse +from pathlib import Path + +from capa.features.extractors.vmray import DEFAULT_ARCHIVE_PASSWORD, VMRayAnalysis + +logger = logging.getLogger(__name__) + + +def main(argv=None): + if argv is None: + argv = sys.argv[1:] + + parser = argparse.ArgumentParser( + description="Minimize VMRay Analysis Archive to ZIP file only containing relevant files" + ) + parser.add_argument( + "analysis_archive", + type=Path, + help="path to VMRay Analysis Archive downloaded from Dynamic Analysis Report page", + ) + parser.add_argument( + "-p", "--password", type=str, default="infected", help="password used to unzip and zip protected archives" + ) + args = parser.parse_args(args=argv) + + analysis_archive = args.analysis_archive + + vmra = VMRayAnalysis(analysis_archive) + sv2_json = vmra.zipfile.read("logs/summary_v2.json", pwd=DEFAULT_ARCHIVE_PASSWORD) + flog_xml = vmra.zipfile.read("logs/flog.xml", pwd=DEFAULT_ARCHIVE_PASSWORD) + sample_file_buf = vmra.sample_file_buf + assert vmra.sample_file_analysis is not None + sample_sha256: str = vmra.sample_file_analysis.hash_values.sha256.lower() + + new_zip_name = f"{analysis_archive.parent / analysis_archive.stem}_min.zip" + with zipfile.ZipFile(new_zip_name, "w") as new_zip: + new_zip.writestr("logs/summary_v2.json", sv2_json) + new_zip.writestr("logs/flog.xml", flog_xml) + new_zip.writestr(f"internal/static_analyses/{sample_sha256}/objects/files/{sample_sha256}", sample_file_buf) + new_zip.setpassword(args.password.encode("ascii")) + + # ensure capa loads the minimized archive + assert isinstance(VMRayAnalysis(Path(new_zip_name)), VMRayAnalysis) + + print(f"Created minimized VMRay archive '{new_zip_name}' with password '{args.password}'.") + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/show-features.py b/scripts/show-features.py index 188aa974b..6005a810c 100644 --- a/scripts/show-features.py +++ b/scripts/show-features.py @@ -229,40 +229,37 @@ def print_dynamic_features(processes, extractor: DynamicFeatureExtractor): for p in processes: print(f"proc: {extractor.get_process_name(p)} (ppid={p.address.ppid}, pid={p.address.pid})") - for feature, addr in extractor.extract_process_features(p): + for feature, _ in extractor.extract_process_features(p): if is_global_feature(feature): continue print(f" proc: {extractor.get_process_name(p)}: {feature}") - for t in extractor.get_threads(p): - print(f" thread: {t.address.tid}") - for feature, addr in extractor.extract_thread_features(p, t): - if is_global_feature(feature): - continue - - if feature != Feature(0): - print(f" {format_address(addr)}: {feature}") + for t in extractor.get_threads(p): + print(f" thread: {t.address.tid}") + for feature, addr in extractor.extract_thread_features(p, t): + if is_global_feature(feature): + continue - for call in extractor.get_calls(p, t): - apis = [] - arguments = [] - for feature, addr in extractor.extract_call_features(p, t, call): - if is_global_feature(feature): - continue + if feature != Feature(0): + print(f" {format_address(addr)}: {feature}") - if isinstance(feature, API): - assert isinstance(addr, capa.features.address.DynamicCallAddress) - apis.append((addr.id, str(feature.value))) + for call in extractor.get_calls(p, t): + apis = [] + arguments = [] + for feature, addr in extractor.extract_call_features(p, t, call): + if is_global_feature(feature): + continue - if isinstance(feature, (Number, String)): - arguments.append(str(feature.value)) + if isinstance(feature, API): + assert isinstance(addr, capa.features.address.DynamicCallAddress) + apis.append((addr.id, str(feature.value))) - if not apis: - print(f" arguments=[{', '.join(arguments)}]") + if isinstance(feature, (Number, String)): + arguments.append(str(feature.value)) - for cid, api in apis: - print(f" call {cid}: {api}({', '.join(arguments)})") + for cid, api in apis: + print(f" call {cid}: {api}({', '.join(arguments)})") def ida_main(): diff --git a/tests/fixtures.py b/tests/fixtures.py index 286eaaef8..41a656dd9 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -209,6 +209,13 @@ def get_drakvuf_extractor(path): return DrakvufExtractor.from_report(report) +@lru_cache(maxsize=1) +def get_vmray_extractor(path): + from capa.features.extractors.vmray.extractor import VMRayExtractor + + return VMRayExtractor.from_zipfile(path) + + @lru_cache(maxsize=1) def get_ghidra_extractor(path: Path): import capa.features.extractors.ghidra.extractor @@ -395,7 +402,7 @@ def get_data_path_by_name(name) -> Path: / "v2.2" / "d46900384c78863420fb3e297d0a2f743cd2b6b3f7f82bf64059a168e07aceb7.json.gz" ) - elif name.startswith("93b2d1"): + elif name.startswith("93b2d1-drakvuf"): return ( CD / "data" @@ -403,6 +410,14 @@ def get_data_path_by_name(name) -> Path: / "drakvuf" / "93b2d1840566f45fab674ebc79a9d19c88993bcb645e0357f3cb584d16e7c795.log.gz" ) + elif name.startswith("93b2d1-vmray"): + return ( + CD + / "data" + / "dynamic" + / "vmray" + / "93b2d1840566f45fab674ebc79a9d19c88993bcb645e0357f3cb584d16e7c795_min_archive.zip" + ) elif name.startswith("ea2876"): return CD / "data" / "ea2876e9175410b6f6719f80ee44b9553960758c7d0f7bed73c0fe9a78d8e669.dll_" elif name.startswith("1038a2"): @@ -1537,4 +1552,7 @@ def a076114_rd(): @pytest.fixture def dynamic_a0000a6_rd(): # python -m capa.main tests/data/dynamic/cape/v2.2/0000a65749f5902c4d82ffa701198038f0b4870b00a27cfca109f8f933476d82.json --json > tests/data/rd/0000a65749f5902c4d82ffa701198038f0b4870b00a27cfca109f8f933476d82.json - return get_result_doc(CD / "data" / "rd" / "0000a65749f5902c4d82ffa701198038f0b4870b00a27cfca109f8f933476d82.json") + # gzip tests/data/rd/0000a65749f5902c4d82ffa701198038f0b4870b00a27cfca109f8f933476d82.json + return get_result_doc( + CD / "data" / "rd" / "0000a65749f5902c4d82ffa701198038f0b4870b00a27cfca109f8f933476d82.json.gz" + ) diff --git a/tests/test_drakvuf_features.py b/tests/test_drakvuf_features.py index 79832fc34..61fe69442 100644 --- a/tests/test_drakvuf_features.py +++ b/tests/test_drakvuf_features.py @@ -15,26 +15,31 @@ DYNAMIC_DRAKVUF_FEATURE_PRESENCE_TESTS = sorted( [ - ("93b2d1", "file", capa.features.common.String("\\Program Files\\WindowsApps\\does_not_exist"), False), + ("93b2d1-drakvuf", "file", capa.features.common.String("\\Program Files\\WindowsApps\\does_not_exist"), False), # file/imports - ("93b2d1", "file", capa.features.file.Import("SetUnhandledExceptionFilter"), True), + ("93b2d1-drakvuf", "file", capa.features.file.Import("SetUnhandledExceptionFilter"), True), # thread/api calls - ("93b2d1", "process=(3564:4852),thread=6592", capa.features.insn.API("LdrLoadDll"), True), - ("93b2d1", "process=(3564:4852),thread=6592", capa.features.insn.API("DoesNotExist"), False), + ("93b2d1-drakvuf", "process=(3564:4852),thread=6592", capa.features.insn.API("LdrLoadDll"), True), + ("93b2d1-drakvuf", "process=(3564:4852),thread=6592", capa.features.insn.API("DoesNotExist"), False), # call/api - ("93b2d1", "process=(3564:4852),thread=6592,call=1", capa.features.insn.API("LdrLoadDll"), True), - ("93b2d1", "process=(3564:4852),thread=6592,call=1", capa.features.insn.API("DoesNotExist"), False), + ("93b2d1-drakvuf", "process=(3564:4852),thread=6592,call=1", capa.features.insn.API("LdrLoadDll"), True), + ("93b2d1-drakvuf", "process=(3564:4852),thread=6592,call=1", capa.features.insn.API("DoesNotExist"), False), # call/string argument ( - "93b2d1", + "93b2d1-drakvuf", "process=(3564:4852),thread=6592,call=1", capa.features.common.String('0x667e2beb40:"api-ms-win-core-fibers-l1-1-1"'), True, ), - ("93b2d1", "process=(3564:4852),thread=6592,call=1", capa.features.common.String("non_existant"), False), + ( + "93b2d1-drakvuf", + "process=(3564:4852),thread=6592,call=1", + capa.features.common.String("non_existant"), + False, + ), # call/number argument - ("93b2d1", "process=(3564:4852),thread=6592,call=1", capa.features.insn.Number(0x801), True), - ("93b2d1", "process=(3564:4852),thread=6592,call=1", capa.features.insn.Number(0x010101010101), False), + ("93b2d1-drakvuf", "process=(3564:4852),thread=6592,call=1", capa.features.insn.Number(0x801), True), + ("93b2d1-drakvuf", "process=(3564:4852),thread=6592,call=1", capa.features.insn.Number(0x010101010101), False), ], # order tests by (file, item) # so that our LRU cache is most effective. @@ -43,26 +48,26 @@ DYNAMIC_DRAKVUF_FEATURE_COUNT_TESTS = sorted( [ - ("93b2d1", "file", capa.features.common.String("\\Program Files\\WindowsApps\\does_not_exist"), False), + ("93b2d1-drakvuf", "file", capa.features.common.String("\\Program Files\\WindowsApps\\does_not_exist"), False), # file/imports - ("93b2d1", "file", capa.features.file.Import("SetUnhandledExceptionFilter"), 1), + ("93b2d1-drakvuf", "file", capa.features.file.Import("SetUnhandledExceptionFilter"), 1), # thread/api calls - ("93b2d1", "process=(3564:4852),thread=6592", capa.features.insn.API("LdrLoadDll"), 9), - ("93b2d1", "process=(3564:4852),thread=6592", capa.features.insn.API("DoesNotExist"), False), + ("93b2d1-drakvuf", "process=(3564:4852),thread=6592", capa.features.insn.API("LdrLoadDll"), 9), + ("93b2d1-drakvuf", "process=(3564:4852),thread=6592", capa.features.insn.API("DoesNotExist"), False), # call/api - ("93b2d1", "process=(3564:4852),thread=6592,call=1", capa.features.insn.API("LdrLoadDll"), 1), - ("93b2d1", "process=(3564:4852),thread=6592,call=1", capa.features.insn.API("DoesNotExist"), 0), + ("93b2d1-drakvuf", "process=(3564:4852),thread=6592,call=1", capa.features.insn.API("LdrLoadDll"), 1), + ("93b2d1-drakvuf", "process=(3564:4852),thread=6592,call=1", capa.features.insn.API("DoesNotExist"), 0), # call/string argument ( - "93b2d1", + "93b2d1-drakvuf", "process=(3564:4852),thread=6592,call=1", capa.features.common.String('0x667e2beb40:"api-ms-win-core-fibers-l1-1-1"'), 1, ), - ("93b2d1", "process=(3564:4852),thread=6592,call=1", capa.features.common.String("non_existant"), 0), + ("93b2d1-drakvuf", "process=(3564:4852),thread=6592,call=1", capa.features.common.String("non_existant"), 0), # call/number argument - ("93b2d1", "process=(3564:4852),thread=6592,call=1", capa.features.insn.Number(0x801), 1), - ("93b2d1", "process=(3564:4852),thread=6592,call=1", capa.features.insn.Number(0x010101010101), 0), + ("93b2d1-drakvuf", "process=(3564:4852),thread=6592,call=1", capa.features.insn.Number(0x801), 1), + ("93b2d1-drakvuf", "process=(3564:4852),thread=6592,call=1", capa.features.insn.Number(0x010101010101), 0), ], # order tests by (file, item) # so that our LRU cache is most effective. diff --git a/tests/test_scripts.py b/tests/test_scripts.py index 35bf5347f..06a6e9fef 100644 --- a/tests/test_scripts.py +++ b/tests/test_scripts.py @@ -27,7 +27,7 @@ def get_binary_file_path(): return str(CD / "data" / "9324d1a8ae37a36ae560c37448c9705a.exe_") -def get_report_file_path(): +def get_cape_report_file_path(): return str( CD / "data" @@ -63,9 +63,10 @@ def get_rule_path(): pytest.param("show-capabilities-by-function.py", [get_binary_file_path()]), pytest.param("show-features.py", [get_binary_file_path()]), pytest.param("show-features.py", ["-F", "0x407970", get_binary_file_path()]), - pytest.param("show-features.py", ["-P", "MicrosoftEdgeUpdate.exe", get_report_file_path()]), + pytest.param("show-features.py", ["-P", "MicrosoftEdgeUpdate.exe", get_cape_report_file_path()]), pytest.param("show-unused-features.py", [get_binary_file_path()]), - pytest.param("capa_as_library.py", [get_binary_file_path()]), + pytest.param("capa-as-library.py", [get_binary_file_path()]), + # not testing "minimize-vmray-results.py" as we don't currently upload full VMRay analysis archives ], ) def test_scripts(script, args): diff --git a/tests/test_vmray_features.py b/tests/test_vmray_features.py new file mode 100644 index 000000000..d92a75e49 --- /dev/null +++ b/tests/test_vmray_features.py @@ -0,0 +1,89 @@ +# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + +import fixtures + +import capa.main +import capa.features.file +import capa.features.insn +import capa.features.common + +DYNAMIC_VMRAY_FEATURE_PRESENCE_TESTS = sorted( + [ + ("93b2d1-vmray", "file", capa.features.common.String("api.%x%x.%s"), True), + ("93b2d1-vmray", "file", capa.features.common.String("\\Program Files\\WindowsApps\\does_not_exist"), False), + # file/imports + ("93b2d1-vmray", "file", capa.features.file.Import("GetAddrInfoW"), True), + # thread/api calls + ("93b2d1-vmray", "process=(2176:0),thread=7", capa.features.insn.API("GetAddrInfoW"), True), + ("93b2d1-vmray", "process=(2176:0),thread=7", capa.features.insn.API("DoesNotExist"), False), + # call/api + ("93b2d1-vmray", "process=(2176:0),thread=7,call=2361", capa.features.insn.API("GetAddrInfoW"), True), + # call/string argument + ( + "93b2d1-vmray", + "process=(2176:0),thread=7,call=10323", + capa.features.common.String("raw.githubusercontent.com"), + True, + ), + # call/number argument + # VirtualAlloc(4096, 4) + ("93b2d1-vmray", "process=(2176:0),thread=7,call=2358", capa.features.insn.Number(4096), True), + ("93b2d1-vmray", "process=(2176:0),thread=7,call=2358", capa.features.insn.Number(4), True), + ], + # order tests by (file, item) + # so that our LRU cache is most effective. + key=lambda t: (t[0], t[1]), +) + +DYNAMIC_VMRAY_FEATURE_COUNT_TESTS = sorted( + [ + # file/imports + ("93b2d1-vmray", "file", capa.features.file.Import("GetAddrInfoW"), 1), + # thread/api calls + ("93b2d1-vmray", "process=(2176:0),thread=7", capa.features.insn.API("free"), 1), + ("93b2d1-vmray", "process=(2176:0),thread=7", capa.features.insn.API("GetAddrInfoW"), 5), + # call/api + ("93b2d1-vmray", "process=(2176:0),thread=7,call=2345", capa.features.insn.API("free"), 1), + ("93b2d1-vmray", "process=(2176:0),thread=7,call=2345", capa.features.insn.API("GetAddrInfoW"), 0), + ("93b2d1-vmray", "process=(2176:0),thread=7,call=2361", capa.features.insn.API("GetAddrInfoW"), 1), + # call/string argument + ( + "93b2d1-vmray", + "process=(2176:0),thread=7,call=10323", + capa.features.common.String("raw.githubusercontent.com"), + 1, + ), + ("93b2d1-vmray", "process=(2176:0),thread=7,call=10323", capa.features.common.String("non_existant"), 0), + # call/number argument + ("93b2d1-vmray", "process=(2176:0),thread=7,call=10315", capa.features.insn.Number(4096), 1), + ("93b2d1-vmray", "process=(2176:0),thread=7,call=10315", capa.features.insn.Number(4), 1), + ("93b2d1-vmray", "process=(2176:0),thread=7,call=10315", capa.features.insn.Number(404), 0), + ], + # order tests by (file, item) + # so that our LRU cache is most effective. + key=lambda t: (t[0], t[1]), +) + + +@fixtures.parametrize( + "sample,scope,feature,expected", + DYNAMIC_VMRAY_FEATURE_PRESENCE_TESTS, + indirect=["sample", "scope"], +) +def test_vmray_features(sample, scope, feature, expected): + fixtures.do_test_feature_presence(fixtures.get_vmray_extractor, sample, scope, feature, expected) + + +@fixtures.parametrize( + "sample,scope,feature,expected", + DYNAMIC_VMRAY_FEATURE_COUNT_TESTS, + indirect=["sample", "scope"], +) +def test_vmray_feature_counts(sample, scope, feature, expected): + fixtures.do_test_feature_count(fixtures.get_vmray_extractor, sample, scope, feature, expected) diff --git a/tests/test_vmray_model.py b/tests/test_vmray_model.py new file mode 100644 index 000000000..c693b6631 --- /dev/null +++ b/tests/test_vmray_model.py @@ -0,0 +1,160 @@ +# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. +import textwrap + +from capa.features.extractors.vmray.models import ( + Param, + PEFile, + ElfFile, + FunctionCall, + AnalysisMetadata, + hexint, + xml_to_dict, +) + + +def test_vmray_model_param(): + param_str = textwrap.dedent( + """ + + """ + ) + param: Param = Param.model_validate(xml_to_dict(param_str)["param"]) + + assert param.value is not None + assert hexint(param.value) == 16 + + +def test_vmray_model_param_deref(): + param_str = textwrap.dedent( + """ + + + + """ + ) + param: Param = Param.model_validate(xml_to_dict(param_str)["param"]) + + assert param.deref is not None + assert param.deref.value == "Hello world" + + +def test_vmray_model_function_call(): + function_call_str = textwrap.dedent( + """ + + + + + + + + + """ + ) + function_call: FunctionCall = FunctionCall.model_validate(xml_to_dict(function_call_str)["fncall"]) + + assert function_call.fncall_id == 18 + assert function_call.process_id == 1 + assert function_call.thread_id == 1 + assert function_call.name == "time" + + assert function_call.params_in is not None + assert function_call.params_in.params[0].value is not None + assert hexint(function_call.params_in.params[0].value) == 0 + + assert function_call.params_out is not None + assert function_call.params_out.params[0].value is not None + assert hexint(function_call.params_out.params[0].value) == 2863311530 + + +def test_vmray_model_analysis_metadata(): + analysis_metadata: AnalysisMetadata = AnalysisMetadata.model_validate_json( + """ + { + "sample_type": "Linux ELF Executable (x86-64)", + "submission_filename": "abcd1234" + } + """ + ) + + assert analysis_metadata.sample_type == "Linux ELF Executable (x86-64)" + assert analysis_metadata.submission_filename == "abcd1234" + + +def test_vmray_model_elffile(): + elffile: ElfFile = ElfFile.model_validate_json( + """ + { + "sections": [ + { + "header": { + "sh_name": "abcd1234", + "sh_addr": 2863311530 + } + } + ] + } + """ + ) + + assert elffile.sections[0].header.sh_name == "abcd1234" + assert elffile.sections[0].header.sh_addr == 2863311530 + + +def test_vmray_model_pefile(): + pefile: PEFile = PEFile.model_validate_json( + """ + { + "basic_info": { + "image_base": 2863311530 + }, + "imports": [ + { + "apis": [ + { + "address": 2863311530, + "api": { + "name": "Sleep" + } + } + ], + "dll": "KERNEL32.dll" + } + ], + "sections": [ + { + "name": ".text", + "virtual_address": 2863311530 + } + ], + "exports": [ + { + "api": { + "name": "HelloWorld", + "ordinal": 10 + }, + "address": 2863311530 + } + ] + } + """ + ) + + assert pefile.basic_info.image_base == 2863311530 + + assert pefile.imports[0].dll == "KERNEL32.dll" + assert pefile.imports[0].apis[0].address == 2863311530 + assert pefile.imports[0].apis[0].api.name == "Sleep" + + assert pefile.sections[0].name == ".text" + assert pefile.sections[0].virtual_address == 2863311530 + + assert pefile.exports[0].address == 2863311530 + assert pefile.exports[0].api.name == "HelloWorld" + assert pefile.exports[0].api.ordinal == 10