Skip to content

Commit

Permalink
add more result documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Sep 23, 2024
1 parent 8e72611 commit 6d69754
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 3 deletions.
15 changes: 15 additions & 0 deletions logprep/abc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,21 @@ class ProcessorResult:
"""
Result object to be returned by every processor. It contains the processor name, created data
and errors (incl. warnings).
Parameters
----------
processor_name : str
The name of the processor
event: Optional[dict]
A reference to the event that was processed
data : Optional[list]
The generated extra data
errors : Optional[list]
The errors that occurred during processing
warnings : Optional[list]
The warnings that occurred during processing
"""

data: list = field(validator=validators.instance_of(list), factory=list)
Expand Down
18 changes: 17 additions & 1 deletion logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,23 @@
@attrs.define(kw_only=True)
class PipelineResult:
"""Result object to be returned after processing the event.
It contains all results of each processor of the pipeline."""
It contains all results of each processor of the pipeline.
Parameters
----------
results : List[ProcessorResult]
List of ProcessorResults
event : dict
The event that was processed
pipeline : List[Processor]
The pipeline that processed the event
Returns
-------
PipelineResult
The result object
"""

results: List[ProcessorResult] = attrs.field(
validator=[
Expand Down
33 changes: 31 additions & 2 deletions tests/unit/framework/test_pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,19 @@

import pytest

from logprep.abc.processor import ProcessorResult
from logprep.connector.http.input import HttpInput
from logprep.factory import Factory
from logprep.framework.pipeline import PipelineResult
from logprep.framework.pipeline_manager import (
ComponentQueueListener,
PipelineManager,
ThrottlingQueue,
)
from logprep.metrics.exporter import PrometheusExporter
from logprep.processor.base.exceptions import ProcessingError
from logprep.processor.base.rule import Rule
from logprep.processor.dropper.rule import DropperRule
from logprep.util.configuration import Configuration, MetricsConfig
from logprep.util.defaults import DEFAULT_LOG_CONFIG
from logprep.util.logging import logqueue
Expand Down Expand Up @@ -551,10 +556,34 @@ def test_listen_calls_target(self):
listener._queue.put("test")
listener._queue.put(listener._sentinel)
listener._listen()
target.assert_called_with("test")
target.assert_called_with({"event": "test", "errors": "An unknown error occurred"})

def test_listen_handles_pipeline_result(self):
assert False
target = mock.MagicMock()
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
listener = ComponentQueueListener(queue, target)
test_event = {"message": "test"}
rule = DropperRule._create_from_dict({"filter": "message", "dropper": {"drop": ["test"]}})
results = [
ProcessorResult(
processor_name="test",
event=test_event,
errors=[
ProcessingError("test value error", rule=rule),
ProcessingError("test type error", rule=rule),
],
)
]
pipeline = [
Factory.create(
{"dummy": {"type": "dropper", "generic_rules": [], "specific_rules": []}}
)
]
pipeline_result = PipelineResult(results=results, event=test_event, pipeline=pipeline)
listener._queue.put(pipeline_result)
listener._queue.put(listener._sentinel)
listener._listen()
target.assert_called_with({"event": "test", "errors": "An unknown error occurred"})

def test_listen_handles_critical_input_output_exception(self):
assert False

0 comments on commit 6d69754

Please sign in to comment.