Skip to content

Commit

Permalink
add matching errors in enqueue_error
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Sep 30, 2024
1 parent dfa477e commit 2822d9b
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 36 deletions.
71 changes: 40 additions & 31 deletions logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ class PipelineResult:
The result object
"""

__match_args__ = ("event", "errors")

results: List[ProcessorResult] = attrs.field(
validator=[
attrs.validators.instance_of(list),
Expand Down Expand Up @@ -356,37 +358,44 @@ def enqueue_error(
) -> None:
"""Enqueues an error to the error queue or logs a warning if
no error queue is defined."""
if self.error_queue:
event: dict | list = None
match item:
case CriticalOutputError(
[
{
"index": {"data": data, "error": error},
},
*values,
]
):
event = [{"event": data, "errors": error}]
if values:
event += [
{"event": i["index"]["data"], "errors": i["index"]["error"]}
for i in values
]
case CriticalOutputError(raw_input) if isinstance(raw_input, dict):
event = {"event": str(raw_input), "errors": str(item)}
case CriticalOutputError(raw_input) if isinstance(raw_input, (list, tuple)):
event = [{"event": str(i), "errors": str(item)} for i in raw_input]
case CriticalOutputError(raw_input) if isinstance(raw_input, (str, bytes)):
event = {"event": str(raw_input), "errors": str(item)}
case _:
event = {"event": str(item), "errors": "Unknown error"}
if isinstance(event, list):
for i in event:
self.error_queue.put(i)
else:
self.error_queue.put(event)
else:
if not self.error_queue:
self.logger.warning("No error queue defined, event was dropped")
if self._input:
self._input.batch_finished_callback()
return
event: dict | list = None
match item:
case CriticalOutputError():
event = self._get_output_error_event(item)
case PipelineResult(event, errors):
event = {
"event": str(event),
"errors": str(errors),
}
case CriticalInputError():
event = {"event": str(item.raw_input), "errors": str(item)}
case _:
event = {"event": str(item), "errors": "Unknown error"}
if isinstance(event, list):
for i in event:
self.error_queue.put(i)
else:
self.error_queue.put(event)
if self._input:
self._input.batch_finished_callback()

def _get_output_error_event(self, item: CriticalOutputError) -> dict:
match item:
case CriticalOutputError([{"errors": _, "event": _}, *_]):
event = [
{"event": str(i["event"]), "errors": str(i["errors"])} for i in item.raw_input
]
return event
case CriticalOutputError({"errors": error, "event": event}):
return {"event": str(event), "errors": str(error)}
case CriticalOutputError(raw_input) if isinstance(raw_input, dict):
return {"event": str(raw_input), "errors": str(item)}
case CriticalOutputError(raw_input) if isinstance(raw_input, (list, tuple)):
return [{"event": str(i), "errors": str(item)} for i in raw_input]
case CriticalOutputError(raw_input) if isinstance(raw_input, (str, bytes)):
return {"event": str(raw_input), "errors": str(item)}
17 changes: 12 additions & 5 deletions tests/unit/framework/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,15 +674,15 @@ def test_enqueue_error_logs_warning_if_no_error_queue(self, _):
"{'foo': 'bar'}",
),
(
[{"index": {"error": "error_msg", "data": "raw_input"}}],
"raw_input",
[{"event": {"test": "message"}, "errors": {"error": "error_msg"}}],
"{'test': 'message'}",
),
(
[
{"index": {"error": "error_msg", "data": "raw_input1"}},
{"index": {"error": "error_msg", "data": "raw_input2"}},
{"event": {"test": f"message{i}"}, "errors": {"error": "error_msg"}}
for i in range(2)
],
"raw_input1",
"{'test': 'message0'}",
),
],
)
Expand All @@ -696,6 +696,13 @@ def test_enqueue_error_handles_critical_output_errors_with_different_event_data(
assert "error_msg" in enqueued_item["errors"]

def test_enqueue_error_calls_batch_finished_callback(self, _):
error = CriticalInputError(self.pipeline._input, "test-error", "raw_input")
self.pipeline._input.batch_finished_callback = mock.MagicMock()
self.pipeline.error_queue = queue.Queue()
self.pipeline.enqueue_error(error)
self.pipeline._input.batch_finished_callback.assert_called()

def test_enqueue_error_calls_batch_finished_callback_without_error_queue(self, _):
error = CriticalInputError(self.pipeline._input, "test-error", "raw_input")
self.pipeline._input.batch_finished_callback = mock.MagicMock()
self.pipeline.error_queue = None
Expand Down

0 comments on commit 2822d9b

Please sign in to comment.