Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add attributes to processor result #630

Merged
59 changes: 31 additions & 28 deletions logprep/abc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,30 +38,28 @@ class ProcessorResult:
and errors (incl. warnings).
"""

name: str = field(validator=validators.instance_of(str))
data: list = field(validator=validators.instance_of(list), factory=list)
""" The generated extra data """
errors: list = field(
validator=validators.deep_iterable(
member_validator=validators.instance_of((ProcessingError, ProcessingWarning)),
member_validator=validators.instance_of(ProcessingError),
iterable_validator=validators.instance_of(list),
),
factory=list,
)

def __contains__(self, error_class):
return any(isinstance(item, error_class) for item in self.errors)

def get_warning_string(self):
"""creates a string containing the warnings"""
return ", ".join(
[error.args[0] for error in self.errors if isinstance(error, ProcessingWarning)]
)

def get_error_string(self):
"""creates a string containing the errors"""
return ", ".join(
[error.args[0] for error in self.errors if isinstance(error, ProcessingError)]
)
""" The errors that occurred during processing """
warnings: list = field(
validator=validators.deep_iterable(
member_validator=validators.instance_of(ProcessingWarning),
iterable_validator=validators.instance_of(list),
),
factory=list,
)
""" The warnings that occurred during processing """
processor_name: str = field(validator=validators.instance_of(str))
""" The name of the processor """
event: dict = field(validator=validators.optional(validators.instance_of(dict)), default=None)
""" A reference to the event that was processed """


class Processor(Component):
Expand Down Expand Up @@ -138,7 +136,7 @@ def __init__(self, name: str, configuration: "Processor.Config"):
specific_rules_targets=self._config.specific_rules,
)
self.has_custom_tests = False
self.result = ProcessorResult(name=self.name)
self.result = None

@property
def _specific_rules(self):
Expand Down Expand Up @@ -180,23 +178,28 @@ def metric_labels(self) -> dict:
"name": self.name,
}

def process(self, event: dict):
"""Process a log event by calling the implemented `process` method of the
strategy object set in `_strategy` attribute.
def process(self, event: dict) -> ProcessorResult:
"""Process a log event.

Parameters
----------
event : dict
A dictionary representing a log event.

Returns
-------
ProcessorResult
A ProcessorResult object containing the processed event, errors,
extra data and a list of target outputs.

"""
self.result = ProcessorResult(name=self.name)
self.result = ProcessorResult(processor_name=self.name, event=event)
logger.debug(f"{self.describe()} processing event {event}")
self._process_rule_tree(event, self._specific_tree)
self._process_rule_tree(event, self._generic_tree)
return self.result

def _process_rule_tree(self, event: dict, tree: "RuleTree"):
def _process_rule_tree(self, event: dict, tree: RuleTree):
applied_rules = set()

@Metric.measure_time()
Expand All @@ -206,14 +209,14 @@ def _process_rule(rule, event):
applied_rules.add(rule)
return event

def _process_rule_tree_multiple_times(tree, event):
def _process_rule_tree_multiple_times(tree: RuleTree, event: dict):
matching_rules = tree.get_matching_rules(event)
while matching_rules:
for rule in matching_rules:
_process_rule(rule, event)
matching_rules = set(tree.get_matching_rules(event)).difference(applied_rules)

def _process_rule_tree_once(tree, event):
def _process_rule_tree_once(tree: RuleTree, event: dict):
matching_rules = tree.get_matching_rules(event)
for rule in matching_rules:
_process_rule(rule, event)
Expand All @@ -231,7 +234,7 @@ def _apply_rules_wrapper(self, event: dict, rule: "Rule"):
except ProcessingCriticalError as error:
self.result.errors.append(error) # is needed to prevent wrapping it in itself
event.clear()
except BaseException as error:
except Exception as error:
self.result.errors.append(ProcessingCriticalError(str(error), rule, event))
event.clear()
if not hasattr(rule, "delete_source_fields"):
Expand Down Expand Up @@ -321,9 +324,9 @@ def _handle_warning_error(self, event, rule, error, failure_tags=None):
else:
add_and_overwrite(event, "tags", sorted(list({*tags, *failure_tags})))
if isinstance(error, ProcessingWarning):
self.result.errors.append(error)
self.result.warnings.append(error)
else:
self.result.errors.append(ProcessingWarning(str(error), rule, event))
self.result.warnings.append(ProcessingWarning(str(error), rule, event))

