diff --git a/drakrun/drakrun/lib/postprocessing/__init__.py b/drakrun/drakrun/lib/postprocessing/__init__.py index a6f961d2..ee2ba3b6 100644 --- a/drakrun/drakrun/lib/postprocessing/__init__.py +++ b/drakrun/drakrun/lib/postprocessing/__init__.py @@ -6,6 +6,7 @@ from .compress_ipt import compress_ipt from .crop_dumps import crop_dumps from .generate_graphs import generate_graphs +from .generate_report import build_report from .generate_wireshark_key_file import generate_wireshark_key_file from .index_logs import index_logs from .process_apimon_log import process_apimon_log @@ -53,6 +54,11 @@ class PostprocessPlugin(NamedTuple): ], generates=["ttps.json"], ), + PostprocessPlugin( + function=build_report, + requires=[], + generates=["report.json"], + ), PostprocessPlugin(function=crop_dumps, requires=["dumps"], generates=["dumps.zip"]), PostprocessPlugin(function=compress_ipt, requires=["ipt"], generates=["ipt.zip"]), PostprocessPlugin(function=index_logs, requires=[], generates=["index"]), diff --git a/drakrun/drakrun/lib/postprocessing/generate_report.py b/drakrun/drakrun/lib/postprocessing/generate_report.py new file mode 100644 index 00000000..644735ff --- /dev/null +++ b/drakrun/drakrun/lib/postprocessing/generate_report.py @@ -0,0 +1,188 @@ +import itertools +from datetime import datetime, timezone +from pathlib import Path +from typing import Dict, Iterator, List, Optional + +import orjson + + +def epoch_to_timestring(unix_time: Optional[float]) -> Optional[str]: + # This method converts a unix epoch time into a formated time string. + # Example: + # Input: 1716998460.000 + # Return: '2024-05-29 17:01:00' + if not unix_time: + # Sometimes the time in the logs would be zero or None + return None + + time = datetime.fromtimestamp(unix_time, tz=timezone.utc) + return time.isoformat() + + +def parse_metadata(metadata_file: Path) -> Dict: + # This method parses the metadata.json file + # Unix epoch timestamps are converted to printable time strings as well. + with metadata_file.open("r") as f: + metadata = orjson.loads(f.read()) + + metadata["time_started"] = epoch_to_timestring(metadata["time_started"]) + metadata["time_finished"] = epoch_to_timestring(metadata["time_finished"]) + + return metadata + + +def process_key(ppid: int, pid: int) -> str: + # This method defines the way we use to address and differentiate between processes. + # The convention used by default is ppid_pid. + return f"{ppid}_{pid}" + + +def parse_apicall(apicall: Dict) -> Dict: + # This method takes in an apimon entry and fetches the necessary information from it. + # Unix epoch times are converted to printable time strings. + return { + "TimeStamp": epoch_to_timestring(float(apicall["TimeStamp"])), + "CalledFrom": apicall["CalledFrom"], + "Method": apicall["Method"], + "ReturnValue": apicall["ReturnValue"], + "Argument": [arg.split("=", maxsplit=1)[1] for arg in apicall["Arguments"]], + } + + +def parse_apimon(processes: Dict, apimon_file: Path) -> None: + # This method parses each entry of the apimon.log file and appends + # it to the appropriate process in the report. + with apimon_file.open("r", errors="ignore") as f: + for line in f: + call = orjson.loads(line) + if call["Event"] == "api_called": + pkey = process_key(call["PPID"], call["PID"]) + processes[pkey]["api_calls"].append(parse_apicall(call)) + + for pkey, process in processes.items(): + grouped_api_calls = [ + list(j) + for i, j in itertools.groupby( + process["api_calls"], + key=lambda call: ( + call["CalledFrom"], + call["Method"], + call["ReturnValue"], + call["Argument"], + ), + ) + ] + api_calls = list() + for calls_group in grouped_api_calls: + api_call = dict() + api_call.update(calls_group[0] | {"Repeated": len(calls_group) - 1}) + api_calls.append(api_call) + process["api_calls"] = api_calls + + +def parse_ttps(processes: Dict, ttps_file: Path) -> None: + # This method parses the TTPs in the ttps.json file and appends + # it to the appropriate process in the report. + with ttps_file.open("r") as f: + for line in f: + ttp: Dict = orjson.loads(line) + occurrences = ttp.pop("occurrences") + for occurrence in occurrences: + pkey = process_key(occurrence["ppid"], occurrence["pid"]) + processes[pkey]["ttps"].append(ttp) + + +def parse_memdumps(processes: Dict, memdumps_file: Path) -> None: + # This method parses the memdump.log file and appends all memory dump + # information into the appropriate process in the report + with memdumps_file.open("r") as f: + for line in f: + memdump: Dict = orjson.loads(line) + pkey = process_key(memdump["PPID"], memdump["PID"]) + processes[pkey]["memdumps"].append( + { + "reason": memdump["DumpReason"], + "addr": memdump["DumpAddr"], + "size": memdump["DumpSize"], + "filename": memdump["DumpFilename"], + "count": memdump["DumpsCount"], + } + ) + + +def parse_processtree(processtree_file: Path) -> Dict[str, Dict]: + # This method extracts all the processes and their associated information + # from the process_tree.json file. + def rec(processes: List[Dict], parent=0) -> Iterator[Dict]: + # This is a helper recursive function that parses the process tree + for process in processes: + yield { + "pid": process["pid"], + "ppid": parent, + "procname": process["procname"], + "args": process["args"], + "ts_from": epoch_to_timestring(process["ts_from"]), + "ts_to": epoch_to_timestring(process["ts_to"]), + "children": [ + process_key(process["pid"], child["pid"]) + for child in process["children"] + ], + "api_calls": [], # to be filled later by parse_apimon() + "ttps": [], # to be filled later by parse_ttps() + "memdumps": [], # to be filled later by parse_memdumps() + } + yield from rec(process["children"], parent=process["pid"]) + + with processtree_file.open("r") as f: + processtree = orjson.loads(f.read()) + + return { + process_key(process["ppid"], process["pid"]): process + for process in rec(processtree) + } + + +def get_metadata(analysis_dir: Path) -> Dict: + # Currently, all metadata is contained in the metadata.json file + return parse_metadata(analysis_dir / "metadata.json") + + +def get_processes(analysis_dir: Path) -> Dict: + # generate a dictionary of indexed processes + processes = parse_processtree(analysis_dir / "process_tree.json") + # parse api calls into the indexed process dictionary + if (analysis_dir / "apimon.log").is_file(): + parse_apimon(processes, analysis_dir / "apimon.log") + # parse ttps into the indexed process dictionary + if (analysis_dir / "ttps.json").is_file(): + parse_ttps(processes, analysis_dir / "ttps.json") + # parse memory dumps log into the indexed process dictionary + if (analysis_dir / "memdump.log").is_file(): + parse_memdumps(processes, analysis_dir / "memdump.log") + + return processes + + +def build_report(analysis_dir: Path) -> None: + report = { + "info": get_metadata(analysis_dir), + "processes": get_processes(analysis_dir), + } + + with (analysis_dir / "report.json").open("wb") as f: + f.write(orjson.dumps(report, option=orjson.OPT_INDENT_2)) + + +if __name__ == "__main__": + from sys import argv + + if len(argv) < 2: + print("missing analysis directory") + exit(1) + + analysis_dir = Path(argv[1]) + if not analysis_dir.exists() or not any(analysis_dir.iterdir()): + print("analysis directory is empty or non-existant") + exit(1) + + build_report(analysis_dir)