diff --git a/logprep/framework/pipeline.py b/logprep/framework/pipeline.py index 3ce98dd2c..87e21bca8 100644 --- a/logprep/framework/pipeline.py +++ b/logprep/framework/pipeline.py @@ -17,7 +17,7 @@ from functools import cached_property, partial from importlib.metadata import version from multiprocessing import Value, current_process -from typing import Any, Generator, Iterable, List, Tuple +from typing import Any, List, Tuple import attrs @@ -233,6 +233,7 @@ def __init__( def _setup(self): self.logger.debug("Creating connectors") for _, output in self._output.items(): + # TODO remove this output.input_connector = self._input if output.default: self._input.output_connector = output diff --git a/tests/unit/framework/test_pipeline_manager.py b/tests/unit/framework/test_pipeline_manager.py index cc559903f..0c8d941d6 100644 --- a/tests/unit/framework/test_pipeline_manager.py +++ b/tests/unit/framework/test_pipeline_manager.py @@ -267,7 +267,9 @@ def test_restart_injects_healthcheck_functions(self): def test_setup_error_queue_sets_error_queue_and_starts_listener(self): self.config.error_output = {"dummy": {"type": "dummy_output"}} with mock.patch("logprep.framework.pipeline_manager.ComponentQueueListener"): - manager = PipelineManager(self.config) + with mock.patch("logprep.framework.pipeline_manager.ThrottlingQueue") as mock_queue: + mock_queue.get.return_value = "not null" + manager = PipelineManager(self.config) assert manager._error_queue is not None assert manager._error_listener is not None manager._error_listener.start.assert_called() # pylint: disable=no-member @@ -280,15 +282,11 @@ def test_setup_does_not_sets_error_queue_if_no_error_output(self): def test_setup_error_queue_raises_system_exit_if_error_listener_fails(self): self.config.error_output = {"dummy": {"type": "dummy_output"}} - mock_listener = mock.MagicMock() - mock_listener.setup_successful = False - with mock.patch( - "logprep.framework.pipeline_manager.ComponentQueueListener", return_value=mock_listener - ): - with mock.patch("logging.Logger.error") as mock_error: - with pytest.raises(SystemExit): + with mock.patch("logprep.framework.pipeline_manager.ComponentQueueListener"): + with mock.patch("logprep.framework.pipeline_manager.ThrottlingQueue.get") as mock_get: + mock_get.return_value = None + with pytest.raises(SystemExit, match="4"): PipelineManager(self.config) - mock_error.assert_called() def test_should_exit_returns_bool_based_on_restart_count(self): self.config.restart_count = 2 @@ -302,16 +300,20 @@ def test_should_exit_returns_bool_based_on_restart_count(self): def test_stop_calls_stop_on_error_listener(self): self.config.error_output = {"dummy": {"type": "dummy_output"}} with mock.patch("logprep.framework.pipeline_manager.ComponentQueueListener"): - manager = PipelineManager(self.config) - manager.stop() + with mock.patch("logprep.framework.pipeline_manager.ThrottlingQueue.get") as mock_get: + mock_get.return_value = "not None" + manager = PipelineManager(self.config) + manager.stop() manager._error_listener.stop.assert_called() # pylint: disable=no-member def test_stop_calls_stop_on_loghandler(self): self.config.error_output = {"dummy": {"type": "dummy_output"}} with mock.patch("logprep.framework.pipeline_manager.ComponentQueueListener"): - manager = PipelineManager(self.config) - manager.loghandler = mock.MagicMock() - manager.stop() + with mock.patch("logprep.framework.pipeline_manager.ThrottlingQueue.get") as mock_get: + mock_get.return_value = "not None" + manager = PipelineManager(self.config) + manager.loghandler = mock.MagicMock() + manager.stop() manager.loghandler.stop.assert_called() def test_restart_with_error_output_calls_pipeline_with_error_queue(self):