Skip to content

Commit

Permalink
refactor and add more exception tests for counting metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Oct 23, 2023
1 parent dc9aa1f commit c14a609
Show file tree
Hide file tree
Showing 13 changed files with 227 additions and 68 deletions.
2 changes: 1 addition & 1 deletion doc/source/development/connector_how_to.rst
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ An exception should be thrown if an error occurs on calling this function.
These exceptions must inherit from the exception classes in :py:class:`~logprep.output.output.Output`.
They should return a helpful message when calling `str(exception)`.
Analogous to the input, exceptions that require a restart of Logprep should inherit from `FatalOutputError`.
Exceptions that inherit from `WarningOutputError` will be logged, but they do not require any error handling.
Exceptions that inherit from `OutputWarning` will be logged, but they do not require any error handling.

:py:meth:`~logprep.output.output.Output.store_failed`
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Expand Down
20 changes: 6 additions & 14 deletions logprep/abc/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@
from logprep.util.validators import dict_structure_validator


class InputError(BaseException):
class InputError(Exception):
"""Base class for Input related exceptions."""

def __init__(self, input_connector: "Input", message: str) -> None:
input_connector.metrics.number_of_errors += 1
super().__init__(f"{self.__class__.__name__} in {input_connector.describe()}: {message}")


Expand All @@ -44,27 +45,18 @@ class CriticalInputParsingError(CriticalInputError):
class FatalInputError(InputError):
"""Must not be catched."""

def __init__(self, input_connector: "Input", message: str) -> None:
super().__init__(input_connector, message)


class WarningInputError(InputError):
class InputWarning(Exception):
"""May be catched but must be displayed to the user/logged."""

def __init__(self, input_connector: "Input", message: str) -> None:
super().__init__(input_connector, message)
input_connector.metrics.number_of_warnings += 1
super().__init__(f"{self.__class__.__name__} in {input_connector.describe()}: {message}")


class SourceDisconnectedError(WarningInputError):
class SourceDisconnectedWarning(InputWarning):
"""Lost (or failed to establish) contact with the source."""

def __init__(self, input_connector: "Input", message: str) -> None:
super().__init__(input_connector, message)


class InfoInputError(InputError):
"""Informational exceptions, e.g. to inform that a timeout occurred"""


@define(kw_only=True)
class HmacConfig:
Expand Down
24 changes: 10 additions & 14 deletions logprep/abc/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,19 @@
from logprep.abc.input import Input


class OutputError(BaseException):
class OutputError(Exception):
"""Base class for Output related exceptions."""

def __init__(self, output: "Output", message: str) -> None:
output.metrics.number_of_errors += 1
super().__init__(f"{self.__class__.__name__} in {output.describe()}: {message}")


class OutputWarning(Exception):
"""Base class for Output related warnings."""

def __init__(self, output: "Output", message: str) -> None:
output.metrics.number_of_warnings += 1
super().__init__(f"{self.__class__.__name__} in {output.describe()}: {message}")


Expand All @@ -25,25 +34,12 @@ class CriticalOutputError(OutputError):
def __init__(self, output, message, raw_input):
if raw_input:
output.store_failed(str(self), raw_input, {})
output.metrics.number_of_errors += 1
super().__init__(output, f"{message} for event: {raw_input}")


class FatalOutputError(OutputError):
"""Must not be catched."""

def __init__(self, output, message) -> None:
output.metrics.number_of_errors += 1
super().__init__(output, message)


class WarningOutputError(OutputError):
"""May be catched but must be displayed to the user/logged."""

def __init__(self, output, message) -> None:
output.metrics.number_of_warnings += 1
super().__init__(output, message)


