Skip to content

Commit

Permalink
add inline rule config feature to pipline config (#453)
Browse files Browse the repository at this point in the history
Co-authored-by: dtrai2 <[email protected]>
  • Loading branch information
ekneg54 and dtrai2 authored Oct 9, 2023
1 parent 7bbbdf3 commit e5cbb34
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 45 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
### Features

* add a preprocessor to enrich by systems env variables
* add option to define rules inline in pipeline config under processor configs `generic_rules` or `specific_rules`

### Improvements

Expand Down
37 changes: 27 additions & 10 deletions logprep/abc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,24 @@ class Config(Component.Config):
specific_rules: List[str] = field(
validator=[
validators.instance_of(list),
validators.deep_iterable(member_validator=validators.instance_of(str)),
validators.deep_iterable(member_validator=validators.instance_of((str, dict))),
]
)
"""List of rule locations to load rules from.
In addition to paths to file directories it is possible to retrieve rules from a URI.
For valid URI formats see :ref:`getters`.
As last option it is possible to define entire rules with all their configuration parameters as list elements.
"""
generic_rules: List[str] = field(
validator=[
validators.instance_of(list),
validators.deep_iterable(member_validator=validators.instance_of(str)),
validators.deep_iterable(member_validator=validators.instance_of((str, dict))),
]
)
"""List of rule locations to load rules from.
In addition to paths to file directories it is possible to retrieve rules from a URI.
For valid URI formats see :ref:`getters`.
As last option it is possible to define entire rules with all their configuration parameters as list elements.
"""
tree_config: Optional[str] = field(
default=None, validator=[validators.optional(validators.instance_of(str))]
Expand Down Expand Up @@ -229,20 +231,35 @@ def test_rules(self) -> dict:
"""

@staticmethod
def resolve_directories(rule_paths: list) -> list:
resolved_paths = []
for rule_path in rule_paths:
getter_instance = getter.GetterFactory.from_string(rule_path)
def resolve_directories(rule_sources: list) -> list:
"""resolves directories to a list of files or rule definitions
Parameters
----------
rule_sources : list
a list of files, directories or rule definitions
Returns
-------
list
a list of files and rule definitions
"""
resolved_sources = []
for rule_source in rule_sources:
if isinstance(rule_source, dict):
resolved_sources.append(rule_source)
continue
getter_instance = getter.GetterFactory.from_string(rule_source)
if getter_instance.protocol == "file":
if Path(getter_instance.target).is_dir():
paths = list_json_files_in_directory(getter_instance.target)
for file_path in paths:
resolved_paths.append(file_path)
resolved_sources.append(file_path)
else:
resolved_paths.append(rule_path)
resolved_sources.append(rule_source)
else:
resolved_paths.append(rule_path)
return resolved_paths
resolved_sources.append(rule_source)
return resolved_sources

def load_rules(self, specific_rules_targets: List[str], generic_rules_targets: List[str]):
"""method to add rules from directories or urls"""
Expand Down
10 changes: 6 additions & 4 deletions logprep/processor/base/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,21 +255,23 @@ def lucene_filter(self):
# pylint: enable=C0111

@classmethod
def create_rules_from_target(cls, path: str) -> list:
def create_rules_from_target(cls, rule_target: str) -> list:
"""Create a rule from a file."""
content = GetterFactory.from_string(path).get()
if isinstance(rule_target, dict):
return [cls._create_from_dict(rule_target)]
content = GetterFactory.from_string(rule_target).get()
try:
rule_data = json.loads(content)
except ValueError:
rule_data = yaml.load_all(content)
try:
rules = [cls._create_from_dict(rule) for rule in rule_data]
except InvalidRuleDefinitionError as error:
raise InvalidRuleDefinitionError(f"{path}: {error}") from error
raise InvalidRuleDefinitionError(f"{rule_target}: {error}") from error
if len(rules) == 0:
raise InvalidRuleDefinitionError("no rules in file")
for rule in rules:
rule.file_name = splitext(basename(path))[0]
rule.file_name = splitext(basename(rule_target))[0]
return rules

@classmethod
Expand Down
42 changes: 24 additions & 18 deletions logprep/util/schema_and_rule_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,28 @@

"""Runner for testing schemas and rules"""

from typing import Optional, List
from collections.abc import Iterable

from argparse import ArgumentParser
from collections.abc import Iterable
from json.decoder import JSONDecodeError
from logging import Logger
from os import walk
from os.path import join
from json.decoder import JSONDecodeError
from typing import List, Optional

from colorama import Fore

from logprep.util.configuration import Configuration

from logprep.abc.processor import Processor
from logprep.filter.lucene_filter import LuceneFilterError
from logprep.processor.base.exceptions import (
InvalidRuleDefinitionError,
MismatchedRuleDefinitionError,
)
from logprep.processor.base.rule import Rule
from logprep.abc.processor import Processor
from logprep.processor.labeler.labeling_schema import LabelingSchema, InvalidLabelingSchemaFileError
from logprep.filter.lucene_filter import LuceneFilterError
from logprep.processor.labeler.labeling_schema import (
InvalidLabelingSchemaFileError,
LabelingSchema,
)
from logprep.util.configuration import Configuration


class SchemaAndRuleChecker:
Expand Down Expand Up @@ -138,14 +139,17 @@ def _validate_rules_in_path(
path_schema: str = None,
):
number_of_checked_rules = 0
for root, _, files in walk(path_rules):
for file in files:
number_of_checked_rules += 1
rule_path = join(root, file)

multi_rule = self.check_rule_creation_errors(rule_class, rule_path)
self._validate_schema(multi_rule, path_schema, rule_path)
self._print_schema_check_results(path_schema)
if isinstance(path_rules, dict):
self.check_rule_creation_errors(rule_class, path_rules)
else:
for root, _, files in walk(path_rules):
for file in files:
number_of_checked_rules += 1
rule_path = join(root, file)

multi_rule = self.check_rule_creation_errors(rule_class, rule_path)
self._validate_schema(multi_rule, path_schema, rule_path)
self._print_schema_check_results(path_schema)
if not self.errors:
self._print_valid(
f"Valid {processor_type} rules in {path_rules} "
Expand Down Expand Up @@ -198,7 +202,9 @@ def check_rule_creation_errors(self, rule_class: Rule, rule_path: str) -> Option
"""
rule = None
try:
if rule_path.endswith(".json") or rule_path.endswith(".yml"):
if isinstance(rule_path, dict):
rule = rule_class.create_rules_from_target(rule_path)
elif rule_path.endswith(".json") or rule_path.endswith(".yml"):
if not rule_path.endswith("_test.json"):
rule = rule_class.create_rules_from_target(rule_path)
except InvalidRuleDefinitionError as error:
Expand Down
14 changes: 14 additions & 0 deletions quickstart/exampledata/config/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ pipeline:
- quickstart/exampledata/rules/dropper/specific
generic_rules:
- quickstart/exampledata/rules/dropper/specific
- filter: "test_dropper"
dropper:
drop:
- drop_me
description: "..."

- pre_detector:
type: pre_detector
Expand Down Expand Up @@ -78,6 +83,15 @@ pipeline:
max_cached_pseudonyms: 1000000
max_caching_days: 1

- calculator:
type: calculator
specific_rules:
- filter: "test_label: execute"
calculator:
target_field: "calculation"
calc: "1 + 1"
generic_rules: []

input:
kafka:
type: confluentkafka_input
Expand Down
12 changes: 2 additions & 10 deletions tests/unit/framework/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

from logprep.abc.input import (
CriticalInputError,
CriticalInputParsingError,
FatalInputError,
SourceDisconnectedError,
WarningInputError,
CriticalInputParsingError,
)
from logprep.abc.output import (
CriticalOutputError,
Expand All @@ -37,10 +37,7 @@
SharedCounter,
)
from logprep.metrics.metric import MetricTargets
from logprep.processor.base.exceptions import (
ProcessingCriticalError,
ProcessingWarning,
)
from logprep.processor.base.exceptions import ProcessingCriticalError, ProcessingWarning
from logprep.processor.deleter.rule import DeleterRule
from logprep.util.getter import GetterFactory
from logprep.util.multiprocessing_log_handler import MultiprocessingLogHandler
Expand All @@ -65,11 +62,6 @@ class ConfigurationForTests:
counter = SharedCounter()


class ProcessorWarningMockError(ProcessingWarning):
def __init__(self):
super().__init__("ProcessorWarningMockError")


@mock.patch("logprep.factory.Factory.create")
class TestPipeline(ConfigurationForTests):
def setup_method(self):
Expand Down
88 changes: 85 additions & 3 deletions tests/unit/test_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,17 @@
from string import ascii_letters
from unittest import mock

from pytest import raises, mark
from pytest import mark, raises

from logprep.abc.input import Input
from logprep.factory import Factory
from logprep.factory_error import (
InvalidConfigSpecificationError,
InvalidConfigurationError,
UnknownComponentTypeError,
NoTypeSpecifiedError,
InvalidConfigSpecificationError,
UnknownComponentTypeError,
)
from logprep.filter.expression.filter_expression import Exists
from logprep.processor.clusterer.processor import Clusterer
from logprep.processor.labeler.processor import Labeler
from logprep.processor.normalizer.processor import Normalizer
Expand Down Expand Up @@ -152,6 +153,87 @@ def test_create_labeler_creates_labeler_processor():
assert isinstance(processor, Labeler)


def test_creates_calculator_with_inline_rules():
processor = Factory.create(
{
"calculator": {
"type": "calculator",
"generic_rules": [
{
"filter": "message",
"calculator": {"target_field": "target", "calc": "1 + 1"},
},
],
"specific_rules": [
{
"filter": "message",
"calculator": {"target_field": "target", "calc": "1 + 3"},
},
],
}
},
logger,
)
assert len(processor._generic_rules) == 1
assert len(processor._specific_rules) == 1


def test_creates_calculator_with_inline_rules_and_files():
processor = Factory.create(
{
"calculator": {
"type": "calculator",
"generic_rules": [
{
"filter": "message1",
"calculator": {"target_field": "target", "calc": "1 + 1"},
},
"tests/testdata/unit/calculator/generic_rules/calculator.json",
],
"specific_rules": [
{
"filter": "message",
"calculator": {"target_field": "target", "calc": "1 + 3"},
},
"tests/testdata/unit/calculator/specific_rules/calculator.json",
],
}
},
logger,
)
assert len(processor._generic_rules) == 2
assert len(processor._specific_rules) == 2
assert processor._generic_rules[0].filter_str == "message1: *"
assert processor._generic_rules[1].filter_str == "(field1: * AND field2: *)"


def test_creates_calculator_with_inline_rules_and_file_and_directory():
processor = Factory.create(
{
"calculator": {
"type": "calculator",
"generic_rules": [
{
"filter": "message",
"calculator": {"target_field": "target", "calc": "1 + 1"},
},
"tests/testdata/unit/calculator/generic_rules/",
],
"specific_rules": [
{
"filter": "message",
"calculator": {"target_field": "target", "calc": "1 + 3"},
},
"tests/testdata/unit/calculator/specific_rules/calculator.json",
],
}
},
logger,
)
assert len(processor._generic_rules) == 2
assert len(processor._specific_rules) == 2


def test_dummy_input_creates_dummy_input_connector():
processor = Factory.create(
{"labelername": {"type": "dummy_input", "documents": [{}, {}]}},
Expand Down

0 comments on commit e5cbb34

Please sign in to comment.