Skip to content

Commit

Permalink
add error_output to health checks
Browse files Browse the repository at this point in the history
* extract component instance creation from usage location which
simplifies testing
  • Loading branch information
ekneg54 committed Oct 21, 2024
1 parent c373691 commit 0209d36
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 41 deletions.
4 changes: 2 additions & 2 deletions logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from functools import cached_property, partial
from importlib.metadata import version
from multiprocessing import Value, current_process
from typing import Any, List, Tuple
from typing import Any, Callable, List, Tuple

import attrs

Expand Down Expand Up @@ -345,7 +345,7 @@ def stop(self) -> None:
with self._continue_iterating.get_lock():
self._continue_iterating.value = False

def get_health_functions(self) -> Tuple[bool]:
def get_health_functions(self) -> Tuple[Callable]:
"""Return health function of components"""
output_health_functions = []
if self._output:
Expand Down
29 changes: 22 additions & 7 deletions logprep/framework/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,17 @@ class ComponentQueueListener:
"""The sentinel object to stop the process. This has to implement identity comparison."""

_instance: multiprocessing.Process = field(init=False)
"""The process that is forked to listen to the queue."""
"""The process or thread instance that is listening to the queue."""

_implementation: str = field(
default="threading", validator=validators.in_(["threading", "multiprocessing"])
)
"""The implementation to use for the listener. Options are threading or multiprocessing.
Default is threading."""

component: Component = field(init=False)
"""The component instance that is listening to the queue."""

def __attrs_post_init__(self):
if self._implementation == "threading":
self._instance = threading.Thread(target=self._listen, daemon=True)
Expand All @@ -94,9 +97,12 @@ def __attrs_post_init__(self):
def start(self):
"""Start the listener."""
logger.debug("Starting listener with target: %s", self.target)
if not hasattr(self, "component"):
raise AttributeError("Component instance not set.")
self._instance.start()

def _get_component_instance(self):
def get_component_instance(self) -> Component:
"""Create and setup the component instance."""
component = Factory.create(self.config)
try:
component.setup()
Expand All @@ -111,8 +117,9 @@ def _get_component_instance(self):
return component

def _listen(self):
component = self._get_component_instance()
target = getattr(component, self.target)
if not self.component:
return
target = getattr(self.component, self.target)
while 1:
item = self.queue.get()
logger.debug("Got item from queue: %s", item)
Expand All @@ -124,7 +131,7 @@ def _listen(self):
except Exception as error: # pylint: disable=broad-except
logger.error("Error processing item: %s due to %s", item, error)
self._drain_queue(target)
component.shut_down()
self.component.shut_down()

def _drain_queue(self, target):
while not self.queue.empty():
Expand Down Expand Up @@ -209,6 +216,7 @@ def _setup_error_queue(self):
self._configuration.error_output,
implementation=self._configuration.component_queue_listener_implementation,
)
self._error_listener.component = self._error_listener.get_component_instance()
self._error_listener.start()
# wait for the error listener to be ready before starting the pipelines
if self.error_queue.get(block=True) is None:
Expand Down Expand Up @@ -304,7 +312,8 @@ def stop(self):
if self._error_listener:
self._error_listener.stop()
if self.prometheus_exporter:
self.prometheus_exporter.server.shut_down()
if self.prometheus_exporter.server:
self.prometheus_exporter.server.shut_down()
self.prometheus_exporter.cleanup_prometheus_multiprocess_dir()
logger.info("Shutdown complete")
if self.loghandler is not None:
Expand All @@ -331,7 +340,13 @@ def _create_pipeline(self, index) -> multiprocessing.Process:
error_queue=self.error_queue,
)
if pipeline.pipeline_index == 1 and self.prometheus_exporter:
self.prometheus_exporter.update_healthchecks(pipeline.get_health_functions())
pipeline_health_functions = pipeline.get_health_functions()
error_output_health_function = (
self._error_listener.component.health if self._error_listener else None
)
self.prometheus_exporter.update_healthchecks(
[error_output_health_function, *pipeline_health_functions]
)
process = multiprocessing.Process(
target=pipeline.run, daemon=True, name=f"Pipeline-{index}"
)
Expand Down
51 changes: 19 additions & 32 deletions tests/unit/framework/test_pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,15 +495,12 @@ def test_start_starts_thread(self):
listener.start()
mock_start.assert_called()