class Output(Connector):
"""Connect to a output destination."""
Expand Down
6 changes: 2 additions & 4 deletions logprep/connector/confluent_kafka/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
CriticalInputParsingError,
FatalInputError,
Input,
WarningInputError,
InputWarning,
)
from logprep.metrics.metrics import CounterMetric, GaugeMetric
from logprep.util.validators import keys_in_validator
Expand Down Expand Up @@ -336,9 +336,7 @@ def _commit_callback(
"""
if error is not None:
self.metrics.commit_failures += 1
raise WarningInputError(
self, f"Could not commit offsets for {topic_partitions}: {error}"
)
raise InputWarning(self, f"Could not commit offsets for {topic_partitions}: {error}")
self.metrics.commit_success += 1
for topic_partition in topic_partitions:
offset = topic_partition.offset
Expand Down
6 changes: 3 additions & 3 deletions logprep/connector/dummy/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
"""
import copy
from functools import cached_property
from typing import List, Union, Optional
from typing import List, Optional, Union

from attr import field, validators
from attrs import define

from logprep.abc.input import Input, SourceDisconnectedError
from logprep.abc.input import Input, SourceDisconnectedWarning


class DummyInput(Input):
Expand All @@ -51,7 +51,7 @@ def _get_event(self, timeout: float) -> tuple:
"""Retrieve next document from configuration and raise error if found"""
if not self._documents:
if not self._config.repeat_documents:
raise SourceDisconnectedError(self, "no documents left")
raise SourceDisconnectedWarning(self, "no documents left")
del self.__dict__["_documents"]

document = self._documents.pop(0)
Expand Down
10 changes: 5 additions & 5 deletions logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@
CriticalInputParsingError,
FatalInputError,
Input,
SourceDisconnectedError,
WarningInputError,
InputWarning,
SourceDisconnectedWarning,
)
from logprep.abc.output import (
CriticalOutputError,
FatalOutputError,
Output,
WarningOutputError,
OutputWarning,
)
from logprep.abc.processor import Processor
from logprep.factory import Factory
Expand Down Expand Up @@ -97,10 +97,10 @@ def _handle_pipeline_error(func):
def _inner(self: "Pipeline") -> Any:
try:
return func(self)
except SourceDisconnectedError as error:
except SourceDisconnectedWarning as error:
self.logger.warning(str(error))
self.stop()
except (WarningOutputError, WarningInputError) as error:
except (OutputWarning, InputWarning) as error:
self.logger.warning(str(error))
except CriticalOutputError as error:
self.logger.error(str(error))
Expand Down
2 changes: 2 additions & 0 deletions tests/unit/connector/test_confluent_kafka_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ def test_create_fails_for_unknown_option(self):
_ = Factory.create({"test connector": kafka_config}, logger=self.logger)

def test_error_callback_logs_warnings(self):
self.object.metrics.number_of_warnings = 0
with mock.patch("logging.Logger.warning") as mock_warning:
test_error = BaseException("test error")
self.object._error_callback(test_error)
mock_warning.assert_called()
mock_warning.assert_called_with(f"{self.object.describe()}: {test_error}")
assert self.object.metrics.number_of_warnings == 1

def test_stats_callback_sets_metric_objetc_attributes(self):
librdkafka_metrics = tuple(
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/connector/test_confluent_kafka_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
CriticalInputError,
CriticalInputParsingError,
FatalInputError,
WarningInputError,
InputWarning,
)
from logprep.factory import Factory
from logprep.factory_error import InvalidConfigurationError
Expand Down Expand Up @@ -213,7 +213,7 @@ def test_get_next_raises_critical_input_parsing_error(self):
self.object.get_next(0.01)

def test_commit_callback_raises_warning_error_and_counts_failures(self):
with pytest.raises(WarningInputError, match="Could not commit offsets"):
with pytest.raises(InputWarning, match="Could not commit offsets"):
self.object._commit_callback(BaseException, ["topic_partition"])
assert self.object._commit_failures == 1

Expand Down
4 changes: 2 additions & 2 deletions tests/unit/connector/test_dummy_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from pytest import raises

from logprep.abc.input import SourceDisconnectedError
from logprep.abc.input import SourceDisconnectedWarning
from logprep.factory import Factory
from tests.unit.connector.base import BaseInputTestCase

Expand All @@ -20,7 +20,7 @@ class TestDummyInput(BaseInputTestCase):
CONFIG = {"type": "dummy_input", "documents": []}

def test_fails_with_disconnected_error_if_input_was_empty(self):
with raises(SourceDisconnectedError):
with raises(SourceDisconnectedWarning):
self.object.get_next(self.timeout)

def test_returns_documents_in_order_provided(self):
Expand Down
19 changes: 9 additions & 10 deletions tests/unit/exceptions/base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# pylint: disable=missing-docstring
# pylint: disable=attribute-defined-outside-init
# pylint: disable=protected-access
# pylint: disable=line-too-long
from logging import getLogger
from typing import Callable

Expand All @@ -15,24 +19,19 @@ class ExceptionBaseTest:
"""regex string to match the error message"""

counted_metric_name: str
"""name of the metric that should be counted"""
"""name of the metric attribute as defined in instance class"""

def setup_method(self):
self.processor = Factory.create(
{"my_dissector": {"type": "dissector", "specific_rules": [], "generic_rules": []}},
getLogger(),
)
self.rule = Rule._create_from_dict({"filter": "message", "rule": {}})
self.object = Rule._create_from_dict({"filter": "message", "rule": {}})
self.event = {"message": "test_event"}
self.exception_args = ("the error message", self.rule, self.event)
self.exception_args = ("the error message", self.object, self.event)

def test_error_message(self):
with pytest.raises(self.exception, match=self.error_message):
raise self.exception(*self.exception_args)

def test_metrics_counts(self):
setattr(self.rule.metrics, self.counted_metric_name, 0)
self.rule.metrics.number_of_warnings = 0
setattr(self.object.metrics, self.counted_metric_name, 0)
with pytest.raises(self.exception):
raise self.exception(*self.exception_args)
assert getattr(self.rule.metrics, self.counted_metric_name) == 1
assert getattr(self.object.metrics, self.counted_metric_name) == 1
Loading

0 comments on commit c14a609

Please sign in to comment.