Skip to content

Commit

Permalink
fix componentqueuelistener tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Oct 1, 2024
1 parent 29a79cf commit 4512c25
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 35 deletions.
7 changes: 4 additions & 3 deletions logprep/framework/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class ComponentQueueListener:
"""The process that is forked to listen to the queue."""

_implementation: str = field(
init=False, default="threading", validator=validators.in_(["threading", "multiprocessing"])
default="threading", validator=validators.in_(["threading", "multiprocessing"])
)
"""The implementation to use for the listener. Options are threading or multiprocessing.
Default is threading."""
Expand Down Expand Up @@ -121,7 +121,7 @@ def _listen(self):
try:
target(item)
except Exception as error: # pylint: disable=broad-except
logger.error("Error processing item: %s", error)
logger.error("Error processing item: %s due to %s", item, error)

def stop(self):
"""Stop the listener."""
Expand Down Expand Up @@ -299,11 +299,12 @@ def start(self):
self.set_count(self._configuration.process_count)

def restart(self):
"""Restarts all pipelines"""
"""Restarts the manager."""
self.stop()
self.start()

def reload(self):
"""Restarts all pipelines."""
self.set_count(0)
self.set_count(self._configuration.process_count)

Expand Down
106 changes: 74 additions & 32 deletions tests/unit/framework/test_pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# pylint: disable=attribute-defined-outside-init
# pylint: disable=unnecessary-lambda-assignment
import multiprocessing
import threading
from copy import deepcopy
from logging import Logger
from logging.config import dictConfig
Expand Down Expand Up @@ -442,30 +443,62 @@ class TestComponentQueueListener:
[
(
{
"config": {"random_name": {"type": "dummy_output"}},
"queue": ThrottlingQueue(multiprocessing.get_context(), 100),
"target": lambda x: None,
"target": "random",
},
None,
),
(
{
"config": {"random_name": {"type": "dummy_output"}},
"queue": ThrottlingQueue(multiprocessing.get_context(), 100),
"target": lambda x: None,
"target": "random",
"sentinel": object(),
},
None,
),
(
{
"config": {"random_name": {"type": "dummy_output"}},
"queue": ThrottlingQueue(multiprocessing.get_context(), 100),
"target": "I am not callable",
"target": "random",
"sentinel": object(),
"implementation": "threading",
},
None,
),
(
{
"config": {"random_name": {"type": "dummy_output"}},
"queue": ThrottlingQueue(multiprocessing.get_context(), 100),
"target": "random",
"sentinel": object(),
"implementation": "multiprocessing",
},
None,
),
(
{
"config": {"random_name": {"type": "dummy_output"}},
"queue": ThrottlingQueue(multiprocessing.get_context(), 100),
"target": lambda x: x,
},
TypeError,
),
(
{
"config": {"random_name": {"type": "dummy_output"}},
"queue": "I am not a queue",
"target": lambda x: None,
"target": "random",
},
TypeError,
),
(
{
"config": "I am not a config",
"queue": ThrottlingQueue(multiprocessing.get_context(), 100),
"target": "random",
},
TypeError,
),
Expand All @@ -478,68 +511,77 @@ def test_sets_parameters(self, parameters, error):
else:
ComponentQueueListener(**parameters)

def test_init_sets_process_but_does_not_start_it(self):
target = lambda x: None
def test_init_sets_thread_but_does_not_start_it(self):
target = "store"
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
listener = ComponentQueueListener(queue, target)
output_config = {"random_name": {"type": "dummy_output"}}
listener = ComponentQueueListener(queue, target, output_config)
assert listener._instance is not None
assert isinstance(listener._instance, multiprocessing.Process)
assert isinstance(listener._instance, threading.Thread)
assert not listener._instance.is_alive()

def test_init_sets_process_target(self):
target = lambda x: None
target = "store"
output_config = {"random_name": {"type": "dummy_output"}}
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
with mock.patch("multiprocessing.Process") as process_mock:
listener = ComponentQueueListener(queue, target)
process_mock.assert_called_with(target=listener._listen, daemon=True)
with mock.patch("threading.Thread") as thread_mock:
listener = ComponentQueueListener(queue, target, output_config)
thread_mock.assert_called_with(target=listener._listen, daemon=True)

def test_start_starts_process(self):
target = lambda x: None
def test_start_starts_thread(self):
target = "store"
output_config = {"random_name": {"type": "dummy_output"}}
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
listener = ComponentQueueListener(queue, target)
listener = ComponentQueueListener(queue, target, output_config)
with mock.patch.object(listener._instance, "start") as mock_start:
listener.start()
mock_start.assert_called()

def test_sentinel_breaks_while_loop(self):
target = lambda x: None
target = "store"
output_config = {"random_name": {"type": "dummy_output"}}
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
listener = ComponentQueueListener(queue, target)
listener = ComponentQueueListener(queue, target, output_config)
listener.queue.put(listener.sentinel)
listener._listen()
assert listener.queue.empty()

def test_stop_injects_sentinel(self):
target = lambda x: None
with mock.patch("multiprocessing.Process"):
target = "store"
output_config = {"random_name": {"type": "dummy_output"}}
with mock.patch("threading.Thread"):
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
listener = ComponentQueueListener(queue, target)
listener = ComponentQueueListener(queue, target, output_config)
with mock.patch.object(queue, "put") as mock_put:
listener.stop()
mock_put.assert_called_with(listener.sentinel)

def test_stop_joins_process(self):
target = lambda x: None
with mock.patch("multiprocessing.Process"):
target = "store"
output_config = {"random_name": {"type": "dummy_output"}}
with mock.patch("threading.Thread"):
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
listener = ComponentQueueListener(queue, target)
listener = ComponentQueueListener(queue, target, output_config)
listener.stop()
listener._instance.join.assert_called()

def test_stop_closes_queue(self):
target = lambda x: None
with mock.patch("multiprocessing.Process"):
target = "store"
output_config = {"random_name": {"type": "dummy_output"}}
with mock.patch("threading.Thread"):
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
listener = ComponentQueueListener(queue, target)
listener = ComponentQueueListener(queue, target, output_config)
with mock.patch.object(queue, "close") as mock_close:
listener.stop()
mock_close.assert_called()

def test_listen_calls_target(self):
target = mock.MagicMock()
target = "store"
output_config = {"random_name": {"type": "dummy_output"}}
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
listener = ComponentQueueListener(queue, target)
listener.queue.put("test")
listener.queue.put(listener.sentinel)
listener._listen()
target.assert_called_with("test")
with mock.patch("logprep.connector.dummy.output.DummyOutput.store") as mock_store:
listener = ComponentQueueListener(queue, target, output_config)
listener.queue.put("test")
listener.queue.put(listener.sentinel)
listener._listen()
mock_store.assert_called_with("test")

0 comments on commit 4512c25

Please sign in to comment.