@mock.patch(
"logprep.framework.pipeline_manager.ComponentQueueListener._get_component_instance",
new=mock.MagicMock(),
)
def test_sentinel_breaks_while_loop(self):
target = "store"
output_config = {"random_name": {"type": "dummy_output"}}
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
listener = ComponentQueueListener(queue, target, output_config)
listener.component = mock.MagicMock()
listener.queue.put(listener.sentinel)
with mock.patch("logging.Logger.debug") as mock_debug:
listener._listen()
Expand Down Expand Up @@ -533,35 +530,31 @@ def test_listen_calls_target(self):
output_config = {"random_name": {"type": "dummy_output"}}
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
queue.empty = mock.MagicMock(return_value=True)
with mock.patch("logprep.connector.dummy.output.DummyOutput.store") as mock_store:
listener = ComponentQueueListener(queue, target, output_config)
listener.queue.put("test")
listener.queue.put(listener.sentinel)
listener._listen()
mock_store.assert_called_with("test")
listener = ComponentQueueListener(queue, target, output_config)
listener.component = mock.MagicMock()
listener.queue.put("test")
listener.queue.put(listener.sentinel)
listener._listen()
listener.component.store.assert_called_with("test")

def test_listen_creates_component(self):
def test_get_component_instance_creates_component(self):
target = "store"
output_config = {"random_name": {"type": "dummy_output"}}
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
queue.empty = mock.MagicMock(return_value=True)
listener = ComponentQueueListener(queue, target, output_config)
with mock.patch("logprep.factory.Factory.create") as mock_create:
listener.queue.put("test")
listener.queue.put(listener.sentinel)
listener._listen()
_ = listener.get_component_instance()
mock_create.assert_called_with(output_config)

def test_listen_setups_component(self):
def test_get_component_instance_setups_component(self):
target = "store"
output_config = {"random_name": {"type": "dummy_output"}}
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
queue.empty = mock.MagicMock(return_value=True)
listener = ComponentQueueListener(queue, target, output_config)
with mock.patch("logprep.connector.dummy.output.DummyOutput.setup") as mock_setup:
listener.queue.put("test")
listener.queue.put(listener.sentinel)
listener._listen()
_ = listener.get_component_instance()
mock_setup.assert_called()

def test_get_component_instance_raises_if_setup_not_successful(self):
Expand All @@ -572,43 +565,37 @@ def test_get_component_instance_raises_if_setup_not_successful(self):
with mock.patch("logprep.connector.dummy.output.DummyOutput.setup") as mock_setup:
mock_setup.side_effect = SystemExit(4)
with pytest.raises(SystemExit, match="4"):
listener._listen()
listener.get_component_instance()

def test_listen_calls_component_shutdown_by_injecting_sentinel(self):
target = "store"
output_config = {"random_name": {"type": "dummy_output"}}
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
queue.empty = mock.MagicMock(return_value=True)
listener = ComponentQueueListener(queue, target, output_config)
with mock.patch("logprep.connector.dummy.output.DummyOutput.shut_down") as mock_shutdown:
listener.queue.put("test")
listener.queue.put(listener.sentinel)
listener._listen()
mock_shutdown.assert_called()
listener.component = mock.MagicMock()
listener.queue.put("test")
listener.queue.put(listener.sentinel)
listener._listen()
listener.component.shut_down.assert_called()

@mock.patch(
"logprep.framework.pipeline_manager.ComponentQueueListener._get_component_instance",
new=mock.MagicMock(),
)
def test_listen_drains_queue_on_shutdown(self):
target = "store"
output_config = {"random_name": {"type": "dummy_output"}}
queue = multiprocessing.Queue()
listener = ComponentQueueListener(queue, target, output_config)
listener.component = mock.MagicMock()
listener.queue.put(listener.sentinel)
listener.queue.put("test")
listener._listen()
assert listener.queue.qsize() == 0

@mock.patch(
"logprep.framework.pipeline_manager.ComponentQueueListener._get_component_instance",
new=mock.MagicMock(),
)
def test_listen_ensures_error_queue_is_closed_after_drained(self):
target = "store"
output_config = {"random_name": {"type": "dummy_output"}}
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
listener = ComponentQueueListener(queue, target, output_config)
listener.component = mock.MagicMock()
random_number = random.randint(10, 50)
listener.queue.put(listener.sentinel)
for _ in range(random_number):
Expand Down

0 comments on commit 0209d36

Please sign in to comment.