Skip to content

Commit

Permalink
add more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Sep 20, 2024
1 parent ed6a150 commit 757ef5a
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 4 deletions.
1 change: 1 addition & 0 deletions logprep/framework/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def stop(self):
"""Stop the listener."""
self._queue.put(self._sentinel)
self._process.join()
self._queue.close()


class PipelineManager:
Expand Down
25 changes: 21 additions & 4 deletions tests/unit/framework/test_pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,11 +391,28 @@ def test_sentinel_breaks_while_loop(self):
listener._listen()
assert listener._queue.empty()

def test_stop_stop_injects_sentinel(self):
def test_stop_injects_sentinel(self):
target = lambda x: None
with mock.patch("multiprocessing.Process") as mock_process:
with mock.patch("multiprocessing.Process"):
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
listener = ComponentQueueListener(queue, target)
with mock.patch.object(queue, "put") as mock_put:
listener.stop()
mock_put.assert_called_with(listener._sentinel)

def test_stop_joins_process(self):
target = lambda x: None
with mock.patch("multiprocessing.Process"):
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
listener = ComponentQueueListener(queue, target)
listener.start()
listener.stop()
assert listener._queue.get() == listener._sentinel
listener._process.join.assert_called()

def test_stop_closes_queue(self):
target = lambda x: None
with mock.patch("multiprocessing.Process"):
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
listener = ComponentQueueListener(queue, target)
with mock.patch.object(queue, "close") as mock_close:
listener.stop()
mock_close.assert_called()

0 comments on commit 757ef5a

Please sign in to comment.