diff --git a/.github/workflows/docker-publish.yml b/.github/workflows/docker-publish.yml deleted file mode 100644 index 80be0f6..0000000 --- a/.github/workflows/docker-publish.yml +++ /dev/null @@ -1,61 +0,0 @@ -name: Docker - -# This workflow uses actions that are not certified by GitHub. -# They are provided by a third-party and are governed by -# separate terms of service, privacy policy, and support -# documentation. - -on: - push: - branches: [ main ] - # Publish semver tags as releases. - tags: [ 'v*.*.*' ] - pull_request: - branches: [ main ] - -env: - # Use docker.io for Docker Hub if empty - REGISTRY: ghcr.io - # github.repository as / - IMAGE_NAME: ${{ github.repository }} - - -jobs: - build: - - runs-on: ubuntu-latest - permissions: - contents: read - packages: write - - steps: - - name: Checkout repository - uses: actions/checkout@v2 - - # Login against a Docker registry except on PR - # https://github.com/docker/login-action - - name: Log into registry ${{ env.REGISTRY }} - if: github.event_name != 'pull_request' - uses: docker/login-action@28218f9b04b4f3f62068d7b6ce6ca5b26e35336c - with: - registry: ${{ env.REGISTRY }} - username: ${{ github.actor }} - password: ${{ secrets.GITHUB_TOKEN }} - - # Extract metadata (tags, labels) for Docker - # https://github.com/docker/metadata-action - - name: Extract Docker metadata - id: meta - uses: docker/metadata-action@98669ae865ea3cffbcbaa878cf57c20bbf1c6c38 - with: - images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} - - # Build and push Docker image with Buildx (don't push on PR) - # https://github.com/docker/build-push-action - - name: Build and push Docker image - uses: docker/build-push-action@ad44023a93711e3deb337508980b4b5e9bcdc5dc - with: - context: . - push: ${{ github.event_name != 'pull_request' }} - tags: ${{ steps.meta.outputs.tags }} - labels: ${{ steps.meta.outputs.labels }} diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d8bfbc4 --- /dev/null +++ b/.gitignore @@ -0,0 +1,84 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# IDE files +.pydevproject +.python-version +.idea + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Cython debug symbols +cython_debug/ diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100755 index 0000000..6fb6aeb --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,25 @@ +# Assemblyline contributing guide + +This guide covers the basics of how to contribute to the Assemblyline project. + +Python code should follow the PEP8 guidelines defined here: [PEP8 Guidelines](https://www.python.org/dev/peps/pep-0008/). + +## Tell us want you want to build/fix +Before you start coding anything you should connect with the [Assemblyline community](https://groups.google.com/d/forum/cse-cst-assemblyline) to make sure no one else is working on the same thing and that whatever you are going to build still fits with the vision off the system. + +## Git workflow + +- Clone the repo to your own account +- Checkout and pull the latest commits from the master branch +- Make a branch +- Work in any way you like and make sure your changes actually work +- When you're satisfied with your changes, create a pull requests to the main assemblyline repo + +#### Transfer your service repo +If you've worked on a new service that you want to be included in the default service selection you'll have to transfer the repo into our control. + +#### You are not allow to merge: + +Even if you try to merge in your pull request, you will be denied. Only a few people in our team are allowed to merge code into our repositories. + +We check for new pull requests every day and will merge them in once they have been approved by someone in our team. \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 80ea697..4f5d572 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,11 +4,12 @@ ENV SERVICE_PATH intezer_static.IntezerStatic USER root -RUN apt update -RUN pip3 install requests +RUN apt-get update USER assemblyline +RUN pip install intezer-sdk && rm -rf ~/.cache/pip + WORKDIR /opt/al_service COPY . . diff --git a/README.md b/README.md index 8278f4d..8c49e97 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,32 @@ -# Intezer service -This repository is a self-developed Assemblyline service fetching the Intezer report of a specific sha256. +# IntezerStatic service +This repository is an Assemblyline service that fetches the Intezer report for the SHA256 of a submitted file, and if the SHA256 was not found on the Intezer instance, then this service will DO NOTHING MORE. + It was created by [x1mus](https://github.com/x1mus) with support from [Sorakurai](https://github.com/Sorakurai) and [reynas](https://github.com/reynas) at [NVISO](https://github.com/NVISOsecurity). + +It has since been passed over to the CCCS :canada: for maintenance! + +**NOTE**: This service **requires** you to have your own API key (Paid or Free). It is **not** preinstalled during a default installation. + +**NOTE**: This service **requires** extensive setup prior to installation if you are deploying your own instance of IntezerAnalyze. + +## Execution + +This service calls the Intezer Analyze API with the hash of your file and returns the results (if any). + +Because this service could query an external API, if selected by the user, it will prompt the user and notify them that their file or metadata related to their file will leave the Assemblyline system. + +### Service Tweaks +If you are using an Intezer Analyze On-Premise solution, then you do not need to set this service as `External` and the `is_external` flag to true. Change the `category` in the `service_manifest.yml` from `External` to `Antivirus` if using on-prem. + +### Configuration Values +* **base_url**: This is the base url of the Intezer Analyze instance that you will be using. *NB* The public instance is at [https://analyze.intezer.com](https://analyze.intezer.com), but you can also set it to http://\. Don't forget the /api/ at the end of the URL! +* **api_version**: This service has only been tested with `v2-0`. +* **api_key**: This is the 36 character key provided to you by [Intezer](https://www.intezer.com/blog/malware-analysis/api-intezer-analyze-community/). +* **private_only**: This is a flag that will only return private submissions on the Intezer Analyze system, if selected. +* **is_on_premise**: This is a flag used for indicating if the Intezer Analyze system is on-premise, rather than the cloud API. + +### Submission Parameters +* **analysis_id**: This is the analysis ID of an analysis that is already on the system. The cloud API counts retrieving the analysis by file hash as a "File Scan" which counts towards an account's monthly quota. We can circumvent this by submitting the analysis ID of an analysis. That being said, this will ignore the file that you submit to Assemblyline. + +## Troubleshooting +If you get this error "server returns The request is not valid, details: {'should_get_only_private_analysis': ['unknown field']}", then you need to set the service configuration value to true for "is_on_premise". diff --git a/intezer_static.py b/intezer_static.py index 48bad46..ba8dcc1 100644 --- a/intezer_static.py +++ b/intezer_static.py @@ -1,62 +1,660 @@ -import json -from intezer_static_client import * +from enum import Enum +from http import HTTPStatus +from requests import HTTPError +from typing import Any, Dict, List, Optional, Set +from intezer_sdk.api import IntezerApi +from intezer_sdk.errors import UnsupportedOnPremiseVersion +from intezer_sdk.consts import OnPremiseVersion, BASE_URL, API_VERSION, AnalysisStatusCode +from signatures import get_attack_ids_for_signature_name, get_heur_id_for_signature_name, GENERIC_HEURISTIC_ID + +from assemblyline.common.str_utils import truncate +from assemblyline_v4_service.common.api import ServiceAPIError from assemblyline_v4_service.common.base import ServiceBase -from assemblyline_v4_service.common.result import Result, ResultSection, BODY_FORMAT +from assemblyline_v4_service.common.dynamic_service_helper import extract_iocs_from_text_blob, SandboxOntology +from assemblyline_v4_service.common.request import ServiceRequest +from assemblyline_v4_service.common.result import ( + Result, + ResultKeyValueSection, + ResultSection, + ResultTableSection, + ResultTextSection, + TableRow, +) +from assemblyline_v4_service.common.tag_helper import add_tag + +global_safelist: Optional[Dict[str, Dict[str, List[str]]]] = None + +UNINTERESTING_ANALYSIS_KEYS = [ + "analysis_url", + "is_private", + "sha256", + "verdict", + "family_id", +] +UNINTERESTING_SUBANALYSIS_KEYS = [ + "source", + "file_type", + "md5", + "sha1", + "sha256", + "size_in_bytes", + "ssdeep", +] +UNINTERESTING_FAMILY_KEYS = ["family_id"] + +FAMILIES_TO_NOT_TAG = ["application", "library"] +MALICIOUS_FAMILY_TYPES = ["malware"] +SUSPICIOUS_FAMILY_TYPES = ["administration_tool", "installer", "packer"] + +TTP_SEVERITY_TRANSLATION = { + 1: 10, + 2: 100, + 3: 250 +} + +SILENT_SIGNATURES = ["enumerates_running_processes"] +COMMAND_LINE_KEYS = ["command", "cmdline", "Commandline executed"] +FILE_KEYS = ["DeletedFile", "file", "binary", "copy", "service path", "office_martian", "File executed"] +REGISTRY_KEYS = ["key", "regkey", "regkeyval"] +URL_KEYS = ["http_request", "url", "suspicious_request", "network_http", "request", "http_downloadurl", "uri"] +IP_KEYS = ["IP"] +DOMAIN_KEYS = ["domain"] + +DEFAULT_ANALYSIS_TIMEOUT = 180 +DEFAULT_POLLING_PERIOD = 5 + +COMPLETED_STATUSES = [AnalysisStatusCode.FINISH.value, AnalysisStatusCode.FAILED.value, "succeeded"] + + +# From the user-guide +class Verdicts(Enum): + # Trusted + KNOWN_TRUSTED = "known_trusted" + TRUSTED = "trusted" + PROBABLY_TRUSTED = "probably_trusted" + KNOWN_LIBRARY = "known_library" + LIBRARY = "library" + TRUSTED_VERDICTS = [KNOWN_TRUSTED, TRUSTED, PROBABLY_TRUSTED, KNOWN_LIBRARY, LIBRARY] + + # Malicious + KNOWN_MALICIOUS = "known_malicious" + MALICIOUS = "malicious" + MALICIOUS_VERDICTS = [MALICIOUS, KNOWN_MALICIOUS] + + # Suspicious + ADMINISTRATION_TOOL = "administration_tool" + KNOWN_ADMINISTRATION_TOOL = "known_administration_tool" + PACKED = "packed" + PROBABLY_PACKED = "probably_packed" + SCRIPT = "script" + SUSPICIOUS = "suspicious" + SUSPICIOUS_VERDICTS = [ADMINISTRATION_TOOL, KNOWN_ADMINISTRATION_TOOL, PACKED, PROBABLY_PACKED, SCRIPT, SUSPICIOUS] + + # Unknown + UNIQUE = "unique" + NO_GENES = "no_genes" + ALMOST_NO_GENES = "almost_no_genes" + INCONCLUSIVE = "inconclusive" + INSTALLER = "installer" + NO_CODE = "no_code" + UNKNOWN = "unknown" + UNKNOWN_VERDICTS = [UNIQUE, NO_GENES, ALMOST_NO_GENES, INCONCLUSIVE, INSTALLER, NO_CODE, UNKNOWN] + + # Not supported + FILE_TYPE_NOT_SUPPORTED = "file_type_not_supported" + NO_NATIVE_CODE = "non_native_code" + CORRUPTED_FILE = "corrupted_file" + NOT_SUPPORTED = "not_supported" + NOT_SUPPORTED_VERDICTS = [FILE_TYPE_NOT_SUPPORTED, NOT_SUPPORTED, NO_NATIVE_CODE, CORRUPTED_FILE] + + # Neutral + NEUTRAL = "neutral" + NEUTRAL_VERDICTS = [NEUTRAL] + + INTERESTING_VERDICTS = MALICIOUS_VERDICTS + SUSPICIOUS_VERDICTS + UNINTERESTING_VERDICTS = NEUTRAL_VERDICTS + NOT_SUPPORTED_VERDICTS + UNKNOWN_VERDICTS + TRUSTED_VERDICTS + + +class NetworkIOCTypes(Enum): + IP = "ip" + DOMAIN = "domain" + TYPES = [IP, DOMAIN] + + +class ALIntezerApi(IntezerApi): + def set_logger(self, log): + self.log = log + + # Overriding the class method to handle if the URL is GONE + def get_latest_analysis(self, + file_hash: str, + private_only: bool = False, + **additional_parameters) -> Optional[Dict]: + try: + return IntezerApi.get_latest_analysis( + self=self, + file_hash=file_hash, + private_only=private_only, + additional_parameters=additional_parameters + ) + except HTTPError as e: + self.log.debug( + f"Unable to get the latest analysis for SHA256 {file_hash} due to '{e}'" + ) + # Occasionally an analysis fails, and HTTPError.GONE is raised + if str(HTTPStatus.GONE.value) in repr(e) or HTTPStatus.GONE.name in repr(e): + return None + else: + raise + + # Overriding the class method to handle if the HTTPError exists + def get_iocs(self, analysis_id: str) -> Dict[str, List[Dict[str, str]]]: + try: + return IntezerApi.get_iocs(self=self, analyses_id=analysis_id) + except HTTPError as e: + self.log.debug( + f"Unable to retrieve IOCs for analysis ID {analysis_id} due to '{e}'" + ) + # If you have a community account with analyze.intezer.com, you will get a 403 FORBIDDEN on this endpoint. + if str(HTTPStatus.FORBIDDEN.value) in repr(e) or HTTPStatus.FORBIDDEN.name in repr(e): + return {"files": [], "network": []} + else: + raise + + # Overriding the class method to handle if the HTTPError or UnsupportedOnPremiseVersion exists + def get_dynamic_ttps(self, analysis_id: str) -> List[Dict[str, str]]: + try: + return IntezerApi.get_dynamic_ttps(self=self, analyses_id=analysis_id) + except HTTPError as e: + self.log.debug( + f"Unable to retrieve TTPs for analysis ID {analysis_id} due to '{e}'" + ) + # If you have a community account with analyze.intezer.com, you will get a 403 FORBIDDEN on this endpoint. + if str(HTTPStatus.FORBIDDEN.value) in repr(e) or HTTPStatus.FORBIDDEN.name in repr(e): + return [] + else: + raise + except UnsupportedOnPremiseVersion as e: + self.log.debug( + f"Unable to retrieve TTPs for analysis ID {analysis_id} due to '{e}'" + ) + return [] + + # Overriding the class method to handle if the HTTPError exists + def get_sub_analyses_by_id(self, analysis_id: str) -> List[Dict[str, Any]]: + try: + return IntezerApi.get_sub_analyses_by_id(self=self, analysis_id=analysis_id) + except HTTPError as e: + self.log.debug( + f"Unable to get sub_analyses for analysis ID {analysis_id} due to '{e}'" + ) + return [] + + # Overriding the class method to handle if the HTTPError exists + def download_file_by_sha256(self, sha256: str, dir_path: str) -> bool: + try: + IntezerApi.download_file_by_sha256( + self=self, sha256=sha256, path=dir_path + ) + return True + except HTTPError as e: + self.log.debug( + f"Unable to download file for SHA256 {sha256} due to '{e}'" + ) + # If you have a community account with analyze.intezer.com, you will get a 403 FORBIDDEN on this endpoint. + if str(HTTPStatus.FORBIDDEN.value) in repr(e) or HTTPStatus.FORBIDDEN.name in repr(e): + return False + else: + raise class IntezerStatic(ServiceBase): - def __init__(self, config=None): - super(IntezerStatic, self).__init__(config) - - def start(self): - self.log.debug("Intezer Static service started") - - def stop(self): - self.log.debug("Intezer Static service ended") - - def execute(self, request): - result = Result() - sha256 = request.sha256 - api_key = request.get_param("api_key") - - client = IntezerStaticClient(api_key) - main_api_result = client.get_hash_results(sha256) - - if main_api_result: - main_kv_section = ResultSection("Intezer Static analysis report", body_format=BODY_FORMAT.KEY_VALUE, body=json.dumps(main_api_result)) - - if main_api_result["verdict"] == "malicious": - main_kv_section.set_heuristic(1) - elif main_api_result["verdict"] == "suspicious": - main_kv_section.set_heuristic(2) - - sub_analysis = client.get_sub_analysis(main_api_result["analysis_id"]) - try: - for sub in sub_analysis["sub_analyses"]: - code_reuse = client.get_code_reuse(main_api_result["analysis_id"], sub["sub_analysis_id"]) - metadata = client.get_metadata(main_api_result["analysis_id"], sub["sub_analysis_id"]) - - # Adding the "code reuse" + "metadata" to the subanalysis dictionnary - sub.update(code_reuse) - sub.update(metadata) - - # Removing the empty values - sub.pop("error", None) - - families = sub.pop("families", None) - extraction_info = sub.pop("extraction_info", None) - - sub_kv_section = ResultSection("Subanalysis report for " + sub["sub_analysis_id"], body_format=BODY_FORMAT.KEY_VALUE, body=json.dumps(sub), parent=main_kv_section) - if families: - for family in families: - ResultSection("Family report for the subanalysis", body_format=BODY_FORMAT.KEY_VALUE, body=json.dumps(family), parent=sub_kv_section) - - if extraction_info: - for info in extraction_info["processes"]: - ResultSection("Extraction informations for the subanalysis", body_format=BODY_FORMAT.KEY_VALUE, body=json.dumps(info), parent=sub_kv_section) - except KeyError: - pass - - result.add_section(main_kv_section) - request.result = result + def __init__(self, config: Optional[Dict] = None) -> None: + super().__init__(config) + self.log.debug("Initializing the IntezerStatic service...") + self.client: Optional[ALIntezerApi] = None + + def start(self) -> None: + global global_safelist + self.log.debug("IntezerStatic service started...") + + if self.config.get("base_url") != BASE_URL and not self.config["is_on_premise"]: + self.log.warning( + f"You are using a base url that is not {BASE_URL}, yet you do not have the 'is_on_premise' parameter set to true. Are you sure?") + + self.client = ALIntezerApi( + api_version=self.config.get("api_version", API_VERSION), + api_key=self.config["api_key"], + base_url=self.config.get("base_url", BASE_URL), + on_premise_version=OnPremiseVersion.V21_11 if self.config["is_on_premise"] else None + ) + self.client.set_logger(self.log) + try: + global_safelist = self.get_api_interface().get_safelist() + except ServiceAPIError as e: + self.log.warning(f"Couldn't retrieve safelist from service: {e}. Continuing without it..") + + def stop(self) -> None: + self.log.debug("IntezerStatic service ended...") + + def execute(self, request: ServiceRequest) -> None: + sha256 = request.sha256 + result = Result() + + # First, let's get the analysis metadata, if it exists on the system + main_api_result = self._get_analysis_metadata(request.get_param('analysis_id'), sha256) + + if not main_api_result: + self.log.debug(f"SHA256 {sha256} is not on the system.") + request.result = result + return + + if main_api_result.get("verdict") in Verdicts.NOT_SUPPORTED_VERDICTS.value: + self.log.debug(f"Unsupported file type: {request.file_type}") + request.result = result + return + elif main_api_result.get("verdict") == AnalysisStatusCode.FAILED.value: + self.log.warning("The Intezer server is not feeling well :(") + request.result = result + return + + analysis_id = main_api_result["analysis_id"] + + # Setup the main result section + main_kv_section = ResultKeyValueSection("IntezerStatic analysis report") + processed_main_api_result = self._process_details( + main_api_result.copy(), UNINTERESTING_ANALYSIS_KEYS + ) + main_kv_section.update_items(processed_main_api_result) + if "family_name" in main_api_result: + main_kv_section.add_tag( + "attribution.family", main_api_result["family_name"] + ) + + # This file-verdict map will be used later on to assign heuristics to sub-analyses + file_verdict_map = {} + self._process_iocs(analysis_id, file_verdict_map, main_kv_section) + if not self.config["is_on_premise"]: + self._process_ttps(analysis_id, main_kv_section) + self._handle_subanalyses(request, sha256, analysis_id, file_verdict_map, main_kv_section) + + # Setting heuristic here to avoid FPs + if main_kv_section.subsections: + self._set_heuristic_by_verdict(main_kv_section, main_api_result["verdict"]) + + if main_kv_section.subsections or main_kv_section.heuristic: + result.add_section(main_kv_section) + request.result = result + + def _get_analysis_metadata(self, analysis_id: str, sha256: str) -> Dict[str, str]: + """ + This method handles the logic of determining what metadata we want to + retrieve (for the hash or for the analysis_id) + :param request: The service request object + :param sha256: The hash of the given file + :return: A dictionary representing the analysis metadata + """ + # NOTE: If a user requests a certain analysis id, then the submitted file will be ignored + if not analysis_id: + return self.client.get_latest_analysis( + file_hash=sha256, private_only=self.config["private_only"] + ) + else: + return {"analysis_id": analysis_id, "verdict": None} + + @staticmethod + def _process_details( + details: Dict[str, str], uninteresting_keys: List[str] + ) -> Dict[str, str]: + """ + This method removes uninteresting details from a given dictionary + :param details: The dictionary possibly containing uninteresting details + :param uninteresting_keys: A list of keys for uninteresting details + :return: A dictionary only containing interesting information + """ + for key in list(details.keys()): + if key in uninteresting_keys: + details.pop(key, None) + return details + + def _set_heuristic_by_verdict( + self, result_section: ResultSection, verdict: Optional[str] + ) -> None: + """ + This method sets the heuristic of the result section based on the verdict + :param result_section: The result section that will have its heuristic set + :param verdict: The verdict of the file + :return: None + """ + if not verdict: + return + + if ( + verdict not in Verdicts.INTERESTING_VERDICTS.value + and verdict not in Verdicts.UNINTERESTING_VERDICTS.value + ): + self.log.debug(f"{verdict} was spotted. Is this useful?") + elif verdict in Verdicts.MALICIOUS_VERDICTS.value: + result_section.set_heuristic(1) + elif verdict in Verdicts.SUSPICIOUS_VERDICTS.value: + result_section.set_heuristic(2) + elif verdict in Verdicts.TRUSTED_VERDICTS.value: + self.log.debug(f"The verdict was {verdict}. Can we do something with this?") + + def _process_iocs( + self, + analysis_id: str, + file_verdict_map: Dict[str, str], + parent_result_section: ResultSection, + ) -> None: + """ + This method retrieves and parses IOCs for an analysis + :param analysis_id: The ID for the analysis which we will be retrieving + :param file_verdict_map: A map of sha256s representing a file's + contents, and the verdict for that file + :param parent_result_section: The result section that the network + result section will be added to, if applicable + :return: None + """ + iocs = self.client.get_iocs(analysis_id) + file_iocs = iocs["files"] + network_iocs = iocs["network"] + + if file_iocs: + for file in file_iocs: + file_verdict_map[file["sha256"]] = file["verdict"] + + if network_iocs: + network_section = ResultTextSection("Network Communication Observed") + for network in network_iocs: + ioc = network["ioc"] + type = network["type"] + if type == NetworkIOCTypes.IP.value: + network_section.add_tag("network.dynamic.ip", ioc) + elif type == NetworkIOCTypes.DOMAIN.value: + network_section.add_tag("network.dynamic.domain", ioc) + elif type not in NetworkIOCTypes.TYPES.value: + self.log.debug( + f"The network IOC type of {type} is not in {NetworkIOCTypes.TYPES.value}. Network item: {network}" + ) + network_section.add_line(f"IOC: {ioc}") + parent_result_section.add_subsection(network_section) + + def _process_ttps( + self, + analysis_id: str, + parent_result_section: ResultSection, + ) -> None: + """ + This method retrieves and parses TTPs for an analysis + :param analysis_id: The ID for the analysis which we will be retrieving + :param file_verdict_map: A map of sha256s representing a file's + contents, and the verdict for that file + :param parent_result_section: The result section that the network + result section will be added to, if applicable + :return: None + """ + # Note: These TTPs are essentially signatures + ttps = self.client.get_dynamic_ttps(analysis_id) + + if not ttps: + return + + sigs_res = ResultSection("Signatures") + for ttp in ttps: + sig_name = ttp['name'] + sig_res = ResultTextSection(f"Signature: {sig_name}") + sig_res.add_line(ttp['description']) + + heur_id = get_heur_id_for_signature_name(sig_name) + if heur_id == GENERIC_HEURISTIC_ID: + self.log.debug(f"{sig_name} does not have a category assigned to it") + + sig_res.set_heuristic(heur_id) + sig_res.heuristic.add_signature_id(sig_name, TTP_SEVERITY_TRANSLATION[ttp['severity']]) + + for aid in get_attack_ids_for_signature_name(sig_name): + sig_res.heuristic.add_attack_id(aid) + + if sig_name in SILENT_SIGNATURES: + sigs_res.add_subsection(sig_res) + continue + + ioc_table = ResultTableSection("IOCs found in signature marks") + self._process_ttp_data(ttp['data'], sig_res, ioc_table) + + if ioc_table.body: + sig_res.add_subsection(ioc_table) + + sigs_res.add_subsection(sig_res) + + if sigs_res.subsections: + parent_result_section.add_subsection(sigs_res) + + def _process_ttp_data( + self, ttp_data: List[Dict[str, str]], + sig_res: ResultSection, ioc_table: ResultTableSection) -> None: + """ + This method handles the processing of signature marks + :param ttp_data: The marks for the signature + :param sig_res: The result section for the signature + :param ioc_table: The result section table where the data is going to go + :return: None + """ + for item in ttp_data: + # Assuming that all items are single key value pairs, + key = next((key for key in item.keys()), "") + if not key: + continue + value = item[key] + if not value: + continue + + if key in IP_KEYS and not add_tag(sig_res, "network.dynamic.ip", value, global_safelist): + extract_iocs_from_text_blob(value, ioc_table) + elif key in COMMAND_LINE_KEYS: + _ = add_tag(sig_res, "dynamic.process.command_line", value, global_safelist) + extract_iocs_from_text_blob(value, ioc_table) + elif key in FILE_KEYS: + _ = add_tag(sig_res, "dynamic.process.file_name", value, global_safelist) + elif key in URL_KEYS: + extract_iocs_from_text_blob(value, ioc_table) + elif key in REGISTRY_KEYS: + _ = add_tag(sig_res, "dynamic.registry_key", value, global_safelist) + elif key in DOMAIN_KEYS: + _ = add_tag(sig_res, "network.dynamic.domain", value, global_safelist) + else: + pass + value = truncate(value, 512) + if not sig_res.body: + sig_res.add_line(f"\t{key}: {value}") + elif sig_res.body and f"\t{key}: {value}" not in sig_res.body: + sig_res.add_line(f"\t{key}: {value}") + + def _handle_subanalyses(self, request: ServiceRequest, sha256: str, analysis_id: str, + file_verdict_map: Dict[str, str], + parent_section: ResultSection) -> None: + """ + This method handles the subanalyses for a given analysis ID + :param request: The service request object + :param sha256: The hash of the given file + :param analysis_id: The ID for the analysis which we will be retrieving + :param file_verdict_map: A map of sha256s representing a file's + contents, and the verdict for that file + :param parent_result_section: The result section that the network + result section will be added to, if applicable + :return: None + """ + so = SandboxOntology() + + # This boolean is used to determine if we should try to download another file + can_we_download_files = True + + # These sets will be used as we work through the process trees + process_path_set = set() + command_line_set = set() + + # Now let's get into the subanalyses for this sample + sub_analyses = self.client.get_sub_analyses_by_id(analysis_id) + + for sub in sub_analyses: + sub_analysis_id = sub["sub_analysis_id"] + + # Get the extraction info, which is basically the details of how the subanalysis object came to be + extraction_info = sub.pop("extraction_info", None) + + # Processes is only present when the sample has undergone dynamic execution + if extraction_info and "processes" not in extraction_info: + extraction_info = None + + code_reuse = self.client.get_sub_analysis_code_reuse_by_id( + analysis_id, sub_analysis_id + ) + + if code_reuse: + families = code_reuse.pop("families", []) + else: + families = [] + + if not families and not extraction_info: + # Otherwise, boring! + continue + + if families and not any(family["reused_gene_count"] > 1 for family in families): + # Most likely a false positive + continue + + ### + # If we have gotten to this point, then the sub analysis is worth reporting + ### + + extraction_method = sub["source"].replace("_", " ") + + if extraction_method != "root": + sub_kv_section = ResultKeyValueSection( + f"Subanalysis report for {sub['sha256']}, extracted via {extraction_method}") + else: + sub_kv_section = ResultKeyValueSection(f"Subanalysis report for {sub['sha256']}") + + metadata = self.client.get_sub_analysis_metadata_by_id( + analysis_id, sub_analysis_id + ) + processed_subanalysis = self._process_details( + metadata.copy(), UNINTERESTING_SUBANALYSIS_KEYS + ) + sub_kv_section.update_items(processed_subanalysis) + parent_section.add_subsection(sub_kv_section) + + if code_reuse: + code_reuse_kv_section = ResultKeyValueSection( + "Code reuse detected" + ) + code_reuse_kv_section.update_items(code_reuse) + sub_kv_section.add_subsection(code_reuse_kv_section) + + sub_sha256 = sub["sha256"] + if families: + self._process_families(families, sub_sha256, file_verdict_map, sub_kv_section) + + if extraction_info: + self._process_extraction_info(extraction_info["processes"], process_path_set, command_line_set, so) + + # Setting a heuristic here or downloading the file would be redundant if the hash matched the original file + if sub_sha256 != sha256: + self._set_heuristic_by_verdict( + sub_kv_section, file_verdict_map.get(sub_sha256) + ) + + if can_we_download_files: + file_was_downloaded = self.client.download_file_by_sha256( + sub_sha256, self.working_directory + ) + if file_was_downloaded: + path = f"{self.working_directory}/{sub_sha256}.sample" + request.add_extracted( + path, + f"{sub_sha256}.sample", + f"Extracted via {extraction_method}", + ) + self.log.debug(f"Added {sub_sha256}.sample as an extracted file.") + else: + can_we_download_files = False + + process_tree_section = so.get_process_tree_result_section() + for process_path in process_path_set: + process_tree_section.add_tag("dynamic.process.file_name", process_path) + for command_line in command_line_set: + process_tree_section.add_tag("dynamic.process.command_line", command_line) + if process_tree_section.body: + parent_section.add_subsection(process_tree_section) + + def _process_families( + self, families: List[Dict[str, str]], + sub_sha256: str, file_verdict_map: Dict[str, str], + parent_section: ResultSection) -> None: + """ + This method handles the "families" list, cutting out boring details and assigning verdicts + :param families: A list of details for families + :param sub_sha256: The hash of the sub analysis file + :param file_verdict_map: A map of sha256s representing a file's + contents, and the verdict for that file + :param parent_section: The result section that the network + :return: None + """ + family_section = ResultTableSection("Family Details") + for family in families: + processed_family = self._process_details( + family.copy(), UNINTERESTING_FAMILY_KEYS + ) + family_section.add_row(TableRow(**processed_family)) + family_type = family["family_type"] + if family_type not in FAMILIES_TO_NOT_TAG: + family_section.add_tag("attribution.family", family["family_name"]) + + # Overwrite value if not malicious + if family_type in MALICIOUS_FAMILY_TYPES and ( + sub_sha256 not in file_verdict_map or file_verdict_map[sub_sha256] != Verdicts.MALICIOUS.value): + file_verdict_map[sub_sha256] = Verdicts.MALICIOUS.value + + # Only overwrite value if value is not already malicious + elif family_type in SUSPICIOUS_FAMILY_TYPES and (sub_sha256 not in file_verdict_map or file_verdict_map[sub_sha256] not in Verdicts.MALICIOUS_VERDICTS.value): + file_verdict_map[sub_sha256] = Verdicts.SUSPICIOUS.value + + if family_section.body: + parent_section.add_subsection(family_section) + + def _process_extraction_info( + self, processes: List[Dict[str, Any]], + process_path_set: Set[str], + command_line_set: Set[str], + so: SandboxOntology) -> None: + """ + This method handles the processing of the extraction info process details + :param processes: A list of processes + :param process_path_set: A set containing process paths + :param command_line_set: A set containing command lines + :param so: the sandbox ontology object + :return: None + """ + for item in processes: + p = so.create_process( + pid=item["process_id"], + image=item["process_path"], + ppid=item["parent_process_id"], + ) + process_path_set.add(item["process_path"]) + so.add_process(p) + + if item["process_path"] != item["module_path"]: + self.log.debug( + f"Investigate! process_path: {item['process_path']} != module_path: {item['module_path']}" + ) + process_path_set.add(item["module_path"]) + command_line = f"{item['process_path']} {item['module_path']}" + command_line_set.add(command_line) + so.update_process( + command_line=command_line, + pid=item["process_id"], + start_time=float("-inf") + ) diff --git a/intezer_static_client.py b/intezer_static_client.py deleted file mode 100644 index 3f6f7c9..0000000 --- a/intezer_static_client.py +++ /dev/null @@ -1,33 +0,0 @@ -import requests - -class IntezerStaticClient(): - def __init__(self, apikey=""): - self.apikey = apikey - self.headers = { "api_key": self.apikey } - self.base_url = "https://analyze.intezer.com/api/v2-0" - self.create_session() - - def create_session(self): - response = requests.post(self.base_url + "/get-access-token", json=self.headers) - self.session = requests.session() - self.session.headers["Authorization"] = self.session.headers["Authorization"] = "Bearer %s" % response.json()["result"] - - def get_hash_results(self, sha256): - response = self.session.get(self.base_url + "/files/" + sha256) - - if response.status_code == 200: - return response.json()["result"] - - def get_sub_analysis(self, analysis_id): - response = self.session.get(self.base_url + "/analyses/" + analysis_id + "/sub-analyses") - - return response.json() - - def get_code_reuse(self, analysis_id, sub_analysis_id): - response = self.session.get(self.base_url + "/analyses/" + analysis_id + "/sub-analyses/" + sub_analysis_id + "/code-reuse") - return response.json() - - def get_metadata(self, analysis_id, sub_analysis_id): - response = self.session.get(self.base_url + "/analyses/" + analysis_id + "/sub-analyses/" + sub_analysis_id + "/metadata") - - return response.json() \ No newline at end of file diff --git a/pipelines/azure-build.yaml b/pipelines/azure-build.yaml new file mode 100755 index 0000000..b17bd58 --- /dev/null +++ b/pipelines/azure-build.yaml @@ -0,0 +1,65 @@ +name: build + +variables: + - group: unittest-samples + - name: self_location + value: "self_location" + - name: full_self_location + value: "$(Agent.BuildDirectory)/$(self_location)" + - name: samples_location + value: "samples_location" + - name: full_samples_location + value: "$(Agent.BuildDirectory)/$(samples_location)" + +resources: + repositories: + - repository: unittest-samples + type: github + name: $(unittest_samples_repository) + ref: main + endpoint: github-repo-sa + trigger: none + +trigger: + tags: + include: ["v*"] +pr: none + +pool: + vmImage: "ubuntu-20.04" + +stages: + - stage: deploy + jobs: + - job: deploy + displayName: Deploy containers to dockerhub + variables: + - group: deployment-information + steps: + - task: Docker@2 + displayName: Login to docker hub + inputs: + command: login + containerRegistry: dockerhub + - checkout: self + fetchDepth: 1 + path: $(self_location) + - checkout: unittest-samples + fetchDepth: 1 + path: $(samples_location) + - script: | + export TAG=${BUILD_SOURCEBRANCH#"refs/tags/v"} + if [[ "$TAG" == *stable* ]]; then export BUILD_TYPE=stable; else export BUILD_TYPE=latest; fi + docker build --build-arg version=$TAG --build-arg branch=$BUILD_TYPE -t cccs/${BUILD_REPOSITORY_NAME##*/}:$TAG -t cccs/${BUILD_REPOSITORY_NAME##*/}:$BUILD_TYPE -f ./Dockerfile . + workingDirectory: $(full_self_location) + displayName: Build containers + - script: | + [ ! -d "$(pwd)/tests" ] && echo "No tests found" && exit + export TAG=${BUILD_SOURCEBRANCH#"refs/tags/v"} + if [[ "$TAG" == *stable* ]]; then export BUILD_TYPE=stable; else export BUILD_TYPE=latest; fi + [ -f "$(pwd)/tests/requirements.txt" ] && docker run -e FULL_SELF_LOCATION=/opt/al_service -e FULL_SAMPLES_LOCATION=/opt/samples -v /usr/share/ca-certificates/mozilla:/usr/share/ca-certificates/mozilla -v $(pwd)/tests/:/opt/al_service/tests/ -v ${FULL_SAMPLES_LOCATION}:/opt/samples cccs/${BUILD_REPOSITORY_NAME##*/}:$BUILD_TYPE bash -c 'pip install -U -r tests/requirements.txt; pytest -p no:cacheprovider -vv' && exit + docker run -e FULL_SELF_LOCATION=/opt/al_service -e FULL_SAMPLES_LOCATION=/opt/samples -v /usr/share/ca-certificates/mozilla:/usr/share/ca-certificates/mozilla -v $(pwd)/tests/:/opt/al_service/tests/ -v ${FULL_SAMPLES_LOCATION}:/opt/samples cccs/${BUILD_REPOSITORY_NAME##*/}:$BUILD_TYPE bash -c 'pytest -p no:cacheprovider -vv' + workingDirectory: $(full_self_location) + displayName: Test containers + - script: docker push cccs/${BUILD_REPOSITORY_NAME##*/} --all-tags + displayName: Deploy to Docker Hub diff --git a/pipelines/azure-tests.yaml b/pipelines/azure-tests.yaml new file mode 100755 index 0000000..a28426c --- /dev/null +++ b/pipelines/azure-tests.yaml @@ -0,0 +1,77 @@ +name: tests + +variables: + - group: unittest-samples + - name: self_location + value: "self_location" + - name: full_self_location + value: "$(Agent.BuildDirectory)/$(self_location)" + - name: samples_location + value: "samples_location" + - name: full_samples_location + value: "$(Agent.BuildDirectory)/$(samples_location)" + +resources: + repositories: + - repository: unittest-samples + type: github + name: $(unittest_samples_repository) + ref: main + endpoint: github-repo-sa + trigger: none + +trigger: ["*"] +pr: ["*"] + +pool: + vmImage: "ubuntu-20.04" + +jobs: + - job: run_test + strategy: + matrix: + Python3_9: + python.version: "3.9" + #Python3_10: + # python.version: "3.10" + #Python3_11: + # python.version: "3.11" + + timeoutInMinutes: 10 + + steps: + - task: UsePythonVersion@0 + displayName: Set python version + inputs: + versionSpec: "$(python.version)" + - checkout: self + fetchDepth: 1 + path: $(self_location) + - checkout: unittest-samples + fetchDepth: 1 + path: $(samples_location) + - script: | + [ ! -d "$(pwd)/tests" ] && echo "No tests found" && exit + sudo apt-get update + sudo apt-get install -y libfuzzy-dev libfuzzy2 + if [[ -f "$(pwd)/pkglist.txt" ]]; then + grep -vE '^#' "$(pwd)/pkglist.txt" | xargs sudo apt install -y + fi + sudo rm -rf /var/lib/apt/lists/* + sudo env "PATH=$PATH" python -m pip install -U --no-cache-dir assemblyline assemblyline_v4_service + [ -f $(pwd)/requirements.txt ] && sudo env "PATH=$PATH" python -m pip install -U --no-cache-dir -r $(pwd)/requirements.txt + [ -f $(pwd)/tests/requirements.txt ] && sudo env "PATH=$PATH" python -m pip install -U --no-cache-dir -r $(pwd)/tests/requirements.txt + sudo rm -rf /tmp/* /var/lib/apt/lists/* ~/.cache/pip + workingDirectory: $(full_self_location) + displayName: Setup environment + - script: | + [ ! -d "$(pwd)/tests" ] && echo "No tests found" && exit + export REPO_NAME=${BUILD_REPOSITORY_NAME##*/} + python -m pytest -p no:cacheprovider --durations=10 -rsx -vv --cov-report=xml --cov=${REPO_NAME/assemblyline-service-/} + workingDirectory: $(full_self_location) + displayName: Test + - script: | + [ ! -d "$(pwd)/tests" ] && echo "No tests found" && exit + python -m codecov + workingDirectory: $(full_self_location) + displayName: Upload Coverage diff --git a/service_manifest.yml b/service_manifest.yml index 63415d7..67726bc 100644 --- a/service_manifest.yml +++ b/service_manifest.yml @@ -2,6 +2,15 @@ name: IntezerStatic version: 4.2.0.stable1 description: Fetch the Intezer results of the submitted file's sha256 +# TODO: From the user guide +# The following file formats are currently supported: +# • Windows Executable Files (PE) – exe, .dll, .sys – native x86, native x64 and .NET. +# • Linux Executable Files (ELF) – native x86, native x64, ARM32, ARM64 +# • Compressed files that contain one file - Zip, RAR, TAR, 7-Zip +# • Android applications (APK)• Installers - msi, trusted installer, Inno setup... +# • Microsoft Office - doc, xls, ppt, etc. +# • PDF +# • Scripts - powershell, vbs, js accepts: .* rejects: empty|metadata/.* @@ -9,33 +18,81 @@ stage: CORE category: External file_required: true -timeout: 120 +timeout: 60 disable_cache: false enabled: false is_external: true licence_count: 0 -heuristics: - - heur_id: 1 - name: Intezer consider this file as malicious - score: 2000 - filetype: .* - description: Intezer consider this file as malicious - - heur_id: 2 - name: Intezer consider this file as suspicious - score: 1000 - filetype: .* - description: Intezer consider this file as suspicious +config: + # Don't forget the /api/ at the end of the URL! + base_url: https://analyze.intezer.com/api/ + api_version: v2-0 + api_key: sample_api_key + private_only: false + is_on_premise: false submission_params: - default: "" - name: api_key + name: analysis_id type: str value: "" +heuristics: + - heur_id: 1 + name: File is malicious + score: 1000 + filetype: .* + description: Intezer considers this file malicious + - heur_id: 2 + name: File is suspicious + score: 500 + filetype: .* + description: Intezer considers this file suspicious + - heur_id: 4 + name: Generic signature raised + score: 0 + filetype: .* + description: Intezer raised a signature + - heur_id: 5 + name: Command And Control + score: 0 + filetype: .* + description: Intezer raised a signature + - heur_id: 6 + name: Credential Access + score: 0 + filetype: .* + description: Intezer raised a signature + - heur_id: 7 + name: Defense Evasion + score: 0 + filetype: .* + description: Intezer raised a signature + - heur_id: 8 + name: Discovery + score: 0 + filetype: .* + description: Intezer raised a signature + - heur_id: 9 + name: Execution + score: 0 + filetype: .* + description: Intezer raised a signature + - heur_id: 10 + name: Persistence + score: 0 + filetype: .* + description: Intezer raised a signature + - heur_id: 11 + name: Collection + score: 0 + filetype: .* + description: Intezer raised a signature + docker_config: - image: ghcr.io/nvisosecurity/assemblyline-service-intezer:main - cpu_cores: 1 - ram_mb: 256 allow_internet_access: true + image: ${REGISTRY}cccs/assemblyline-service-intezer-static:$SERVICE_TAG + cpu_cores: 0.5 + ram_mb: 256 diff --git a/signatures.py b/signatures.py new file mode 100644 index 0000000..5a9fc70 --- /dev/null +++ b/signatures.py @@ -0,0 +1,103 @@ +from typing import List, Optional + +GENERIC_HEURISTIC_ID = 4 + +SIGNATURE_ATTACK_IDS = { + "InjectionInterProcess": ["T1055"], + "process_creation_suspicious_location": ["T1106"], + "infostealer_browser": ["T1552.001"], + "recon_fingerprint": ["T1082"], + "enumerates_running_processes": ["T1057"], + "process_interest": ["T1057"], + # Yes I understand that these two are essentially the same + "injection_createremotethread": ["T1055"], + "InjectionCreateRemoteThread": ["T1055"], + "stealth_hidden_extension": ["T1562.006"], + "dropper": ["T1129"], + "network_cnc_https_generic": ["T1573"], + "office_cve2017_11882": ["T1203"], + "powershell_network_connection": ["T1059.004"], + "wmi_create_process": ["T1047"], + "wmi_script_process": ["T1047"], + "powershell_variable_obfuscation": ["T1059.004"], + "office_macro_autoexecution": ["T1059.004"], + "persistence_autorun": ["T1547.001"], + "persistence_autorun_tasks": ["T1543"], + "office_martian_children": ["T1059"], + "office_cve2017_11882_network": ["T1203"], + "infostealer_ftp": ["T1552.001"], + "infostealer_im": ["T1005"], + "spawns_dev_util": ["T1127"], + "uses_windows_utilities_to_create_scheduled_task": ["T1053"], + "windows_defender_powershell": ["T1562.001"], + "pe_compile_timestomping": ["T1070.006"], + "creates_largekey": ["T1112"], + "registry_credential_store_access": ["T1003"], + "powershell_command_suspicious": ["T1059.004"], + "infostealer_cookies": ["T1539"], + "infostealer_mail": ["T1005"], + "recon_programs": ["T1518"], + "antivm_generic_cpu": ["T1012"], + "infostealer_bitcoin": ["T1005"], +} + +SIGNATURE_TO_CATEGORY = { + "InjectionInterProcess": "Defense Evasion", + "process_creation_suspicious_location": "Execution", + "infostealer_browser": "Credential Access", + "recon_fingerprint": "Discovery", + "enumerates_running_processes": "Discovery", + "process_interest": "Discovery", + # Yes I understand that these two are essentially the same + "injection_createremotethread": "Defense Evasion", + "InjectionCreateRemoteThread": "Defense Evasion", + "stealth_hidden_extension": "Defense Evasion", + "dropper": "Execution", + "network_cnc_https_generic": "Command And Control", + "office_cve2017_11882": "Execution", + "powershell_network_connection": "Execution", + "wmi_create_process": "Execution", + "wmi_script_process": "Execution", + "powershell_variable_obfuscation": "Execution", + "office_macro_autoexecution": "Execution", + "persistence_autorun": "Persistence", + "persistence_autorun_tasks": "Persistence", + "office_martian_children": "Execution", + "office_cve2017_11882_network": "Execution", + "infostealer_ftp": "Credential Access", + "infostealer_im": "Collection", + "spawns_dev_util": "Defense Evasion", + "uses_windows_utilities_to_create_scheduled_task": "Execution", + "windows_defender_powershell": "Defense Evasion", + "pe_compile_timestomping": "Defense Evasion", + "creates_largekey": "Defense Evasion", + "registry_credential_store_access": "Credential Access", + "powershell_command_suspicious": "Execution", + "infostealer_cookies": "Credential Access", + "infostealer_mail": "Collection", + "recon_programs": "Discovery", + "antivm_generic_cpu": "Discovery", + "infostealer_bitcoin": "Collection", +} + +CATEGORY_TO_HEUR_ID = { + "Command And Control": 5, + "Credential Access": 6, + "Defense Evasion": 7, + "Discovery": 8, + "Execution": 9, + "Persistence": 10, + "Collection": 11, +} + + +def get_heur_id_for_signature_name(sig_name: str) -> Optional[int]: + category = SIGNATURE_TO_CATEGORY.get(sig_name) + if not category: + return GENERIC_HEURISTIC_ID + else: + return CATEGORY_TO_HEUR_ID[category] + + +def get_attack_ids_for_signature_name(sig_name: str) -> List[str]: + return SIGNATURE_ATTACK_IDS.get(sig_name, []) diff --git a/tests/requirements.txt b/tests/requirements.txt new file mode 100755 index 0000000..416eb8e --- /dev/null +++ b/tests/requirements.txt @@ -0,0 +1,5 @@ +pytest +pytest-mock +pytest-cov +codecov +intezer-sdk diff --git a/tests/results/dadc624d4454e10293dbd1b701b9ee9f99ef83b4cd07b695111d37eb95abcff8.json b/tests/results/dadc624d4454e10293dbd1b701b9ee9f99ef83b4cd07b695111d37eb95abcff8.json new file mode 100755 index 0000000..3da5403 --- /dev/null +++ b/tests/results/dadc624d4454e10293dbd1b701b9ee9f99ef83b4cd07b695111d37eb95abcff8.json @@ -0,0 +1 @@ +{"classification": "TLP:W", "response": {"milestones": {"service_started": null, "service_completed": null}, "service_version": null, "service_name": "intezer_static", "service_tool_version": null, "supplementary": [], "extracted": [], "service_context": null, "service_debug_info": null}, "result": {"score": 0, "sections": []}, "sha256": "dadc624d4454e10293dbd1b701b9ee9f99ef83b4cd07b695111d37eb95abcff8", "drop_file": false, "temp_submission_data": {}} diff --git a/tests/samples/dadc624d4454e10293dbd1b701b9ee9f99ef83b4cd07b695111d37eb95abcff8 b/tests/samples/dadc624d4454e10293dbd1b701b9ee9f99ef83b4cd07b695111d37eb95abcff8 new file mode 100755 index 0000000..e068544 --- /dev/null +++ b/tests/samples/dadc624d4454e10293dbd1b701b9ee9f99ef83b4cd07b695111d37eb95abcff8 @@ -0,0 +1 @@ +this is a text file \ No newline at end of file diff --git a/tests/test_intezer_static.py b/tests/test_intezer_static.py new file mode 100755 index 0000000..a966c49 --- /dev/null +++ b/tests/test_intezer_static.py @@ -0,0 +1,770 @@ +import os +import pytest +import shutil + +# Getting absolute paths, names and regexes +TEST_DIR = os.path.dirname(os.path.abspath(__file__)) +ROOT_DIR = os.path.dirname(TEST_DIR) +SERVICE_CONFIG_NAME = "service_manifest.yml" +SERVICE_CONFIG_PATH = os.path.join(ROOT_DIR, SERVICE_CONFIG_NAME) +TEMP_SERVICE_CONFIG_PATH = os.path.join("/tmp", SERVICE_CONFIG_NAME) + +# Samples that we will be sending to the service +samples = [ + dict( + sid=1, + metadata={}, + service_name='intezer_static', + service_config={}, + fileinfo=dict( + magic='ASCII text, with no line terminators', + md5='fda4e701258ba56f465e3636e60d36ec', + mime='text/plain', + sha1='af2c2618032c679333bebf745e75f9088748d737', + sha256='dadc624d4454e10293dbd1b701b9ee9f99ef83b4cd07b695111d37eb95abcff8', + size=19, + type='unknown', + ), + filename='dadc624d4454e10293dbd1b701b9ee9f99ef83b4cd07b695111d37eb95abcff8', + min_classification='TLP:WHITE', + max_files=501, # TODO: get the actual value + ttl=3600, + safelist_config={ + "enabled": False, + "hash_types": ['sha1', 'sha256'], + "enforce_safelist_service": False + } + ), +] + + +def check_section_equality(this, that) -> bool: + # Recursive method to check equality of result section and nested sections + + # Heuristics also need their own equality checks + if this.heuristic and that.heuristic: + result_heuristic_equality = this.heuristic.attack_ids == that.heuristic.attack_ids and \ + this.heuristic.frequency == that.heuristic.frequency and \ + this.heuristic.heur_id == that.heuristic.heur_id and \ + this.heuristic.score == that.heuristic.score and \ + this.heuristic.score_map == that.heuristic.score_map and \ + this.heuristic.signatures == that.heuristic.signatures + + if not result_heuristic_equality: + print("The heuristics are not equal:") + if this.heuristic.attack_ids != that.heuristic.attack_ids: + print("The attack_ids are different:") + print(f"{this.heuristic.attack_ids}") + print(f"{that.heuristic.attack_ids}") + if this.heuristic.frequency != that.heuristic.frequency: + print("The frequencies are different:") + print(f"{this.heuristic.frequency}") + print(f"{that.heuristic.frequency}") + if this.heuristic.heur_id != that.heuristic.heur_id: + print("The heur_ids are different:") + print(f"{this.heuristic.heur_id}") + print(f"{that.heuristic.heur_id}") + if this.heuristic.score != that.heuristic.score: + print("The scores are different:") + print(f"{this.heuristic.score}") + print(f"{that.heuristic.score}") + if this.heuristic.score_map != that.heuristic.score_map: + print("The score_maps are different:") + print(f"{this.heuristic.score_map}") + print(f"{that.heuristic.score_map}") + if this.heuristic.signatures != that.heuristic.signatures: + print("The signatures are different:") + print(f"{this.heuristic.signatures}") + print(f"{that.heuristic.signatures}") + + elif not this.heuristic and not that.heuristic: + result_heuristic_equality = True + else: + print("The heuristics are not equal:") + if this.heuristic: + print(f"{this.heuristic.__dict__}") + else: + print("this.heuristic is None") + if that.heuristic: + print(f"{that.heuristic.__dict__}") + else: + print("that.heuristic is None") + result_heuristic_equality = False + + # Assuming we are given the "root section" at all times, it is safe to say that we don't need to confirm parent + current_section_equality = result_heuristic_equality and \ + this.body == that.body and \ + this.body_format == that.body_format and \ + this.classification == that.classification and \ + this.depth == that.depth and \ + len(this.subsections) == len(that.subsections) and \ + this.title_text == that.title_text and \ + this.tags == that.tags and \ + this.auto_collapse == that.auto_collapse + + if not current_section_equality: + print("The current sections are not equal:") + if not result_heuristic_equality: + print("The result heuristics are not equal") + if this.body != that.body: + print("The bodies are different:") + print(f"{this.body}") + print(f"{that.body}") + if this.body_format != that.body_format: + print("The body formats are different:") + print(f"{this.body_format}") + print(f"{that.body_format}") + if this.classification != that.classification: + print("The classifications are different:") + print(f"{this.classifications}") + print(f"{that.classifications}") + if this.depth != that.depth: + print("The depths are different:") + print(f"{this.depths}") + print(f"{that.depths}") + if len(this.subsections) != len(that.subsections): + print("The number of subsections are different:") + print(f"{len(this.subsections)}") + print(f"{len(that.subsections)}") + if this.title_text != that.title_text: + print("The title texts are different:") + print(f"{this.title_text}") + print(f"{that.title_text}") + if this.tags != that.tags: + print("The tags are different:") + print(f"{this.tags}") + print(f"{that.tags}") + if this.auto_collapse != that.auto_collapse: + print("The auto_collapse settings are different:") + print(f"{this.auto_collapse}") + print(f"{that.auto_collapse}") + return False + + for index, subsection in enumerate(this.subsections): + subsection_equality = check_section_equality(subsection, that.subsections[index]) + if not subsection_equality: + return False + + return True + + +def create_tmp_manifest(): + temp_service_config_path = os.path.join("/tmp", SERVICE_CONFIG_NAME) + if not os.path.exists(temp_service_config_path): + # Placing the service_manifest.yml in the tmp directory + shutil.copyfile(SERVICE_CONFIG_PATH, temp_service_config_path) + + +def remove_tmp_manifest(): + temp_service_config_path = os.path.join("/tmp", SERVICE_CONFIG_NAME) + if os.path.exists(temp_service_config_path): + os.remove(temp_service_config_path) + + +@pytest.fixture +def intezer_static_class_instance(): + create_tmp_manifest() + try: + from intezer_static import IntezerStatic + yield IntezerStatic() + finally: + remove_tmp_manifest() + + +@pytest.fixture +def dummy_completed_process_instance(): + class DummyCompletedProcess: + def __init__(self): + self.stdout = b"blah\nblah" + yield DummyCompletedProcess() + + +@pytest.fixture +def dummy_get_response_class(): + class DummyGetResponse: + def __init__(self, text): + self.text = text + + def json(self): + return {"status": self.text} + yield DummyGetResponse + + +@pytest.fixture +def dummy_api_interface_class(): + class DummyApiInterface: + @staticmethod + def get_safelist(): + return [] + return DummyApiInterface + + +@pytest.fixture +def dummy_request_class(): + + class DummyRequest(): + def __init__(self): + self.file_path = "blah" + self.file_name = "blah" + self.extracted = [] + + def add_extracted(self, path, name, description): + self.extracted.append({"path": path, "name": name, "description": description}) + + yield DummyRequest + + +class TestIntezerStatic: + @classmethod + def setup_class(cls): + # Placing the samples in the tmp directory + samples_path = os.path.join(TEST_DIR, "samples") + for sample in os.listdir(samples_path): + sample_path = os.path.join(samples_path, sample) + shutil.copyfile(sample_path, os.path.join("/tmp", sample)) + + @classmethod + def teardown_class(cls): + # Cleaning up the tmp directory + samples_path = os.path.join(TEST_DIR, "samples") + for sample in os.listdir(samples_path): + temp_sample_path = os.path.join("/tmp", sample) + os.remove(temp_sample_path) + + @staticmethod + def test_init(intezer_static_class_instance): + assert intezer_static_class_instance.client is None + + @staticmethod + def test_start(intezer_static_class_instance, dummy_api_interface_class, mocker): + from intezer_static import ALIntezerApi + mocker.patch.object(intezer_static_class_instance, "get_api_interface", return_value=dummy_api_interface_class) + intezer_static_class_instance.start() + assert isinstance(intezer_static_class_instance.client, ALIntezerApi) + assert True + + @staticmethod + def test_stop(intezer_static_class_instance): + intezer_static_class_instance.stop() + assert True + + @staticmethod + @pytest.mark.parametrize("sample", samples) + def test_execute(sample, intezer_static_class_instance, dummy_api_interface_class, dummy_get_response_class, mocker): + from assemblyline_v4_service.common.task import Task + from assemblyline.odm.messages.task import Task as ServiceTask + from assemblyline_v4_service.common.request import ServiceRequest + from json import loads + from intezer_static import ALIntezerApi + + mocker.patch.object(intezer_static_class_instance, "get_api_interface", return_value=dummy_api_interface_class) + intezer_static_class_instance.start() + + service_task = ServiceTask(sample) + task = Task(service_task) + task.service_config = { + "analysis_id": "", + } + intezer_static_class_instance._task = task + service_request = ServiceRequest(task) + intezer_static_class_instance.config["private_only"] = False + + mocker.patch.object(ALIntezerApi, "get_latest_analysis", return_value={"analysis_id": "blah"}) + mocker.patch.object(ALIntezerApi, "analyze_by_file", return_value="blah") + mocker.patch.object(ALIntezerApi, "get_iocs", return_value={"files": [], "network": []}) + mocker.patch.object(ALIntezerApi, "get_dynamic_ttps", return_value=[]) + mocker.patch.object(ALIntezerApi, "get_sub_analyses_by_id", return_value=[]) + + # Actually executing the sample + intezer_static_class_instance.execute(service_request) + + # Get the result of execute() from the test method + test_result = task.get_service_result() + + # Get the assumed "correct" result of the sample + correct_result_path = os.path.join(TEST_DIR, "results", task.file_name + ".json") + with open(correct_result_path, "r") as f: + correct_result = loads(f.read()) + f.close() + + # Assert that the appropriate sections of the dict are equal + + # Avoiding unique items in the response + test_result_response = test_result.pop("response") + correct_result_response = correct_result.pop("response") + assert test_result == correct_result + + # Comparing everything in the response except for the service_completed and the output.json supplementary + test_result_response["milestones"].pop("service_completed") + correct_result_response["milestones"].pop("service_completed") + correct_result_response.pop("supplementary") + test_result_response.pop("supplementary") + correct_result_response.pop("service_context") + test_result_response.pop("service_context") + assert test_result_response == correct_result_response + + # Code coverage + task.service_config = { + "analysis_id": "blah", + } + intezer_static_class_instance._task = task + service_request = ServiceRequest(task) + intezer_static_class_instance.execute(service_request) + + task.service_config = {"analysis_id": ""} + intezer_static_class_instance.config["is_on_premise"] = False + mocker.patch.object(ALIntezerApi, "get_latest_analysis", return_value={"verdict": "not_supported"}) + mocker.patch.object(ALIntezerApi, "get_dynamic_ttps", return_value=[]) + intezer_static_class_instance.execute(service_request) + + mocker.patch.object(ALIntezerApi, "get_latest_analysis", return_value={"verdict": "failed"}) + intezer_static_class_instance.execute(service_request) + + @staticmethod + def test_get_analysis_metadata(intezer_static_class_instance, dummy_api_interface_class, mocker): + from intezer_static import ALIntezerApi + mocker.patch.object(intezer_static_class_instance, "get_api_interface", return_value=dummy_api_interface_class) + intezer_static_class_instance.start() + + analysis_metadata = {"analysis_id": "blah", "verdict": "malicious"} + mocker.patch.object(ALIntezerApi, "get_latest_analysis", return_value=analysis_metadata) + assert intezer_static_class_instance._get_analysis_metadata("", "blah") == analysis_metadata + assert intezer_static_class_instance._get_analysis_metadata( + "blah", "blah") == {"analysis_id": "blah", "verdict": None} + + @staticmethod + @pytest.mark.parametrize("details, uninteresting_keys, expected_output", + [ + ({}, [], {}), + ({"a": "b"}, [], {"a": "b"}), + ({"a": "b"}, ["a"], {}), + ] + ) + def test_process_details(details, uninteresting_keys, expected_output): + from intezer_static import IntezerStatic + assert IntezerStatic._process_details(details, uninteresting_keys) == expected_output + + @staticmethod + def test_set_heuristic_by_verdict(intezer_static_class_instance): + from assemblyline_v4_service.common.result import ResultSection + result_section = ResultSection("blah") + intezer_static_class_instance._set_heuristic_by_verdict(result_section, None) + assert result_section.heuristic is None + + intezer_static_class_instance._set_heuristic_by_verdict(result_section, "blah") + assert result_section.heuristic is None + + intezer_static_class_instance._set_heuristic_by_verdict(result_section, "trusted") + assert result_section.heuristic is None + + intezer_static_class_instance._set_heuristic_by_verdict(result_section, "malicious") + assert result_section.heuristic.heur_id == 1 + + result_section = ResultSection("blah") + intezer_static_class_instance._set_heuristic_by_verdict(result_section, "known_malicious") + assert result_section.heuristic.heur_id == 1 + + result_section = ResultSection("blah") + intezer_static_class_instance._set_heuristic_by_verdict(result_section, "suspicious") + assert result_section.heuristic.heur_id == 2 + + @staticmethod + def test_process_iocs(intezer_static_class_instance, dummy_api_interface_class, mocker): + from intezer_static import ALIntezerApi + from intezer_sdk.api import IntezerApi + from assemblyline_v4_service.common.result import ResultSection + from requests import HTTPError + mocker.patch.object(intezer_static_class_instance, "get_api_interface", return_value=dummy_api_interface_class) + intezer_static_class_instance.start() + parent_res_sec = ResultSection("blah") + file_verdict_map = {} + + mocker.patch.object(ALIntezerApi, "get_iocs", return_value={"files": [], "network": []}) + intezer_static_class_instance._process_iocs("blah", file_verdict_map, parent_res_sec) + assert parent_res_sec.subsections == [] + assert file_verdict_map == {} + + mocker.patch.object(IntezerApi, "get_iocs", side_effect=HTTPError("FORBIDDEN")) + intezer_static_class_instance._process_iocs("blah", file_verdict_map, parent_res_sec) + assert parent_res_sec.subsections == [] + assert file_verdict_map == {} + + mocker.patch.object( + ALIntezerApi, "get_iocs", + return_value={"files": [{"sha256": "blah", "verdict": "malicious"}], + "network": [{"ioc": "1.1.1.1", "type": "ip"}, + {"ioc": "blah.com", "type": "domain"}]}) + intezer_static_class_instance._process_iocs("blah", file_verdict_map, parent_res_sec) + correct_res_sec = ResultSection("Network Communication Observed") + correct_res_sec.add_tag("network.dynamic.ip", "1.1.1.1") + correct_res_sec.add_tag("network.dynamic.domain", "blah.com") + correct_res_sec.add_line("IOC: 1.1.1.1") + correct_res_sec.add_line("IOC: blah.com") + assert check_section_equality(parent_res_sec.subsections[0], correct_res_sec) + assert file_verdict_map == {"blah": "malicious"} + + @staticmethod + def test_process_ttps(intezer_static_class_instance, dummy_api_interface_class, mocker): + from intezer_static import ALIntezerApi + from intezer_sdk.api import IntezerApi + from intezer_sdk.errors import UnsupportedOnPremiseVersion + from assemblyline_v4_service.common.result import ResultSection, ResultTableSection, TableRow + from requests import HTTPError + mocker.patch.object(intezer_static_class_instance, "get_api_interface", return_value=dummy_api_interface_class) + intezer_static_class_instance.start() + parent_res_sec = ResultSection("blah") + + mocker.patch.object(ALIntezerApi, "get_dynamic_ttps", return_value=[]) + intezer_static_class_instance._process_ttps("blah", parent_res_sec) + assert parent_res_sec.subsections == [] + + mocker.patch.object(IntezerApi, "get_dynamic_ttps", side_effect=HTTPError("FORBIDDEN")) + intezer_static_class_instance._process_ttps("blah", parent_res_sec) + assert parent_res_sec.subsections == [] + + mocker.patch.object(IntezerApi, "get_dynamic_ttps", side_effect=UnsupportedOnPremiseVersion()) + intezer_static_class_instance._process_ttps("blah", parent_res_sec) + assert parent_res_sec.subsections == [] + + mocker.patch.object(ALIntezerApi, "get_dynamic_ttps", + return_value=[{"name": "blah", "description": "blah", "data": [], "severity": 1}] + ) + intezer_static_class_instance._process_ttps("blah", parent_res_sec) + correct_res_sec = ResultSection("Signature: blah", "blah") + correct_res_sec.set_heuristic(4) + correct_res_sec.heuristic.add_signature_id("blah", 10) + assert check_section_equality(parent_res_sec.subsections[0].subsections[0], correct_res_sec) + + parent_res_sec = ResultSection("blah") + mocker.patch.object(ALIntezerApi, "get_dynamic_ttps", return_value=[ + {"name": "InjectionInterProcess", "description": "blah", "data": [], "severity": 1}]) + intezer_static_class_instance._process_ttps("blah", parent_res_sec) + correct_res_sec = ResultSection("Signature: InjectionInterProcess", "blah") + correct_res_sec.set_heuristic(7) + correct_res_sec.heuristic.add_signature_id("InjectionInterProcess", 10) + correct_res_sec.heuristic.add_attack_id("T1055") + assert check_section_equality(parent_res_sec.subsections[0].subsections[0], correct_res_sec) + + parent_res_sec = ResultSection("blah") + mocker.patch.object(ALIntezerApi, "get_dynamic_ttps", return_value=[ + {"name": "enumerates_running_processes", "description": "blah", "data": [{"wow": "print me!"}], "severity": 1}]) + intezer_static_class_instance._process_ttps("blah", parent_res_sec) + correct_res_sec = ResultSection("Signature: enumerates_running_processes", "blah") + correct_res_sec.set_heuristic(8) + correct_res_sec.heuristic.add_signature_id("enumerates_running_processes", 10) + correct_res_sec.heuristic.add_attack_id("T1057") + assert check_section_equality(parent_res_sec.subsections[0].subsections[0], correct_res_sec) + + parent_res_sec = ResultSection("blah") + mocker.patch.object(ALIntezerApi, "get_dynamic_ttps", + return_value=[ + { + "name": "blah", + "description": "blah", + "data": + [ + {"IP": "blah 2.2.2.2 blah"}, + ], + "severity": 1 + } + ] + ) + intezer_static_class_instance._process_ttps("blah", parent_res_sec) + correct_res_sec = ResultSection("Signature: blah", "blah") + correct_res_sec.add_line("\tIP: blah 2.2.2.2 blah") + correct_res_sec.set_heuristic(4) + correct_res_sec.heuristic.add_signature_id("blah", 10) + correct_ioc_res_sec = ResultTableSection("IOCs found in signature marks") + correct_ioc_res_sec.add_row(TableRow(ioc_type="ip", ioc="2.2.2.2")) + correct_ioc_res_sec.add_tag("network.dynamic.ip", "2.2.2.2") + correct_res_sec.add_subsection(correct_ioc_res_sec) + assert check_section_equality(parent_res_sec.subsections[0].subsections[0], correct_res_sec) + + @staticmethod + def test_process_ttp_data(intezer_static_class_instance): + from assemblyline_v4_service.common.result import ResultSection, ResultTableSection, TableRow + sig_res = ResultSection("blah") + ioc_table = ResultTableSection("blah") + + intezer_static_class_instance._process_ttp_data( + [ + {"wow": "print me!"}, + {"a": ""}, + {"IP": "1.1.1.1"}, + {"IP": "blah 2.2.2.2 blah"}, + {"command": "do bad thing"}, + {"DeletedFile": "blah.exe"}, + {"key": "HKEY\\Registry\\Key\\Path"}, + {"http_request": "http://blah.com/blah"}, + {"domain": "blah.ca"}, + {"domain": "blah.ca"}, + {"b": "blah"*150}, + ], sig_res, ioc_table, + ) + correct_res_sec = ResultSection("blah") + correct_res_sec.add_lines( + ["\twow: print me!", "\tIP: 1.1.1.1", "\tIP: blah 2.2.2.2 blah", "\tcommand: do bad thing", + "\tDeletedFile: blah.exe", "\tkey: HKEY\\Registry\\Key\\Path", "\thttp_request: http://blah.com/blah", + "\tdomain: blah.ca", + "\tb: blahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblahblah..."]) + correct_res_sec.add_tag("network.dynamic.ip", "1.1.1.1") + correct_res_sec.add_tag("dynamic.process.command_line", "do bad thing") + correct_res_sec.add_tag("dynamic.process.file_name", "blah.exe") + correct_res_sec.add_tag("dynamic.registry_key", "HKEY\\Registry\\Key\\Path") + correct_res_sec.add_tag("network.dynamic.domain", "blah.ca") + correct_ioc_res_sec = ResultTableSection("blah") + correct_ioc_res_sec.add_row(TableRow(ioc_type="ip", ioc="2.2.2.2")) + correct_ioc_res_sec.add_row(TableRow(ioc_type="domain", ioc="blah.com")) + correct_ioc_res_sec.add_row(TableRow(ioc_type="uri", ioc="http://blah.com/blah")) + correct_ioc_res_sec.add_row(TableRow(ioc_type="uri_path", ioc="/blah")) + correct_ioc_res_sec.add_tag("network.dynamic.ip", "2.2.2.2") + correct_ioc_res_sec.add_tag("network.dynamic.domain", "blah.com") + correct_ioc_res_sec.add_tag("network.dynamic.uri", "http://blah.com/blah") + correct_ioc_res_sec.add_tag("network.dynamic.uri_path", "/blah") + assert check_section_equality(sig_res, correct_res_sec) + assert check_section_equality(ioc_table, correct_ioc_res_sec) + + @staticmethod + def test_handle_subanalyses(intezer_static_class_instance, dummy_request_class, dummy_api_interface_class, mocker): + from assemblyline_v4_service.common.result import ResultSection, ResultKeyValueSection, ResultProcessTreeSection, ProcessItem + mocker.patch.object(intezer_static_class_instance, "get_api_interface", return_value=dummy_api_interface_class) + intezer_static_class_instance.start() + + mocker.patch.object(intezer_static_class_instance.client, "get_sub_analyses_by_id", return_value=[]) + parent_result_section = ResultSection("blah") + intezer_static_class_instance._handle_subanalyses( + dummy_request_class(), "blah", "blah", {}, parent_result_section) + assert parent_result_section.subsections == [] + + mocker.patch.object( + intezer_static_class_instance.client, + "get_sub_analyses_by_id", + return_value=[ + { + "sub_analysis_id": "blah", + "extraction_info": { + "processes": [ + { + "process_id": 124, + "process_path": "blah2.exe", + "parent_process_id": 321, + "module_path": "blah2.exe" + }, + ] + }, + "source": "blah_blah", + "sha256": "blah2" + } + ] + ) + mocker.patch.object( + intezer_static_class_instance.client, + "get_sub_analysis_code_reuse_by_id", + return_value={ + "families": [{"reused_gene_count": 2}], + "blah": "blah" + } + ) + mocker.patch.object( + intezer_static_class_instance.client, + "get_sub_analysis_metadata_by_id", + return_value={ + "source": "blah", + "blah": "blah" + } + ) + mocker.patch.object(intezer_static_class_instance, "_process_families") + mocker.patch.object(intezer_static_class_instance.client, "download_file_by_sha256", return_value=True) + correct_result_section = ResultKeyValueSection("Subanalysis report for blah2, extracted via blah blah") + correct_result_section.update_items({"blah": "blah"}) + correct_code_reuse = ResultKeyValueSection("Code reuse detected") + correct_code_reuse.update_items({"blah": "blah"}) + correct_result_section.add_subsection(correct_code_reuse) + correct_process_tree = ResultProcessTreeSection("Spawned Process Tree") + correct_process_tree.add_process(ProcessItem(pid=124, name="blah2.exe", cmd=None)) + correct_process_tree.add_tag("dynamic.processtree_id", "blah2.exe") + correct_process_tree.add_tag("dynamic.process.file_name", "blah2.exe") + dummy_request_class_instance = dummy_request_class() + intezer_static_class_instance._handle_subanalyses( + dummy_request_class_instance, "blah", "blah", {}, parent_result_section) + assert check_section_equality(parent_result_section.subsections[0], correct_result_section) + assert check_section_equality(parent_result_section.subsections[1], correct_process_tree) + assert dummy_request_class_instance.extracted[0]["description"] == "Extracted via blah blah" + assert dummy_request_class_instance.extracted[0]["name"] == "blah2.sample" + + @staticmethod + @pytest.mark.parametrize("families, file_verdict_map, correct_tags, correct_fvp", + [([], + {}, + [], + {}), + ([{"blah": "blah", "family_type": "blah", "family_name": "blah"}], + {}, + [("attribution.family", "blah")], + {}), + ([{"family_id": "blah", "family_type": "blah", "family_name": "blah"}], + {}, + [("attribution.family", "blah")], + {}), + ([{"family_id": "blah", "family_type": "application", "family_name": "blah"}], + {}, + [], + {}), + ([{"family_id": "blah", "family_type": "malware", "family_name": "blah"}], + {}, + [("attribution.family", "blah")], + {"blah": "malicious"}), + ([{"family_id": "blah", "family_type": "malware", "family_name": "blah"}], + {"blah": "blah"}, + [("attribution.family", "blah")], + {"blah": "malicious"}), + ([{"family_id": "blah", "family_type": "malware", "family_name": "blah"}], + {"blah": "malicious"}, + [("attribution.family", "blah")], + {"blah": "malicious"}), + ([{"family_id": "blah", "family_type": "packer", "family_name": "blah"}], + {}, + [("attribution.family", "blah")], + {"blah": "suspicious"}), + ([{"family_id": "blah", "family_type": "packer", "family_name": "blah"}], + {"blah": "malicious"}, + [("attribution.family", "blah")], + {"blah": "malicious"}), ]) + def test_process_families(families, file_verdict_map, correct_tags, correct_fvp, intezer_static_class_instance): + from assemblyline_v4_service.common.result import ResultSection, ResultTableSection, TableRow + + parent_section = ResultSection("blah") + intezer_static_class_instance._process_families(families, "blah", file_verdict_map, parent_section) + + if not families: + assert parent_section.subsections == [] + else: + correct_result_section = ResultTableSection("Family Details") + for family in families: + if "family_id" in family: + family.pop("family_id") + correct_result_section.add_row(TableRow(**family)) + for tag in correct_tags: + correct_result_section.add_tag(tag[0], tag[1]) + + assert check_section_equality(parent_section.subsections[0], correct_result_section) + assert file_verdict_map == correct_fvp + + @staticmethod + def test_process_extraction_info(intezer_static_class_instance): + from assemblyline_v4_service.common.dynamic_service_helper import SandboxOntology + so = SandboxOntology() + + processes = [ + { + "process_id": 123, + "process_path": "blah.exe", + "parent_process_id": 321, + "module_path": "blah.exe" + }, + { + "process_id": 124, + "process_path": "blah2.exe", + "parent_process_id": 321, + "module_path": "blah2.dll,blah" + }, + { + "process_id": 123, + "process_path": "blah.exe", + "parent_process_id": 321, + "module_path": "blah.dll,blah" + }, + { + "process_id": 321, + "process_path": "blah3.exe", + "parent_process_id": 322, + "module_path": "blah3.exe" + }, + ] + process_path_set = set() + command_line_set = set() + correct_processes = [ + { + "start_time": float("-inf"), + "end_time": float("inf"), + "objectid": { + "tag": "blah.exe", + "treeid": None, + "processtree": None, + "time_observed": float("-inf") + }, + "pobjectid": { + "tag": "blah3.exe", + "treeid": None, + "processtree": None, + "time_observed": float("-inf") + }, + "pimage": "blah3.exe", + "pcommand_line": None, + "ppid": 321, + "pid": 123, + "image": "blah.exe", + "command_line": "blah.exe blah.dll,blah", + "integrity_level": None, + "image_hash": None, + "original_file_name": None, + }, + { + "start_time": float("-inf"), + "end_time": float("inf"), + "objectid": { + "tag": "blah2.exe", + "treeid": None, + "processtree": None, + "time_observed": float("-inf") + }, + "pobjectid": { + "tag": "blah3.exe", + "treeid": None, + "processtree": None, + "time_observed": float("-inf") + }, + "pimage": "blah3.exe", + "pcommand_line": None, + "ppid": 321, + "pid": 124, + "image": "blah2.exe", + "command_line": "blah2.exe blah2.dll,blah", + "integrity_level": None, + "image_hash": None, + "original_file_name": None, + }, + { + "start_time": float("-inf"), + "end_time": float("inf"), + "objectid": { + "tag": "blah3.exe", + "treeid": None, + "processtree": None, + "time_observed": float("-inf") + }, + "pobjectid": { + "tag": None, + "treeid": None, + "processtree": None, + "time_observed": None + }, + "pimage": None, + "pcommand_line": None, + "ppid": 322, + "pid": 321, + "image": "blah3.exe", + "command_line": None, + "integrity_level": None, + "image_hash": None, + "original_file_name": None, + }, + ] + intezer_static_class_instance._process_extraction_info(processes, process_path_set, command_line_set, so) + for index, process in enumerate(so.get_processes()): + process_as_primitives = process.as_primitives() + process_as_primitives["objectid"].pop("guid") + process_as_primitives["pobjectid"].pop("guid") + assert process_as_primitives == correct_processes[index] + assert process_path_set == {"blah.dll,blah", "blah2.dll,blah", "blah2.exe", "blah.exe", "blah3.exe"} + assert command_line_set == {"blah.exe blah.dll,blah", "blah2.exe blah2.dll,blah"}