Skip to content

Commit

Permalink
add tests for componentqueuelistener
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Sep 20, 2024
1 parent 68630e1 commit ed6a150
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 6 deletions.
17 changes: 11 additions & 6 deletions logprep/framework/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,24 +58,30 @@ def put(self, obj, block=True, timeout=None, batch_size=1):
super().put(obj, block=block, timeout=timeout)


@define(kw_only=True)
@define()
class ComponentQueueListener:
"""This forks a process and handles all items from the given queue into
the specified callable. It uses a sentinel object to stop the process."""

_queue: multiprocessing.Queue = field(
_queue: multiprocessing.queues.Queue = field(
validator=validators.instance_of(multiprocessing.queues.Queue)
)
"""The queue to listen to."""

_target: Callable = field(validator=validators.instance_of(Callable))
"""The target callable to call with the items from the queue."""

_sentinel: Any = field(factory=object)
_sentinel: Any = field(default=None)
"""The sentinel object to stop the process. This has to implement identity comparison."""

_process: multiprocessing.Process = None
_process: multiprocessing.Process = field(init=False)
"""The process that is forked to listen to the queue."""

def __attrs_post_init__(self):
self._process = multiprocessing.Process(target=self._listen, daemon=True)

def start(self):
"""Start the listener."""
self._process = multiprocessing.Process(target=self._listen, daemon=True)
self._process.start()

def _listen(self):
Expand All @@ -92,7 +98,6 @@ def stop(self):
"""Stop the listener."""
self._queue.put(self._sentinel)
self._process.join()
self._queue.close()


class PipelineManager:
Expand Down
41 changes: 41 additions & 0 deletions tests/unit/framework/test_pipeline_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# pylint: disable=missing-docstring
# pylint: disable=protected-access
# pylint: disable=attribute-defined-outside-init
# pylint: disable=unnecessary-lambda-assignment
import multiprocessing
from copy import deepcopy
from logging import Logger
Expand Down Expand Up @@ -358,3 +359,43 @@ def test_sets_parameters(self, parameters, error):
ComponentQueueListener(**parameters)
else:
ComponentQueueListener(**parameters)

def test_init_sets_process_but_does_not_start_it(self):
target = lambda x: None
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
listener = ComponentQueueListener(queue, target)
assert listener._process is not None
assert isinstance(listener._process, multiprocessing.Process)
assert not listener._process.is_alive()

def test_init_sets_process_target(self):
target = lambda x: None
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
with mock.patch("multiprocessing.Process") as process_mock:
listener = ComponentQueueListener(queue, target)
process_mock.assert_called_with(target=listener._listen, daemon=True)

def test_start_starts_process(self):
target = lambda x: None
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
listener = ComponentQueueListener(queue, target)
with mock.patch.object(listener._process, "start") as mock_start:
listener.start()
mock_start.assert_called()

def test_sentinel_breaks_while_loop(self):
target = lambda x: None
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
listener = ComponentQueueListener(queue, target)
listener._queue.put(listener._sentinel)
listener._listen()
assert listener._queue.empty()

def test_stop_stop_injects_sentinel(self):
target = lambda x: None
with mock.patch("multiprocessing.Process") as mock_process:
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
listener = ComponentQueueListener(queue, target)
listener.start()
listener.stop()
assert listener._queue.get() == listener._sentinel

0 comments on commit ed6a150

Please sign in to comment.