diff --git a/CHANGELOG.md b/CHANGELOG.md index df9410be8..a5d55dc2e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ### Features * add a preprocessor to enrich by systems env variables +* add option to define rules inline in pipeline config under processor configs `generic_rules` or `specific_rules` ### Improvements diff --git a/logprep/abc/processor.py b/logprep/abc/processor.py index b0d901759..2503afde7 100644 --- a/logprep/abc/processor.py +++ b/logprep/abc/processor.py @@ -41,22 +41,24 @@ class Config(Component.Config): specific_rules: List[str] = field( validator=[ validators.instance_of(list), - validators.deep_iterable(member_validator=validators.instance_of(str)), + validators.deep_iterable(member_validator=validators.instance_of((str, dict))), ] ) """List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see :ref:`getters`. + As last option it is possible to define entire rules with all their configuration parameters as list elements. """ generic_rules: List[str] = field( validator=[ validators.instance_of(list), - validators.deep_iterable(member_validator=validators.instance_of(str)), + validators.deep_iterable(member_validator=validators.instance_of((str, dict))), ] ) """List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see :ref:`getters`. + As last option it is possible to define entire rules with all their configuration parameters as list elements. """ tree_config: Optional[str] = field( default=None, validator=[validators.optional(validators.instance_of(str))] @@ -229,20 +231,35 @@ def test_rules(self) -> dict: """ @staticmethod - def resolve_directories(rule_paths: list) -> list: - resolved_paths = [] - for rule_path in rule_paths: - getter_instance = getter.GetterFactory.from_string(rule_path) + def resolve_directories(rule_sources: list) -> list: + """resolves directories to a list of files or rule definitions + + Parameters + ---------- + rule_sources : list + a list of files, directories or rule definitions + + Returns + ------- + list + a list of files and rule definitions + """ + resolved_sources = [] + for rule_source in rule_sources: + if isinstance(rule_source, dict): + resolved_sources.append(rule_source) + continue + getter_instance = getter.GetterFactory.from_string(rule_source) if getter_instance.protocol == "file": if Path(getter_instance.target).is_dir(): paths = list_json_files_in_directory(getter_instance.target) for file_path in paths: - resolved_paths.append(file_path) + resolved_sources.append(file_path) else: - resolved_paths.append(rule_path) + resolved_sources.append(rule_source) else: - resolved_paths.append(rule_path) - return resolved_paths + resolved_sources.append(rule_source) + return resolved_sources def load_rules(self, specific_rules_targets: List[str], generic_rules_targets: List[str]): """method to add rules from directories or urls""" diff --git a/logprep/processor/base/rule.py b/logprep/processor/base/rule.py index 1a07dca0d..be7091661 100644 --- a/logprep/processor/base/rule.py +++ b/logprep/processor/base/rule.py @@ -255,9 +255,11 @@ def lucene_filter(self): # pylint: enable=C0111 @classmethod - def create_rules_from_target(cls, path: str) -> list: + def create_rules_from_target(cls, rule_target: str) -> list: """Create a rule from a file.""" - content = GetterFactory.from_string(path).get() + if isinstance(rule_target, dict): + return [cls._create_from_dict(rule_target)] + content = GetterFactory.from_string(rule_target).get() try: rule_data = json.loads(content) except ValueError: @@ -265,11 +267,11 @@ def create_rules_from_target(cls, path: str) -> list: try: rules = [cls._create_from_dict(rule) for rule in rule_data] except InvalidRuleDefinitionError as error: - raise InvalidRuleDefinitionError(f"{path}: {error}") from error + raise InvalidRuleDefinitionError(f"{rule_target}: {error}") from error if len(rules) == 0: raise InvalidRuleDefinitionError("no rules in file") for rule in rules: - rule.file_name = splitext(basename(path))[0] + rule.file_name = splitext(basename(rule_target))[0] return rules @classmethod diff --git a/logprep/util/schema_and_rule_checker.py b/logprep/util/schema_and_rule_checker.py index 05a3ca2d4..50d1015b0 100644 --- a/logprep/util/schema_and_rule_checker.py +++ b/logprep/util/schema_and_rule_checker.py @@ -2,27 +2,28 @@ """Runner for testing schemas and rules""" -from typing import Optional, List -from collections.abc import Iterable - from argparse import ArgumentParser +from collections.abc import Iterable +from json.decoder import JSONDecodeError from logging import Logger from os import walk from os.path import join -from json.decoder import JSONDecodeError +from typing import List, Optional from colorama import Fore -from logprep.util.configuration import Configuration - +from logprep.abc.processor import Processor +from logprep.filter.lucene_filter import LuceneFilterError from logprep.processor.base.exceptions import ( InvalidRuleDefinitionError, MismatchedRuleDefinitionError, ) from logprep.processor.base.rule import Rule -from logprep.abc.processor import Processor -from logprep.processor.labeler.labeling_schema import LabelingSchema, InvalidLabelingSchemaFileError -from logprep.filter.lucene_filter import LuceneFilterError +from logprep.processor.labeler.labeling_schema import ( + InvalidLabelingSchemaFileError, + LabelingSchema, +) +from logprep.util.configuration import Configuration class SchemaAndRuleChecker: @@ -138,14 +139,17 @@ def _validate_rules_in_path( path_schema: str = None, ): number_of_checked_rules = 0 - for root, _, files in walk(path_rules): - for file in files: - number_of_checked_rules += 1 - rule_path = join(root, file) - - multi_rule = self.check_rule_creation_errors(rule_class, rule_path) - self._validate_schema(multi_rule, path_schema, rule_path) - self._print_schema_check_results(path_schema) + if isinstance(path_rules, dict): + self.check_rule_creation_errors(rule_class, path_rules) + else: + for root, _, files in walk(path_rules): + for file in files: + number_of_checked_rules += 1 + rule_path = join(root, file) + + multi_rule = self.check_rule_creation_errors(rule_class, rule_path) + self._validate_schema(multi_rule, path_schema, rule_path) + self._print_schema_check_results(path_schema) if not self.errors: self._print_valid( f"Valid {processor_type} rules in {path_rules} " @@ -198,7 +202,9 @@ def check_rule_creation_errors(self, rule_class: Rule, rule_path: str) -> Option """ rule = None try: - if rule_path.endswith(".json") or rule_path.endswith(".yml"): + if isinstance(rule_path, dict): + rule = rule_class.create_rules_from_target(rule_path) + elif rule_path.endswith(".json") or rule_path.endswith(".yml"): if not rule_path.endswith("_test.json"): rule = rule_class.create_rules_from_target(rule_path) except InvalidRuleDefinitionError as error: diff --git a/quickstart/exampledata/config/pipeline.yml b/quickstart/exampledata/config/pipeline.yml index fcf47074b..5d5a02b59 100644 --- a/quickstart/exampledata/config/pipeline.yml +++ b/quickstart/exampledata/config/pipeline.yml @@ -40,6 +40,11 @@ pipeline: - quickstart/exampledata/rules/dropper/specific generic_rules: - quickstart/exampledata/rules/dropper/specific + - filter: "test_dropper" + dropper: + drop: + - drop_me + description: "..." - pre_detector: type: pre_detector @@ -78,6 +83,15 @@ pipeline: max_cached_pseudonyms: 1000000 max_caching_days: 1 + - calculator: + type: calculator + specific_rules: + - filter: "test_label: execute" + calculator: + target_field: "calculation" + calc: "1 + 1" + generic_rules: [] + input: kafka: type: confluentkafka_input diff --git a/tests/unit/framework/test_pipeline.py b/tests/unit/framework/test_pipeline.py index 5f5c3eee9..48abacdcf 100644 --- a/tests/unit/framework/test_pipeline.py +++ b/tests/unit/framework/test_pipeline.py @@ -16,10 +16,10 @@ from logprep.abc.input import ( CriticalInputError, + CriticalInputParsingError, FatalInputError, SourceDisconnectedError, WarningInputError, - CriticalInputParsingError, ) from logprep.abc.output import ( CriticalOutputError, @@ -37,10 +37,7 @@ SharedCounter, ) from logprep.metrics.metric import MetricTargets -from logprep.processor.base.exceptions import ( - ProcessingCriticalError, - ProcessingWarning, -) +from logprep.processor.base.exceptions import ProcessingCriticalError, ProcessingWarning from logprep.processor.deleter.rule import DeleterRule from logprep.util.getter import GetterFactory from logprep.util.multiprocessing_log_handler import MultiprocessingLogHandler @@ -65,11 +62,6 @@ class ConfigurationForTests: counter = SharedCounter() -class ProcessorWarningMockError(ProcessingWarning): - def __init__(self): - super().__init__("ProcessorWarningMockError") - - @mock.patch("logprep.factory.Factory.create") class TestPipeline(ConfigurationForTests): def setup_method(self): diff --git a/tests/unit/test_factory.py b/tests/unit/test_factory.py index d59d37922..ca991a7a6 100644 --- a/tests/unit/test_factory.py +++ b/tests/unit/test_factory.py @@ -7,16 +7,17 @@ from string import ascii_letters from unittest import mock -from pytest import raises, mark +from pytest import mark, raises from logprep.abc.input import Input from logprep.factory import Factory from logprep.factory_error import ( + InvalidConfigSpecificationError, InvalidConfigurationError, - UnknownComponentTypeError, NoTypeSpecifiedError, - InvalidConfigSpecificationError, + UnknownComponentTypeError, ) +from logprep.filter.expression.filter_expression import Exists from logprep.processor.clusterer.processor import Clusterer from logprep.processor.labeler.processor import Labeler from logprep.processor.normalizer.processor import Normalizer @@ -152,6 +153,87 @@ def test_create_labeler_creates_labeler_processor(): assert isinstance(processor, Labeler) +def test_creates_calculator_with_inline_rules(): + processor = Factory.create( + { + "calculator": { + "type": "calculator", + "generic_rules": [ + { + "filter": "message", + "calculator": {"target_field": "target", "calc": "1 + 1"}, + }, + ], + "specific_rules": [ + { + "filter": "message", + "calculator": {"target_field": "target", "calc": "1 + 3"}, + }, + ], + } + }, + logger, + ) + assert len(processor._generic_rules) == 1 + assert len(processor._specific_rules) == 1 + + +def test_creates_calculator_with_inline_rules_and_files(): + processor = Factory.create( + { + "calculator": { + "type": "calculator", + "generic_rules": [ + { + "filter": "message1", + "calculator": {"target_field": "target", "calc": "1 + 1"}, + }, + "tests/testdata/unit/calculator/generic_rules/calculator.json", + ], + "specific_rules": [ + { + "filter": "message", + "calculator": {"target_field": "target", "calc": "1 + 3"}, + }, + "tests/testdata/unit/calculator/specific_rules/calculator.json", + ], + } + }, + logger, + ) + assert len(processor._generic_rules) == 2 + assert len(processor._specific_rules) == 2 + assert processor._generic_rules[0].filter_str == "message1: *" + assert processor._generic_rules[1].filter_str == "(field1: * AND field2: *)" + + +def test_creates_calculator_with_inline_rules_and_file_and_directory(): + processor = Factory.create( + { + "calculator": { + "type": "calculator", + "generic_rules": [ + { + "filter": "message", + "calculator": {"target_field": "target", "calc": "1 + 1"}, + }, + "tests/testdata/unit/calculator/generic_rules/", + ], + "specific_rules": [ + { + "filter": "message", + "calculator": {"target_field": "target", "calc": "1 + 3"}, + }, + "tests/testdata/unit/calculator/specific_rules/calculator.json", + ], + } + }, + logger, + ) + assert len(processor._generic_rules) == 2 + assert len(processor._specific_rules) == 2 + + def test_dummy_input_creates_dummy_input_connector(): processor = Factory.create( {"labelername": {"type": "dummy_input", "documents": [{}, {}]}},