Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add tests for shutdown behavior to pipeline_manager
Browse files Browse the repository at this point in the history
ekneg54 committed Oct 2, 2024
1 parent c4c2b88 commit 1ff01f5
Showing 2 changed files with 25 additions and 9 deletions.
3 changes: 3 additions & 0 deletions logprep/framework/pipeline_manager.py
Original file line number Diff line number Diff line change
@@ -105,6 +105,9 @@ def _get_component_instance(self):
logger.error("Error output not reachable. Exiting...")
self.queue.put(self.sentinel)
raise error from error
# wait for setup method in pipeline manager to receive the message
while not self.queue.empty():
logger.debug("Waiting for receiver to be ready")
return component

def _listen(self):
31 changes: 22 additions & 9 deletions tests/unit/framework/test_pipeline_manager.py
Original file line number Diff line number Diff line change
@@ -502,14 +502,19 @@ def test_start_starts_thread(self):
listener.start()
mock_start.assert_called()

@mock.patch(
"logprep.framework.pipeline_manager.ComponentQueueListener._get_component_instance",
new=mock.MagicMock(),
)
def test_sentinel_breaks_while_loop(self):
target = "store"
output_config = {"random_name": {"type": "dummy_output"}}
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
listener = ComponentQueueListener(queue, target, output_config)
listener.queue.put(listener.sentinel)
listener._listen()
assert not listener._instance.is_alive()
with mock.patch("logging.Logger.debug") as mock_debug:
listener._listen()
mock_debug.assert_called_with("Got sentinel. Stopping listener.")

def test_stop_injects_sentinel(self):
target = "store"
@@ -534,6 +539,7 @@ def test_listen_calls_target(self):
target = "store"
output_config = {"random_name": {"type": "dummy_output"}}
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
queue.empty = mock.MagicMock(return_value=True)
with mock.patch("logprep.connector.dummy.output.DummyOutput.store") as mock_store:
listener = ComponentQueueListener(queue, target, output_config)
listener.queue.put("test")
@@ -545,6 +551,7 @@ def test_listen_creates_component(self):
target = "store"
output_config = {"random_name": {"type": "dummy_output"}}
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
queue.empty = mock.MagicMock(return_value=True)
listener = ComponentQueueListener(queue, target, output_config)
with mock.patch("logprep.factory.Factory.create") as mock_create:
listener.queue.put("test")
@@ -556,6 +563,7 @@ def test_listen_setups_component(self):
target = "store"
output_config = {"random_name": {"type": "dummy_output"}}
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
queue.empty = mock.MagicMock(return_value=True)
listener = ComponentQueueListener(queue, target, output_config)
with mock.patch("logprep.connector.dummy.output.DummyOutput.setup") as mock_setup:
listener.queue.put("test")
@@ -577,27 +585,32 @@ def test_listen_calls_component_shutdown_by_injecting_sentinel(self):
target = "store"
output_config = {"random_name": {"type": "dummy_output"}}
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
queue.empty = mock.MagicMock(return_value=True)
listener = ComponentQueueListener(queue, target, output_config)
with mock.patch("logprep.connector.dummy.output.DummyOutput.shut_down") as mock_shutdown:
listener.queue.put("test")
listener.queue.put(listener.sentinel)
listener._listen()
mock_shutdown.assert_called()

def test_listen_ensures_error_queue_is_drained_before_shutdown(self):
@mock.patch(
"logprep.framework.pipeline_manager.ComponentQueueListener._get_component_instance",
new=mock.MagicMock(),
)
def test_listen_drains_queue_on_shutdown(self):
target = "store"
output_config = {"random_name": {"type": "dummy_output"}}
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
queue = multiprocessing.Queue()
listener = ComponentQueueListener(queue, target, output_config)
random_number = random.randint(10, 50)
listener.queue.put(listener.sentinel)
for _ in range(random_number):
listener.queue.put("test")
while not listener.queue.qsize() == random_number + 1:
pass
listener.queue.put("test")
listener._listen()
assert listener.queue.qsize() == 0

@mock.patch(
"logprep.framework.pipeline_manager.ComponentQueueListener._get_component_instance",
new=mock.MagicMock(),
)
def test_listen_ensures_error_queue_is_closed_after_drained(self):
target = "store"
output_config = {"random_name": {"type": "dummy_output"}}

0 comments on commit 1ff01f5

Please sign in to comment.