Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Oct 12, 2023
1 parent 7fbb823 commit 3b8da10
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 44 deletions.
10 changes: 9 additions & 1 deletion logprep/abc/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,20 @@ class Metrics:
)
"""Number of events that were processed"""

number_of_failed_events: Metric = field(
factory=lambda: Metric(
type=MetricType.COUNTER,
description="Number of events that were send to error output",
name="number_of_failed_events",
)
)
"""Number of events that were send to error output"""

def __attrs_post_init__(self):
for attribute in asdict(self):
attribute = getattr(self, attribute)
if isinstance(attribute, Metric):
attribute.labels = self._labels
# attribute.labels |= {"component": type(self).__module__.split(".")[-1]}
attribute.tracker = attribute.type(
name=f"{self._prefix}{attribute.name}",
documentation=attribute.description,
Expand Down
17 changes: 0 additions & 17 deletions logprep/abc/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

from logprep.abc.connector import Connector
from logprep.abc.input import Input
from logprep.metrics.metrics import Metric, MetricType


class OutputError(BaseException):
Expand Down Expand Up @@ -58,22 +57,6 @@ class Config(Connector.Config):
But this output can be called as output for extra_data.
"""

@define(kw_only=True)
class Metrics(Connector.Metrics):
"""Base Metric class to track and expose statistics about logprep"""

_labels: dict
_prefix: str = "logprep_"

number_of_failed_events: Metric = field(
factory=lambda: Metric(
type=MetricType.COUNTER,
description="Number of events that were send to error output",
name="number_of_failed_events",
)
)
"""Number of events that were send to error output"""

__slots__ = {"input_connector"}

input_connector: Optional[Input]
Expand Down
13 changes: 5 additions & 8 deletions logprep/abc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,15 +187,17 @@ def _process_rule(event, rule):
else:
reduce(_process_rule, (event, *tree.get_matching_rules(event)))

def _apply_rules_wrapper(self, event, rule):
def _apply_rules_wrapper(self, event: dict, rule: "Rule"):
self.metrics.number_of_processed_events += 1
try:
self._apply_rules(event, rule)
except ProcessingWarning as error:
self._handle_warning_error(event, rule, error)
except ProcessingCriticalError as error:
rule.metrics.number_of_failed_events += 1
raise error
except BaseException as error:
rule.metrics.number_of_failed_events += 1
raise ProcessingCriticalError(self, str(error), event) from error
if not hasattr(rule, "delete_source_fields"):
return
Expand Down Expand Up @@ -252,18 +254,13 @@ def load_rules(self, specific_rules_targets: List[str], generic_rules_targets: L
specific_rules_targets = self.resolve_directories(specific_rules_targets)
generic_rules_targets = self.resolve_directories(generic_rules_targets)
for specific_rules_target in specific_rules_targets:
rules = self.rule_class.create_rules_from_target(specific_rules_target)
rules = self.rule_class.create_rules_from_target(specific_rules_target, processor=self)
for rule in rules:
self._specific_tree.add_rule(rule, self._logger)
for generic_rules_target in generic_rules_targets:
rules = self.rule_class.create_rules_from_target(generic_rules_target)
rules = self.rule_class.create_rules_from_target(generic_rules_target, processor=self)
for rule in rules:
self._generic_tree.add_rule(rule, self._logger)
if self._logger.isEnabledFor(DEBUG): # pragma: no cover
number_specific_rules = self._specific_tree.metrics.number_of_rules
self._logger.debug(f"{self.describe()} loaded {number_specific_rules} specific rules")
number_generic_rules = self._generic_tree.metrics.number_of_rules
self._logger.debug(f"{self.describe()} loaded {number_generic_rules} generic rules")

@staticmethod
def _field_exists(event: dict, dotted_field: str) -> bool:
Expand Down
1 change: 1 addition & 0 deletions logprep/framework/rule_tree/rule_tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class RuleTreeMetrics(Component.Metrics):
"""Tracks statistics about the current rule tree"""

number_of_processed_events = field(default=None)
number_of_failed_events = field(default=None)

__slots__ = (
"rule_parser",
Expand Down
2 changes: 1 addition & 1 deletion logprep/processor/base/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class ProcessingCriticalError(ProcessingError):
"""A critical error occurred - stop processing of this event"""

def __init__(self, processor: "Processor", message: str, event: dict):
processor.metrics.number_of_errors += 1
processor.metrics.number_of_failed_events += 1
super().__init__(
processor, f"{message} -> event was send to error output and further processing stopped"
)
Expand Down
16 changes: 10 additions & 6 deletions logprep/processor/base/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@
from ruamel.yaml import YAML

from logprep.abc.component import Component
from logprep.abc.processor import Processor
from logprep.filter.expression.filter_expression import FilterExpression
from logprep.filter.lucene_filter import LuceneFilter
from logprep.metrics.metrics import calculate_new_average
Expand Down Expand Up @@ -224,16 +225,19 @@ def metric_labels(self) -> dict:
"""Return the metric labels for this component."""
return {
"component": "rule",
"type": self.rule_type,
"processor": self._processor_name,
"id": str(self._config.rule_id),
"description": self._config.description,
}

def __init__(self, filter_rule: FilterExpression, config: Config):
def __init__(self, filter_rule: FilterExpression, config: Config, processor_name: str):
if not isinstance(config, self.Config):
raise InvalidRuleDefinitionError("config is not a Config class")
if not config.tag_on_failure:
config.tag_on_failure = [f"_{self.rule_type}_failure"]
self.__class__.__hash__ = Rule.__hash__
self._processor_name = processor_name
self.filter_str = str(filter_rule)
self._filter = filter_rule
self._special_fields = None
Expand Down Expand Up @@ -272,17 +276,17 @@ def lucene_filter(self):
# pylint: enable=C0111

@classmethod
def create_rules_from_target(cls, rule_target: str) -> list:
def create_rules_from_target(cls, rule_target: str, processor: Processor) -> list:
"""Create a rule from a file."""
if isinstance(rule_target, dict):
return [cls._create_from_dict(rule_target)]
return [cls._create_from_dict(rule_target, processor.name)]
content = GetterFactory.from_string(rule_target).get()
try:
rule_data = json.loads(content)
except ValueError:
rule_data = yaml.load_all(content)
try:
rules = [cls._create_from_dict(rule) for rule in rule_data]
rules = [cls._create_from_dict(rule, processor.name) for rule in rule_data]
except InvalidRuleDefinitionError as error:
raise InvalidRuleDefinitionError(f"{rule_target}: {error}") from error
if len(rules) == 0:
Expand All @@ -298,7 +302,7 @@ def normalize_rule_dict(cls, rule: dict) -> None:
"""

@classmethod
def _create_from_dict(cls, rule: dict) -> "Rule":
def _create_from_dict(cls, rule: dict, processor_name: str) -> "Rule":
cls.normalize_rule_dict(rule)
filter_expression = Rule._create_filter_expression(rule)
cls.rule_type = camel_to_snake(cls.__name__.replace("Rule", ""))
Expand All @@ -315,7 +319,7 @@ def _create_from_dict(cls, rule: dict) -> "Rule":
if special_field_value is not None:
config.update({special_field: special_field_value})
config = cls.Config(**config)
return cls(filter_expression, config)
return cls(filter_expression, config, processor_name)

@staticmethod
def _check_rule_validity(
Expand Down
3 changes: 1 addition & 2 deletions logprep/processor/normalizer/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -914,7 +914,6 @@ def __init__(
self._special_fields = None
self.file_name = None
self._tests = []
self.metrics = self.RuleMetrics(labels={"type": "rule"})
self._substitutions = {}
self._grok = {}
self._timestamps = {}
Expand Down Expand Up @@ -999,7 +998,7 @@ def timestamps(self) -> dict:
# pylint: enable=C0111

@staticmethod
def _create_from_dict(rule: dict) -> "NormalizerRule":
def _create_from_dict(rule: dict, processor_name: str) -> "NormalizerRule":
NormalizerRule._check_rule_validity(rule, "normalize")
NormalizerRule._check_if_normalization_valid(rule)

Expand Down
10 changes: 1 addition & 9 deletions quickstart/exampledata/config/pipeline.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
version: 1
process_count: 1
process_count: 2
timeout: 0.1
logger:
level: INFO
Expand All @@ -24,14 +24,6 @@ pipeline:
generic_rules:
- quickstart/exampledata/rules/labeler/generic

- normalizer:
type: normalizer
specific_rules:
- quickstart/exampledata/rules/normalizer/specific/
generic_rules:
- quickstart/exampledata/rules/normalizer/generic/
regex_mapping: quickstart/exampledata/rules/normalizer/normalizer_regex_mapping.yml

- dropper:
type: dropper
specific_rules:
Expand Down

0 comments on commit 3b8da10

Please sign in to comment.