From eb9cc0060b3cdb6760053ef4236b5ce1750ac2ff Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Tue, 9 Jul 2024 16:41:10 +0200 Subject: [PATCH 01/10] add outputs, event to processor_result and rename name to processor_name --- logprep/abc/processor.py | 13 +++++++--- tests/unit/framework/test_pipeline.py | 26 +++++++++++-------- tests/unit/processor/base.py | 2 +- .../test_selective_extractor.py | 2 +- .../unit/util/test_auto_rule_corpus_tester.py | 6 ++++- 5 files changed, 32 insertions(+), 17 deletions(-) diff --git a/logprep/abc/processor.py b/logprep/abc/processor.py index 56bb533dd..41f82e3a8 100644 --- a/logprep/abc/processor.py +++ b/logprep/abc/processor.py @@ -38,8 +38,8 @@ class ProcessorResult: and errors (incl. warnings). """ - name: str = field(validator=validators.instance_of(str)) data: list = field(validator=validators.instance_of(list), factory=list) + """ The generated extra data """ errors: list = field( validator=validators.deep_iterable( member_validator=validators.instance_of((ProcessingError, ProcessingWarning)), @@ -47,6 +47,13 @@ class ProcessorResult: ), factory=list, ) + """ The errors and warnings that occurred during processing """ + outputs: list = field(validator=validators.instance_of(list), factory=list) + """ The outputs of the processors extra data """ + processor_name: str = field(validator=validators.instance_of(str)) + """ The name of the processor """ + event: dict = field(validator=validators.optional(validators.instance_of(dict)), default=None) + """ A reference to the event that was processed """ def __contains__(self, error_class): return any(isinstance(item, error_class) for item in self.errors) @@ -138,7 +145,7 @@ def __init__(self, name: str, configuration: "Processor.Config"): specific_rules_targets=self._config.specific_rules, ) self.has_custom_tests = False - self.result = ProcessorResult(name=self.name) + self.result = ProcessorResult(processor_name=self.name) @property def _specific_rules(self): @@ -190,7 +197,7 @@ def process(self, event: dict): A dictionary representing a log event. """ - self.result = ProcessorResult(name=self.name) + self.result = ProcessorResult(processor_name=self.name, event=event) logger.debug(f"{self.describe()} processing event {event}") self._process_rule_tree(event, self._specific_tree) self._process_rule_tree(event, self._generic_tree) diff --git a/tests/unit/framework/test_pipeline.py b/tests/unit/framework/test_pipeline.py index eff1d2a95..f8c5a8975 100644 --- a/tests/unit/framework/test_pipeline.py +++ b/tests/unit/framework/test_pipeline.py @@ -63,7 +63,7 @@ def get_mock_create(): mock_create = mock.MagicMock() mock_component = mock.MagicMock() mock_component.process = mock.MagicMock() - mock_component.process.return_value = ProcessorResult(name="mock_processor") + mock_component.process.return_value = ProcessorResult(processor_name="mock_processor") mock_create.return_value = mock_component return mock_create @@ -133,7 +133,7 @@ def test_empty_documents_are_not_forwarded_to_other_processors(self, _): processor_with_mock_result = mock.MagicMock() processor_with_mock_result.process = mock.MagicMock() processor_with_mock_result.process.return_value = ProcessorResult( - name="processor_with_mock_res" + processor_name="processor_with_mock_res" ) self.pipeline._pipeline = [ processor_with_mock_result, @@ -160,7 +160,7 @@ def test_not_empty_documents_are_stored_in_the_output(self, _): def test_empty_documents_are_not_stored_in_the_output(self, _): def mock_process_event(event): event.clear() - return [ProcessorResult(name="")] + return [ProcessorResult(processor_name="")] self.pipeline.process_event = mock_process_event self.pipeline._setup() @@ -245,7 +245,7 @@ def test_processor_warning_error_is_logged_but_processing_continues(self, _): mock_rule = mock.MagicMock() processing_warning = ProcessingWarning("not so bad", mock_rule, {"message": "test"}) self.pipeline._pipeline[1].process.return_value = ProcessorResult( - name="mock_processor", errors=[processing_warning] + processor_name="mock_processor", errors=[processing_warning] ) self.pipeline.process_pipeline() @@ -264,13 +264,13 @@ def test_processor_critical_error_is_logged_event_is_stored_in_error_output( self.pipeline._input.get_next.return_value = (input_event1, None) mock_rule = mock.MagicMock() self.pipeline._pipeline[1].process.return_value = ProcessorResult( - name="", + processor_name="", errors=[ProcessingCriticalError("really bad things happened", mock_rule, input_event1)], ) self.pipeline.process_pipeline() self.pipeline._input.get_next.return_value = (input_event2, None) self.pipeline._pipeline[1].process.return_value = ProcessorResult( - name="", + processor_name="", errors=[ProcessingCriticalError("really bad things happened", mock_rule, input_event2)], ) @@ -311,9 +311,13 @@ def test_processor_logs_processing_error_and_warnings_separately( mock_rule = mock.MagicMock() self.pipeline._pipeline[1] = deepcopy(self.pipeline._pipeline[0]) warning = FieldExistsWarning(mock_rule, input_event1, ["foo"]) - self.pipeline._pipeline[0].process.return_value = ProcessorResult(name="", errors=[warning]) + self.pipeline._pipeline[0].process.return_value = ProcessorResult( + processor_name="", errors=[warning] + ) error = ProcessingCriticalError("really bad things happened", mock_rule, input_event1) - self.pipeline._pipeline[1].process.return_value = ProcessorResult(name="", errors=[error]) + self.pipeline._pipeline[1].process.return_value = ProcessorResult( + processor_name="", errors=[error] + ) self.pipeline.process_pipeline() assert mock_error.call_count == 1, f"one error occurred: {mock_error.mock_calls}" assert mock_warning.call_count == 1, f"one warning occurred: {mock_warning.mock_calls}" @@ -438,7 +442,7 @@ def test_extra_data_tuple_is_passed_to_store_custom(self, _): self.pipeline._input.get_next.return_value = ({"some": "event"}, None) self.pipeline._pipeline[1] = deepcopy(self.pipeline._pipeline[0]) self.pipeline._pipeline[1].process.return_value = ProcessorResult( - name="", data=[({"foo": "bar"}, ({"dummy": "target"},))] + processor_name="", data=[({"foo": "bar"}, ({"dummy": "target"},))] ) self.pipeline._pipeline.append(deepcopy(self.pipeline._pipeline[0])) self.pipeline.process_pipeline() @@ -451,7 +455,7 @@ def test_store_custom_calls_all_defined_outputs(self, _): self.pipeline._setup() self.pipeline._pipeline[1] = deepcopy(self.pipeline._pipeline[0]) self.pipeline._pipeline[1].process.return_value = ProcessorResult( - name="", + processor_name="", data=[ ( {"foo": "bar"}, @@ -474,7 +478,7 @@ def test_extra_data_list_is_passed_to_store_custom(self, _): self.pipeline._setup() self.pipeline._pipeline[1] = deepcopy(self.pipeline._pipeline[0]) self.pipeline._pipeline[1].process.return_value = ProcessorResult( - name="", data=[({"foo": "bar"}, ({"dummy": "target"},))] + processor_name="", data=[({"foo": "bar"}, ({"dummy": "target"},))] ) self.pipeline._pipeline.append(deepcopy(self.pipeline._pipeline[0])) self.pipeline._input.get_next.return_value = ({"some": "event"}, None) diff --git a/tests/unit/processor/base.py b/tests/unit/processor/base.py index 5c3b7d519..b4da29a7d 100644 --- a/tests/unit/processor/base.py +++ b/tests/unit/processor/base.py @@ -292,4 +292,4 @@ def test_process_return_result_object(self): assert isinstance(result, ProcessorResult) assert result.data == [] assert result.errors == [] - assert result.name == "Test Instance Name" + assert result.processor_name == "Test Instance Name" diff --git a/tests/unit/processor/selective_extractor/test_selective_extractor.py b/tests/unit/processor/selective_extractor/test_selective_extractor.py index f3aa7d3a6..9bdd0dfcb 100644 --- a/tests/unit/processor/selective_extractor/test_selective_extractor.py +++ b/tests/unit/processor/selective_extractor/test_selective_extractor.py @@ -102,7 +102,7 @@ def test_process_returns_none_when_no_extraction_field_matches(self): assert isinstance(result, ProcessorResult) assert result.data == [] assert result.errors == [] - assert result.name == "Test Instance Name" + assert result.processor_name == "Test Instance Name" def test_gets_matching_rules_from_rules_trees(self): rule_trees = [self.object._generic_tree, self.object._specific_tree] diff --git a/tests/unit/util/test_auto_rule_corpus_tester.py b/tests/unit/util/test_auto_rule_corpus_tester.py index b0f49a444..4e827c459 100644 --- a/tests/unit/util/test_auto_rule_corpus_tester.py +++ b/tests/unit/util/test_auto_rule_corpus_tester.py @@ -321,7 +321,11 @@ def test_run_prints_expected_outputs_to_console( "logprep.util.auto_rule_tester.auto_rule_corpus_tester.Pipeline.process_pipeline" ) as mock_process_pipeline: mock_process_pipeline.return_value = mock_output[0], PipelineResult( - results=[ProcessorResult(name="test", data=test_data["expected_extra_output"])] + results=[ + ProcessorResult( + processor_name="test", data=test_data["expected_extra_output"] + ) + ] ) corpus_tester.run() else: From b991148840da69369ce9766840589d2c32580062 Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Wed, 10 Jul 2024 15:28:30 +0200 Subject: [PATCH 02/10] add tests for collecting errors in result object --- logprep/abc/processor.py | 8 ++-- .../processor/generic_resolver/processor.py | 17 ++++++-- .../processor/hyperscan_resolver/processor.py | 6 +-- tests/unit/processor/base.py | 40 ++++++++++++++++++- tests/unit/processor/test_process.py | 1 - 5 files changed, 57 insertions(+), 15 deletions(-) diff --git a/logprep/abc/processor.py b/logprep/abc/processor.py index 41f82e3a8..8a483fd9e 100644 --- a/logprep/abc/processor.py +++ b/logprep/abc/processor.py @@ -203,7 +203,7 @@ def process(self, event: dict): self._process_rule_tree(event, self._generic_tree) return self.result - def _process_rule_tree(self, event: dict, tree: "RuleTree"): + def _process_rule_tree(self, event: dict, tree: RuleTree): applied_rules = set() @Metric.measure_time() @@ -213,14 +213,14 @@ def _process_rule(rule, event): applied_rules.add(rule) return event - def _process_rule_tree_multiple_times(tree, event): + def _process_rule_tree_multiple_times(tree: RuleTree, event: dict): matching_rules = tree.get_matching_rules(event) while matching_rules: for rule in matching_rules: _process_rule(rule, event) matching_rules = set(tree.get_matching_rules(event)).difference(applied_rules) - def _process_rule_tree_once(tree, event): + def _process_rule_tree_once(tree: RuleTree, event: dict): matching_rules = tree.get_matching_rules(event) for rule in matching_rules: _process_rule(rule, event) @@ -238,7 +238,7 @@ def _apply_rules_wrapper(self, event: dict, rule: "Rule"): except ProcessingCriticalError as error: self.result.errors.append(error) # is needed to prevent wrapping it in itself event.clear() - except BaseException as error: + except Exception as error: self.result.errors.append(ProcessingCriticalError(str(error), rule, event)) event.clear() if not hasattr(rule, "delete_source_fields"): diff --git a/logprep/processor/generic_resolver/processor.py b/logprep/processor/generic_resolver/processor.py index 09f4e19a7..aaa1f2dad 100644 --- a/logprep/processor/generic_resolver/processor.py +++ b/logprep/processor/generic_resolver/processor.py @@ -28,18 +28,21 @@ import re from typing import Union -from logprep.processor.base.exceptions import FieldExistsWarning +from logprep.processor.base.exceptions import ( + FieldExistsWarning, + ProcessingCriticalError, +) from logprep.processor.field_manager.processor import FieldManager from logprep.processor.generic_resolver.rule import GenericResolverRule from logprep.util.getter import GetterFactory from logprep.util.helper import add_field_to, get_dotted_field_value -class GenericResolverError(BaseException): +class GenericResolverError(ProcessingCriticalError): """Base class for GenericResolver related exceptions.""" - def __init__(self, name: str, message: str): - super().__init__(f"GenericResolver ({name}): {message}") + def __init__(self, name: str, message: str, rule: GenericResolverRule, event: dict): + super().__init__(f"{name}: {message}", rule=rule, event=event) class GenericResolver(FieldManager): @@ -78,6 +81,8 @@ def _apply_rules(self, event, rule): raise GenericResolverError( self.name, "Mapping group is missing in mapping file pattern!", + rule=rule, + event=event, ) dest_val = replacements.get(mapping) if dest_val: @@ -145,9 +150,13 @@ def ensure_rules_from_file(self, rule): f"Additions file " f'\'{rule.resolve_from_file["path"]}\'' f" must be a dictionary with string values!", + rule=rule, + event=None, ) except FileNotFoundError as error: raise GenericResolverError( self.name, f'Additions file \'{rule.resolve_from_file["path"]}' f"' not found!", + rule=rule, + event=None, ) from error diff --git a/logprep/processor/hyperscan_resolver/processor.py b/logprep/processor/hyperscan_resolver/processor.py index 1c7d13039..d24b84321 100644 --- a/logprep/processor/hyperscan_resolver/processor.py +++ b/logprep/processor/hyperscan_resolver/processor.py @@ -39,6 +39,7 @@ from logprep.processor.base.exceptions import FieldExistsWarning, SkipImportError from logprep.processor.field_manager.processor import FieldManager +from logprep.processor.generic_resolver.processor import GenericResolverError from logprep.util.helper import add_field_to, get_dotted_field_value from logprep.util.validators import directory_validator @@ -56,12 +57,9 @@ # pylint: enable=ungrouped-imports -class HyperscanResolverError(BaseException): +class HyperscanResolverError(GenericResolverError): """Base class for HyperscanResolver related exceptions.""" - def __init__(self, name: str, message: str): - super().__init__(f"HyperscanResolver ({name}): {message}") - class HyperscanResolver(FieldManager): """Resolve values in documents by referencing a mapping list.""" diff --git a/tests/unit/processor/base.py b/tests/unit/processor/base.py index b4da29a7d..f4bdb5c73 100644 --- a/tests/unit/processor/base.py +++ b/tests/unit/processor/base.py @@ -17,8 +17,10 @@ from logprep.abc.processor import Processor, ProcessorResult from logprep.factory import Factory +from logprep.filter.lucene_filter import LuceneFilter from logprep.framework.rule_tree.rule_tree import RuleTree from logprep.metrics.metrics import CounterMetric, HistogramMetric +from logprep.processor.base.exceptions import ProcessingCriticalError, ProcessingError from logprep.util.json_handling import list_json_files_in_directory from tests.unit.component.base import BaseComponentTestCase @@ -95,6 +97,28 @@ def setup_method(self) -> None: self.object = Factory.create(configuration=config) self.specific_rules = self.set_rules(self.specific_rules_dirs) self.generic_rules = self.set_rules(self.generic_rules_dirs) + self.match_all_event = { + "message": "event", + "winlog": { + "event_id": 1, + "provider_name": "Microsoft-Windows-Sysmon", + "event_data": {"IpAddress": "127.0.0.54"}, + }, + "field1": "foo", + "field2": "bar", + "another_random_field": "baz", + "@timestamp": "2021-01-01T00:00:00.000Z", + "delete_event": "does not matter", + "irrelevant": "does not matter", + "url": "http://example.local", + "drop_me": "does not matter", + "add_generic_test": "does not matter", + "anything": "does not matter", + "client": {"ip": "127.0.0.54"}, + "ips": ["127.0.0.54", "192.168.4.33"], + "applyrule": "yes", + "A": "foobarfoo", + } # this is an event that can be used in all processor tests, cause it matches everywhere def teardown_method(self) -> None: """teardown for all methods""" @@ -290,6 +314,18 @@ def test_process_return_result_object(self): event = {"some": "event"} result = self.object.process(event) assert isinstance(result, ProcessorResult) - assert result.data == [] - assert result.errors == [] + assert isinstance(result.data, list) + assert isinstance(result.outputs, list) + assert isinstance(result.errors, list) assert result.processor_name == "Test Instance Name" + + def test_process_collects_errors_in_result_object(self): + with mock.patch.object( + self.object, + "_apply_rules", + side_effect=ProcessingCriticalError( + "side effect", rule=self.object.rules[0], event=self.match_all_event + ), + ): + result = self.object.process(self.match_all_event) + assert len(result.errors) > 0, "minimum one error should be in result object" diff --git a/tests/unit/processor/test_process.py b/tests/unit/processor/test_process.py index 9a36054b7..d228d6a8e 100644 --- a/tests/unit/processor/test_process.py +++ b/tests/unit/processor/test_process.py @@ -1,7 +1,6 @@ # pylint: disable=missing-docstring # pylint: disable=protected-access import re -from logging import getLogger from unittest import mock from unittest.mock import call From 8ffde32784e26d273f44604053fce58191b3d28f Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Wed, 10 Jul 2024 15:41:09 +0200 Subject: [PATCH 03/10] add tests for event reference --- logprep/abc/processor.py | 13 +++++++++---- tests/unit/processor/base.py | 4 ++++ 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/logprep/abc/processor.py b/logprep/abc/processor.py index 8a483fd9e..01f89e6a2 100644 --- a/logprep/abc/processor.py +++ b/logprep/abc/processor.py @@ -145,7 +145,7 @@ def __init__(self, name: str, configuration: "Processor.Config"): specific_rules_targets=self._config.specific_rules, ) self.has_custom_tests = False - self.result = ProcessorResult(processor_name=self.name) + self.result = None @property def _specific_rules(self): @@ -187,15 +187,20 @@ def metric_labels(self) -> dict: "name": self.name, } - def process(self, event: dict): - """Process a log event by calling the implemented `process` method of the - strategy object set in `_strategy` attribute. + def process(self, event: dict) -> ProcessorResult: + """Process a log event. Parameters ---------- event : dict A dictionary representing a log event. + Returns + ------- + ProcessorResult + A ProcessorResult object containing the processed event, errors, + extra data and a list of target outputs. + """ self.result = ProcessorResult(processor_name=self.name, event=event) logger.debug(f"{self.describe()} processing event {event}") diff --git a/tests/unit/processor/base.py b/tests/unit/processor/base.py index f4bdb5c73..121430441 100644 --- a/tests/unit/processor/base.py +++ b/tests/unit/processor/base.py @@ -329,3 +329,7 @@ def test_process_collects_errors_in_result_object(self): ): result = self.object.process(self.match_all_event) assert len(result.errors) > 0, "minimum one error should be in result object" + + def test_result_object_has_reference_to_event(self): + result = self.object.process(self.match_all_event) + assert result.event is self.match_all_event From b41a391bd796812bae5ec7ff647f52e399980af5 Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Wed, 10 Jul 2024 16:41:55 +0200 Subject: [PATCH 04/10] fix tests without setting result in init --- tests/unit/processor/base.py | 7 +++++-- tests/unit/processor/geoip_enricher/test_geoip_enricher.py | 3 ++- tests/unit/processor/pseudonymizer/test_pseudonymizer.py | 3 +++ .../selective_extractor/test_selective_extractor.py | 1 - 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/tests/unit/processor/base.py b/tests/unit/processor/base.py index 121430441..9065fa10b 100644 --- a/tests/unit/processor/base.py +++ b/tests/unit/processor/base.py @@ -21,6 +21,7 @@ from logprep.framework.rule_tree.rule_tree import RuleTree from logprep.metrics.metrics import CounterMetric, HistogramMetric from logprep.processor.base.exceptions import ProcessingCriticalError, ProcessingError +from logprep.processor.base.rule import Rule from logprep.util.json_handling import list_json_files_in_directory from tests.unit.component.base import BaseComponentTestCase @@ -77,10 +78,12 @@ def set_rules(rules_dirs): rules.append(rule) return rules - def _load_specific_rule(self, rule): + def _load_specific_rule(self, rule: dict | Rule): self.object._generic_tree = RuleTree() self.object._specific_tree = RuleTree() - specific_rule = self.object.rule_class._create_from_dict(rule) + specific_rule = ( + self.object.rule_class._create_from_dict(rule) if isinstance(rule, dict) else rule + ) self.object._specific_tree.add_rule(specific_rule, self.logger) def setup_method(self) -> None: diff --git a/tests/unit/processor/geoip_enricher/test_geoip_enricher.py b/tests/unit/processor/geoip_enricher/test_geoip_enricher.py index 911cbcff9..2703afa76 100644 --- a/tests/unit/processor/geoip_enricher/test_geoip_enricher.py +++ b/tests/unit/processor/geoip_enricher/test_geoip_enricher.py @@ -122,7 +122,8 @@ def test_no_geoip_data_added_if_source_field_is_none(self): def test_source_field_is_none_emits_missing_fields_warning(self): document = {"client": {"ip": None}} expected = {"client": {"ip": None}, "tags": ["_geoip_enricher_missing_field_warning"]} - self.object._apply_rules(document, self.object.rules[0]) + self._load_specific_rule(self.object.rules[0]) + self.object.process(document) assert len(self.object.result.errors) == 1 assert re.match( r".*missing source_fields: \['client\.ip'].*", str(self.object.result.errors[0]) diff --git a/tests/unit/processor/pseudonymizer/test_pseudonymizer.py b/tests/unit/processor/pseudonymizer/test_pseudonymizer.py index 79a2df622..215ed622c 100644 --- a/tests/unit/processor/pseudonymizer/test_pseudonymizer.py +++ b/tests/unit/processor/pseudonymizer/test_pseudonymizer.py @@ -6,6 +6,7 @@ import re from copy import deepcopy from pathlib import Path +from unittest import mock import pytest @@ -814,6 +815,7 @@ def test_replace_regex_keywords_by_regex_expression_is_idempotent(self): assert self.object._specific_tree.rules[0].pseudonyms == {"something": expected_pattern} def test_pseudonymize_string_adds_pseudonyms(self): + self.object.result = ProcessorResult(processor_name="test") assert self.object._pseudonymize_string("foo").startswith(" Date: Wed, 10 Jul 2024 19:00:36 +0200 Subject: [PATCH 05/10] refactor pipeline_result --- logprep/abc/processor.py | 2 -- logprep/framework/pipeline.py | 24 ++++++++++--- tests/unit/framework/test_pipeline.py | 52 ++++++++++++++++++--------- tests/unit/processor/base.py | 1 - 4 files changed, 55 insertions(+), 24 deletions(-) diff --git a/logprep/abc/processor.py b/logprep/abc/processor.py index 01f89e6a2..ade4f5447 100644 --- a/logprep/abc/processor.py +++ b/logprep/abc/processor.py @@ -48,8 +48,6 @@ class ProcessorResult: factory=list, ) """ The errors and warnings that occurred during processing """ - outputs: list = field(validator=validators.instance_of(list), factory=list) - """ The outputs of the processors extra data """ processor_name: str = field(validator=validators.instance_of(str)) """ The name of the processor """ event: dict = field(validator=validators.optional(validators.instance_of(dict)), default=None) diff --git a/logprep/framework/pipeline.py b/logprep/framework/pipeline.py index 6d79e277a..18dbd30a6 100644 --- a/logprep/framework/pipeline.py +++ b/logprep/framework/pipeline.py @@ -18,7 +18,7 @@ from functools import cached_property, partial from importlib.metadata import version from multiprocessing import Lock, Value, current_process -from typing import Any, List, Tuple, Optional +from typing import Any, List, Optional, Tuple import attrs import msgspec @@ -59,6 +59,15 @@ class PipelineResult: ), ] ) + """List of ProcessorResults""" + event: dict = attrs.field(validator=attrs.validators.instance_of(dict)) + """The event that was processed""" + event_received: dict = attrs.field( + validator=attrs.validators.instance_of(dict), converter=copy.deepcopy + ) + """The event that was received""" + pipeline: "Pipeline" + """The pipeline that processed the event""" def __iter__(self): return iter(self.results) @@ -253,14 +262,14 @@ def process_pipeline(self) -> Tuple[Optional[dict], Optional[PipelineResult]]: if self._output: self._store_failed_event(processor_result.errors, event_received, event) # pipeline is aborted on processing error - return event, result + return result if self._output: result_data = [res.data for res in result if res.data] if result_data: self._store_extra_data(itertools.chain(*result_data)) if event: self._store_event(event) - return event, result + return result def _store_event(self, event: dict) -> None: for output_name, output in self._output.items(): @@ -288,9 +297,14 @@ def _get_event(self) -> dict: @Metric.measure_time() def process_event(self, event: dict): """process all processors for one event""" - return PipelineResult( - results=[processor.process(event) for processor in self._pipeline if event] + result = PipelineResult( + results=[], + event_received=event, + event=event, + pipeline=self, ) + result.results = [processor.process(event) for processor in self._pipeline if event] + return result def _store_extra_data(self, result_data: List | itertools.chain) -> None: self.logger.debug("Storing extra data") diff --git a/tests/unit/framework/test_pipeline.py b/tests/unit/framework/test_pipeline.py index f8c5a8975..213dbe289 100644 --- a/tests/unit/framework/test_pipeline.py +++ b/tests/unit/framework/test_pipeline.py @@ -239,7 +239,8 @@ def test_output_warning_error_is_logged_but_processing_continues(self, mock_warn assert mock_warning.call_count == 1 assert self.pipeline._output["dummy"].store.call_count == 3 - def test_processor_warning_error_is_logged_but_processing_continues(self, _): + @mock.patch("logging.Logger.warning") + def test_processor_warning_error_is_logged_but_processing_continues(self, mock_warning, _): self.pipeline._setup() self.pipeline._input.get_next.return_value = ({"message": "test"}, None) mock_rule = mock.MagicMock() @@ -247,11 +248,11 @@ def test_processor_warning_error_is_logged_but_processing_continues(self, _): self.pipeline._pipeline[1].process.return_value = ProcessorResult( processor_name="mock_processor", errors=[processing_warning] ) - self.pipeline.process_pipeline() self.pipeline._input.get_next.return_value = ({"message": "test"}, None) - _, result = self.pipeline.process_pipeline() + result = self.pipeline.process_pipeline() assert processing_warning in result.results[0].errors + mock_warning.assert_called() assert self.pipeline._output["dummy"].store.call_count == 2, "all events are processed" @mock.patch("logging.Logger.error") @@ -619,6 +620,27 @@ def test_shutdown_logs_fatal_errors(self, mock_error, _): logger_call = f"Couldn't gracefully shut down pipeline due to: {error}" mock_error.assert_called_with(logger_call) + def test_pipeline_result_provides_event_received(self, _): + self.pipeline._setup() + event = {"some": "event"} + self.pipeline._input.get_next.return_value = (event, None) + generic_adder = original_create( + { + "generic_adder": { + "type": "generic_adder", + "specific_rules": [ + {"filter": "some", "generic_adder": {"add": {"field": "foo"}}} + ], + "generic_rules": [], + } + } + ) + self.pipeline._pipeline = [generic_adder] + result = self.pipeline.process_pipeline() + assert result.event_received is not event, "event_received is a copy" + assert result.event_received == {"some": "event"}, "received event is as expected" + assert result.event == {"some": "event", "field": "foo"}, "processed event is as expected" + class TestPipelineWithActualInput: def setup_method(self): @@ -637,10 +659,9 @@ def test_pipeline_without_output_connector_and_one_input_event_and_preprocessors self.config.input["test_input"]["documents"] = [{"applyrule": "yes"}] pipeline = Pipeline(config=self.config) assert pipeline._output is None - event, extra_outputs = pipeline.process_pipeline() - assert event["label"] == {"reporter": ["windows"]} - assert "arrival_time" in event - assert extra_outputs.results[0].data == [] + result = pipeline.process_pipeline() + assert result.event["label"] == {"reporter": ["windows"]} + assert "arrival_time" in result.event def test_process_event_processes_without_input_and_without_output(self): event = {"applyrule": "yes"} @@ -660,13 +681,12 @@ def test_pipeline_without_output_connector_and_two_input_events_and_preprocessor self.config.input["test_input"]["documents"] = input_events pipeline = Pipeline(config=self.config) assert pipeline._output is None - event, extra_outputs = pipeline.process_pipeline() - assert event["label"] == {"reporter": ["windows"]} - assert "arrival_time" in event - event, extra_outputs = pipeline.process_pipeline() - assert "pseudonym" in event.get("winlog", {}).get("event_data", {}).get("IpAddress") - assert "arrival_time" in event - assert len(extra_outputs.results) == len(pipeline._pipeline) + result = pipeline.process_pipeline() + assert result.event["label"] == {"reporter": ["windows"]} + assert "arrival_time" in result.event + result = pipeline.process_pipeline() + assert "pseudonym" in result.event.get("winlog", {}).get("event_data", {}).get("IpAddress") + assert "arrival_time" in result.event def test_pipeline_hmac_error_message_without_output_connector(self): self.config.input["test_input"]["documents"] = [{"applyrule": "yes"}] @@ -675,8 +695,8 @@ def test_pipeline_hmac_error_message_without_output_connector(self): } pipeline = Pipeline(config=self.config) assert pipeline._output is None - event, _ = pipeline.process_pipeline() - assert event["hmac"]["hmac"] == "error" + result = pipeline.process_pipeline() + assert result.event["hmac"]["hmac"] == "error" def test_pipeline_run_raises_assertion_when_run_without_input(self): self.config.input = {} diff --git a/tests/unit/processor/base.py b/tests/unit/processor/base.py index 9065fa10b..7f1d973aa 100644 --- a/tests/unit/processor/base.py +++ b/tests/unit/processor/base.py @@ -318,7 +318,6 @@ def test_process_return_result_object(self): result = self.object.process(event) assert isinstance(result, ProcessorResult) assert isinstance(result.data, list) - assert isinstance(result.outputs, list) assert isinstance(result.errors, list) assert result.processor_name == "Test Instance Name" From 926724262f872aae52a5d354a94f10eea92bf0da Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Wed, 10 Jul 2024 19:24:41 +0200 Subject: [PATCH 06/10] refactor pipeline_result --- logprep/framework/pipeline.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/logprep/framework/pipeline.py b/logprep/framework/pipeline.py index 18dbd30a6..cb235979d 100644 --- a/logprep/framework/pipeline.py +++ b/logprep/framework/pipeline.py @@ -18,7 +18,7 @@ from functools import cached_property, partial from importlib.metadata import version from multiprocessing import Lock, Value, current_process -from typing import Any, List, Optional, Tuple +from typing import Any, Generator, List, Optional, Tuple import attrs import msgspec @@ -53,7 +53,7 @@ class PipelineResult: results: List[ProcessorResult] = attrs.field( validator=[ - attrs.validators.instance_of(list), + attrs.validators.instance_of((list, Generator)), attrs.validators.deep_iterable( member_validator=attrs.validators.instance_of(ProcessorResult) ), @@ -66,9 +66,14 @@ class PipelineResult: validator=attrs.validators.instance_of(dict), converter=copy.deepcopy ) """The event that was received""" - pipeline: "Pipeline" + pipeline: list[Processor] """The pipeline that processed the event""" + def __attrs_post_init__(self): + self.results = list( + (processor.process(self.event) for processor in self.pipeline if self.event) + ) + def __iter__(self): return iter(self.results) @@ -243,14 +248,13 @@ def run(self) -> None: # pylint: disable=method-hidden self._shut_down() @_handle_pipeline_error - def process_pipeline(self) -> Tuple[Optional[dict], Optional[PipelineResult]]: + def process_pipeline(self) -> PipelineResult: """Retrieve next event, process event with full pipeline and store or return results""" Component.run_pending_tasks() event = self._get_event() if not event: return None, None - event_received = copy.deepcopy(event) result: PipelineResult = self.process_event(event) for processor_result in result: if not processor_result.errors: @@ -260,9 +264,9 @@ def process_pipeline(self) -> Tuple[Optional[dict], Optional[PipelineResult]]: if ProcessingError in processor_result: self.logger.error(processor_result.get_error_string()) if self._output: - self._store_failed_event(processor_result.errors, event_received, event) + self._store_failed_event(processor_result.errors, result.event_received, event) # pipeline is aborted on processing error - return result + return if self._output: result_data = [res.data for res in result if res.data] if result_data: @@ -301,9 +305,8 @@ def process_event(self, event: dict): results=[], event_received=event, event=event, - pipeline=self, + pipeline=self._pipeline, ) - result.results = [processor.process(event) for processor in self._pipeline if event] return result def _store_extra_data(self, result_data: List | itertools.chain) -> None: From 5cfb1566e7c3f6844a0f7601ae3ef6bb8a97538c Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Wed, 10 Jul 2024 20:15:40 +0200 Subject: [PATCH 07/10] refactor pipeline_result --- logprep/abc/processor.py | 31 +++++++++------------- logprep/framework/pipeline.py | 37 +++++++++++++++++++-------- tests/unit/framework/test_pipeline.py | 32 +++++++---------------- 3 files changed, 47 insertions(+), 53 deletions(-) diff --git a/logprep/abc/processor.py b/logprep/abc/processor.py index ade4f5447..fde55fe6e 100644 --- a/logprep/abc/processor.py +++ b/logprep/abc/processor.py @@ -42,32 +42,25 @@ class ProcessorResult: """ The generated extra data """ errors: list = field( validator=validators.deep_iterable( - member_validator=validators.instance_of((ProcessingError, ProcessingWarning)), + member_validator=validators.instance_of(ProcessingError), iterable_validator=validators.instance_of(list), ), factory=list, ) - """ The errors and warnings that occurred during processing """ + """ The errors that occurred during processing """ + warnings: list = field( + validator=validators.deep_iterable( + member_validator=validators.instance_of(ProcessingWarning), + iterable_validator=validators.instance_of(list), + ), + factory=list, + ) + """ The warnings that occurred during processing """ processor_name: str = field(validator=validators.instance_of(str)) """ The name of the processor """ event: dict = field(validator=validators.optional(validators.instance_of(dict)), default=None) """ A reference to the event that was processed """ - def __contains__(self, error_class): - return any(isinstance(item, error_class) for item in self.errors) - - def get_warning_string(self): - """creates a string containing the warnings""" - return ", ".join( - [error.args[0] for error in self.errors if isinstance(error, ProcessingWarning)] - ) - - def get_error_string(self): - """creates a string containing the errors""" - return ", ".join( - [error.args[0] for error in self.errors if isinstance(error, ProcessingError)] - ) - class Processor(Component): """Abstract Processor Class to define the Interface""" @@ -331,9 +324,9 @@ def _handle_warning_error(self, event, rule, error, failure_tags=None): else: add_and_overwrite(event, "tags", sorted(list({*tags, *failure_tags}))) if isinstance(error, ProcessingWarning): - self.result.errors.append(error) + self.result.warnings.append(error) else: - self.result.errors.append(ProcessingWarning(str(error), rule, event)) + self.result.warnings.append(ProcessingWarning(str(error), rule, event)) def _has_missing_values(self, event, rule, source_field_dict): missing_fields = list( diff --git a/logprep/framework/pipeline.py b/logprep/framework/pipeline.py index cb235979d..e911f450e 100644 --- a/logprep/framework/pipeline.py +++ b/logprep/framework/pipeline.py @@ -69,6 +69,26 @@ class PipelineResult: pipeline: list[Processor] """The pipeline that processed the event""" + @property + def has_processing_errors(self) -> bool: + """Check if any of the results has processing errors.""" + return any(result.errors for result in self) + + @property + def has_processing_warnings(self) -> bool: + """Check if any of the results has processing errors.""" + return any(result.warnings for result in self) + + @property + def errors(self) -> List[ProcessingError]: + """Return all processing errors.""" + return itertools.chain(*[result.errors for result in self]) + + @property + def warnings(self) -> List[ProcessingWarning]: + """Return all processing warnings.""" + return itertools.chain(*[result.warnings for result in self]) + def __attrs_post_init__(self): self.results = list( (processor.process(self.event) for processor in self.pipeline if self.event) @@ -256,17 +276,12 @@ def process_pipeline(self) -> PipelineResult: if not event: return None, None result: PipelineResult = self.process_event(event) - for processor_result in result: - if not processor_result.errors: - continue - if ProcessingWarning in processor_result: - self.logger.warning(processor_result.get_warning_string()) - if ProcessingError in processor_result: - self.logger.error(processor_result.get_error_string()) - if self._output: - self._store_failed_event(processor_result.errors, result.event_received, event) - # pipeline is aborted on processing error - return + if result.has_processing_warnings: + self.logger.warning(",".join((str(warning) for warning in result.warnings))) + if result.has_processing_errors: + self.logger.error(",".join((str(error) for error in result.errors))) + self._store_failed_event(result.errors, result.event_received, event) + return if self._output: result_data = [res.data for res in result if res.data] if result_data: diff --git a/tests/unit/framework/test_pipeline.py b/tests/unit/framework/test_pipeline.py index 213dbe289..10a836872 100644 --- a/tests/unit/framework/test_pipeline.py +++ b/tests/unit/framework/test_pipeline.py @@ -26,7 +26,7 @@ ) from logprep.abc.processor import ProcessorResult from logprep.factory import Factory -from logprep.framework.pipeline import Pipeline +from logprep.framework.pipeline import Pipeline, PipelineResult from logprep.processor.base.exceptions import ( FieldExistsWarning, ProcessingCriticalError, @@ -160,7 +160,9 @@ def test_not_empty_documents_are_stored_in_the_output(self, _): def test_empty_documents_are_not_stored_in_the_output(self, _): def mock_process_event(event): event.clear() - return [ProcessorResult(processor_name="")] + return PipelineResult( + event=event, event_received=event, results=[], pipeline=self.pipeline._pipeline + ) self.pipeline.process_event = mock_process_event self.pipeline._setup() @@ -246,13 +248,14 @@ def test_processor_warning_error_is_logged_but_processing_continues(self, mock_w mock_rule = mock.MagicMock() processing_warning = ProcessingWarning("not so bad", mock_rule, {"message": "test"}) self.pipeline._pipeline[1].process.return_value = ProcessorResult( - processor_name="mock_processor", errors=[processing_warning] + processor_name="mock_processor", warnings=[processing_warning] ) self.pipeline.process_pipeline() self.pipeline._input.get_next.return_value = ({"message": "test"}, None) result = self.pipeline.process_pipeline() - assert processing_warning in result.results[0].errors + assert processing_warning in result.results[0].warnings mock_warning.assert_called() + assert "ProcessingWarning: not so bad" in mock_warning.call_args[0][0] assert self.pipeline._output["dummy"].store.call_count == 2, "all events are processed" @mock.patch("logging.Logger.error") @@ -278,24 +281,7 @@ def test_processor_critical_error_is_logged_event_is_stored_in_error_output( self.pipeline.process_pipeline() assert self.pipeline._input.get_next.call_count == 2, "2 events gone into processing" assert mock_error.call_count == 2, f"two errors occurred: {mock_error.mock_calls}" - - logger_calls = ( - mock.call( - str( - ProcessingCriticalError( - "really bad things happened", mock_rule, {"message": "first event"} - ) - ) - ), - mock.call( - str( - ProcessingCriticalError( - "really bad things happened", mock_rule, {"message": "second event"} - ) - ) - ), - ) - mock_error.assert_has_calls(logger_calls) + assert "ProcessingCriticalError: really bad things happened" in mock_error.call_args[0][0] assert self.pipeline._output["dummy"].store.call_count == 0, "no event in output" assert ( self.pipeline._output["dummy"].store_failed.call_count == 2 @@ -313,7 +299,7 @@ def test_processor_logs_processing_error_and_warnings_separately( self.pipeline._pipeline[1] = deepcopy(self.pipeline._pipeline[0]) warning = FieldExistsWarning(mock_rule, input_event1, ["foo"]) self.pipeline._pipeline[0].process.return_value = ProcessorResult( - processor_name="", errors=[warning] + processor_name="", warnings=[warning] ) error = ProcessingCriticalError("really bad things happened", mock_rule, input_event1) self.pipeline._pipeline[1].process.return_value = ProcessorResult( From 201d76900b80fdd463c5c55f59db90bbf4bf31eb Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Thu, 11 Jul 2024 09:49:51 +0200 Subject: [PATCH 08/10] fix tests --- logprep/framework/pipeline.py | 5 +++++ .../auto_rule_corpus_tester.py | 21 +++++++------------ logprep/util/rule_dry_runner.py | 10 ++++----- tests/unit/processor/amides/test_amides.py | 8 ++++--- tests/unit/processor/base.py | 3 +-- .../processor/calculator/test_calculator.py | 4 ++-- .../concatenator/test_concatenator.py | 6 +++--- .../test_datetime_extractor.py | 4 ++-- .../processor/dissector/test_dissector.py | 4 ++-- .../test_domain_label_extractor.py | 10 ++++----- .../domain_resolver/test_domain_resolver.py | 6 +++--- .../field_manager/test_field_manager.py | 16 +++++++------- .../generic_adder/test_generic_adder.py | 8 +++---- .../generic_resolver/test_generic_resolver.py | 4 ++-- .../geoip_enricher/test_geoip_enricher.py | 8 +++---- tests/unit/processor/grokker/test_grokker.py | 4 ++-- .../test_hyperscan_resolver.py | 4 ++-- .../processor/ip_informer/test_ip_informer.py | 4 ++-- .../processor/key_checker/test_key_checker.py | 6 +++--- .../list_comparison/test_list_comparison.py | 12 +++++------ .../processor/requester/test_requester.py | 4 ++-- .../string_splitter/test_string_splitter.py | 4 ++-- .../test_template_replacer.py | 4 ++-- tests/unit/processor/test_process.py | 6 +++--- .../timestamp_differ/test_timestamp_differ.py | 4 ++-- .../processor/timestamper/test_timestamper.py | 4 ++-- .../unit/util/test_auto_rule_corpus_tester.py | 14 +++++++------ 27 files changed, 93 insertions(+), 94 deletions(-) diff --git a/logprep/framework/pipeline.py b/logprep/framework/pipeline.py index e911f450e..258706baf 100644 --- a/logprep/framework/pipeline.py +++ b/logprep/framework/pipeline.py @@ -89,6 +89,11 @@ def warnings(self) -> List[ProcessingWarning]: """Return all processing warnings.""" return itertools.chain(*[result.warnings for result in self]) + @property + def data(self) -> List[Tuple[dict, dict]]: + """Return all extra data.""" + return itertools.chain(*[result.data for result in self]) + def __attrs_post_init__(self): self.results = list( (processor.process(self.event) for processor in self.pipeline if self.event) diff --git a/logprep/util/auto_rule_tester/auto_rule_corpus_tester.py b/logprep/util/auto_rule_tester/auto_rule_corpus_tester.py index c9f922de7..6e0692025 100644 --- a/logprep/util/auto_rule_tester/auto_rule_corpus_tester.py +++ b/logprep/util/auto_rule_tester/auto_rule_corpus_tester.py @@ -99,7 +99,7 @@ from colorama import Fore, Style from deepdiff import DeepDiff, grep -from logprep.framework.pipeline import Pipeline +from logprep.framework.pipeline import Pipeline, PipelineResult from logprep.util.configuration import Configuration from logprep.util.helper import get_dotted_field_value from logprep.util.json_handling import parse_json @@ -113,9 +113,8 @@ def convert_extra_data_format(extra_outputs) -> List[Dict]: output target is the key and the values are the actual outputs. """ reformatted_extra_outputs = [] - for extra_output in extra_outputs: - for output in extra_output: - reformatted_extra_outputs.append({str(output[1]): output[0]}) + for value, key in extra_outputs: + reformatted_extra_outputs.append({str(key): value}) return reformatted_extra_outputs @@ -211,18 +210,12 @@ def _run_pipeline_per_test_case(self): print(Style.BRIGHT + "# Test Cases Summary:" + Style.RESET_ALL) for test_case_id, test_case in self._test_cases.items(): _ = [processor.setup() for processor in self._pipeline._pipeline] - parsed_event, result = self._pipeline.process_pipeline() - extra_outputs = convert_extra_data_format( - result.results[processor_result].data - for processor_result in range(len(result.results)) - ) + result: PipelineResult = self._pipeline.process_pipeline() + parsed_event = result.event + extra_outputs = convert_extra_data_format(result.data) test_case.generated_output = parsed_event test_case.generated_extra_output = extra_outputs - test_case.warnings = [ - result.results[processor_result].errors - for processor_result in range(len(result.results)) - ] - test_case.warnings = list(itertools.chain(*test_case.warnings)) + test_case.warnings = result.warnings self._compare_logprep_outputs(test_case_id, parsed_event) self._compare_extra_data_output(test_case_id, extra_outputs) self._print_pass_fail_statements(test_case_id) diff --git a/logprep/util/rule_dry_runner.py b/logprep/util/rule_dry_runner.py index 66684a7e2..0c6782fd6 100644 --- a/logprep/util/rule_dry_runner.py +++ b/logprep/util/rule_dry_runner.py @@ -45,7 +45,7 @@ from colorama import Back, Fore from ruamel.yaml import YAML -from logprep.framework.pipeline import Pipeline +from logprep.framework.pipeline import Pipeline, PipelineResult from logprep.util.auto_rule_tester.auto_rule_corpus_tester import ( convert_extra_data_format, ) @@ -103,11 +103,9 @@ def run(self): transformed_cnt = 0 output_count = 0 for input_document in self._input_documents: - test_output, result = self._pipeline.process_pipeline() - test_output_custom = convert_extra_data_format( - result.results[processor_result].data - for processor_result in range(len(result.results)) - ) + result: PipelineResult = self._pipeline.process_pipeline() + test_output = result.event + test_output_custom = convert_extra_data_format(result.data) if test_output: output_count += 1 diff = self._print_output_results(input_document, test_output, test_output_custom) diff --git a/tests/unit/processor/amides/test_amides.py b/tests/unit/processor/amides/test_amides.py index 7ffc25cfe..4254cf71c 100644 --- a/tests/unit/processor/amides/test_amides.py +++ b/tests/unit/processor/amides/test_amides.py @@ -168,9 +168,11 @@ def test_process_event_raise_duplication_error(self): self.object.process(document) assert document.get("amides") result = self.object.process(document) - assert len(result.errors) > 0 - assert re.match(r".*missing source_fields: \['process.command_line'].*", str(result.errors)) - assert re.match(".*FieldExistsWarning.*", str(result.errors)) + assert len(result.warnings) > 0 + assert re.match( + r".*missing source_fields: \['process.command_line'].*", str(result.warnings) + ) + assert re.match(".*FieldExistsWarning.*", str(result.warnings)) def test_setup_get_model_via_file_getter(self, tmp_path, monkeypatch): model_uri = "file://tests/testdata/unit/amides/model.zip" diff --git a/tests/unit/processor/base.py b/tests/unit/processor/base.py index 7f1d973aa..d0adc908b 100644 --- a/tests/unit/processor/base.py +++ b/tests/unit/processor/base.py @@ -17,10 +17,9 @@ from logprep.abc.processor import Processor, ProcessorResult from logprep.factory import Factory -from logprep.filter.lucene_filter import LuceneFilter from logprep.framework.rule_tree.rule_tree import RuleTree from logprep.metrics.metrics import CounterMetric, HistogramMetric -from logprep.processor.base.exceptions import ProcessingCriticalError, ProcessingError +from logprep.processor.base.exceptions import ProcessingCriticalError from logprep.processor.base.rule import Rule from logprep.util.json_handling import list_json_files_in_directory from tests.unit.component.base import BaseComponentTestCase diff --git a/tests/unit/processor/calculator/test_calculator.py b/tests/unit/processor/calculator/test_calculator.py index bb9dc07e4..be49fac5e 100644 --- a/tests/unit/processor/calculator/test_calculator.py +++ b/tests/unit/processor/calculator/test_calculator.py @@ -355,8 +355,8 @@ def test_testcases_failure_handling(self, testcase, rule, event, expected, error self._load_specific_rule(rule) result = self.object.process(event) - assert len(result.errors) == 1 - assert re.match(rf".*{error_message}", str(result.errors[0])) + assert len(result.warnings) == 1 + assert re.match(rf".*{error_message}", str(result.warnings[0])) assert event == expected, testcase @pytest.mark.parametrize( diff --git a/tests/unit/processor/concatenator/test_concatenator.py b/tests/unit/processor/concatenator/test_concatenator.py index c6c3df208..e29f6a8e5 100644 --- a/tests/unit/processor/concatenator/test_concatenator.py +++ b/tests/unit/processor/concatenator/test_concatenator.py @@ -170,7 +170,7 @@ def test_for_expected_output(self, test_case, rule, document, expected_output): self.object.process(document) assert document == expected_output, test_case - def test_process_raises_duplication_error_if_target_field_exists_and_should_not_be_overwritten( + def test_process_raises_field_exists_warning_if_target_field_exists_and_should_not_be_overwritten( self, ): rule = { @@ -186,8 +186,8 @@ def test_process_raises_duplication_error_if_target_field_exists_and_should_not_ self._load_specific_rule(rule) document = {"field": {"a": "first", "b": "second"}, "target_field": "has already content"} result = self.object.process(document) - assert len(result.errors) == 1 - assert isinstance(result.errors[0], FieldExistsWarning) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) assert "target_field" in document assert document.get("target_field") == "has already content" assert document.get("tags") == ["_concatenator_failure"] diff --git a/tests/unit/processor/datetime_extractor/test_datetime_extractor.py b/tests/unit/processor/datetime_extractor/test_datetime_extractor.py index 39aa9e344..2521194eb 100644 --- a/tests/unit/processor/datetime_extractor/test_datetime_extractor.py +++ b/tests/unit/processor/datetime_extractor/test_datetime_extractor.py @@ -181,8 +181,8 @@ def test_existing_target_raises_if_not_overwrite_target(self): } self._load_specific_rule(rule) result = self.object.process(document) - assert len(result.errors) == 1 - assert isinstance(result.errors[0], FieldExistsWarning) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) @staticmethod def _parse_local_tz(tz_local_name): diff --git a/tests/unit/processor/dissector/test_dissector.py b/tests/unit/processor/dissector/test_dissector.py index f0c833e4e..92c41fd96 100644 --- a/tests/unit/processor/dissector/test_dissector.py +++ b/tests/unit/processor/dissector/test_dissector.py @@ -731,6 +731,6 @@ def test_testcases(self, testcase, rule, event, expected): # pylint: disable=un def test_testcases_failure_handling(self, testcase, rule, event, expected): self._load_specific_rule(rule) result = self.object.process(event) - assert len(result.errors) == 1 - assert isinstance(result.errors[0], ProcessingWarning) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], ProcessingWarning) assert event == expected, testcase diff --git a/tests/unit/processor/domain_label_extractor/test_domain_label_extractor.py b/tests/unit/processor/domain_label_extractor/test_domain_label_extractor.py index 3a0a6b311..fc0dcc308 100644 --- a/tests/unit/processor/domain_label_extractor/test_domain_label_extractor.py +++ b/tests/unit/processor/domain_label_extractor/test_domain_label_extractor.py @@ -245,8 +245,8 @@ def test_domain_extraction_with_ipv6_target(self): def test_domain_extraction_with_existing_output_field(self): document = {"url": {"domain": "test.domain.de", "subdomain": "exists already"}} result = self.object.process(document) - assert len(result.errors) == 1 - assert isinstance(result.errors[0], FieldExistsWarning) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) def test_domain_extraction_overwrites_target_field(self): document = {"url": {"domain": "test.domain.de", "subdomain": "exists already"}} @@ -314,7 +314,7 @@ def test_does_nothing_if_source_field_not_exits(self): self.object.process(document) assert document == expected - def test_raises_duplication_error_if_target_field_exits(self): + def test_raises_field_exists_warning_if_target_field_exits(self): document = {"url": {"domain": "test.domain.de", "subdomain": "exists already"}} expected = { "tags": ["_domain_label_extractor_failure"], @@ -336,8 +336,8 @@ def test_raises_duplication_error_if_target_field_exits(self): } self._load_specific_rule(rule_dict) result = self.object.process(document) - assert len(result.errors) == 1 - assert isinstance(result.errors[0], FieldExistsWarning) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) assert document == expected @responses.activate diff --git a/tests/unit/processor/domain_resolver/test_domain_resolver.py b/tests/unit/processor/domain_resolver/test_domain_resolver.py index ea49466a8..07e208d6d 100644 --- a/tests/unit/processor/domain_resolver/test_domain_resolver.py +++ b/tests/unit/processor/domain_resolver/test_domain_resolver.py @@ -228,12 +228,12 @@ def test_configured_dotted_subfield(self, _): assert document == expected @mock.patch("socket.gethostbyname", return_value="1.2.3.4") - def test_duplication_error(self, _): + def test_field_exits_warning(self, _): document = {"client": "google.de"} result = self.object.process(document) - assert len(result.errors) == 1 - assert isinstance(result.errors[0], FieldExistsWarning) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) @mock.patch("socket.gethostbyname", return_value="1.2.3.4") def test_no_duplication_error(self, _): diff --git a/tests/unit/processor/field_manager/test_field_manager.py b/tests/unit/processor/field_manager/test_field_manager.py index 0a12d0ffc..5a71fb849 100644 --- a/tests/unit/processor/field_manager/test_field_manager.py +++ b/tests/unit/processor/field_manager/test_field_manager.py @@ -591,11 +591,11 @@ def test_testcases(self, testcase, rule, event, expected): # pylint: disable=un def test_testcases_failure_handling(self, testcase, rule, event, expected, error): self._load_specific_rule(rule) result = self.object.process(event) - assert len(result.errors) == 1 - assert re.match(error, str(result.errors[0])) + assert len(result.warnings) == 1 + assert re.match(error, str(result.warnings[0])) assert event == expected, testcase - def test_process_raises_duplication_error_if_target_field_exists_and_should_not_be_overwritten( + def test_process_raises_field_exists_warning_if_target_field_exists_and_should_not_be_overwritten( self, ): rule = { @@ -610,7 +610,7 @@ def test_process_raises_duplication_error_if_target_field_exists_and_should_not_ self._load_specific_rule(rule) document = {"field": {"a": "first", "b": "second"}, "target_field": "has already content"} result = self.object.process(document) - assert isinstance(result.errors[0], FieldExistsWarning) + assert isinstance(result.warnings[0], FieldExistsWarning) assert "target_field" in document assert document.get("target_field") == "has already content" assert document.get("tags") == ["_field_manager_failure"] @@ -626,10 +626,10 @@ def test_process_raises_processing_warning_with_missing_fields(self): self._load_specific_rule(rule) document = {"field": {"a": "first", "b": "second"}} result = self.object.process(document) - assert len(result.errors) == 1 + assert len(result.warnings) == 1 assert re.match( r".*ProcessingWarning.*missing source_fields: \['does.not.exists'\]", - str(result.errors[0]), + str(result.warnings[0]), ) def test_process_raises_processing_warning_with_missing_fields_but_event_is_processed(self): @@ -650,10 +650,10 @@ def test_process_raises_processing_warning_with_missing_fields_but_event_is_proc "tags": ["_field_manager_missing_field_warning"], } result = self.object.process(document) - assert len(result.errors) == 1 + assert len(result.warnings) == 1 assert re.match( r".*ProcessingWarning.*missing source_fields: \['does.not.exists'\]", - str(result.errors[0]), + str(result.warnings[0]), ) assert document == expected diff --git a/tests/unit/processor/generic_adder/test_generic_adder.py b/tests/unit/processor/generic_adder/test_generic_adder.py index c657d4ea4..5fe29930b 100644 --- a/tests/unit/processor/generic_adder/test_generic_adder.py +++ b/tests/unit/processor/generic_adder/test_generic_adder.py @@ -409,8 +409,8 @@ def test_generic_adder_testcases_failure_handling( ): self._load_specific_rule(rule) result = self.object.process(event) - assert len(result.errors) == 1 - assert re.match(rf".*FieldExistsWarning.*{error_message}", str(result.errors[0])) + assert len(result.warnings) == 1 + assert re.match(rf".*FieldExistsWarning.*{error_message}", str(result.warnings[0])) assert event == expected, testcase def test_add_generic_fields_from_file_missing_and_existing_with_all_required(self): @@ -611,8 +611,8 @@ def test_sql_database_raises_exception_on_duplicate(self, caplog): self.object.process(document) result = self.object.process(document) - assert len(result.errors) == 1 - assert isinstance(result.errors[0], FieldExistsWarning) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) assert document == expected diff --git a/tests/unit/processor/generic_resolver/test_generic_resolver.py b/tests/unit/processor/generic_resolver/test_generic_resolver.py index 1e95ea3c2..de9237741 100644 --- a/tests/unit/processor/generic_resolver/test_generic_resolver.py +++ b/tests/unit/processor/generic_resolver/test_generic_resolver.py @@ -420,8 +420,8 @@ def test_resolve_dotted_src_and_dest_field_and_conflict_match(self, caplog): "re": {"solved": "I already exist!"}, } result = self.object.process(document) - assert len(result.errors) == 1 - assert isinstance(result.errors[0], FieldExistsWarning) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) assert document == expected def test_resolve_generic_and_multiple_match_first_only(self): diff --git a/tests/unit/processor/geoip_enricher/test_geoip_enricher.py b/tests/unit/processor/geoip_enricher/test_geoip_enricher.py index 2703afa76..5d95772db 100644 --- a/tests/unit/processor/geoip_enricher/test_geoip_enricher.py +++ b/tests/unit/processor/geoip_enricher/test_geoip_enricher.py @@ -124,9 +124,9 @@ def test_source_field_is_none_emits_missing_fields_warning(self): expected = {"client": {"ip": None}, "tags": ["_geoip_enricher_missing_field_warning"]} self._load_specific_rule(self.object.rules[0]) self.object.process(document) - assert len(self.object.result.errors) == 1 + assert len(self.object.result.warnings) == 1 assert re.match( - r".*missing source_fields: \['client\.ip'].*", str(self.object.result.errors[0]) + r".*missing source_fields: \['client\.ip'].*", str(self.object.result.warnings[0]) ) assert document == expected @@ -164,8 +164,8 @@ def test_enrich_an_event_geoip(self): def test_enrich_an_event_geoip_with_existing_differing_geoip(self): document = {"client": {"ip": "8.8.8.8"}, "geoip": {"type": "Feature"}} result = self.object.process(document) - assert len(result.errors) == 1 - assert re.match(".*FieldExistsWarning.*geoip.type", str(result.errors[0])) + assert len(result.warnings) == 1 + assert re.match(".*FieldExistsWarning.*geoip.type", str(result.warnings[0])) def test_configured_dotted_output_field(self): document = {"source": {"ip": "8.8.8.8"}} diff --git a/tests/unit/processor/grokker/test_grokker.py b/tests/unit/processor/grokker/test_grokker.py index 3b0665cdc..dc092a27b 100644 --- a/tests/unit/processor/grokker/test_grokker.py +++ b/tests/unit/processor/grokker/test_grokker.py @@ -433,8 +433,8 @@ def test_testcases_failure_handling(self, testcase, rule, event, expected, error self.object.setup() if isinstance(error, str): result = self.object.process(event) - assert len(result.errors) == 1 - assert re.match(rf".*{error}", str(result.errors[0])) + assert len(result.warnings) == 1 + assert re.match(rf".*{error}", str(result.warnings[0])) assert event == expected, testcase else: result = self.object.process(event) diff --git a/tests/unit/processor/hyperscan_resolver/test_hyperscan_resolver.py b/tests/unit/processor/hyperscan_resolver/test_hyperscan_resolver.py index 217be9214..f644c7cf5 100644 --- a/tests/unit/processor/hyperscan_resolver/test_hyperscan_resolver.py +++ b/tests/unit/processor/hyperscan_resolver/test_hyperscan_resolver.py @@ -378,8 +378,8 @@ def test_resolve_dotted_and_dest_field_with_conflict_match(self): "tags": ["_hyperscan_resolver_failure"], } result = self.object.process(document) - assert len(result.errors) == 1 - assert isinstance(result.errors[0], FieldExistsWarning) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) assert document == expected def test_resolve_with_multiple_match_first_only(self): diff --git a/tests/unit/processor/ip_informer/test_ip_informer.py b/tests/unit/processor/ip_informer/test_ip_informer.py index 815830c75..0ef5e25b1 100644 --- a/tests/unit/processor/ip_informer/test_ip_informer.py +++ b/tests/unit/processor/ip_informer/test_ip_informer.py @@ -425,6 +425,6 @@ def test_testcases(self, testcase, rule, event, expected): def test_testcases_failure_handling(self, testcase, rule, event, expected): self._load_specific_rule(rule) result = self.object.process(event) - assert len(result.errors) == 1 - assert isinstance(result.errors[0], ProcessingWarning) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], ProcessingWarning) assert event == expected, testcase diff --git a/tests/unit/processor/key_checker/test_key_checker.py b/tests/unit/processor/key_checker/test_key_checker.py index 1a73e1c32..0391c3e82 100644 --- a/tests/unit/processor/key_checker/test_key_checker.py +++ b/tests/unit/processor/key_checker/test_key_checker.py @@ -255,7 +255,7 @@ def test_testcases_positiv( self.object.process(event) assert event == expected - def test_raises_duplication_error(self): + def test_field_exists_warning(self): rule_dict = { "filter": "*", "key_checker": { @@ -273,5 +273,5 @@ def test_raises_duplication_error(self): "missing_fields": ["i.exists.already"], } result = self.object.process(document) - assert len(result.errors) == 1 - assert isinstance(result.errors[0], FieldExistsWarning) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) diff --git a/tests/unit/processor/list_comparison/test_list_comparison.py b/tests/unit/processor/list_comparison/test_list_comparison.py index 6fb49c5c1..385227ea7 100644 --- a/tests/unit/processor/list_comparison/test_list_comparison.py +++ b/tests/unit/processor/list_comparison/test_list_comparison.py @@ -159,8 +159,8 @@ def test_target_field_exists_and_cant_be_extended(self): self._load_specific_rule(rule_dict) self.object.setup() result = self.object.process(document) - assert len(result.errors) == 1 - assert isinstance(result.errors[0], FieldExistsWarning) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) assert document == expected def test_intermediate_output_field_is_wrong_type(self): @@ -188,8 +188,8 @@ def test_intermediate_output_field_is_wrong_type(self): self._load_specific_rule(rule_dict) self.object.setup() result = self.object.process(document) - assert len(result.errors) == 1 - assert isinstance(result.errors[0], FieldExistsWarning) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) assert document == expected def test_check_in_dotted_subfield(self): @@ -244,8 +244,8 @@ def test_overwrite_target_field(self): self._load_specific_rule(rule_dict) self.object.setup() result = self.object.process(document) - assert len(result.errors) == 1 - assert isinstance(result.errors[0], FieldExistsWarning) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) assert document == expected @responses.activate diff --git a/tests/unit/processor/requester/test_requester.py b/tests/unit/processor/requester/test_requester.py index 66e7df211..f9ba5d1f9 100644 --- a/tests/unit/processor/requester/test_requester.py +++ b/tests/unit/processor/requester/test_requester.py @@ -351,6 +351,6 @@ def test_requester_testcases_failure_handling( responses.add(responses.Response(**response_kwargs)) self._load_specific_rule(rule) result = self.object.process(event) - assert len(result.errors) == 1 - assert re.match(error_message, str(result.errors[0])) + assert len(result.warnings) == 1 + assert re.match(error_message, str(result.warnings[0])) assert event == expected, testcase diff --git a/tests/unit/processor/string_splitter/test_string_splitter.py b/tests/unit/processor/string_splitter/test_string_splitter.py index 937d7d0bb..3db351526 100644 --- a/tests/unit/processor/string_splitter/test_string_splitter.py +++ b/tests/unit/processor/string_splitter/test_string_splitter.py @@ -71,6 +71,6 @@ def test_testcases(self, testcase, rule, event, expected): # pylint: disable=un def test_testcases_failure_handling(self, testcase, rule, event, expected, error_message): self._load_specific_rule(rule) result = self.object.process(event) - assert len(result.errors) == 1 - assert re.match(error_message, str(result.errors[0])) + assert len(result.warnings) == 1 + assert re.match(error_message, str(result.warnings[0])) assert event == expected, testcase diff --git a/tests/unit/processor/template_replacer/test_template_replacer.py b/tests/unit/processor/template_replacer/test_template_replacer.py index 887b1c08d..2eca9f0fd 100644 --- a/tests/unit/processor/template_replacer/test_template_replacer.py +++ b/tests/unit/processor/template_replacer/test_template_replacer.py @@ -148,8 +148,8 @@ def test_replace_incompatible_existing_dotted_message_parent_via_template(self): "dotted": "foo", } result = template_replacer.process(document) - assert len(result.errors) == 1 - assert isinstance(result.errors[0], FieldExistsWarning) + assert len(result.warnings) == 1 + assert isinstance(result.warnings[0], FieldExistsWarning) def test_replace_fails_with_invalid_template(self): config = deepcopy(self.CONFIG) diff --git a/tests/unit/processor/test_process.py b/tests/unit/processor/test_process.py index d228d6a8e..5d985c99b 100644 --- a/tests/unit/processor/test_process.py +++ b/tests/unit/processor/test_process.py @@ -158,12 +158,12 @@ def test_processes_generic_rules_after_processor_error_in_specific_rules(self): pipeline._pipeline[0]._specific_tree.add_rule(specific_rule_two) pipeline._pipeline[0]._specific_tree.add_rule(specific_rule_one) res = pipeline.process_event(event) - assert len(res.results[0].errors) == 1 - assert isinstance(res.results[0].errors[0], FieldExistsWarning) + assert len(res.results[0].warnings) == 1 + assert isinstance(res.results[0].warnings[0], FieldExistsWarning) re.match( "The following fields could not be written, " "because one or more subfields existed and could not be extended: first", - str(res.results[0].errors[0]), + str(res.results[0].warnings[0]), ) assert event == expected_event diff --git a/tests/unit/processor/timestamp_differ/test_timestamp_differ.py b/tests/unit/processor/timestamp_differ/test_timestamp_differ.py index 4a3eda680..dd082562a 100644 --- a/tests/unit/processor/timestamp_differ/test_timestamp_differ.py +++ b/tests/unit/processor/timestamp_differ/test_timestamp_differ.py @@ -449,6 +449,6 @@ def test_testcases(self, testcase, rule, event, expected): def test_testcases_failure_handling(self, testcase, rule, event, expected, error_message): self._load_specific_rule(rule) result = self.object.process(event) - assert len(result.errors) == 1 - assert re.match(error_message, str(result.errors[0])) + assert len(result.warnings) == 1 + assert re.match(error_message, str(result.warnings[0])) assert event == expected, testcase diff --git a/tests/unit/processor/timestamper/test_timestamper.py b/tests/unit/processor/timestamper/test_timestamper.py index 59566d326..228d5ffc9 100644 --- a/tests/unit/processor/timestamper/test_timestamper.py +++ b/tests/unit/processor/timestamper/test_timestamper.py @@ -320,6 +320,6 @@ def test_testcases(self, testcase, rule, event, expected): def test_testcases_failure_handling(self, testcase, rule, event, expected, error_message): self._load_specific_rule(rule) result = self.object.process(event) - assert len(result.errors) == 1 - assert re.match(rf".*{error_message}", str(result.errors[0])) + assert len(result.warnings) == 1 + assert re.match(rf".*{error_message}", str(result.warnings[0])) assert event == expected, testcase diff --git a/tests/unit/util/test_auto_rule_corpus_tester.py b/tests/unit/util/test_auto_rule_corpus_tester.py index 4e827c459..b0576678d 100644 --- a/tests/unit/util/test_auto_rule_corpus_tester.py +++ b/tests/unit/util/test_auto_rule_corpus_tester.py @@ -320,13 +320,15 @@ def test_run_prints_expected_outputs_to_console( with mock.patch( "logprep.util.auto_rule_tester.auto_rule_corpus_tester.Pipeline.process_pipeline" ) as mock_process_pipeline: - mock_process_pipeline.return_value = mock_output[0], PipelineResult( - results=[ - ProcessorResult( - processor_name="test", data=test_data["expected_extra_output"] - ) - ] + mock_process_pipeline.return_value = PipelineResult( + results=[], + event=mock_output[0], + event_received=mock_output[0], + pipeline=[], ) + mock_process_pipeline.return_value.results = [ + ProcessorResult(processor_name="test", data=test_data["expected_extra_output"]) + ] corpus_tester.run() else: corpus_tester.run() From 12d644360520b52bad7fabb7e908610b1e1b8d59 Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Thu, 11 Jul 2024 10:13:53 +0200 Subject: [PATCH 09/10] refactor result and remove lock from pipeline --- logprep/framework/pipeline.py | 49 ++++++++------------------- logprep/framework/pipeline_manager.py | 3 +- 2 files changed, 15 insertions(+), 37 deletions(-) diff --git a/logprep/framework/pipeline.py b/logprep/framework/pipeline.py index 258706baf..5e5ffd751 100644 --- a/logprep/framework/pipeline.py +++ b/logprep/framework/pipeline.py @@ -17,11 +17,10 @@ from ctypes import c_bool from functools import cached_property, partial from importlib.metadata import version -from multiprocessing import Lock, Value, current_process -from typing import Any, Generator, List, Optional, Tuple +from multiprocessing import Value, current_process +from typing import Any, Generator, List, Tuple import attrs -import msgspec from logprep.abc.component import Component from logprep.abc.input import ( @@ -69,30 +68,20 @@ class PipelineResult: pipeline: list[Processor] """The pipeline that processed the event""" - @property - def has_processing_errors(self) -> bool: - """Check if any of the results has processing errors.""" - return any(result.errors for result in self) - - @property - def has_processing_warnings(self) -> bool: - """Check if any of the results has processing errors.""" - return any(result.warnings for result in self) - - @property + @cached_property def errors(self) -> List[ProcessingError]: """Return all processing errors.""" - return itertools.chain(*[result.errors for result in self]) + return list(itertools.chain(*[result.errors for result in self])) - @property + @cached_property def warnings(self) -> List[ProcessingWarning]: """Return all processing warnings.""" - return itertools.chain(*[result.warnings for result in self]) + return list(itertools.chain(*[result.warnings for result in self])) - @property + @cached_property def data(self) -> List[Tuple[dict, dict]]: """Return all extra data.""" - return itertools.chain(*[result.data for result in self]) + return list(itertools.chain(*[result.data for result in self])) def __attrs_post_init__(self): self.results = list( @@ -152,9 +141,6 @@ class Metrics(Component.Metrics): _continue_iterating: Value """ a flag to signal if iterating continues """ - _lock: Lock - """ the lock for the pipeline process """ - pipeline_index: int """ the index of this pipeline """ @@ -214,19 +200,13 @@ def _input(self) -> Input: ) return Factory.create(input_connector_config) - def __init__( - self, config: Configuration, pipeline_index: int = None, lock: Lock = None - ) -> None: + def __init__(self, config: Configuration, pipeline_index: int = None) -> None: self.logger = logging.getLogger("Pipeline") self.logger.name = f"Pipeline{pipeline_index}" self._logprep_config = config self._timeout = config.timeout self._continue_iterating = Value(c_bool) - - self._lock = lock self.pipeline_index = pipeline_index - self._encoder = msgspec.msgpack.Encoder() - self._decoder = msgspec.msgpack.Decoder() if self._logprep_config.profile_pipelines: self.run = partial(PipelineProfiler.profile_function, self.run) @@ -263,10 +243,9 @@ def run(self) -> None: # pylint: disable=method-hidden self._continue_iterating.value = True assert self._input, "Pipeline should not be run without input connector" assert self._output, "Pipeline should not be run without output connector" - with self._lock: - with warnings.catch_warnings(): - warnings.simplefilter("default") - self._setup() + with warnings.catch_warnings(): + warnings.simplefilter("default") + self._setup() self.logger.debug("Start iterating") while self._continue_iterating.value: self.process_pipeline() @@ -281,9 +260,9 @@ def process_pipeline(self) -> PipelineResult: if not event: return None, None result: PipelineResult = self.process_event(event) - if result.has_processing_warnings: + if result.warnings: self.logger.warning(",".join((str(warning) for warning in result.warnings))) - if result.has_processing_errors: + if result.errors: self.logger.error(",".join((str(error) for error in result.errors))) self._store_failed_event(result.errors, result.event_received, event) return diff --git a/logprep/framework/pipeline_manager.py b/logprep/framework/pipeline_manager.py index a0cf6181f..bba5ac540 100644 --- a/logprep/framework/pipeline_manager.py +++ b/logprep/framework/pipeline_manager.py @@ -64,7 +64,6 @@ def __init__(self, configuration: Configuration): self._pipelines: list[multiprocessing.Process] = [] self._configuration = configuration - self._lock = multiprocessing.Lock() prometheus_config = self._configuration.metrics if prometheus_config.enabled: self.prometheus_exporter = PrometheusExporter(prometheus_config) @@ -164,7 +163,7 @@ def restart(self): self.prometheus_exporter.run() def _create_pipeline(self, index) -> multiprocessing.Process: - pipeline = Pipeline(pipeline_index=index, config=self._configuration, lock=self._lock) + pipeline = Pipeline(pipeline_index=index, config=self._configuration) logger.info("Created new pipeline") process = multiprocessing.Process(target=pipeline.run, daemon=True) process.stop = pipeline.stop From df5fb0812313643d94617d372c04a85076202fcd Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Thu, 11 Jul 2024 10:51:41 +0200 Subject: [PATCH 10/10] fix pipeline tests --- tests/unit/framework/test_pipeline.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/unit/framework/test_pipeline.py b/tests/unit/framework/test_pipeline.py index 10a836872..2f30825a6 100644 --- a/tests/unit/framework/test_pipeline.py +++ b/tests/unit/framework/test_pipeline.py @@ -52,7 +52,6 @@ class ConfigurationForTests: "metrics": {"enabled": False}, } ) - lock = Lock() def get_mock_create(): @@ -76,7 +75,6 @@ def setup_method(self): self.pipeline = Pipeline( pipeline_index=1, config=self.logprep_config, - lock=self.lock, ) def test_pipeline_property_returns_pipeline(self, mock_create):