diff --git a/examples/exampledata/rules/labeler/generic/example_rule.yml b/examples/exampledata/rules/labeler/generic/example_rule.yml index 1bee7c0b6..a053c0b96 100644 --- a/examples/exampledata/rules/labeler/generic/example_rule.yml +++ b/examples/exampledata/rules/labeler/generic/example_rule.yml @@ -1,7 +1,6 @@ -filter: "test_label: execute" +filter: 'winlog.event_data.param2: "stop"' labeler: - id: labeler-1352bc0a-53ae-4740-bb9e-1e865f63375f label: action: - - execute + - terminate description: "..." diff --git a/examples/exampledata/rules/pre_detector/generic/example_rule.yml b/examples/exampledata/rules/pre_detector/generic/example_rule.yml index 22974b4f6..eb477620e 100644 --- a/examples/exampledata/rules/pre_detector/generic/example_rule.yml +++ b/examples/exampledata/rules/pre_detector/generic/example_rule.yml @@ -1,10 +1,9 @@ -filter: "test_pre_detector" +filter: 'tags: "1" AND inp.message: "1"' pre_detector: - id: RULE_ONE_ID + id: RULE_ONE_ID_1 title: RULE_ONE severity: critical mitre: - attack.test1 - - attack.test2 case_condition: directly -description: "..." + diff --git a/logprep/abc/processor.py b/logprep/abc/processor.py index b1980baed..4f3ef6b12 100644 --- a/logprep/abc/processor.py +++ b/logprep/abc/processor.py @@ -120,7 +120,6 @@ class Config(Component.Config): __slots__ = [ "rule_class", - "has_custom_tests", "_event", "_specific_tree", "_generic_tree", @@ -130,7 +129,6 @@ class Config(Component.Config): ] rule_class: "Rule" - has_custom_tests: bool _event: dict _specific_tree: RuleTree _generic_tree: RuleTree @@ -155,7 +153,6 @@ def __init__(self, name: str, configuration: "Processor.Config"): generic_rules_targets=self._config.generic_rules, specific_rules_targets=self._config.specific_rules, ) - self.has_custom_tests = False self.result = None self._bypass_rule_tree = False if os.environ.get("LOGPREP_BYPASS_RULE_TREE"): diff --git a/logprep/processor/clusterer/processor.py b/logprep/processor/clusterer/processor.py index cc5e65162..914a8c1a5 100644 --- a/logprep/processor/clusterer/processor.py +++ b/logprep/processor/clusterer/processor.py @@ -75,7 +75,6 @@ class Config(Processor.Config): def __init__(self, name: str, configuration: Processor.Config): super().__init__(name=name, configuration=configuration) self.sps = SignaturePhaseStreaming() - self.has_custom_tests = True self._last_rule_id = math.inf self._last_non_extracted_signature = None diff --git a/logprep/util/auto_rule_tester/auto_rule_tester.py b/logprep/util/auto_rule_tester/auto_rule_tester.py index 99aeaf45e..c1837ae3b 100644 --- a/logprep/util/auto_rule_tester/auto_rule_tester.py +++ b/logprep/util/auto_rule_tester/auto_rule_tester.py @@ -16,7 +16,7 @@ `raw` contains an input log message and `processed` the corresponding processed result. When using multi-rules it may be necessary to restrict tests to specific rules in the file. -This can be achieved by the field `target_rule_idx`. +This can be achieved by the field `target_rule_idx` - now mandatory. The value of that field corresponds to the index of the rule in the JSON list of multi-rules (starting with 0). @@ -50,17 +50,17 @@ import re import sys import tempfile -import traceback from collections import OrderedDict, defaultdict -from contextlib import redirect_stdout +from collections.abc import Iterable from difflib import ndiff -from io import StringIO from logging import getLogger -from os import path, walk +from os import path +from pathlib import Path from pprint import pprint -from typing import TYPE_CHECKING, TextIO, Tuple +from typing import TYPE_CHECKING, Union from colorama import Fore +from more_itertools import nth from ruamel.yaml import YAML, YAMLError from logprep.factory import Factory @@ -84,11 +84,11 @@ def __init__(self, message: str): super().__init__(f"AutoRuleTester ({message}): ") -class PreDetectionExtraHandler: +class ProcessorExtensions: """Used to handle special demands for PreDetector auto-tests.""" @staticmethod - def _get_errors(processor: "Processor", extra_output: tuple): + def _get_errors(processor: "Processor", extra_output: list): pd_errors = [] pd_warnings = [] if isinstance(processor, PreDetector): @@ -119,9 +119,7 @@ def _get_errors(processor: "Processor", extra_output: tuple): ) return pd_errors, pd_warnings - def update_errors( - self, processor: PreDetector, extra_output: tuple, errors: list, warnings: list - ): + def update_errors(self, processor: PreDetector, extra_output: list, problems: dict): """Create aggregating logger. Parameters @@ -130,22 +128,116 @@ def update_errors( Processor that should be of type PreDetector. extra_output : dict Extra output containing MITRE information coming from PreDetector. - errors : list - List of errors. - warnings : list - List of warnings. + problems : dict + Warnings and errors. """ mitre_errors, id_warnings = self._get_errors(processor, extra_output) - errors += mitre_errors - warnings += id_warnings + problems["errors"].extend(mitre_errors) + problems["warnings"].extend(id_warnings) + + def print_rules(self, rules, t_idx=None): + """Iterate through every printable and assign right processing resulting in + a coloured output + + Parameters + ---------- + rules : dict + key and rule + t_idx : int, optional + Optional index to print correct element of , by default None + """ + print() + for key, rule in rules.items(): + self.print_diff_test(key, rule, t_idx) + + @staticmethod + def print_diff_test(key, rule, t_idx=None): + """Determine right processing for printable: no iterable, indexed and non + index queried iterable + + Parameters + ---------- + key : str + kind of message + rule : str or list + printable message + t_idx : int, optional + associated index with printable, by default None + """ + if not isinstance(rule, Iterable): + diff = f"{key}: {rule}" + ProcessorExtensions.color_based_print(diff) + else: + if t_idx is not None: + diff = f"{key}: {rule[t_idx]}" + ProcessorExtensions.color_based_print(diff) + else: + for item in rule: + diff = f"{key}: {item}" + ProcessorExtensions.color_based_print(diff) + + @staticmethod + def color_based_print(item): + """Print coloured status based on tokens + + Parameters + ---------- + item : str + status message + """ + item = item.replace("]", "").replace("[", "") + if ( + item.startswith((": - ", "- ")) + or item.startswith("error") + or item.startswith("without tests") + ): + print_fcolor(Fore.RED, item) + elif item.startswith((": + ", "+ ")) or item.startswith("with tests"): + print_fcolor(Fore.GREEN, item) + elif item.startswith((": ? ", "? ")): + print_fcolor(Fore.WHITE, "\n" + item) + elif item.startswith("> "): + print_fcolor(Fore.MAGENTA, "\n" + item) + elif item.lstrip().startswith("~ ") or item.startswith("warning"): + print_fcolor(Fore.YELLOW, item) + else: + print_fcolor(Fore.CYAN, item) + + def load_json_or_yaml(self, file_path) -> Union[list, dict]: + """load json or yaml depending on suffix + + Parameters + ---------- + file_path : str + path to file + + Returns + ------- + Union[list, dict] + wether json or yaml + + Raises + ------ + ValueError + error when file cant be decoded + """ + try: + with open(file_path, "r", encoding="utf-8") as file: + if file_path.endswith(".yml"): + return list(yaml.load_all(file)) + else: + return json.load(file) + + except (json.JSONDecodeError, YAMLError) as error: + raise ValueError(f"Error decoding {file_path}: {str(error)}") from error class AutoRuleTester: """Used to perform auto-tests for rules.""" - def __init__(self, config): - with open(config, "r", encoding="utf8") as yaml_file: + def __init__(self, config_path: str): + with open(config_path, "r", encoding="utf-8") as yaml_file: self._config_yml = yaml.load(yaml_file) self._empty_rules_dirs = [tempfile.mkdtemp()] @@ -158,105 +250,130 @@ def __init__(self, config): self._success = True - self._successful_rule_tests_cnt = 0 - self._failed_rule_tests_cnt = 0 - self._warning_cnt = 0 - - self._pd_extra = PreDetectionExtraHandler() - - self._filename_printed = False + self._result = { + "+ Successful Tests": 0, + "- Failed Tests": 0, + "~ Warning": 0, + "Rule Test Coverage": 0.0, + "Total Tests": 0, + } + self._problems = {"warnings": [], "errors": []} + self._pd_extra = ProcessorExtensions() self._gpr = GrokPatternReplacer(self._config_yml) - self._custom_tests_output = "" - self._custom_tests = [] - self._missing_custom_tests = [] + self._filename_printed = False + self._rule_cnt = 0 self._logger = getLogger() self._logger.disabled = True def run(self): - """Perform auto-tests.""" + """Perform auto-tests. Main entry""" rules_dirs = self._get_rule_dirs_by_processor_name() rules_pn = self._get_rules_per_processor_name(rules_dirs) self._run_if_any_rules_exist(rules_pn) - def _run_if_any_rules_exist(self, rules_pn: dict): - if not self._has_rules(rules_pn): - print_fcolor(Fore.YELLOW, "\nThere are no rules within any of the rules directories!") - else: - self._run_tests_for_rules(rules_pn) + def _run_if_any_rules_exist(self, rules_pn: dict) -> None: + """Check if any rules exist in given path, then start rule tests depending on that. - def _run_tests_for_rules(self, rules_pn: dict): - rule_test_coverage = self._check_which_rule_files_miss_tests(rules_pn) - self._set_rules_dirs_to_empty() + Parameters + ---------- + rules_pn : dict + accumulated rules for each processor to operate on + """ + if any(processor_test_cfg["rules"] for processor_test_cfg in rules_pn.values()): + self._run_tests_for_rules(rules_pn) + else: + print_fcolor(Fore.YELLOW, "~\nThere are no rules within any of the rules directories!") - processors_ct, processors_no_ct = self._get_processors_split_by_custom_tests_existence() + def check_run_rule_tests(self, processor_cont, rules_pn) -> None: + """Verify dependencies for every preproccessor and if fullfilled, start the real rule tests. - for processor, processor_name in processors_ct.items(): + Parameters + ---------- + processor_cont : dict + proc object and name + rules_pn : dict + accumulated rules for each processor to operate on + """ + for processor, processor_name in processor_cont.items(): for rule_test in rules_pn[processor_name]["rules"]: - if processor and rule_test["tests"] or processor.has_custom_tests: - self._run_custom_rule_tests(processor, rule_test) - - if self._custom_tests: - print_fcolor(Fore.GREEN, "\nRULES WITH CUSTOM TESTS:") - for file_name in self._custom_tests: - print_fcolor(Fore.GREEN, file_name) - - if self._missing_custom_tests: - print_fcolor(Fore.RED, "\nRULES WITHOUT CUSTOM TESTS:") - for file_name in self._missing_custom_tests: - print_fcolor(Fore.RED, file_name) + if processor and rule_test["tests"]: + self._run_rule_tests(processor, rule_test) - print(self._custom_tests_output) + def _run_tests_for_rules(self, rules_pn: dict) -> None: + """Run various check and collect warnings, if not Successful exit. - for processor, processor_name in processors_no_ct.items(): - for rule_test in rules_pn[processor_name]["rules"]: - if processor and rule_test["tests"]: - self._run_file_rule_tests(processor, rule_test) - - print_fcolor(Fore.WHITE, "\nResults:") - print_fcolor(Fore.RED, f"Failed tests: {self._failed_rule_tests_cnt}") - print_fcolor(Fore.GREEN, f"Successful tests: {self._successful_rule_tests_cnt}") - print_fcolor( - Fore.CYAN, - f"Total tests: " f"{self._successful_rule_tests_cnt + self._failed_rule_tests_cnt}", - ) - print_fcolor(Fore.BLUE, f"Rule Test Coverage: {rule_test_coverage:.2f}%") - print_fcolor(Fore.YELLOW, f"Warnings: {self._warning_cnt}") + Parameters + ---------- + rules_pn : dict + accumulated rules for each processor to operate on + """ + self._check_which_rule_files_miss_tests(rules_pn) + processors_no_ct = self._get_processors() + self.check_run_rule_tests(processors_no_ct, rules_pn) + self._result["~ Warning"] += len(self._problems.get("warnings")) + self._pd_extra.print_rules(self._result) if not self._success: sys.exit(1) - @staticmethod - def _has_rules(rules_pn: dict) -> bool: - for processor_test_cfg in rules_pn.values(): - if processor_test_cfg["rules"]: - return True - return False - - def _get_processors_split_by_custom_tests_existence(self) -> Tuple[OrderedDict, OrderedDict]: - processors_with_custom_test = OrderedDict() - processors_without_custom_test = OrderedDict() - for processor_in_pipeline in self._config_yml["pipeline"]: - name, processor_cfg = next(iter(processor_in_pipeline.items())) - processor = self._get_processor_instance(name, processor_cfg, self._logger) - if processor.has_custom_tests: - processors_with_custom_test[processor] = name - else: - processors_without_custom_test[processor] = name - return processors_with_custom_test, processors_without_custom_test + def _run_rule_tests(self, processor: "Processor", rule_test: dict): + """Run various evaluations for the rules given - def _get_custom_test_mapping(self) -> dict: - processor_uses_own_tests = {} + Parameters + ---------- + processor : Processor + name + rule_test : dict + the rules to test + """ + temp_rule_path = path.join(self._empty_rules_dirs[0], f"{hashlib.sha256()}.json") + rules = self._get_rules(processor, rule_test) + + for rule_type, rules in rules.items(): + for idx, rule_dict in enumerate(rules): + self._prepare_test_eval(processor, rule_dict, rule_type, temp_rule_path) + self._eval_file_rule_test(rule_test, processor, idx) + remove_file_if_exists(temp_rule_path) + + def _get_processors(self) -> OrderedDict: + """Get processors in k/v-pairs + + Returns + ------- + OrderedDict + returns processors with meta data + """ + processors_without_custom_test = OrderedDict() for processor_in_pipeline in self._config_yml["pipeline"]: name, processor_cfg = next(iter(processor_in_pipeline.items())) - processor = self._get_processor_instance(name, processor_cfg, self._logger) - processor_uses_own_tests[processor_cfg["type"]] = processor.has_custom_tests - return processor_uses_own_tests + processor = self._get_processor_instance(name, processor_cfg) + processors_without_custom_test[processor] = name + return processors_without_custom_test @staticmethod def _get_rules(processor: "Processor", rule_test: dict) -> dict: + """Assign and get each type of rule + + Parameters + ---------- + processor : Processor + name + rule_test : dict + unassigned rules + + Returns + ------- + dict + ruleset + + Raises + ------ + AutoRuleTesterException + empty ruleset + """ if rule_test.get("rules"): return {"rules": rule_test.get("rules", [])} if rule_test.get("specific_rules") or rule_test.get("generic_rules"): @@ -271,6 +388,15 @@ def _get_rules(processor: "Processor", rule_test: dict) -> dict: ) def _load_rules(self, processor: "Processor", rule_type: str): + """Load each type of rules for each processor and set it up + + Parameters + ---------- + processor : Processor + proc obj + rule_type : str + type + """ if rule_type == "rules": processor.load_rules(self._empty_rules_dirs) elif rule_type == "specific_rules": @@ -281,200 +407,167 @@ def _load_rules(self, processor: "Processor", rule_type: str): def _prepare_test_eval( self, processor: "Processor", rule_dict: dict, rule_type: str, temp_rule_path: str - ): + ) -> None: + """Prepare test eval: Create rule file, then reset tree of processor and then load + the rules for the processor + + Parameters + ---------- + processor : Processor + processor + rule_dict : dict + rules for proc + rule_type : str + type of rules + temp_rule_path : str + temporary path to rules + """ self._create_rule_file(rule_dict, temp_rule_path) - self._reset_trees(processor) - self._clear_rules(processor) + self._reset(processor) self._load_rules(processor, rule_type) - def _run_custom_rule_tests(self, processor: "Processor", rule_test: dict): - temp_rule_path = path.join(self._empty_rules_dirs[0], f"{hashlib.sha256()}.json") - rules = self._get_rules(processor, rule_test) - - for rule_type, rules in rules.items(): - for rule_dict in rules: - self._prepare_test_eval(processor, rule_dict, rule_type, temp_rule_path) - self._eval_custom_rule_test(rule_test, processor) - remove_file_if_exists(temp_rule_path) - - def _run_file_rule_tests(self, processor: "Processor", rule_test: dict): - temp_rule_path = path.join(self._empty_rules_dirs[0], f"{hashlib.sha256()}.json") - rules = self._get_rules(processor, rule_test) - - for rule_type, rules in rules.items(): - for idx, rule_dict in enumerate(rules): - self._prepare_test_eval(processor, rule_dict, rule_type, temp_rule_path) - self._eval_file_rule_test(rule_test, processor, idx) - remove_file_if_exists(temp_rule_path) - - @staticmethod - def _clear_rules(processor: "Processor"): - if hasattr(processor, "_rules"): - processor.rules.clear() - - @staticmethod - def _reset_trees(processor: "Processor"): - if hasattr(processor, "_tree"): - processor._tree = RuleTree() - if hasattr(processor, "_specific_tree"): - processor._specific_tree = RuleTree() - if hasattr(processor, "_generic_tree"): - processor._generic_tree = RuleTree() - - @staticmethod - def _create_rule_file(rule_dict: dict, rule_path: str): - with open(rule_path, "w", encoding="utf8") as temp_file: - json.dump([rule_dict], temp_file) - - def _print_error_on_exception(self, error: BaseException, rule_test: dict, t_idx: int): - self._print_filename(rule_test) - print_fcolor(Fore.MAGENTA, f"RULE {t_idx}:") - print_fcolor(Fore.RED, f"Exception: {error}") - self._print_stack_trace(error) - - def _print_stack_trace(self, error: BaseException): - if self._enable_print_stack_trace: - print("Stack Trace:") - tbk = traceback.format_tb(error.__traceback__) - for line in tbk: - print(line) - - def _print_filename(self, rule_test: dict): - if not self._filename_printed: - print_fcolor(Fore.LIGHTMAGENTA_EX, f'\nRULE FILE {rule_test["file"]}') - self._filename_printed = True - - def _eval_custom_rule_test(self, rule_test: dict, processor: "Processor"): - self._filename_printed = False - with StringIO() as buf, redirect_stdout(buf): - self._run_custom_tests(processor, rule_test) - self._custom_tests_output += buf.getvalue() - def _eval_file_rule_test(self, rule_test: dict, processor: "Processor", r_idx: int): - self._filename_printed = False + """Main logic to check each rule file, compare and validate it, then print out results. + For each processor a process is spawned. + Parameters + ---------- + rule_test : dict + rules to test + processor : Processor + processor object to base start process on + r_idx : int + rule index in file + + Raises + ------ + Exception + spawned process caught in exception + """ + self._filename_printed = False for t_idx, test in enumerate(rule_test["tests"]): + if test.get("target_rule_idx") is not None and test.get("target_rule_idx") != r_idx: continue - try: result = processor.process(test["raw"]) - except BaseException as error: - self._print_error_on_exception(error, rule_test, t_idx) + if not result and processor.name == "pre_detector": + self._pd_extra.color_based_print( + f"- Can't process RULE FILE {rule_test['file']}. No extra output generated" + ) + sys.exit(1) + except Exception: self._success = False - self._failed_rule_tests_cnt += 1 - return + self._result["- Failed Tests"] += 1 + continue diff = self._get_diff_raw_test(test) print_diff = self._check_if_different(diff) - errors = [] - warnings = [] - if isinstance(processor, PreDetector): - self._pd_extra.update_errors(processor, result.data, errors, warnings) + self._pd_extra.update_errors(processor, result.data, self._problems) - if print_diff or warnings or errors: - self._print_filename(rule_test) - print_fcolor(Fore.MAGENTA, f"RULE {t_idx}:") + if ( + print_diff + or nth(self._problems.get("warnings"), self._rule_cnt) is not None + or nth(self._problems.get("errors"), self._rule_cnt) is not None + ): + self._pd_extra.color_based_print( + f"> RULE FILE {rule_test['file']} & " + f"RULE TEST {t_idx + 1}/{len(rule_test['tests'])}:" + ) + if nth(self._problems.get("warnings"), self._rule_cnt) is not None: + self._pd_extra.color_based_print( + f"~ {self._problems.get('warnings')[self._result['~ Warning']]}" + ) - if print_diff: - self._print_filename(rule_test) - self._print_diff_test(diff) + if print_diff or nth(self._problems.get("errors"), self._rule_cnt) is not None: + if nth(self._problems.get("errors"), self._rule_cnt) is not None: + self._pd_extra.color_based_print( + f"- {self._problems.get('errors')[self._result['- Failed Tests']]}" + ) + self._pd_extra.print_diff_test("", diff) # print_rules({"DIFF": diff}) + self._success = False + self._result["- Failed Tests"] += 1 - if print_diff or errors: - self._success = False - self._failed_rule_tests_cnt += 1 else: - self._successful_rule_tests_cnt += 1 + self._result["+ Successful Tests"] += 1 - self._warning_cnt += len(warnings) + self._rule_cnt += 1 + self._result["Total Tests"] = ( + self._result["+ Successful Tests"] + self._result["- Failed Tests"] + ) - self._print_errors_and_warnings(errors, warnings) + @staticmethod + def _reset(processor: "Processor"): + """Reset the rule tree - def _run_custom_tests(self, processor, rule_test): - results_for_all_rules = processor.test_rules() - results = results_for_all_rules.get(processor.rules[0].__repr__(), []) - if not results: - self._missing_custom_tests.append(rule_test["file"]) - else: - self._custom_tests.append(rule_test["file"]) - for idx, result in enumerate(results): - diff = list(ndiff([result[0]], [result[1]])) - if self._check_if_different(diff): - if not self._filename_printed: - self._print_filename(rule_test) - print(f"{processor.__class__.__name__.upper()} SPECIFIC TEST #{idx}:") - self._print_diff_test(diff) - self._failed_rule_tests_cnt += 1 - self._success = False - else: - self._successful_rule_tests_cnt += 1 + Parameters + ---------- + processor : Processor + processor to reset tree on + """ + if hasattr(processor, "_rules"): + processor.rules.clear() + if hasattr(processor, "_tree"): + processor._tree = RuleTree() + if hasattr(processor, "_specific_tree"): + processor._specific_tree = RuleTree() + if hasattr(processor, "_generic_tree"): + processor._generic_tree = RuleTree() @staticmethod - def _print_errors_and_warnings(errors, warnings): - for error in errors: - print_fcolor(Fore.RED, error) - - for warning in warnings: - print_fcolor(Fore.YELLOW, warning) + def _create_rule_file(rule_dict: dict, rule_path: str): + with open(rule_path, "w", encoding="utf8") as temp_file: + json.dump([rule_dict], temp_file) @staticmethod def _check_if_different(diff): + """Check result of comparison (diff) + + Parameters + ---------- + diff : list + result of comparison + + Returns + ------- + bool + any(thing) found reference + """ return any((item for item in diff if item.startswith(("+", "-", "?")))) - def _check_which_rule_files_miss_tests(self, rules_pn): - custom_test_mapping = self._get_custom_test_mapping() - rules_with_tests = [] - rules_without_tests = [] + @staticmethod + def _get_processor_instance(name, processor_cfg): + cfg = {name: processor_cfg} + processor = Factory.create(cfg) + return processor + + def _check_which_rule_files_miss_tests(self, rules_pn) -> None: + """Calculate quota on coverage of tests for (processor) ruleset + + Parameters + ---------- + rules_pn : dict + accumulated rules for each processor to operate on + """ + rule_tests = {"with tests": [], "without tests": []} for _, processor_test_cfg in rules_pn.items(): - processor_type = processor_test_cfg["type"] rules = processor_test_cfg["rules"] - has_custom_tests = custom_test_mapping.get(processor_type, False) - if has_custom_tests: - continue - for rule in rules: if rule["tests"]: - rules_with_tests.append(rule["file"]) + rule_tests["with tests"].append(rule["file"]) else: - rules_without_tests.append(rule["file"]) + rule_tests["without tests"].append(rule["file"]) - rule_test_coverage = ( - len(rules_with_tests) / (len(rules_with_tests) + len(rules_without_tests)) * 100 + self._result["Rule Test Coverage"] = ( + len(rule_tests["with tests"]) + / (len(rule_tests["without tests"]) + len(rule_tests["with tests"])) + * 100 ) - print_fcolor(Fore.LIGHTGREEN_EX, "\nRULES WITH TESTS:") - for rule in rules_with_tests: - print_fcolor(Fore.LIGHTGREEN_EX, f" {rule}") - if not rules_with_tests: - print_fcolor(Fore.LIGHTGREEN_EX, "None") - print_fcolor(Fore.LIGHTRED_EX, "\nRULES WITHOUT TESTS:") - for rule in rules_without_tests: - print_fcolor(Fore.LIGHTRED_EX, f" {rule}") - if not rules_without_tests: - print_fcolor(Fore.LIGHTRED_EX, "None") - - return rule_test_coverage - - @staticmethod - def _get_processor_instance(name, processor_cfg, logger_): - cfg = {name: processor_cfg} - processor = Factory.create(cfg) - return processor - - @staticmethod - def _print_diff_test(diff): - for item in diff: - if item.startswith("- "): - print_fcolor(Fore.RED, item) - elif item.startswith("+ "): - print_fcolor(Fore.GREEN, item) - elif item.startswith("? "): - print_fcolor(Fore.WHITE, item) - else: - print_fcolor(Fore.CYAN, item) + self._pd_extra.print_rules(rule_tests) def _sort_lists_in_nested_dict(self, nested_dict): for key, value in nested_dict.items(): @@ -484,6 +577,18 @@ def _sort_lists_in_nested_dict(self, nested_dict): nested_dict[key] = sorted(nested_dict[key]) def _get_diff_raw_test(self, test: dict) -> list: + """Compare tests + + Parameters + ---------- + test : dict + each test in rule file + + Returns + ------- + list + found differences + """ self._gpr.replace_grok_keywords(test["processed"], test) self._sort_lists_in_nested_dict(test) @@ -494,7 +599,8 @@ def _get_diff_raw_test(self, test: dict) -> list: diff = ndiff(raw.splitlines(), processed.splitlines()) return list(diff) - def _set_rules_dirs_to_empty(self): + def _set_rules_dirs_to_empty(self) -> None: + """Set each rule type to empty""" for processor in self._config_yml["pipeline"]: processor_cfg = next(iter(processor.values())) @@ -504,123 +610,101 @@ def _set_rules_dirs_to_empty(self): processor_cfg["generic_rules"] = self._empty_rules_dirs processor_cfg["specific_rules"] = self._empty_rules_dirs - @staticmethod - def _check_test_validity(errors: list, rule_tests: list, test_file: TextIO) -> bool: - has_errors = False - for rule_test in rule_tests: - rule_keys = set(rule_test.keys()) - valid_keys = {"raw", "processed", "target_rule_idx"} - required_keys = {"raw", "processed"} - invalid_keys = rule_keys.difference(valid_keys) - has_error = False - - if invalid_keys.difference({"target_rule_idx"}): - errors.append( - f'Schema error in test "{test_file.name}": "Remove keys: {invalid_keys}"' - ) - has_error = True - - available_required_keys = rule_keys.intersection(required_keys) - if available_required_keys != required_keys: - errors.append( - f'Schema error in test "{test_file.name}": "The following required keys are ' - f'missing: {required_keys.difference(available_required_keys)}"' - ) - has_error = True - - if not has_error: - if not isinstance(rule_test.get("raw"), dict) or not isinstance( - rule_test.get("processed"), dict - ): - errors.append( - f'Schema error in test "{test_file.name}": "Values of raw and processed ' - f'must be dictionaries"' - ) - has_error = True - if {"target_rule_idx"}.intersection(rule_keys): - if not isinstance(rule_test.get("target_rule_idx"), int): - errors.append( - f'Schema error in test "{test_file.name}": "Value of target_rule_idx ' - f'must be an integer"' - ) - has_error = True - has_errors = has_errors or has_error - return has_errors - def _get_rules_per_processor_name(self, rules_dirs: dict) -> defaultdict: - print_fcolor(Fore.YELLOW, "\nRULES DIRECTORIES:") rules_pn = defaultdict(dict) - errors = [] for processor_name, proc_rules_dirs in rules_dirs.items(): - self._get_rules_for_processor(processor_name, proc_rules_dirs, rules_pn, errors) - if errors: - for error in errors: - print_fcolor(Fore.RED, error) + self._get_rules_for_processor(processor_name, proc_rules_dirs, rules_pn) + if self._problems["errors"]: + self._pd_extra.print_rules(self._problems["errors"]) sys.exit(1) return rules_pn - def _get_rules_for_processor(self, processor_name, proc_rules_dirs, rules_pn, errors): + def _get_rules_for_processor(self, processor_name, proc_rules_dirs, rules_pn): + """Read out rules and populate dict with processor: rules + + Parameters + ---------- + processor_name : str + name of proc + proc_rules_dirs : dict + all directories for proc + rules_pn : dict + accumulated rules for each processor to operate on + """ if not rules_pn[processor_name]: rules_pn[processor_name] = defaultdict(dict) processor_type = proc_rules_dirs["type"] rules_pn[processor_name]["type"] = processor_type rules_pn[processor_name]["rules"] = [] - print_fcolor(Fore.YELLOW, f" {processor_name} ({processor_type}):") - for rule_dirs_type, rules_dirs_by_type in proc_rules_dirs["rule_dirs"].items(): - print_fcolor(Fore.YELLOW, f" {rule_dirs_type}:") - for rules_dir in rules_dirs_by_type: - print_fcolor(Fore.YELLOW, f" {rules_dir}:") - for root, _, files in walk(rules_dir): - rule_files = [file for file in files if self._is_valid_rule_name(file)] - for file in rule_files: - multi_rule = self._get_multi_rule_dict(file, root) - test_path = path.join( - root, "".join([file.rsplit(".", maxsplit=1)[0], "_test.json"]) - ) - if path.isfile(test_path): - with open(test_path, "r", encoding="utf8") as test_file: - try: - rule_tests = json.load(test_file) - except json.decoder.JSONDecodeError as error: - errors.append( - f"JSON decoder error in test " - f'"{test_file.name}": "{str(error)}" ' - ) - continue - has_errors = self._check_test_validity( - errors, rule_tests, test_file - ) - if has_errors: - continue - else: - rule_tests = [] - rules_pn[processor_name]["rules"].append( - { - rule_dirs_type: multi_rule, - "tests": rule_tests, - "file": path.join(root, file), - } - ) + directories = {"Rules Directory": [f"{processor_name} ({processor_type}):"], "Path": []} + + for _, (rule_type, rules_dir) in enumerate(proc_rules_dirs["rule_dirs"].items()): + directories["Path"].append(f" - {rule_type}") + for path in Path(rules_dir).rglob("*"): + if path.is_file() and self._is_valid_rule_name(path.name): + self._get_rule_dict( + path.name, str(path.parent), processor_name, rules_pn, rule_type + ) - @staticmethod - def _get_multi_rule_dict(file, root): - with open(path.join(root, file), "r", encoding="utf8") as rules_file: + self._pd_extra.print_rules(directories) + + def _get_rule_dict(self, file, root, processor_name, rules_pn, rule_dirs_type) -> None: + """Read out (multi-)rules and realize mapping via dict for further processing + + Parameters + ---------- + file : str + each rule file + root : str + base path + processor_name : str + name + rules_pn : dict + mapping of procs to rules + rule_dirs_type : str + type of rule + + Raises + ------ + Exception + Target_rule_idx is now mandatory, throw exception if not found for each rule + """ + rule_tests = [] + test_path = path.join(root, "".join([file.rsplit(".", maxsplit=1)[0], "_test.json"])) + + if path.isfile(test_path): try: - multi_rule = ( - list(yaml.load_all(rules_file)) - if file.endswith(".yml") - else json.load(rules_file) + rule_tests = self._pd_extra.load_json_or_yaml(test_path) + except ValueError as error: + self._problems["errors"].append(str(error)) + return + + file_path = path.join(root, file) + try: + multi_rule = self._pd_extra.load_json_or_yaml(file_path) + if ( + processor_name == "pre_detector" + and not all(d.get("target_rule_idx") is not None for d in rule_tests) + and len(rule_tests) > 1 + ): + self._pd_extra.color_based_print( + f"- Not all dictionaries in {file_path} " + f"contain the mandatory key target_rule_idx: " + f"Can't build correct test set for rules." ) - except json.decoder.JSONDecodeError as error: - raise AutoRuleTesterException( - f'JSON decoder error in rule "{rules_file.name}": ' f'"{str(error)}"' - ) from error - except YAMLError as error: - raise AutoRuleTesterException( - f"YAML error in rule " f'"{rules_file.name}": ' f'"{error}"' - ) from error - return multi_rule + sys.exit(1) + except ValueError as error: + self._problems["errors"].append(str(error)) + return + + rules_pn[processor_name]["rules"].append( + { + rule_dirs_type: multi_rule, + "tests": rule_tests, + "file": file_path, + } + ) @staticmethod def _is_valid_rule_name(file_name: str) -> bool: @@ -633,15 +717,10 @@ def _get_rule_dirs_by_processor_name(self) -> defaultdict: for processor in self._config_yml["pipeline"]: processor_name, processor_cfg = next(iter(processor.items())) - rules_to_add = [] print("\nProcessor Config:") pprint(processor_cfg) - if processor_cfg.get("rules"): - rules_to_add.append(("rules", processor_cfg["rules"])) - elif processor_cfg.get("generic_rules") and processor_cfg.get("specific_rules"): - rules_to_add.append(("generic_rules", processor_cfg["generic_rules"])) - rules_to_add.append(("specific_rules", processor_cfg["specific_rules"])) + rules_to_add = self._get_rules_to_add(processor_cfg) if not rules_dirs[processor_name]: rules_dirs[processor_name] = defaultdict(dict) @@ -649,9 +728,33 @@ def _get_rule_dirs_by_processor_name(self) -> defaultdict: rules_dirs[processor_name]["type"] = processor_cfg["type"] if not rules_dirs[processor_name]["rule_dirs"]: - rules_dirs[processor_name]["rule_dirs"] = defaultdict(list) + rules_dirs[processor_name]["rule_dirs"] = defaultdict(str) for rule_to_add in rules_to_add: rules_dirs[processor_name]["rule_dirs"][rule_to_add[0]] += rule_to_add[1] return rules_dirs + + @staticmethod + def _get_rules_to_add(processor_cfg) -> list: + """Accumulate rules depending on processor (config) + + Parameters + ---------- + processor_cfg : dict + config + + Returns + ------- + list + rules + """ + rules_to_add = [] + + if processor_cfg.get("rules"): + rules_to_add.append(("rules", processor_cfg["rules"])) + elif processor_cfg.get("generic_rules") and processor_cfg.get("specific_rules"): + rules_to_add.append(("generic_rules", processor_cfg["generic_rules"][0])) + rules_to_add.append(("specific_rules", processor_cfg["specific_rules"][0])) + + return rules_to_add diff --git a/pyproject.toml b/pyproject.toml index a07259653..9e7468633 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -66,6 +66,7 @@ dependencies = [ "hyperscan>=0.7.0", "jsonref", "luqum", + "more-itertools==8.10.0", "mysql-connector-python<9", "numpy>=1.26.0", "opensearch-py", diff --git a/requirements.in b/requirements.in new file mode 100644 index 000000000..e69de29bb diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 000000000..e69de29bb diff --git a/tests/testdata/auto_tests/dummy/rule.yml b/tests/testdata/auto_tests/dummy/rule.yml new file mode 100644 index 000000000..1141ddb56 --- /dev/null +++ b/tests/testdata/auto_tests/dummy/rule.yml @@ -0,0 +1,13 @@ +filter: 'winlog.event_data.param2: "pause"' +labeler: + label: + action: + - terminate +description: "..." +--- +filter: 'winlog.event_data.param2: "dada"' +labeler: + label: + action: + - terminate +description: "..." diff --git a/tests/testdata/auto_tests/dummy/rule_test.json b/tests/testdata/auto_tests/dummy/rule_test.json new file mode 100644 index 000000000..a1199e87c --- /dev/null +++ b/tests/testdata/auto_tests/dummy/rule_test.json @@ -0,0 +1,67 @@ +[ + { + "target_rule_idx": 0, + "raw": { + "winlog": { + "event_data": { + "param2": "ooo" + } + } + }, + "processed": { + "label": { + "action": [ + "terminate" + ] + }, + "winlog": { + "event_data": { + "param2": "pause" + } + } + } + }, + { + "raw": { + "winlog": { + "event_data": { + "param2": "pause" + } + } + }, + "processed": { + "label": { + "action": [ + "terminate" + ] + }, + "winlog": { + "event_data": { + "param2": "pause" + } + } + } + }, + { + "target_rule_idx": 1, + "raw": { + "winlog": { + "event_data": { + "param2": "dada" + } + } + }, + "processed": { + "label": { + "action": [ + "terminate" + ] + }, + "winlog": { + "event_data": { + "param2": "dada" + } + } + } + } +] \ No newline at end of file diff --git a/tests/testdata/auto_tests/labeler/rules/generic/auto_test_labeling_match_test.json b/tests/testdata/auto_tests/labeler/rules/generic/auto_test_labeling_match_test.json index e97c992c7..f864fc591 100644 --- a/tests/testdata/auto_tests/labeler/rules/generic/auto_test_labeling_match_test.json +++ b/tests/testdata/auto_tests/labeler/rules/generic/auto_test_labeling_match_test.json @@ -5,7 +5,11 @@ }, "processed": { "some_field": "stop", - "label": {"action": ["terminate"]} + "label": { + "action": [ + "terminate" + ] + } } }, { @@ -14,7 +18,11 @@ }, "processed": { "some_field": "end", - "label": {"action": ["terminate"]} + "label": { + "action": [ + "terminate" + ] + } } } ] \ No newline at end of file diff --git a/tests/unit/util/test_auto_rule_tester.py b/tests/unit/util/test_auto_rule_tester.py index 580e42dbd..d66b1f3ac 100644 --- a/tests/unit/util/test_auto_rule_tester.py +++ b/tests/unit/util/test_auto_rule_tester.py @@ -1,12 +1,9 @@ -# pylint: disable=missing-docstring -# pylint: disable=wrong-import-position # pylint: disable=protected-access -# pylint: disable=broad-except -# pylint: disable=line-too-long +# pylint: disable=broad-exception-caught +# pylint: disable=missing-function-docstring) import logging import re from unittest import mock - import pytest from logprep.util.auto_rule_tester.auto_rule_tester import AutoRuleTester @@ -21,6 +18,84 @@ def fixture_auto_rule_tester(): class TestAutoRuleTester: + + def test_get_rule_dict_valid_file(self, auto_rule_tester): + processor_name = "dummy" + rules_pn = {"dummy": {"type": "dummy", "rules": []}} + file = "rule.yml" + root = "tests/testdata/auto_tests/dummy" + rule_dirs_type = "doesnt_matter" + + auto_rule_tester._get_rule_dict(file, root, processor_name, rules_pn, rule_dirs_type) + + # raw literal + expected_rule_dict = [ + { + "doesnt_matter": [ + { + "filter": 'winlog.event_data.param2: "pause"', + "labeler": {"label": {"action": ["terminate"]}}, + "description": "...", + }, + { + "filter": 'winlog.event_data.param2: "dada"', + "labeler": {"label": {"action": ["terminate"]}}, + "description": "...", + }, + ], + "tests": [ + { + "target_rule_idx": 0, + "raw": {"winlog": {"event_data": {"param2": "ooo"}}}, + "processed": { + "label": {"action": ["terminate"]}, + "winlog": {"event_data": {"param2": "pause"}}, + }, + }, + { + "raw": {"winlog": {"event_data": {"param2": "pause"}}}, + "processed": { + "label": {"action": ["terminate"]}, + "winlog": {"event_data": {"param2": "pause"}}, + }, + }, + { + "target_rule_idx": 1, + "raw": {"winlog": {"event_data": {"param2": "dada"}}}, + "processed": { + "label": {"action": ["terminate"]}, + "winlog": {"event_data": {"param2": "dada"}}, + }, + }, + ], + "file": "tests/testdata/auto_tests/dummy/rule.yml", + } + ] + + assert rules_pn["dummy"]["rules"] == expected_rule_dict + + def test_get_rule_dict_target_rule_idx_not_found(self, auto_rule_tester): + processor_name = "dummy" + rules_pn = {"dummy": {"type": "dummy", "rules": []}} + file = "rule.yml" + root = "tests/testdata/auto_tests/dummy" + rule_dirs_type = "doesnt_matter" + + auto_rule_tester._get_rule_dict(file, root, processor_name, rules_pn, rule_dirs_type) + + def remove_dict_with_target_rule_idx(list_of_dicts): + for idx, d in enumerate(list_of_dicts): + if "target_rule_idx" in d: + del list_of_dicts[idx] + break + + remove_dict_with_target_rule_idx(rules_pn["dummy"]["rules"]) + + with pytest.raises(KeyError): + for test in rules_pn["dummy"]["rules"]: + for t in test["tests"]: + t["target_rule_idx"] + def test_coverage_no_rule_files_raises_exception(self, auto_rule_tester): rules_pn = { "processor_name": { @@ -49,8 +124,8 @@ def test_coverage_no_rule_files_have_tests(self, auto_rule_tester): } } - coverage = auto_rule_tester._check_which_rule_files_miss_tests(rules_pn) - assert coverage == 0 + auto_rule_tester._check_which_rule_files_miss_tests(rules_pn) + assert auto_rule_tester._result["Rule Test Coverage"] == 0 def test_coverage_all_rule_files_have_tests(self, auto_rule_tester): rules_pn = { @@ -69,8 +144,8 @@ def test_coverage_all_rule_files_have_tests(self, auto_rule_tester): } } - coverage = auto_rule_tester._check_which_rule_files_miss_tests(rules_pn) - assert coverage == 100 + auto_rule_tester._check_which_rule_files_miss_tests(rules_pn) + assert auto_rule_tester._result["Rule Test Coverage"] == 100 def test_coverage_half_rule_files_have_tests(self, auto_rule_tester): rules_pn = { @@ -89,8 +164,8 @@ def test_coverage_half_rule_files_have_tests(self, auto_rule_tester): } } - coverage = auto_rule_tester._check_which_rule_files_miss_tests(rules_pn) - assert coverage == 50 + auto_rule_tester._check_which_rule_files_miss_tests(rules_pn) + assert auto_rule_tester._result["Rule Test Coverage"] == 50 def test_does_not_run_if_no_rules_exist(self, auto_rule_tester, capsys): rules_pn = { @@ -153,12 +228,8 @@ def test_pseudonymizer_specific_setup_called_on_load_rules( "max_cached_pseudonyms": 1000000, } mock_replace_regex_keywords_by_regex_expression.assert_not_called() - processor = auto_rule_tester._get_processor_instance( - "pseudonymizer", pseudonymizer_cfg, LOGGER - ) - auto_rule_tester._reset_trees( - processor - ) # Called every time by auto tester before adding rules + processor = auto_rule_tester._get_processor_instance("pseudonymizer", pseudonymizer_cfg) + auto_rule_tester._reset(processor) # Called every time by auto tester before adding rules auto_rule_tester._load_rules(processor, "specific_rules") assert mock_replace_regex_keywords_by_regex_expression.call_count == 1 @@ -174,10 +245,8 @@ def test_list_comparison_specific_setup_called_on_load_rules( "list_search_base_path": "tests/testdata/unit/list_comparison/rules", } mock_setup.assert_not_called() - processor = auto_rule_tester._get_processor_instance( - "list_comparison", list_comparison_cfg, LOGGER - ) - auto_rule_tester._reset_trees( + processor = auto_rule_tester._get_processor_instance("list_comparison", list_comparison_cfg) + auto_rule_tester._reset( processor ) # Called every time by auto tester before adding rules instead auto_rule_tester._load_rules(processor, "specific_rules") @@ -187,7 +256,7 @@ def test_full_auto_rule_test_run(self, auto_rule_tester, capsys): with pytest.raises(SystemExit): auto_rule_tester.run() expected_rules_with_tests = [ - "RULES WITH TESTS:", + "with tests", "tests/testdata/auto_tests/labeler/rules/generic/auto_test_labeling_match.json", "tests/testdata/auto_tests/labeler/rules/specific/auto_test_labeling_mismatch.json", "tests/testdata/auto_tests/dissector/rules/generic/auto_test_match.json", @@ -202,24 +271,19 @@ def test_full_auto_rule_test_run(self, auto_rule_tester, capsys): "tests/testdata/auto_tests/template_replacer/rules/specific/template_replacer.json", ] expected_rules_without_tests = [ - "RULES WITHOUT TESTS:", + "without tests", "tests/testdata/auto_tests/labeler/rules/specific/auto_test_labeling_no_test_.json", "tests/testdata/auto_tests/dissector/rules/specific/auto_test_no_test_.json", "tests/testdata/auto_tests/pre_detector/rules/specific/auto_test_pre_detector_no_test_.json", "tests/testdata/auto_tests/pseudonymizer/rules/specific/auto_test_pseudonymizer_no_test_.json", ] - expected_rules_with_custom_tests = [ - "RULES WITH CUSTOM TESTS:", - "tests/testdata/auto_tests/clusterer/rules/specific/rule_with_custom_tests.yml", - ] expected_overall_results = [ - "Results:", - "Failed tests: 7", - "Successful tests: 33", - "Total tests: 40", - "Rule Test Coverage: 80.00%", - "Warnings: 2", + "+ Successful Tests: 32", + "- Failed Tests: 6", + "~ Warning: 2", + "Rule Test Coverage: 72.72727272727273", + "Total Tests: 38", ] captured = capsys.readouterr() @@ -234,10 +298,7 @@ def test_full_auto_rule_test_run(self, auto_rule_tester, capsys): assert match.group(1) == expected, f'Expected: "{expected_result}"' expected_sample_lines = ( - expected_rules_with_tests - + expected_rules_without_tests - + expected_rules_with_custom_tests - + expected_overall_results + expected_rules_with_tests + expected_rules_without_tests + expected_overall_results ) for line in expected_sample_lines: assert line in captured.out