diff --git a/logprep/framework/pipeline_manager.py b/logprep/framework/pipeline_manager.py index 32cd82a72..7cec734ff 100644 --- a/logprep/framework/pipeline_manager.py +++ b/logprep/framework/pipeline_manager.py @@ -98,6 +98,7 @@ def stop(self): """Stop the listener.""" self._queue.put(self._sentinel) self._process.join() + self._queue.close() class PipelineManager: diff --git a/tests/unit/framework/test_pipeline_manager.py b/tests/unit/framework/test_pipeline_manager.py index cb3a2658f..cc090c1c5 100644 --- a/tests/unit/framework/test_pipeline_manager.py +++ b/tests/unit/framework/test_pipeline_manager.py @@ -391,11 +391,28 @@ def test_sentinel_breaks_while_loop(self): listener._listen() assert listener._queue.empty() - def test_stop_stop_injects_sentinel(self): + def test_stop_injects_sentinel(self): target = lambda x: None - with mock.patch("multiprocessing.Process") as mock_process: + with mock.patch("multiprocessing.Process"): + queue = ThrottlingQueue(multiprocessing.get_context(), 100) + listener = ComponentQueueListener(queue, target) + with mock.patch.object(queue, "put") as mock_put: + listener.stop() + mock_put.assert_called_with(listener._sentinel) + + def test_stop_joins_process(self): + target = lambda x: None + with mock.patch("multiprocessing.Process"): queue = ThrottlingQueue(multiprocessing.get_context(), 100) listener = ComponentQueueListener(queue, target) - listener.start() listener.stop() - assert listener._queue.get() == listener._sentinel + listener._process.join.assert_called() + + def test_stop_closes_queue(self): + target = lambda x: None + with mock.patch("multiprocessing.Process"): + queue = ThrottlingQueue(multiprocessing.get_context(), 100) + listener = ComponentQueueListener(queue, target) + with mock.patch.object(queue, "close") as mock_close: + listener.stop() + mock_close.assert_called()