Skip to content

Commit

Permalink
fix pipeline_manager tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Oct 1, 2024
1 parent ea2d570 commit f7fb0b9
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 15 deletions.
3 changes: 2 additions & 1 deletion logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from functools import cached_property, partial
from importlib.metadata import version
from multiprocessing import Value, current_process
from typing import Any, Generator, Iterable, List, Tuple
from typing import Any, List, Tuple

import attrs

Expand Down Expand Up @@ -233,6 +233,7 @@ def __init__(
def _setup(self):
self.logger.debug("Creating connectors")
for _, output in self._output.items():
# TODO remove this
output.input_connector = self._input
if output.default:
self._input.output_connector = output
Expand Down
30 changes: 16 additions & 14 deletions tests/unit/framework/test_pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,9 @@ def test_restart_injects_healthcheck_functions(self):
def test_setup_error_queue_sets_error_queue_and_starts_listener(self):
self.config.error_output = {"dummy": {"type": "dummy_output"}}
with mock.patch("logprep.framework.pipeline_manager.ComponentQueueListener"):
manager = PipelineManager(self.config)
with mock.patch("logprep.framework.pipeline_manager.ThrottlingQueue") as mock_queue:
mock_queue.get.return_value = "not null"
manager = PipelineManager(self.config)
assert manager._error_queue is not None
assert manager._error_listener is not None
manager._error_listener.start.assert_called() # pylint: disable=no-member
Expand All @@ -280,15 +282,11 @@ def test_setup_does_not_sets_error_queue_if_no_error_output(self):

def test_setup_error_queue_raises_system_exit_if_error_listener_fails(self):
self.config.error_output = {"dummy": {"type": "dummy_output"}}
mock_listener = mock.MagicMock()
mock_listener.setup_successful = False
with mock.patch(
"logprep.framework.pipeline_manager.ComponentQueueListener", return_value=mock_listener
):
with mock.patch("logging.Logger.error") as mock_error:
with pytest.raises(SystemExit):
with mock.patch("logprep.framework.pipeline_manager.ComponentQueueListener"):
with mock.patch("logprep.framework.pipeline_manager.ThrottlingQueue.get") as mock_get:
mock_get.return_value = None
with pytest.raises(SystemExit, match="4"):
PipelineManager(self.config)
mock_error.assert_called()

def test_should_exit_returns_bool_based_on_restart_count(self):
self.config.restart_count = 2
Expand All @@ -302,16 +300,20 @@ def test_should_exit_returns_bool_based_on_restart_count(self):
def test_stop_calls_stop_on_error_listener(self):
self.config.error_output = {"dummy": {"type": "dummy_output"}}
with mock.patch("logprep.framework.pipeline_manager.ComponentQueueListener"):
manager = PipelineManager(self.config)
manager.stop()
with mock.patch("logprep.framework.pipeline_manager.ThrottlingQueue.get") as mock_get:
mock_get.return_value = "not None"
manager = PipelineManager(self.config)
manager.stop()
manager._error_listener.stop.assert_called() # pylint: disable=no-member

def test_stop_calls_stop_on_loghandler(self):
self.config.error_output = {"dummy": {"type": "dummy_output"}}
with mock.patch("logprep.framework.pipeline_manager.ComponentQueueListener"):
manager = PipelineManager(self.config)
manager.loghandler = mock.MagicMock()
manager.stop()
with mock.patch("logprep.framework.pipeline_manager.ThrottlingQueue.get") as mock_get:
mock_get.return_value = "not None"
manager = PipelineManager(self.config)
manager.loghandler = mock.MagicMock()
manager.stop()
manager.loghandler.stop.assert_called()

def test_restart_with_error_output_calls_pipeline_with_error_queue(self):
Expand Down

0 comments on commit f7fb0b9

Please sign in to comment.