Skip to content

Commit

Permalink
Working error output start bahavior
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Oct 1, 2024
1 parent 61e184d commit ea2d570
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 85 deletions.
1 change: 0 additions & 1 deletion logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,6 @@ def enqueue_error(
self.logger.debug(f"Enqueuing error item: {i}")
self.error_queue.put(i, timeout=0.1)
self.logger.debug("Enqueued error item")

else:
self.logger.debug(f"Enqueuing error item: {event}")
self.error_queue.put(event, timeout=0.1)
Expand Down
36 changes: 12 additions & 24 deletions logprep/framework/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
import multiprocessing.managers
import multiprocessing.queues
import random
import sys
import threading
import time
from typing import Any

from attr import define, field, validators

from logprep.abc.component import Component
from logprep.abc.output import Output
from logprep.connector.http.input import HttpInput
from logprep.factory import Factory
from logprep.framework.pipeline import Pipeline
Expand Down Expand Up @@ -85,9 +85,6 @@ class ComponentQueueListener:
"""The implementation to use for the listener. Options are threading or multiprocessing.
Default is threading."""

setup_successful: bool = field(default=False, init=False)
"""Flag to indicate if the setup of the component was successful."""

def __attrs_post_init__(self):
if self._implementation == "threading":
self._instance = threading.Thread(target=self._listen, daemon=True)
Expand All @@ -101,22 +98,17 @@ def start(self):

def _get_component_instance(self):
component = Factory.create(self.config)
for _ in range(5):
try:
component.setup()
except SystemExit:
logger.warning("Error output not reachable. Try again...")
time.sleep(3)
continue
if component.health():
break
else:
raise SystemExit(EXITCODES.ERROR_OUTPUT_NOT_REACHABLE.value)
try:
component.setup()
self.queue.put(1)
except SystemExit as error:
logger.error("Error output not reachable. Exiting...")
self.queue.put(self.sentinel)
raise error from error
return component

def _listen(self):
component = self._get_component_instance()
self.setup_successful = True
target = getattr(component, self.target)
while 1:
item = self.queue.get()
Expand Down Expand Up @@ -174,7 +166,6 @@ def __init__(self, configuration: Configuration):
self.metrics = self.Metrics(labels={"component": "manager"})
self.loghandler: LogprepMPQueueListener = None
self._error_queue: multiprocessing.Queue | None = None
self._error_output: Output | None = None
self._error_listener: ComponentQueueListener | None = None
self._configuration: Configuration = configuration
self._pipelines: list[multiprocessing.Process] = []
Expand Down Expand Up @@ -202,11 +193,10 @@ def _setup_error_queue(self):
self._error_queue, "store", self._configuration.error_output
)
self._error_listener.start()
while 1:
if self._error_listener.setup_successful:
break
logger.debug("Waiting for error output to be ready...")
time.sleep(1)
# wait for the error listener to be ready before starting the pipelines
if self._error_queue.get(block=True) is None:
self.stop()
sys.exit(EXITCODES.ERROR_OUTPUT_NOT_REACHABLE.value)

def _setup_logging(self):
console_logger = logging.getLogger("console")
Expand Down Expand Up @@ -296,8 +286,6 @@ def stop(self):
self.set_count(0)
if self._error_listener:
self._error_listener.stop()
if self._error_output:
self._error_output.shut_down()
if self.prometheus_exporter:
self.prometheus_exporter.server.shut_down()
self.prometheus_exporter.cleanup_prometheus_multiprocess_dir()
Expand Down
69 changes: 9 additions & 60 deletions tests/unit/framework/test_pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,77 +269,26 @@ def test_setup_error_queue_sets_error_queue_and_starts_listener(self):
with mock.patch("logprep.framework.pipeline_manager.ComponentQueueListener"):
manager = PipelineManager(self.config)
assert manager._error_queue is not None
assert manager._error_output is not None
assert manager._error_listener is not None
manager._error_listener.start.assert_called() # pylint: disable=no-member

def test_setup_does_not_sets_error_queue_if_no_error_output(self):
self.config.error_output = {}
manager = PipelineManager(self.config)
assert manager._error_queue is None
assert manager._error_output is None
assert manager._error_listener is None

def test_setup_error_queue_calls_setup_on_error_output_at_minimum_once(self):
def test_setup_error_queue_raises_system_exit_if_error_listener_fails(self):
self.config.error_output = {"dummy": {"type": "dummy_output"}}
with mock.patch("logprep.framework.pipeline_manager.ComponentQueueListener"):
with mock.patch("logprep.connector.dummy.output.DummyOutput.setup") as setup_mock:
manager = PipelineManager(self.config)
assert manager._error_queue is not None
assert manager._error_output is not None
assert manager._error_listener is not None
setup_mock.assert_called()

def test_setup_error_queue_calls_setup_and_raises_sys_exit_if_should_exit(self, caplog):
self.config.error_output = {"dummy": {"type": "dummy_output"}}
with mock.patch("logprep.connector.dummy.output.DummyOutput.setup") as setup_mock:
with mock.patch(
"logprep.framework.pipeline_manager.PipelineManager.should_exit", return_value=True
) as should_exit_mock:
setup_mock.side_effect = SystemExit
mock_listener = mock.MagicMock()
mock_listener.setup_successful = False
with mock.patch(
"logprep.framework.pipeline_manager.ComponentQueueListener", return_value=mock_listener
):
with mock.patch("logging.Logger.error") as mock_error:
with pytest.raises(SystemExit):
manager = PipelineManager(self.config)
assert manager._error_queue is not None
assert manager._error_output is not None
assert manager._error_listener is not None
setup_mock.assert_called()
should_exit_mock.assert_called()
assert "Exiting..." in caplog.text

def test_setup_error_queue_calls_setup_and_tries_again(self, caplog):
self.config.error_output = {"dummy": {"type": "dummy_output"}}
with mock.patch("logprep.connector.dummy.output.DummyOutput.setup") as setup_mock:
with mock.patch(
"logprep.framework.pipeline_manager.PipelineManager.should_exit"
) as should_exit_mock:
setup_mock.side_effect = SystemExit
should_exit_mock.side_effect = [False, True]
manager = PipelineManager(self.config)
assert manager._error_queue is not None
assert manager._error_output is not None
assert manager._error_listener is not None
setup_mock.assert_called()
should_exit_mock.assert_called()
assert "Trying again..." in caplog.text

def test_setup_error_queue_calls_tries_infinite_on_negative_restart_count(self, caplog):
self.config.error_output = {"dummy": {"type": "dummy_output"}}
self.config.restart_count = -1
with mock.patch("logprep.framework.pipeline_manager.ComponentQueueListener"):
with mock.patch("logprep.connector.dummy.output.DummyOutput.setup") as setup_mock:
with mock.patch(
"logprep.framework.pipeline_manager.PipelineManager.should_exit"
) as should_exit_mock:
setup_mock.side_effect = [SystemExit, None]
should_exit_mock.side_effect = [False, True]
manager = PipelineManager(self.config)
assert manager._error_queue is not None
assert manager._error_output is not None
assert manager._error_listener is not None
setup_mock.assert_called()
assert setup_mock.call_count == 2
should_exit_mock.assert_not_called()
assert "Try again infinite..." in caplog.text
PipelineManager(self.config)
mock_error.assert_called()

def test_should_exit_returns_bool_based_on_restart_count(self):
self.config.restart_count = 2
Expand Down

0 comments on commit ea2d570

Please sign in to comment.