Skip to content

Commit

Permalink
add queuelistener shutdown behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Oct 2, 2024
1 parent 2fba7b2 commit c4c2b88
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 19 deletions.
14 changes: 13 additions & 1 deletion logprep/framework/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,24 @@ def _listen(self):
target(item)
except Exception as error: # pylint: disable=broad-except
logger.error("Error processing item: %s due to %s", item, error)
self._drain_queue(target)
component.shut_down()

def _drain_queue(self, target):
while not self.queue.empty():
item = self.queue.get()
if item is self.sentinel:
logger.debug("Got another sentinel")
try:
target(item)
except Exception as error: # pylint: disable=broad-except
logger.error("Error processing item: %s due to %s", item, error)
self.queue.close() # close queue after draining to prevent message loss

def stop(self):
"""Stop the listener."""
self.queue.put(self.sentinel)
self._instance.join()
self.queue.close()
logger.debug("Stopped listener.")


Expand Down
84 changes: 66 additions & 18 deletions tests/unit/framework/test_pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# pylint: disable=attribute-defined-outside-init
# pylint: disable=unnecessary-lambda-assignment
import multiprocessing
import random
import threading
from copy import deepcopy
from logging import Logger
Expand Down Expand Up @@ -529,16 +530,6 @@ def test_stop_joins_process(self):
listener.stop()
listener._instance.join.assert_called()

def test_stop_closes_queue(self):
target = "store"
output_config = {"random_name": {"type": "dummy_output"}}
with mock.patch("threading.Thread"):
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
listener = ComponentQueueListener(queue, target, output_config)
with mock.patch.object(queue, "close") as mock_close:
listener.stop()
mock_close.assert_called()

def test_listen_calls_target(self):
target = "store"
output_config = {"random_name": {"type": "dummy_output"}}
Expand All @@ -551,16 +542,73 @@ def test_listen_calls_target(self):
mock_store.assert_called_with("test")

def test_listen_creates_component(self):
assert False
target = "store"
output_config = {"random_name": {"type": "dummy_output"}}
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
listener = ComponentQueueListener(queue, target, output_config)
with mock.patch("logprep.factory.Factory.create") as mock_create:
listener.queue.put("test")
listener.queue.put(listener.sentinel)
listener._listen()
mock_create.assert_called_with(output_config)

def test_get_component_instance_setups_component(self):
assert False
def test_listen_setups_component(self):
target = "store"
output_config = {"random_name": {"type": "dummy_output"}}
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
listener = ComponentQueueListener(queue, target, output_config)
with mock.patch("logprep.connector.dummy.output.DummyOutput.setup") as mock_setup:
listener.queue.put("test")
listener.queue.put(listener.sentinel)
listener._listen()
mock_setup.assert_called()

def test_get_component_instance_raises_if_setup_not_successful(self):
assert False
target = "store"
output_config = {"random_name": {"type": "dummy_output"}}
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
listener = ComponentQueueListener(queue, target, output_config)
with mock.patch("logprep.connector.dummy.output.DummyOutput.setup") as mock_setup:
mock_setup.side_effect = SystemExit(4)
with pytest.raises(SystemExit, match="4"):
listener._listen()

def test_listen_calls_component_shutdown_by_injecting_sentinel(self):
target = "store"
output_config = {"random_name": {"type": "dummy_output"}}
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
listener = ComponentQueueListener(queue, target, output_config)
with mock.patch("logprep.connector.dummy.output.DummyOutput.shut_down") as mock_shutdown:
listener.queue.put("test")
listener.queue.put(listener.sentinel)
listener._listen()
mock_shutdown.assert_called()

def test_stop_calls_component_shutdown(self):
assert False
def test_listen_ensures_error_queue_is_drained_before_shutdown(self):
target = "store"
output_config = {"random_name": {"type": "dummy_output"}}
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
listener = ComponentQueueListener(queue, target, output_config)
random_number = random.randint(10, 50)
listener.queue.put(listener.sentinel)
for _ in range(random_number):
listener.queue.put("test")
while not listener.queue.qsize() == random_number + 1:
pass
listener._listen()
assert listener.queue.qsize() == 0

def test_stop_ensures_error_queue_is_drained(self):
assert False
def test_listen_ensures_error_queue_is_closed_after_drained(self):
target = "store"
output_config = {"random_name": {"type": "dummy_output"}}
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
listener = ComponentQueueListener(queue, target, output_config)
random_number = random.randint(10, 50)
listener.queue.put(listener.sentinel)
for _ in range(random_number):
listener.queue.put("test")
while not listener.queue.qsize() == random_number + 1:
pass
listener._listen()
with pytest.raises(ValueError, match="is closed"):
listener.queue.put("test")

0 comments on commit c4c2b88

Please sign in to comment.