diff --git a/doc/source/development/connector_how_to.rst b/doc/source/development/connector_how_to.rst index e60ddc675..2ac5eb97c 100644 --- a/doc/source/development/connector_how_to.rst +++ b/doc/source/development/connector_how_to.rst @@ -60,7 +60,7 @@ An exception should be thrown if an error occurs on calling this function. These exceptions must inherit from the exception classes in :py:class:`~logprep.output.output.Output`. They should return a helpful message when calling `str(exception)`. Analogous to the input, exceptions that require a restart of Logprep should inherit from `FatalOutputError`. -Exceptions that inherit from `WarningOutputError` will be logged, but they do not require any error handling. +Exceptions that inherit from `OutputWarning` will be logged, but they do not require any error handling. :py:meth:`~logprep.output.output.Output.store_failed` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/logprep/abc/input.py b/logprep/abc/input.py index f76b7b211..433a13d6a 100644 --- a/logprep/abc/input.py +++ b/logprep/abc/input.py @@ -22,10 +22,11 @@ from logprep.util.validators import dict_structure_validator -class InputError(BaseException): +class InputError(Exception): """Base class for Input related exceptions.""" def __init__(self, input_connector: "Input", message: str) -> None: + input_connector.metrics.number_of_errors += 1 super().__init__(f"{self.__class__.__name__} in {input_connector.describe()}: {message}") @@ -44,27 +45,18 @@ class CriticalInputParsingError(CriticalInputError): class FatalInputError(InputError): """Must not be catched.""" - def __init__(self, input_connector: "Input", message: str) -> None: - super().__init__(input_connector, message) - -class WarningInputError(InputError): +class InputWarning(Exception): """May be catched but must be displayed to the user/logged.""" def __init__(self, input_connector: "Input", message: str) -> None: - super().__init__(input_connector, message) + input_connector.metrics.number_of_warnings += 1 + super().__init__(f"{self.__class__.__name__} in {input_connector.describe()}: {message}") -class SourceDisconnectedError(WarningInputError): +class SourceDisconnectedWarning(InputWarning): """Lost (or failed to establish) contact with the source.""" - def __init__(self, input_connector: "Input", message: str) -> None: - super().__init__(input_connector, message) - - -class InfoInputError(InputError): - """Informational exceptions, e.g. to inform that a timeout occurred""" - @define(kw_only=True) class HmacConfig: diff --git a/logprep/abc/output.py b/logprep/abc/output.py index 4cfa98bea..fc09e5a0b 100644 --- a/logprep/abc/output.py +++ b/logprep/abc/output.py @@ -12,10 +12,19 @@ from logprep.abc.input import Input -class OutputError(BaseException): +class OutputError(Exception): """Base class for Output related exceptions.""" def __init__(self, output: "Output", message: str) -> None: + output.metrics.number_of_errors += 1 + super().__init__(f"{self.__class__.__name__} in {output.describe()}: {message}") + + +class OutputWarning(Exception): + """Base class for Output related warnings.""" + + def __init__(self, output: "Output", message: str) -> None: + output.metrics.number_of_warnings += 1 super().__init__(f"{self.__class__.__name__} in {output.describe()}: {message}") @@ -25,25 +34,12 @@ class CriticalOutputError(OutputError): def __init__(self, output, message, raw_input): if raw_input: output.store_failed(str(self), raw_input, {}) - output.metrics.number_of_errors += 1 super().__init__(output, f"{message} for event: {raw_input}") class FatalOutputError(OutputError): """Must not be catched.""" - def __init__(self, output, message) -> None: - output.metrics.number_of_errors += 1 - super().__init__(output, message) - - -class WarningOutputError(OutputError): - """May be catched but must be displayed to the user/logged.""" - - def __init__(self, output, message) -> None: - output.metrics.number_of_warnings += 1 - super().__init__(output, message) - class Output(Connector): """Connect to a output destination.""" diff --git a/logprep/connector/confluent_kafka/input.py b/logprep/connector/confluent_kafka/input.py index 58a366a37..1eefe0791 100644 --- a/logprep/connector/confluent_kafka/input.py +++ b/logprep/connector/confluent_kafka/input.py @@ -51,7 +51,7 @@ CriticalInputParsingError, FatalInputError, Input, - WarningInputError, + InputWarning, ) from logprep.metrics.metrics import CounterMetric, GaugeMetric from logprep.util.validators import keys_in_validator @@ -336,9 +336,7 @@ def _commit_callback( """ if error is not None: self.metrics.commit_failures += 1 - raise WarningInputError( - self, f"Could not commit offsets for {topic_partitions}: {error}" - ) + raise InputWarning(self, f"Could not commit offsets for {topic_partitions}: {error}") self.metrics.commit_success += 1 for topic_partition in topic_partitions: offset = topic_partition.offset diff --git a/logprep/connector/dummy/input.py b/logprep/connector/dummy/input.py index c364fe982..0fab18f26 100644 --- a/logprep/connector/dummy/input.py +++ b/logprep/connector/dummy/input.py @@ -20,12 +20,12 @@ """ import copy from functools import cached_property -from typing import List, Union, Optional +from typing import List, Optional, Union from attr import field, validators from attrs import define -from logprep.abc.input import Input, SourceDisconnectedError +from logprep.abc.input import Input, SourceDisconnectedWarning class DummyInput(Input): @@ -51,7 +51,7 @@ def _get_event(self, timeout: float) -> tuple: """Retrieve next document from configuration and raise error if found""" if not self._documents: if not self._config.repeat_documents: - raise SourceDisconnectedError(self, "no documents left") + raise SourceDisconnectedWarning(self, "no documents left") del self.__dict__["_documents"] document = self._documents.pop(0) diff --git a/logprep/framework/pipeline.py b/logprep/framework/pipeline.py index 36a51246c..1952b72a3 100644 --- a/logprep/framework/pipeline.py +++ b/logprep/framework/pipeline.py @@ -28,14 +28,14 @@ CriticalInputParsingError, FatalInputError, Input, - SourceDisconnectedError, - WarningInputError, + InputWarning, + SourceDisconnectedWarning, ) from logprep.abc.output import ( CriticalOutputError, FatalOutputError, Output, - WarningOutputError, + OutputWarning, ) from logprep.abc.processor import Processor from logprep.factory import Factory @@ -97,10 +97,10 @@ def _handle_pipeline_error(func): def _inner(self: "Pipeline") -> Any: try: return func(self) - except SourceDisconnectedError as error: + except SourceDisconnectedWarning as error: self.logger.warning(str(error)) self.stop() - except (WarningOutputError, WarningInputError) as error: + except (OutputWarning, InputWarning) as error: self.logger.warning(str(error)) except CriticalOutputError as error: self.logger.error(str(error)) diff --git a/tests/unit/connector/test_confluent_kafka_common.py b/tests/unit/connector/test_confluent_kafka_common.py index b77d31ad5..b1af90d37 100644 --- a/tests/unit/connector/test_confluent_kafka_common.py +++ b/tests/unit/connector/test_confluent_kafka_common.py @@ -29,11 +29,13 @@ def test_create_fails_for_unknown_option(self): _ = Factory.create({"test connector": kafka_config}, logger=self.logger) def test_error_callback_logs_warnings(self): + self.object.metrics.number_of_warnings = 0 with mock.patch("logging.Logger.warning") as mock_warning: test_error = BaseException("test error") self.object._error_callback(test_error) mock_warning.assert_called() mock_warning.assert_called_with(f"{self.object.describe()}: {test_error}") + assert self.object.metrics.number_of_warnings == 1 def test_stats_callback_sets_metric_objetc_attributes(self): librdkafka_metrics = tuple( diff --git a/tests/unit/connector/test_confluent_kafka_input.py b/tests/unit/connector/test_confluent_kafka_input.py index 6e17f72e4..e7a9389c7 100644 --- a/tests/unit/connector/test_confluent_kafka_input.py +++ b/tests/unit/connector/test_confluent_kafka_input.py @@ -14,7 +14,7 @@ CriticalInputError, CriticalInputParsingError, FatalInputError, - WarningInputError, + InputWarning, ) from logprep.factory import Factory from logprep.factory_error import InvalidConfigurationError @@ -213,7 +213,7 @@ def test_get_next_raises_critical_input_parsing_error(self): self.object.get_next(0.01) def test_commit_callback_raises_warning_error_and_counts_failures(self): - with pytest.raises(WarningInputError, match="Could not commit offsets"): + with pytest.raises(InputWarning, match="Could not commit offsets"): self.object._commit_callback(BaseException, ["topic_partition"]) assert self.object._commit_failures == 1 diff --git a/tests/unit/connector/test_dummy_input.py b/tests/unit/connector/test_dummy_input.py index e25b85bdf..ce6dd8460 100644 --- a/tests/unit/connector/test_dummy_input.py +++ b/tests/unit/connector/test_dummy_input.py @@ -5,7 +5,7 @@ from pytest import raises -from logprep.abc.input import SourceDisconnectedError +from logprep.abc.input import SourceDisconnectedWarning from logprep.factory import Factory from tests.unit.connector.base import BaseInputTestCase @@ -20,7 +20,7 @@ class TestDummyInput(BaseInputTestCase): CONFIG = {"type": "dummy_input", "documents": []} def test_fails_with_disconnected_error_if_input_was_empty(self): - with raises(SourceDisconnectedError): + with raises(SourceDisconnectedWarning): self.object.get_next(self.timeout) def test_returns_documents_in_order_provided(self): diff --git a/tests/unit/exceptions/base.py b/tests/unit/exceptions/base.py index 15072192b..c6cfeb199 100644 --- a/tests/unit/exceptions/base.py +++ b/tests/unit/exceptions/base.py @@ -1,3 +1,7 @@ +# pylint: disable=missing-docstring +# pylint: disable=attribute-defined-outside-init +# pylint: disable=protected-access +# pylint: disable=line-too-long from logging import getLogger from typing import Callable @@ -15,24 +19,19 @@ class ExceptionBaseTest: """regex string to match the error message""" counted_metric_name: str - """name of the metric that should be counted""" + """name of the metric attribute as defined in instance class""" def setup_method(self): - self.processor = Factory.create( - {"my_dissector": {"type": "dissector", "specific_rules": [], "generic_rules": []}}, - getLogger(), - ) - self.rule = Rule._create_from_dict({"filter": "message", "rule": {}}) + self.object = Rule._create_from_dict({"filter": "message", "rule": {}}) self.event = {"message": "test_event"} - self.exception_args = ("the error message", self.rule, self.event) + self.exception_args = ("the error message", self.object, self.event) def test_error_message(self): with pytest.raises(self.exception, match=self.error_message): raise self.exception(*self.exception_args) def test_metrics_counts(self): - setattr(self.rule.metrics, self.counted_metric_name, 0) - self.rule.metrics.number_of_warnings = 0 + setattr(self.object.metrics, self.counted_metric_name, 0) with pytest.raises(self.exception): raise self.exception(*self.exception_args) - assert getattr(self.rule.metrics, self.counted_metric_name) == 1 + assert getattr(self.object.metrics, self.counted_metric_name) == 1 diff --git a/tests/unit/exceptions/test_connector_exceptions.py b/tests/unit/exceptions/test_connector_exceptions.py new file mode 100644 index 000000000..3dd0096ad --- /dev/null +++ b/tests/unit/exceptions/test_connector_exceptions.py @@ -0,0 +1,172 @@ +# pylint: disable=missing-docstring +# pylint: disable=attribute-defined-outside-init +# pylint: disable=protected-access +# pylint: disable=line-too-long + +from logging import getLogger + +from logprep.abc.input import ( + CriticalInputError, + CriticalInputParsingError, + FatalInputError, + InputError, + InputWarning, + SourceDisconnectedWarning, +) +from logprep.abc.output import ( + CriticalOutputError, + FatalOutputError, + OutputError, + OutputWarning, +) +from logprep.factory import Factory +from tests.unit.exceptions.base import ExceptionBaseTest + + +class TestFatalOutputError(ExceptionBaseTest): + exception = FatalOutputError + + error_message = r"FatalOutputError in DummyOutput \(test connector\): the error message" + + counted_metric_name = "number_of_errors" + + def setup_method(self): + self.object = Factory.create( + {"test connector": {"type": "dummy_output", "default": False}}, logger=getLogger() + ) + self.exception_args = (self.object, "the error message") + + +class TestCriticalOutputError(ExceptionBaseTest): + exception = CriticalOutputError + + error_message = r"CriticalOutputError in DummyOutput \(test connector\): the error message" + + counted_metric_name = "number_of_errors" + + def setup_method(self): + self.object = Factory.create( + {"test connector": {"type": "dummy_output", "default": False}}, + logger=getLogger(), + ) + self.exception_args = (self.object, "the error message", b"raw input") + + +class TestOutputError(ExceptionBaseTest): + exception = OutputError + + error_message = r"OutputError in DummyOutput \(test connector\): the error message" + + counted_metric_name = "number_of_errors" + + def setup_method(self): + self.object = Factory.create( + {"test connector": {"type": "dummy_output", "default": False}}, + logger=getLogger(), + ) + self.exception_args = (self.object, "the error message") + + +class TestOutputWarning(ExceptionBaseTest): + exception = OutputWarning + + error_message = r"OutputWarning in DummyOutput \(test connector\): the error message" + + counted_metric_name = "number_of_warnings" + + def setup_method(self): + self.object = Factory.create( + {"test connector": {"type": "dummy_output", "default": False}}, + logger=getLogger(), + ) + self.exception_args = (self.object, "the error message") + + +class TestInputError(ExceptionBaseTest): + exception = InputError + + error_message = r"InputError in DummyInput \(test connector\): the error message" + + counted_metric_name = "number_of_errors" + + def setup_method(self): + self.object = Factory.create( + {"test connector": {"type": "dummy_input", "documents": []}}, + logger=getLogger(), + ) + self.exception_args = (self.object, "the error message") + + +class TestCriticalInputError(ExceptionBaseTest): + exception = CriticalInputError + + error_message = r"CriticalInputError in DummyInput \(test connector\): the error message" + + counted_metric_name = "number_of_errors" + + def setup_method(self): + self.object = Factory.create( + {"test connector": {"type": "dummy_input", "documents": []}}, + logger=getLogger(), + ) + self.exception_args = (self.object, "the error message", b"raw input") + + +class TestCriticalInputParsingError(ExceptionBaseTest): + exception = CriticalInputParsingError + + error_message = r"CriticalInputParsingError in DummyInput \(test connector\): the error message" + + counted_metric_name = "number_of_errors" + + def setup_method(self): + self.object = Factory.create( + {"test connector": {"type": "dummy_input", "documents": []}}, + logger=getLogger(), + ) + self.exception_args = (self.object, "the error message", b"raw input") + + +class TestFatalInputError(ExceptionBaseTest): + exception = FatalInputError + + error_message = r"FatalInputError in DummyInput \(test connector\): the error message" + + counted_metric_name = "number_of_errors" + + def setup_method(self): + self.object = Factory.create( + {"test connector": {"type": "dummy_input", "documents": []}}, + logger=getLogger(), + ) + self.exception_args = (self.object, "the error message") + + +class TestInputWarning(ExceptionBaseTest): + exception = InputWarning + + error_message = r"InputWarning in DummyInput \(test connector\): the error message" + + counted_metric_name = "number_of_warnings" + + def setup_method(self): + self.object = Factory.create( + {"test connector": {"type": "dummy_input", "documents": []}}, + logger=getLogger(), + ) + self.exception_args = (self.object, "the error message") + + +class TestSourceDisconnectedWarning(ExceptionBaseTest): + exception = SourceDisconnectedWarning + + error_message = r"SourceDisconnectedWarning in DummyInput \(test connector\): the error message" + + counted_metric_name = "number_of_warnings" + + def setup_method(self): + self.object = Factory.create( + {"test connector": {"type": "dummy_input", "documents": []}}, + logger=getLogger(), + ) + self.exception_args = (self.object, "the error message") diff --git a/tests/unit/exceptions/test_processing_exceptions.py b/tests/unit/exceptions/test_processing_exceptions.py index 4d59584cc..ce7473ff4 100644 --- a/tests/unit/exceptions/test_processing_exceptions.py +++ b/tests/unit/exceptions/test_processing_exceptions.py @@ -35,7 +35,7 @@ class TestFieldExsitsWarning(ExceptionBaseTest): def setup_method(self): super().setup_method() - self.exception_args = (self.rule, self.event, ["my_field"]) + self.exception_args = (self.object, self.event, ["my_field"]) class TestProcessingCriticalError(ExceptionBaseTest): @@ -59,4 +59,4 @@ class TestProcessingError(ExceptionBaseTest): def setup_method(self): super().setup_method() - self.exception_args = ("the error message", self.rule) + self.exception_args = ("the error message", self.object) diff --git a/tests/unit/framework/test_pipeline.py b/tests/unit/framework/test_pipeline.py index c19ff04a5..090771ebf 100644 --- a/tests/unit/framework/test_pipeline.py +++ b/tests/unit/framework/test_pipeline.py @@ -18,14 +18,14 @@ CriticalInputError, CriticalInputParsingError, FatalInputError, - SourceDisconnectedError, - WarningInputError, + InputWarning, + SourceDisconnectedWarning, ) from logprep.abc.output import ( CriticalOutputError, FatalOutputError, Output, - WarningOutputError, + OutputWarning, ) from logprep.factory import Factory from logprep.framework.pipeline import MultiprocessingPipeline, Pipeline, SharedCounter @@ -193,7 +193,7 @@ def test_input_warning_error_is_logged_but_processing_continues(self, mock_warni self.pipeline._setup() def raise_warning_error(_): - raise WarningInputError(self.pipeline._input, "i warn you") + raise InputWarning(self.pipeline._input, "i warn you") self.pipeline._input.metrics = mock.MagicMock() self.pipeline._input.metrics.number_of_warnings = 0 @@ -204,7 +204,7 @@ def raise_warning_error(_): self.pipeline._input.get_next.side_effect = None self.pipeline.process_pipeline() assert self.pipeline._input.get_next.call_count == 3 - mock_warning.assert_called_with(str(WarningInputError(self.pipeline._input, "i warn you"))) + mock_warning.assert_called_with(str(InputWarning(self.pipeline._input, "i warn you"))) assert self.pipeline._output["dummy"].store.call_count == 2 @mock.patch("logging.Logger.warning") @@ -214,7 +214,7 @@ def test_output_warning_error_is_logged_but_processing_continues(self, mock_warn self.pipeline._output["dummy"].metrics = mock.MagicMock() self.pipeline._output["dummy"].metrics.number_of_warnings = 0 self.pipeline.process_pipeline() - self.pipeline._output["dummy"].store.side_effect = WarningOutputError( + self.pipeline._output["dummy"].store.side_effect = OutputWarning( self.pipeline._output["dummy"], "" ) self.pipeline.process_pipeline() @@ -306,14 +306,14 @@ def raise_critical(timeout): @mock.patch("logging.Logger.warning") def test_input_warning_is_logged(self, mock_warning, _): def raise_warning(_): - raise WarningInputError(self.pipeline._input, "mock input warning") + raise InputWarning(self.pipeline._input, "mock input warning") self.pipeline._setup() self.pipeline._input.get_next.side_effect = raise_warning self.pipeline.process_pipeline() self.pipeline._input.get_next.assert_called() mock_warning.assert_called_with( - str(WarningInputError(self.pipeline._input, "mock input warning")) + str(InputWarning(self.pipeline._input, "mock input warning")) ) @mock.patch("logging.Logger.error") @@ -342,7 +342,7 @@ def test_warning_output_error_is_logged(self, mock_warning, _): dummy_output = original_create({"dummy_output": {"type": "dummy_output"}}, mock.MagicMock()) def raise_warning(event): - raise WarningOutputError(self.pipeline._output["dummy"], "mock output warning") + raise OutputWarning(self.pipeline._output["dummy"], "mock output warning") input_event = {"test": "message"} dummy_output.store = raise_warning @@ -351,7 +351,7 @@ def raise_warning(event): self.pipeline._input.get_next.return_value = (input_event, None) self.pipeline.process_pipeline() mock_warning.assert_called_with( - str(WarningOutputError(self.pipeline._output["dummy"], "mock output warning")) + str(OutputWarning(self.pipeline._output["dummy"], "mock output warning")) ) @mock.patch("logging.Logger.error") @@ -746,7 +746,7 @@ def test_graceful_shutdown_of_pipeline_on_source_disconnected_error(self, capfd) pipeline._input.get_next = mock.MagicMock() def raise_source_disconnected_error(_): - raise SourceDisconnectedError(pipeline._input, "source was disconnected") + raise SourceDisconnectedWarning(pipeline._input, "source was disconnected") pipeline._input.get_next.side_effect = raise_source_disconnected_error pipeline.start()