diff --git a/logprep/framework/pipeline_manager.py b/logprep/framework/pipeline_manager.py index 066075891..2c222fab2 100644 --- a/logprep/framework/pipeline_manager.py +++ b/logprep/framework/pipeline_manager.py @@ -105,6 +105,9 @@ def _get_component_instance(self): logger.error("Error output not reachable. Exiting...") self.queue.put(self.sentinel) raise error from error + # wait for setup method in pipeline manager to receive the message + while not self.queue.empty(): + logger.debug("Waiting for receiver to be ready") return component def _listen(self): diff --git a/tests/unit/framework/test_pipeline_manager.py b/tests/unit/framework/test_pipeline_manager.py index a030c23ab..17cc7c25f 100644 --- a/tests/unit/framework/test_pipeline_manager.py +++ b/tests/unit/framework/test_pipeline_manager.py @@ -502,14 +502,19 @@ def test_start_starts_thread(self): listener.start() mock_start.assert_called() + @mock.patch( + "logprep.framework.pipeline_manager.ComponentQueueListener._get_component_instance", + new=mock.MagicMock(), + ) def test_sentinel_breaks_while_loop(self): target = "store" output_config = {"random_name": {"type": "dummy_output"}} queue = ThrottlingQueue(multiprocessing.get_context(), 100) listener = ComponentQueueListener(queue, target, output_config) listener.queue.put(listener.sentinel) - listener._listen() - assert not listener._instance.is_alive() + with mock.patch("logging.Logger.debug") as mock_debug: + listener._listen() + mock_debug.assert_called_with("Got sentinel. Stopping listener.") def test_stop_injects_sentinel(self): target = "store" @@ -534,6 +539,7 @@ def test_listen_calls_target(self): target = "store" output_config = {"random_name": {"type": "dummy_output"}} queue = ThrottlingQueue(multiprocessing.get_context(), 100) + queue.empty = mock.MagicMock(return_value=True) with mock.patch("logprep.connector.dummy.output.DummyOutput.store") as mock_store: listener = ComponentQueueListener(queue, target, output_config) listener.queue.put("test") @@ -545,6 +551,7 @@ def test_listen_creates_component(self): target = "store" output_config = {"random_name": {"type": "dummy_output"}} queue = ThrottlingQueue(multiprocessing.get_context(), 100) + queue.empty = mock.MagicMock(return_value=True) listener = ComponentQueueListener(queue, target, output_config) with mock.patch("logprep.factory.Factory.create") as mock_create: listener.queue.put("test") @@ -556,6 +563,7 @@ def test_listen_setups_component(self): target = "store" output_config = {"random_name": {"type": "dummy_output"}} queue = ThrottlingQueue(multiprocessing.get_context(), 100) + queue.empty = mock.MagicMock(return_value=True) listener = ComponentQueueListener(queue, target, output_config) with mock.patch("logprep.connector.dummy.output.DummyOutput.setup") as mock_setup: listener.queue.put("test") @@ -577,6 +585,7 @@ def test_listen_calls_component_shutdown_by_injecting_sentinel(self): target = "store" output_config = {"random_name": {"type": "dummy_output"}} queue = ThrottlingQueue(multiprocessing.get_context(), 100) + queue.empty = mock.MagicMock(return_value=True) listener = ComponentQueueListener(queue, target, output_config) with mock.patch("logprep.connector.dummy.output.DummyOutput.shut_down") as mock_shutdown: listener.queue.put("test") @@ -584,20 +593,24 @@ def test_listen_calls_component_shutdown_by_injecting_sentinel(self): listener._listen() mock_shutdown.assert_called() - def test_listen_ensures_error_queue_is_drained_before_shutdown(self): + @mock.patch( + "logprep.framework.pipeline_manager.ComponentQueueListener._get_component_instance", + new=mock.MagicMock(), + ) + def test_listen_drains_queue_on_shutdown(self): target = "store" output_config = {"random_name": {"type": "dummy_output"}} - queue = ThrottlingQueue(multiprocessing.get_context(), 100) + queue = multiprocessing.Queue() listener = ComponentQueueListener(queue, target, output_config) - random_number = random.randint(10, 50) listener.queue.put(listener.sentinel) - for _ in range(random_number): - listener.queue.put("test") - while not listener.queue.qsize() == random_number + 1: - pass + listener.queue.put("test") listener._listen() assert listener.queue.qsize() == 0 + @mock.patch( + "logprep.framework.pipeline_manager.ComponentQueueListener._get_component_instance", + new=mock.MagicMock(), + ) def test_listen_ensures_error_queue_is_closed_after_drained(self): target = "store" output_config = {"random_name": {"type": "dummy_output"}}