diff --git a/logprep/framework/pipeline_manager.py b/logprep/framework/pipeline_manager.py index 02ee14d8b..404e8516b 100644 --- a/logprep/framework/pipeline_manager.py +++ b/logprep/framework/pipeline_manager.py @@ -80,7 +80,7 @@ class ComponentQueueListener: """The process that is forked to listen to the queue.""" _implementation: str = field( - init=False, default="threading", validator=validators.in_(["threading", "multiprocessing"]) + default="threading", validator=validators.in_(["threading", "multiprocessing"]) ) """The implementation to use for the listener. Options are threading or multiprocessing. Default is threading.""" @@ -121,7 +121,7 @@ def _listen(self): try: target(item) except Exception as error: # pylint: disable=broad-except - logger.error("Error processing item: %s", error) + logger.error("Error processing item: %s due to %s", item, error) def stop(self): """Stop the listener.""" @@ -299,11 +299,12 @@ def start(self): self.set_count(self._configuration.process_count) def restart(self): - """Restarts all pipelines""" + """Restarts the manager.""" self.stop() self.start() def reload(self): + """Restarts all pipelines.""" self.set_count(0) self.set_count(self._configuration.process_count) diff --git a/tests/unit/framework/test_pipeline_manager.py b/tests/unit/framework/test_pipeline_manager.py index 88d7cad1c..811f8a680 100644 --- a/tests/unit/framework/test_pipeline_manager.py +++ b/tests/unit/framework/test_pipeline_manager.py @@ -3,6 +3,7 @@ # pylint: disable=attribute-defined-outside-init # pylint: disable=unnecessary-lambda-assignment import multiprocessing +import threading from copy import deepcopy from logging import Logger from logging.config import dictConfig @@ -442,30 +443,62 @@ class TestComponentQueueListener: [ ( { + "config": {"random_name": {"type": "dummy_output"}}, "queue": ThrottlingQueue(multiprocessing.get_context(), 100), - "target": lambda x: None, + "target": "random", }, None, ), ( { + "config": {"random_name": {"type": "dummy_output"}}, "queue": ThrottlingQueue(multiprocessing.get_context(), 100), - "target": lambda x: None, + "target": "random", "sentinel": object(), }, None, ), ( { + "config": {"random_name": {"type": "dummy_output"}}, "queue": ThrottlingQueue(multiprocessing.get_context(), 100), - "target": "I am not callable", + "target": "random", + "sentinel": object(), + "implementation": "threading", + }, + None, + ), + ( + { + "config": {"random_name": {"type": "dummy_output"}}, + "queue": ThrottlingQueue(multiprocessing.get_context(), 100), + "target": "random", + "sentinel": object(), + "implementation": "multiprocessing", + }, + None, + ), + ( + { + "config": {"random_name": {"type": "dummy_output"}}, + "queue": ThrottlingQueue(multiprocessing.get_context(), 100), + "target": lambda x: x, }, TypeError, ), ( { + "config": {"random_name": {"type": "dummy_output"}}, "queue": "I am not a queue", - "target": lambda x: None, + "target": "random", + }, + TypeError, + ), + ( + { + "config": "I am not a config", + "queue": ThrottlingQueue(multiprocessing.get_context(), 100), + "target": "random", }, TypeError, ), @@ -478,68 +511,77 @@ def test_sets_parameters(self, parameters, error): else: ComponentQueueListener(**parameters) - def test_init_sets_process_but_does_not_start_it(self): - target = lambda x: None + def test_init_sets_thread_but_does_not_start_it(self): + target = "store" queue = ThrottlingQueue(multiprocessing.get_context(), 100) - listener = ComponentQueueListener(queue, target) + output_config = {"random_name": {"type": "dummy_output"}} + listener = ComponentQueueListener(queue, target, output_config) assert listener._instance is not None - assert isinstance(listener._instance, multiprocessing.Process) + assert isinstance(listener._instance, threading.Thread) assert not listener._instance.is_alive() def test_init_sets_process_target(self): - target = lambda x: None + target = "store" + output_config = {"random_name": {"type": "dummy_output"}} queue = ThrottlingQueue(multiprocessing.get_context(), 100) - with mock.patch("multiprocessing.Process") as process_mock: - listener = ComponentQueueListener(queue, target) - process_mock.assert_called_with(target=listener._listen, daemon=True) + with mock.patch("threading.Thread") as thread_mock: + listener = ComponentQueueListener(queue, target, output_config) + thread_mock.assert_called_with(target=listener._listen, daemon=True) - def test_start_starts_process(self): - target = lambda x: None + def test_start_starts_thread(self): + target = "store" + output_config = {"random_name": {"type": "dummy_output"}} queue = ThrottlingQueue(multiprocessing.get_context(), 100) - listener = ComponentQueueListener(queue, target) + listener = ComponentQueueListener(queue, target, output_config) with mock.patch.object(listener._instance, "start") as mock_start: listener.start() mock_start.assert_called() def test_sentinel_breaks_while_loop(self): - target = lambda x: None + target = "store" + output_config = {"random_name": {"type": "dummy_output"}} queue = ThrottlingQueue(multiprocessing.get_context(), 100) - listener = ComponentQueueListener(queue, target) + listener = ComponentQueueListener(queue, target, output_config) listener.queue.put(listener.sentinel) listener._listen() assert listener.queue.empty() def test_stop_injects_sentinel(self): - target = lambda x: None - with mock.patch("multiprocessing.Process"): + target = "store" + output_config = {"random_name": {"type": "dummy_output"}} + with mock.patch("threading.Thread"): queue = ThrottlingQueue(multiprocessing.get_context(), 100) - listener = ComponentQueueListener(queue, target) + listener = ComponentQueueListener(queue, target, output_config) with mock.patch.object(queue, "put") as mock_put: listener.stop() mock_put.assert_called_with(listener.sentinel) def test_stop_joins_process(self): - target = lambda x: None - with mock.patch("multiprocessing.Process"): + target = "store" + output_config = {"random_name": {"type": "dummy_output"}} + with mock.patch("threading.Thread"): queue = ThrottlingQueue(multiprocessing.get_context(), 100) - listener = ComponentQueueListener(queue, target) + listener = ComponentQueueListener(queue, target, output_config) listener.stop() listener._instance.join.assert_called() def test_stop_closes_queue(self): - target = lambda x: None - with mock.patch("multiprocessing.Process"): + target = "store" + output_config = {"random_name": {"type": "dummy_output"}} + with mock.patch("threading.Thread"): queue = ThrottlingQueue(multiprocessing.get_context(), 100) - listener = ComponentQueueListener(queue, target) + listener = ComponentQueueListener(queue, target, output_config) with mock.patch.object(queue, "close") as mock_close: listener.stop() mock_close.assert_called() def test_listen_calls_target(self): - target = mock.MagicMock() + target = "store" + output_config = {"random_name": {"type": "dummy_output"}} queue = ThrottlingQueue(multiprocessing.get_context(), 100) - listener = ComponentQueueListener(queue, target) - listener.queue.put("test") - listener.queue.put(listener.sentinel) - listener._listen() - target.assert_called_with("test") + with mock.patch("logprep.connector.dummy.output.DummyOutput.store") as mock_store: + listener = ComponentQueueListener(queue, target, output_config) + listener.queue.put("test") + listener.queue.put(listener.sentinel) + listener._listen() + mock_store.assert_called_with("test")