diff --git a/logprep/abc/processor.py b/logprep/abc/processor.py index 34243a479..b1980baed 100644 --- a/logprep/abc/processor.py +++ b/logprep/abc/processor.py @@ -37,6 +37,21 @@ class ProcessorResult: """ Result object to be returned by every processor. It contains the processor name, created data and errors (incl. warnings). + + Parameters + ---------- + + + processor_name : str + The name of the processor + event: Optional[dict] + A reference to the event that was processed + data : Optional[list] + The generated extra data + errors : Optional[list] + The errors that occurred during processing + warnings : Optional[list] + The warnings that occurred during processing """ data: list = field(validator=validators.instance_of(list), factory=list) diff --git a/logprep/framework/pipeline.py b/logprep/framework/pipeline.py index 360b9cf11..11ffbfa94 100644 --- a/logprep/framework/pipeline.py +++ b/logprep/framework/pipeline.py @@ -46,7 +46,23 @@ @attrs.define(kw_only=True) class PipelineResult: """Result object to be returned after processing the event. - It contains all results of each processor of the pipeline.""" + It contains all results of each processor of the pipeline. + + Parameters + ---------- + + results : List[ProcessorResult] + List of ProcessorResults + event : dict + The event that was processed + pipeline : List[Processor] + The pipeline that processed the event + + Returns + ------- + PipelineResult + The result object + """ results: List[ProcessorResult] = attrs.field( validator=[ diff --git a/tests/unit/framework/test_pipeline_manager.py b/tests/unit/framework/test_pipeline_manager.py index 477e910a4..9b0200e52 100644 --- a/tests/unit/framework/test_pipeline_manager.py +++ b/tests/unit/framework/test_pipeline_manager.py @@ -10,14 +10,19 @@ import pytest +from logprep.abc.processor import ProcessorResult from logprep.connector.http.input import HttpInput from logprep.factory import Factory +from logprep.framework.pipeline import PipelineResult from logprep.framework.pipeline_manager import ( ComponentQueueListener, PipelineManager, ThrottlingQueue, ) from logprep.metrics.exporter import PrometheusExporter +from logprep.processor.base.exceptions import ProcessingError +from logprep.processor.base.rule import Rule +from logprep.processor.dropper.rule import DropperRule from logprep.util.configuration import Configuration, MetricsConfig from logprep.util.defaults import DEFAULT_LOG_CONFIG from logprep.util.logging import logqueue @@ -551,10 +556,34 @@ def test_listen_calls_target(self): listener._queue.put("test") listener._queue.put(listener._sentinel) listener._listen() - target.assert_called_with("test") + target.assert_called_with({"event": "test", "errors": "An unknown error occurred"}) def test_listen_handles_pipeline_result(self): - assert False + target = mock.MagicMock() + queue = ThrottlingQueue(multiprocessing.get_context(), 100) + listener = ComponentQueueListener(queue, target) + test_event = {"message": "test"} + rule = DropperRule._create_from_dict({"filter": "message", "dropper": {"drop": ["test"]}}) + results = [ + ProcessorResult( + processor_name="test", + event=test_event, + errors=[ + ProcessingError("test value error", rule=rule), + ProcessingError("test type error", rule=rule), + ], + ) + ] + pipeline = [ + Factory.create( + {"dummy": {"type": "dropper", "generic_rules": [], "specific_rules": []}} + ) + ] + pipeline_result = PipelineResult(results=results, event=test_event, pipeline=pipeline) + listener._queue.put(pipeline_result) + listener._queue.put(listener._sentinel) + listener._listen() + target.assert_called_with({"event": "test", "errors": "An unknown error occurred"}) def test_listen_handles_critical_input_output_exception(self): assert False