From 879c7588a5e006f7db381e578ade9014fc2459d7 Mon Sep 17 00:00:00 2001 From: Ma Lu Date: Thu, 21 Mar 2024 10:53:01 +0100 Subject: [PATCH] Refactored: custom tests, presentation overview and dry vios --- logprep/abc/processor.py | 3 - logprep/processor/clusterer/processor.py | 1 - .../util/auto_rule_tester/auto_rule_tester.py | 514 +++++++----------- 3 files changed, 186 insertions(+), 332 deletions(-) diff --git a/logprep/abc/processor.py b/logprep/abc/processor.py index b3e1a8a79..ec38aad67 100644 --- a/logprep/abc/processor.py +++ b/logprep/abc/processor.py @@ -69,14 +69,12 @@ class Config(Component.Config): __slots__ = [ "rule_class", - "has_custom_tests", "_event", "_specific_tree", "_generic_tree", ] rule_class: "Rule" - has_custom_tests: bool _event: dict _specific_tree: RuleTree _generic_tree: RuleTree @@ -98,7 +96,6 @@ def __init__(self, name: str, configuration: "Processor.Config", logger: Logger) generic_rules_targets=self._config.generic_rules, specific_rules_targets=self._config.specific_rules, ) - self.has_custom_tests = False @property def _specific_rules(self): diff --git a/logprep/processor/clusterer/processor.py b/logprep/processor/clusterer/processor.py index 53841e8a8..a95ce8c04 100644 --- a/logprep/processor/clusterer/processor.py +++ b/logprep/processor/clusterer/processor.py @@ -77,7 +77,6 @@ def __init__(self, name: str, configuration: Processor.Config, logger: Logger): super().__init__(name=name, configuration=configuration, logger=logger) self.matching_rules = [] self.sps = SignaturePhaseStreaming() - self.has_custom_tests = True def process(self, event: dict): self.matching_rules = [] diff --git a/logprep/util/auto_rule_tester/auto_rule_tester.py b/logprep/util/auto_rule_tester/auto_rule_tester.py index 563f579df..ddd1098c6 100644 --- a/logprep/util/auto_rule_tester/auto_rule_tester.py +++ b/logprep/util/auto_rule_tester/auto_rule_tester.py @@ -64,6 +64,7 @@ from os import path, walk from pprint import pprint from typing import TYPE_CHECKING, TextIO, Tuple +from collections.abc import Iterable from colorama import Fore from ruamel.yaml import YAML, YAMLError @@ -89,7 +90,7 @@ def __init__(self, message: str): super().__init__(f"AutoRuleTester ({message}): ") -class PreDetectionExtraHandler: +class PorcessorExtensions: """Used to handle special demands for PreDetector auto-tests.""" @staticmethod @@ -125,8 +126,8 @@ def _get_errors(processor: "Processor", extra_output: tuple): return pd_errors, pd_warnings def update_errors( - self, processor: PreDetector, extra_output: tuple, errors: list, warnings: list - ): + self, processor: PreDetector, extra_output: tuple, problems: dict + ): """Create aggregating logger. Parameters @@ -142,15 +143,66 @@ def update_errors( """ mitre_errors, id_warnings = self._get_errors(processor, extra_output) - errors += mitre_errors - warnings += id_warnings + problems["errors"].extend(mitre_errors) + problems["warnings"].extend(id_warnings) + + def print_rules(self, rules, t_idx=None): + print() + for key, rule in rules.items(): + self._print_diff_test(key, rule, t_idx) + #else: + # print_fcolor(Fore.LIGHTRED_EX, "None") + + @staticmethod + def _print_diff_test( key, rule, t_idx=None): + if not isinstance(rule, Iterable): + diff = f"{key}: {rule}" + PorcessorExtensions.color_based_print(diff) + else: + if t_idx is not None: + diff = f"{key}: {rule[t_idx]}" + PorcessorExtensions.color_based_print(diff) + else: + for item in rule: + diff = f"{key}: {item}" + PorcessorExtensions.color_based_print(diff) + + @staticmethod + def color_based_print(item): + item = item.replace("]", "").replace("[", "") + if item.startswith("- ") or item.startswith("error") or item.startswith("without tests"): + print_fcolor(Fore.RED, item) #+ "\n") + elif item.startswith("+ ") or item.startswith("with tests"): + print_fcolor(Fore.GREEN, item) + elif item.startswith("? "): + print_fcolor(Fore.WHITE, "\n" + item) + elif item.lstrip().startswith("~ ") or item.startswith("warning"): + print_fcolor(Fore.YELLOW, item) + else: + print_fcolor(Fore.CYAN, item) + + def _load_json_or_yaml(self, file_path): + try: + with open(file_path, "r", encoding="utf-8") as file: + if file_path.endswith(".yml"): + return list(yaml.load_all(file)) + else: + return json.load(file) + + except (json.JSONDecodeError, YAMLError) as error: + raise ValueError(f"Error decoding {file_path}: {str(error)}") class AutoRuleTester: """Used to perform auto-tests for rules.""" - def __init__(self, config): - with open(config, "r", encoding="utf8") as yaml_file: + _original_config_paths: tuple[str] + """ Path to the original configuration that should be tested """ + + def __init__(self, config_paths: tuple[str]) + self._original_config_paths = config_paths + + with open(config_paths, "r", encoding="utf8") as yaml_file: self._config_yml = yaml.load(yaml_file) self._empty_rules_dirs = [tempfile.mkdtemp()] @@ -163,19 +215,14 @@ def __init__(self, config): self._success = True - self._successful_rule_tests_cnt = 0 - self._failed_rule_tests_cnt = 0 - self._warning_cnt = 0 - - self._pd_extra = PreDetectionExtraHandler() - - self._filename_printed = False + self._result = {"+ successful_rule_tests_cnt": 0, "- failed_rule_tests_cnt": 0, "~ warning_cnt": 0, "rule_test_coverage": 0, "total_tests": 0} + self._problems = {"warnings": [], "errors": []} + self._pd_extra = PorcessorExtensions() self._gpr = GrokPatternReplacer(self._config_yml) - self._custom_tests_output = "" - self._custom_tests = [] - self._missing_custom_tests = [] + self._filename_printed = False + self._rule_cnt = 0 self._logger = getLogger() self._logger.disabled = True @@ -187,78 +234,49 @@ def run(self): self._run_if_any_rules_exist(rules_pn) def _run_if_any_rules_exist(self, rules_pn: dict): - if not self._has_rules(rules_pn): - print_fcolor(Fore.YELLOW, "\nThere are no rules within any of the rules directories!") - else: + if any(processor_test_cfg["rules"] for processor_test_cfg in rules_pn.values()): self._run_tests_for_rules(rules_pn) + return + else: + print_fcolor(Fore.YELLOW, "~\nThere are no rules within any of the rules directories!") - def _run_tests_for_rules(self, rules_pn: dict): - rule_test_coverage = self._check_which_rule_files_miss_tests(rules_pn) - self._set_rules_dirs_to_empty() - - processors_ct, processors_no_ct = self._get_processors_split_by_custom_tests_existence() - - for processor, processor_name in processors_ct.items(): + def check_run_rule_tests(self, processor_cont, rules_pn, test_type): + for processor, processor_name in processor_cont.items(): for rule_test in rules_pn[processor_name]["rules"]: - if processor and rule_test["tests"] or processor.has_custom_tests: - self._run_custom_rule_tests(processor, rule_test) + if processor and rule_test["tests"]: + self._run_rule_tests(processor, rule_test, test_type) - if self._custom_tests: - print_fcolor(Fore.GREEN, "\nRULES WITH CUSTOM TESTS:") - for file_name in self._custom_tests: - print_fcolor(Fore.GREEN, file_name) + def _run_tests_for_rules(self, rules_pn: dict): + self._check_which_rule_files_miss_tests(rules_pn) + #self._set_rules_dirs_to_empty() - if self._missing_custom_tests: - print_fcolor(Fore.RED, "\nRULES WITHOUT CUSTOM TESTS:") - for file_name in self._missing_custom_tests: - print_fcolor(Fore.RED, file_name) + processors_no_ct = self._get_processors() - print(self._custom_tests_output) + self.check_run_rule_tests(processors_no_ct, rules_pn, "file") - for processor, processor_name in processors_no_ct.items(): - for rule_test in rules_pn[processor_name]["rules"]: - if processor and rule_test["tests"]: - self._run_file_rule_tests(processor, rule_test) - - print_fcolor(Fore.WHITE, "\nResults:") - print_fcolor(Fore.RED, f"Failed tests: {self._failed_rule_tests_cnt}") - print_fcolor(Fore.GREEN, f"Successful tests: {self._successful_rule_tests_cnt}") - print_fcolor( - Fore.CYAN, - f"Total tests: " f"{self._successful_rule_tests_cnt + self._failed_rule_tests_cnt}", - ) - print_fcolor(Fore.BLUE, f"Rule Test Coverage: {rule_test_coverage:.2f}%") - print_fcolor(Fore.YELLOW, f"Warnings: {self._warning_cnt}") + self._result["~ warning_cnt"] += len(self._problems.get("warnings") ) + self._pd_extra.print_rules(self._result) if not self._success: sys.exit(1) - @staticmethod - def _has_rules(rules_pn: dict) -> bool: - for processor_test_cfg in rules_pn.values(): - if processor_test_cfg["rules"]: - return True - return False - - def _get_processors_split_by_custom_tests_existence(self) -> Tuple[OrderedDict, OrderedDict]: - processors_with_custom_test = OrderedDict() - processors_without_custom_test = OrderedDict() - for processor_in_pipeline in self._config_yml["pipeline"]: - name, processor_cfg = next(iter(processor_in_pipeline.items())) - processor = self._get_processor_instance(name, processor_cfg, self._logger) - if processor.has_custom_tests: - processors_with_custom_test[processor] = name - else: - processors_without_custom_test[processor] = name - return processors_with_custom_test, processors_without_custom_test + def _run_rule_tests(self, processor: "Processor", rule_test: dict, test_type: str): + temp_rule_path = path.join(self._empty_rules_dirs[0], f"{hashlib.sha256()}.json") + rules = self._get_rules(processor, rule_test) - def _get_custom_test_mapping(self) -> dict: - processor_uses_own_tests = {} + for rule_type, rules in rules.items(): + for idx, rule_dict in enumerate(rules): + self._prepare_test_eval(processor, rule_dict, rule_type, temp_rule_path) + self._eval_file_rule_test(rule_test, processor, idx) + remove_file_if_exists(temp_rule_path) + + def _get_processors(self) -> Tuple[OrderedDict, OrderedDict]: + processors_without_custom_test = OrderedDict() for processor_in_pipeline in self._config_yml["pipeline"]: name, processor_cfg = next(iter(processor_in_pipeline.items())) processor = self._get_processor_instance(name, processor_cfg, self._logger) - processor_uses_own_tests[processor_cfg["type"]] = processor.has_custom_tests - return processor_uses_own_tests + processors_without_custom_test[processor] = name + return processors_without_custom_test @staticmethod def _get_rules(processor: "Processor", rule_test: dict) -> dict: @@ -288,37 +306,47 @@ def _prepare_test_eval( self, processor: "Processor", rule_dict: dict, rule_type: str, temp_rule_path: str ): self._create_rule_file(rule_dict, temp_rule_path) - self._reset_trees(processor) - self._clear_rules(processor) + self._reset_(processor) self._load_rules(processor, rule_type) - def _run_custom_rule_tests(self, processor: "Processor", rule_test: dict): - temp_rule_path = path.join(self._empty_rules_dirs[0], f"{hashlib.sha256()}.json") - rules = self._get_rules(processor, rule_test) + def _eval_file_rule_test(self, rule_test: dict, processor: "Processor", r_idx: int): + self._filename_printed = False + for t_idx, test in enumerate(rule_test["tests"]): + if test.get("target_rule_idx") is not None and test.get("target_rule_idx") != r_idx: + continue + try: + extra_output = processor.process(test["raw"]) + except BaseException as error: + self._print_error_on_exception(error, rule_test, self._rule_cnt) + self._success = False + self._result["- failed_rule_tests_cnt"] += 1 + return - for rule_type, rules in rules.items(): - for rule_dict in rules: - self._prepare_test_eval(processor, rule_dict, rule_type, temp_rule_path) - self._eval_custom_rule_test(rule_test, processor) - remove_file_if_exists(temp_rule_path) + diff = self._get_diff_raw_test(test) + print_diff = self._check_if_different(diff) - def _run_file_rule_tests(self, processor: "Processor", rule_test: dict): - temp_rule_path = path.join(self._empty_rules_dirs[0], f"{hashlib.sha256()}.json") - rules = self._get_rules(processor, rule_test) + if isinstance(processor, PreDetector): + self._pd_extra.update_errors(processor, extra_output, self._problems) - for rule_type, rules in rules.items(): - for idx, rule_dict in enumerate(rules): - self._prepare_test_eval(processor, rule_dict, rule_type, temp_rule_path) - self._eval_file_rule_test(rule_test, processor, idx) - remove_file_if_exists(temp_rule_path) + if print_diff or self._problems.get("warnings") or self._problems.get("errors"): + print_fcolor(Fore.MAGENTA, f"RULE FILE {rule_test['file']} & RULE {t_idx}:") + + if print_diff or self._problems.get("errors"): + self._pd_extra.print_rules({"DIFF": diff}) + self._success = False + self._result["- failed_rule_tests_cnt"] += 1 + else: + self._result["+ successful_rule_tests_cnt"] += 1 + + self._pd_extra.print_rules(self._problems, self._rule_cnt) + + self._rule_cnt += 1 + self._result["total_tests"] = self._result["+ successful_rule_tests_cnt"] + self._result["- failed_rule_tests_cnt"] @staticmethod - def _clear_rules(processor: "Processor"): + def _reset_(processor: "Processor"): if hasattr(processor, "_rules"): processor.rules.clear() - - @staticmethod - def _reset_trees(processor: "Processor"): if hasattr(processor, "_tree"): processor._tree = RuleTree() if hasattr(processor, "_specific_tree"): @@ -332,8 +360,7 @@ def _create_rule_file(rule_dict: dict, rule_path: str): json.dump([rule_dict], temp_file) def _print_error_on_exception(self, error: BaseException, rule_test: dict, t_idx: int): - self._print_filename(rule_test) - print_fcolor(Fore.MAGENTA, f"RULE {t_idx}:") + print_fcolor(Fore.MAGENTA, f"RULE FILE {rule_test['file']} & RULE {t_idx}:") print_fcolor(Fore.RED, f"Exception: {error}") self._print_stack_trace(error) @@ -344,142 +371,34 @@ def _print_stack_trace(self, error: BaseException): for line in tbk: print(line) - def _print_filename(self, rule_test: dict): - if not self._filename_printed: - print_fcolor(Fore.LIGHTMAGENTA_EX, f'\nRULE FILE {rule_test["file"]}') - self._filename_printed = True - - def _eval_custom_rule_test(self, rule_test: dict, processor: "Processor"): - self._filename_printed = False - with StringIO() as buf, redirect_stdout(buf): - self._run_custom_tests(processor, rule_test) - self._custom_tests_output += buf.getvalue() - - def _eval_file_rule_test(self, rule_test: dict, processor: "Processor", r_idx: int): - self._filename_printed = False - - for t_idx, test in enumerate(rule_test["tests"]): - if test.get("target_rule_idx") is not None and test.get("target_rule_idx") != r_idx: - continue - - try: - extra_output = processor.process(test["raw"]) - except BaseException as error: - self._print_error_on_exception(error, rule_test, t_idx) - self._success = False - self._failed_rule_tests_cnt += 1 - return - - diff = self._get_diff_raw_test(test) - print_diff = self._check_if_different(diff) - - errors = [] - warnings = [] - - if isinstance(processor, PreDetector): - self._pd_extra.update_errors(processor, extra_output, errors, warnings) - - if print_diff or warnings or errors: - self._print_filename(rule_test) - print_fcolor(Fore.MAGENTA, f"RULE {t_idx}:") - - if print_diff: - self._print_filename(rule_test) - self._print_diff_test(diff) - - if print_diff or errors: - self._success = False - self._failed_rule_tests_cnt += 1 - else: - self._successful_rule_tests_cnt += 1 - - self._warning_cnt += len(warnings) - - self._print_errors_and_warnings(errors, warnings) - - def _run_custom_tests(self, processor, rule_test): - results_for_all_rules = processor.test_rules() - results = results_for_all_rules.get(processor.rules[0].__repr__(), []) - if not results: - self._missing_custom_tests.append(rule_test["file"]) - else: - self._custom_tests.append(rule_test["file"]) - for idx, result in enumerate(results): - diff = list(ndiff([result[0]], [result[1]])) - if self._check_if_different(diff): - if not self._filename_printed: - self._print_filename(rule_test) - print(f"{processor.__class__.__name__.upper()} SPECIFIC TEST #{idx}:") - self._print_diff_test(diff) - self._failed_rule_tests_cnt += 1 - self._success = False - else: - self._successful_rule_tests_cnt += 1 - - @staticmethod - def _print_errors_and_warnings(errors, warnings): - for error in errors: - print_fcolor(Fore.RED, error) - - for warning in warnings: - print_fcolor(Fore.YELLOW, warning) - @staticmethod def _check_if_different(diff): return any((item for item in diff if item.startswith(("+", "-", "?")))) + @staticmethod + def _get_processor_instance(name, processor_cfg, logger_): + cfg = {name: processor_cfg} + processor = Factory.create(cfg, logger_) + return processor + def _check_which_rule_files_miss_tests(self, rules_pn): - custom_test_mapping = self._get_custom_test_mapping() - rules_with_tests = [] - rules_without_tests = [] + + rule_tests = {"with tests": [], "without tests": []} for _, processor_test_cfg in rules_pn.items(): processor_type = processor_test_cfg["type"] rules = processor_test_cfg["rules"] - - has_custom_tests = custom_test_mapping.get(processor_type, False) - if has_custom_tests: - continue - + for rule in rules: if rule["tests"]: - rules_with_tests.append(rule["file"]) + rule_tests["with tests"].append(rule["file"]) else: - rules_without_tests.append(rule["file"]) + rule_tests["without tests"].append(rule["file"]) - rule_test_coverage = ( - len(rules_with_tests) / (len(rules_with_tests) + len(rules_without_tests)) * 100 + self._result["rule_test_coverage"] = ( + len(rule_tests["with tests"]) / (len(rule_tests["without tests"]) + len(rule_tests["without tests"])) * 100 ) - print_fcolor(Fore.LIGHTGREEN_EX, "\nRULES WITH TESTS:") - for rule in rules_with_tests: - print_fcolor(Fore.LIGHTGREEN_EX, f" {rule}") - if not rules_with_tests: - print_fcolor(Fore.LIGHTGREEN_EX, "None") - print_fcolor(Fore.LIGHTRED_EX, "\nRULES WITHOUT TESTS:") - for rule in rules_without_tests: - print_fcolor(Fore.LIGHTRED_EX, f" {rule}") - if not rules_without_tests: - print_fcolor(Fore.LIGHTRED_EX, "None") - - return rule_test_coverage - - @staticmethod - def _get_processor_instance(name, processor_cfg, logger_): - cfg = {name: processor_cfg} - processor = Factory.create(cfg, logger_) - return processor - - @staticmethod - def _print_diff_test(diff): - for item in diff: - if item.startswith("- "): - print_fcolor(Fore.RED, item) - elif item.startswith("+ "): - print_fcolor(Fore.GREEN, item) - elif item.startswith("? "): - print_fcolor(Fore.WHITE, item) - else: - print_fcolor(Fore.CYAN, item) + self._pd_extra.print_rules(rule_tests) def _sort_lists_in_nested_dict(self, nested_dict): for key, value in nested_dict.items(): @@ -509,123 +428,63 @@ def _set_rules_dirs_to_empty(self): processor_cfg["generic_rules"] = self._empty_rules_dirs processor_cfg["specific_rules"] = self._empty_rules_dirs - @staticmethod - def _check_test_validity(errors: list, rule_tests: list, test_file: TextIO) -> bool: - has_errors = False - for rule_test in rule_tests: - rule_keys = set(rule_test.keys()) - valid_keys = {"raw", "processed", "target_rule_idx"} - required_keys = {"raw", "processed"} - invalid_keys = rule_keys.difference(valid_keys) - has_error = False - - if invalid_keys.difference({"target_rule_idx"}): - errors.append( - f'Schema error in test "{test_file.name}": "Remove keys: {invalid_keys}"' - ) - has_error = True - - available_required_keys = rule_keys.intersection(required_keys) - if available_required_keys != required_keys: - errors.append( - f'Schema error in test "{test_file.name}": "The following required keys are ' - f'missing: {required_keys.difference(available_required_keys)}"' - ) - has_error = True - - if not has_error: - if not isinstance(rule_test.get("raw"), dict) or not isinstance( - rule_test.get("processed"), dict - ): - errors.append( - f'Schema error in test "{test_file.name}": "Values of raw and processed ' - f'must be dictionaries"' - ) - has_error = True - if {"target_rule_idx"}.intersection(rule_keys): - if not isinstance(rule_test.get("target_rule_idx"), int): - errors.append( - f'Schema error in test "{test_file.name}": "Value of target_rule_idx ' - f'must be an integer"' - ) - has_error = True - has_errors = has_errors or has_error - return has_errors - def _get_rules_per_processor_name(self, rules_dirs: dict) -> defaultdict: - print_fcolor(Fore.YELLOW, "\nRULES DIRECTORIES:") rules_pn = defaultdict(dict) - errors = [] for processor_name, proc_rules_dirs in rules_dirs.items(): - self._get_rules_for_processor(processor_name, proc_rules_dirs, rules_pn, errors) - if errors: - for error in errors: - print_fcolor(Fore.RED, error) + self._get_rules_for_processor(processor_name, proc_rules_dirs, rules_pn) + if self._problems["errors"]: + self._pd_extra.print_rules(self._problems["errors"]) sys.exit(1) return rules_pn - def _get_rules_for_processor(self, processor_name, proc_rules_dirs, rules_pn, errors): + def _get_rules_for_processor(self, processor_name, proc_rules_dirs, rules_pn): if not rules_pn[processor_name]: rules_pn[processor_name] = defaultdict(dict) processor_type = proc_rules_dirs["type"] rules_pn[processor_name]["type"] = processor_type rules_pn[processor_name]["rules"] = [] - print_fcolor(Fore.YELLOW, f" {processor_name} ({processor_type}):") - for rule_dirs_type, rules_dirs_by_type in proc_rules_dirs["rule_dirs"].items(): - print_fcolor(Fore.YELLOW, f" {rule_dirs_type}:") - for rules_dir in rules_dirs_by_type: - print_fcolor(Fore.YELLOW, f" {rules_dir}:") - for root, _, files in walk(rules_dir): - rule_files = [file for file in files if self._is_valid_rule_name(file)] - for file in rule_files: - multi_rule = self._get_multi_rule_dict(file, root) - test_path = path.join( - root, "".join([file.rsplit(".", maxsplit=1)[0], "_test.json"]) - ) - if path.isfile(test_path): - with open(test_path, "r", encoding="utf8") as test_file: - try: - rule_tests = json.load(test_file) - except json.decoder.JSONDecodeError as error: - errors.append( - f"JSON decoder error in test " - f'"{test_file.name}": "{str(error)}" ' - ) - continue - has_errors = self._check_test_validity( - errors, rule_tests, test_file - ) - if has_errors: - continue - else: - rule_tests = [] - rules_pn[processor_name]["rules"].append( - { - rule_dirs_type: multi_rule, - "tests": rule_tests, - "file": path.join(root, file), - } - ) + directories = {"Rules Directory" : [f"{processor_name} ({processor_type}):"], "Path": []} + + for type_count, rules_dir in enumerate(proc_rules_dirs["rule_dirs"].values()): + rule_dirs_type = list(proc_rules_dirs['rule_dirs'].keys())[type_count] + directories["Path"].append(f" - {rule_dirs_type}") + for root, _, files in walk(str(rules_dir)): + rule_files = [file for file in files if self._is_valid_rule_name(file)] + for file in rule_files: + + test_path = path.join( + root, "".join([file.rsplit(".", maxsplit=1)[0], "_test.json"]) + ) - @staticmethod - def _get_multi_rule_dict(file, root): - with open(path.join(root, file), "r", encoding="utf8") as rules_file: + self._get_rule_dict(file, root, test_path, processor_name, rules_pn, rule_dirs_type) + + self._pd_extra.print_rules(directories) + + def _get_rule_dict(self, file, root, test_path, processor_name, rules_pn, rule_dirs_type): + rule_tests = [] + + if path.isfile(test_path): try: - multi_rule = ( - list(yaml.load_all(rules_file)) - if file.endswith(".yml") - else json.load(rules_file) - ) - except json.decoder.JSONDecodeError as error: - raise AutoRuleTesterException( - f'JSON decoder error in rule "{rules_file.name}": ' f'"{str(error)}"' - ) from error - except YAMLError as error: - raise AutoRuleTesterException( - f"YAML error in rule " f'"{rules_file.name}": ' f'"{error}"' - ) from error - return multi_rule + rule_tests = self._pd_extra._load_json_or_yaml(test_path) + except ValueError as error: + self._problems["errors"].append(str(error)) + return + + file_path = path.join(root, file) + try: + multi_rule = self._pd_extra._load_json_or_yaml(file_path) + if not all(d.get("target_rule_idx") is not None for d in rule_tests) and len(rule_tests) > 1: + raise Exception(f"Not all dictionaries in {file_path} contain the mandatory key target_rule_idx: Cant build corret test set for rules.") + except ValueError as error: + self._problems["errors"].append(str(error)) + return + + rules_pn[processor_name]["rules"].append({ + rule_dirs_type: multi_rule, + "tests": rule_tests, + "file": file_path, + }) @staticmethod def _is_valid_rule_name(file_name: str) -> bool: @@ -637,7 +496,6 @@ def _get_rule_dirs_by_processor_name(self) -> defaultdict: rules_dirs = defaultdict(dict) for processor in self._config_yml["pipeline"]: processor_name, processor_cfg = next(iter(processor.items())) - rules_to_add = [] print("\nProcessor Config:") pprint(processor_cfg) @@ -645,8 +503,8 @@ def _get_rule_dirs_by_processor_name(self) -> defaultdict: if processor_cfg.get("rules"): rules_to_add.append(("rules", processor_cfg["rules"])) elif processor_cfg.get("generic_rules") and processor_cfg.get("specific_rules"): - rules_to_add.append(("generic_rules", processor_cfg["generic_rules"])) - rules_to_add.append(("specific_rules", processor_cfg["specific_rules"])) + rules_to_add.append(("generic_rules", processor_cfg["generic_rules"][0])) + rules_to_add.append(("specific_rules", processor_cfg["specific_rules"][0])) if not rules_dirs[processor_name]: rules_dirs[processor_name] = defaultdict(dict) @@ -654,8 +512,8 @@ def _get_rule_dirs_by_processor_name(self) -> defaultdict: rules_dirs[processor_name]["type"] = processor_cfg["type"] if not rules_dirs[processor_name]["rule_dirs"]: - rules_dirs[processor_name]["rule_dirs"] = defaultdict(list) - + rules_dirs[processor_name]["rule_dirs"] = defaultdict(str) + for rule_to_add in rules_to_add: rules_dirs[processor_name]["rule_dirs"][rule_to_add[0]] += rule_to_add[1]