def _has_missing_values(self, event, rule, source_field_dict):
missing_fields = list(
Expand Down
84 changes: 50 additions & 34 deletions logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
from ctypes import c_bool
from functools import cached_property, partial
from importlib.metadata import version
from multiprocessing import Lock, Value, current_process
from typing import Any, List, Tuple, Optional
from multiprocessing import Value, current_process
from typing import Any, Generator, List, Tuple

import attrs
import msgspec

from logprep.abc.component import Component
from logprep.abc.input import (
Expand Down Expand Up @@ -53,12 +52,41 @@ class PipelineResult:

results: List[ProcessorResult] = attrs.field(
validator=[
attrs.validators.instance_of(list),
attrs.validators.instance_of((list, Generator)),
attrs.validators.deep_iterable(
member_validator=attrs.validators.instance_of(ProcessorResult)
),
]
)
"""List of ProcessorResults"""
event: dict = attrs.field(validator=attrs.validators.instance_of(dict))
"""The event that was processed"""
event_received: dict = attrs.field(
validator=attrs.validators.instance_of(dict), converter=copy.deepcopy
)
"""The event that was received"""
pipeline: list[Processor]
"""The pipeline that processed the event"""

@cached_property
def errors(self) -> List[ProcessingError]:
"""Return all processing errors."""
return list(itertools.chain(*[result.errors for result in self]))

@cached_property
def warnings(self) -> List[ProcessingWarning]:
"""Return all processing warnings."""
return list(itertools.chain(*[result.warnings for result in self]))

@cached_property
def data(self) -> List[Tuple[dict, dict]]:
"""Return all extra data."""
return list(itertools.chain(*[result.data for result in self]))

def __attrs_post_init__(self):
self.results = list(
(processor.process(self.event) for processor in self.pipeline if self.event)
)

def __iter__(self):
return iter(self.results)
Expand Down Expand Up @@ -113,9 +141,6 @@ class Metrics(Component.Metrics):
_continue_iterating: Value
""" a flag to signal if iterating continues """

_lock: Lock
""" the lock for the pipeline process """

pipeline_index: int
""" the index of this pipeline """

Expand Down Expand Up @@ -175,19 +200,13 @@ def _input(self) -> Input:
)
return Factory.create(input_connector_config)

def __init__(
self, config: Configuration, pipeline_index: int = None, lock: Lock = None
) -> None:
def __init__(self, config: Configuration, pipeline_index: int = None) -> None:
self.logger = logging.getLogger("Pipeline")
self.logger.name = f"Pipeline{pipeline_index}"
self._logprep_config = config
self._timeout = config.timeout
self._continue_iterating = Value(c_bool)

self._lock = lock
self.pipeline_index = pipeline_index
self._encoder = msgspec.msgpack.Encoder()
self._decoder = msgspec.msgpack.Decoder()
if self._logprep_config.profile_pipelines:
self.run = partial(PipelineProfiler.profile_function, self.run)

Expand Down Expand Up @@ -224,43 +243,36 @@ def run(self) -> None: # pylint: disable=method-hidden
self._continue_iterating.value = True
assert self._input, "Pipeline should not be run without input connector"
assert self._output, "Pipeline should not be run without output connector"
with self._lock:
with warnings.catch_warnings():
warnings.simplefilter("default")
self._setup()
with warnings.catch_warnings():
warnings.simplefilter("default")
self._setup()
self.logger.debug("Start iterating")
while self._continue_iterating.value:
self.process_pipeline()
self._shut_down()

@_handle_pipeline_error
def process_pipeline(self) -> Tuple[Optional[dict], Optional[PipelineResult]]:
def process_pipeline(self) -> PipelineResult:
"""Retrieve next event, process event with full pipeline and store or return results"""
Component.run_pending_tasks()

event = self._get_event()
if not event:
return None, None
event_received = copy.deepcopy(event)
result: PipelineResult = self.process_event(event)
for processor_result in result:
if not processor_result.errors:
continue
if ProcessingWarning in processor_result:
self.logger.warning(processor_result.get_warning_string())
if ProcessingError in processor_result:
self.logger.error(processor_result.get_error_string())
if self._output:
self._store_failed_event(processor_result.errors, event_received, event)
# pipeline is aborted on processing error
return event, result
if result.warnings:
self.logger.warning(",".join((str(warning) for warning in result.warnings)))
if result.errors:
self.logger.error(",".join((str(error) for error in result.errors)))
self._store_failed_event(result.errors, result.event_received, event)
return
if self._output:
result_data = [res.data for res in result if res.data]
if result_data:
self._store_extra_data(itertools.chain(*result_data))
if event:
self._store_event(event)
return event, result
return result

def _store_event(self, event: dict) -> None:
for output_name, output in self._output.items():
Expand Down Expand Up @@ -288,9 +300,13 @@ def _get_event(self) -> dict:
@Metric.measure_time()
def process_event(self, event: dict):
"""process all processors for one event"""
return PipelineResult(
results=[processor.process(event) for processor in self._pipeline if event]
result = PipelineResult(
results=[],
event_received=event,
event=event,
pipeline=self._pipeline,
)
return result

def _store_extra_data(self, result_data: List | itertools.chain) -> None:
self.logger.debug("Storing extra data")
Expand Down
3 changes: 1 addition & 2 deletions logprep/framework/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ def __init__(self, configuration: Configuration):
self._pipelines: list[multiprocessing.Process] = []
self._configuration = configuration

self._lock = multiprocessing.Lock()
prometheus_config = self._configuration.metrics
if prometheus_config.enabled:
self.prometheus_exporter = PrometheusExporter(prometheus_config)
Expand Down Expand Up @@ -164,7 +163,7 @@ def restart(self):
self.prometheus_exporter.run()

def _create_pipeline(self, index) -> multiprocessing.Process:
pipeline = Pipeline(pipeline_index=index, config=self._configuration, lock=self._lock)
pipeline = Pipeline(pipeline_index=index, config=self._configuration)
logger.info("Created new pipeline")
process = multiprocessing.Process(target=pipeline.run, daemon=True)
process.stop = pipeline.stop
Expand Down
17 changes: 13 additions & 4 deletions logprep/processor/generic_resolver/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,21 @@
import re
from typing import Union

from logprep.processor.base.exceptions import FieldExistsWarning
from logprep.processor.base.exceptions import (
FieldExistsWarning,
ProcessingCriticalError,
)
from logprep.processor.field_manager.processor import FieldManager
from logprep.processor.generic_resolver.rule import GenericResolverRule
from logprep.util.getter import GetterFactory
from logprep.util.helper import add_field_to, get_dotted_field_value


class GenericResolverError(BaseException):
class GenericResolverError(ProcessingCriticalError):
"""Base class for GenericResolver related exceptions."""

def __init__(self, name: str, message: str):
super().__init__(f"GenericResolver ({name}): {message}")
def __init__(self, name: str, message: str, rule: GenericResolverRule, event: dict):
super().__init__(f"{name}: {message}", rule=rule, event=event)


class GenericResolver(FieldManager):
Expand Down Expand Up @@ -78,6 +81,8 @@ def _apply_rules(self, event, rule):
raise GenericResolverError(
self.name,
"Mapping group is missing in mapping file pattern!",
rule=rule,
event=event,
)
dest_val = replacements.get(mapping)
if dest_val:
Expand Down Expand Up @@ -145,9 +150,13 @@ def ensure_rules_from_file(self, rule):
f"Additions file "
f'\'{rule.resolve_from_file["path"]}\''
f" must be a dictionary with string values!",
rule=rule,
event=None,
)
except FileNotFoundError as error:
raise GenericResolverError(
self.name,
f'Additions file \'{rule.resolve_from_file["path"]}' f"' not found!",
rule=rule,
event=None,
) from error
6 changes: 2 additions & 4 deletions logprep/processor/hyperscan_resolver/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

from logprep.processor.base.exceptions import FieldExistsWarning, SkipImportError
from logprep.processor.field_manager.processor import FieldManager
from logprep.processor.generic_resolver.processor import GenericResolverError
from logprep.util.helper import add_field_to, get_dotted_field_value
from logprep.util.validators import directory_validator

Expand All @@ -56,12 +57,9 @@
# pylint: enable=ungrouped-imports


class HyperscanResolverError(BaseException):
class HyperscanResolverError(GenericResolverError):
"""Base class for HyperscanResolver related exceptions."""

def __init__(self, name: str, message: str):
super().__init__(f"HyperscanResolver ({name}): {message}")


class HyperscanResolver(FieldManager):
"""Resolve values in documents by referencing a mapping list."""
Expand Down
Loading
Loading