Skip to content

Commit

Permalink
add debugging and additional tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Oct 1, 2024
1 parent fdd9564 commit 72b7882
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 4 deletions.
15 changes: 13 additions & 2 deletions logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ def enqueue_error(
if self._input:
self._input.batch_finished_callback()
return
self.logger.debug(f"Enqueuing error item: {item}")
event: dict | list = None
match item:
case CriticalOutputError():
Expand All @@ -378,12 +379,20 @@ def enqueue_error(
event = [{"event": str(i), "errors": "Unknown error"} for i in item]
case _:
event = {"event": str(item), "errors": "Unknown error"}
if event is None:
self.logger.error("Tried to enqueue sentinel -> aborted!")
return
try:
if isinstance(event, list):
for i in event:
self.error_queue.put(i)
self.logger.debug(f"Enqueuing error item: {i}")
self.error_queue.put(i, timeout=0.1)
self.logger.debug("Enqueued error item")

else:
self.error_queue.put(event)
self.logger.debug(f"Enqueuing error item: {event}")
self.error_queue.put(event, timeout=0.1)
self.logger.debug("Enqueued error item")
except Exception as error: # pylint: disable=broad-except
self.logger.error((f"Couldn't enqueue error item due to: {error} | Item: '{event}'"))
if self._input:
Expand All @@ -404,3 +413,5 @@ def _get_output_error_event(self, item: CriticalOutputError) -> dict:
return [{"event": str(i), "errors": str(item.message)} for i in raw_input]
case CriticalOutputError(raw_input) if isinstance(raw_input, (str, bytes)):
return {"event": str(raw_input), "errors": str(item.message)}
case _:
return {"event": str(item.raw_input), "errors": str(item.message)}
17 changes: 15 additions & 2 deletions logprep/framework/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ class ComponentQueueListener:
"""The implementation to use for the listener. Options are threading or multiprocessing.
Default is threading."""

setup_successful: bool = field(default=False, init=False)
"""Flag to indicate if the setup of the component was successful."""

def __attrs_post_init__(self):
if self._implementation == "threading":
self._instance = threading.Thread(target=self._listen, daemon=True)
Expand All @@ -107,12 +110,15 @@ def _get_component_instance(self):
continue
if component.health():
break
else:
raise SystemExit(EXITCODES.ERROR_OUTPUT_NOT_REACHABLE.value)
return component

def _listen(self):
component = self._get_component_instance()
self.setup_successful = True
target = getattr(component, self.target)
while True:
while 1:
item = self.queue.get()
logger.debug("Got item from queue: %s", item)
if item is self.sentinel:
Expand Down Expand Up @@ -196,6 +202,11 @@ def _setup_error_queue(self):
self._error_queue, "store", self._configuration.error_output
)
self._error_listener.start()
while 1:
if self._error_listener.setup_successful:
break
logger.debug("Waiting for error output to be ready...")
time.sleep(1)

def _setup_logging(self):
console_logger = logging.getLogger("console")
Expand Down Expand Up @@ -316,7 +327,9 @@ def _create_pipeline(self, index) -> multiprocessing.Process:
)
if pipeline.pipeline_index == 1 and self.prometheus_exporter:
self.prometheus_exporter.update_healthchecks(pipeline.get_health_functions())
process = threading.Thread(target=pipeline.run, daemon=True, name=f"Pipeline-{index}")
process = multiprocessing.Process(
target=pipeline.run, daemon=True, name=f"Pipeline-{index}"
)
process.stop = pipeline.stop
process.start()
logger.info("Created new pipeline")
Expand Down
9 changes: 9 additions & 0 deletions tests/unit/framework/test_pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,3 +585,12 @@ def test_listen_calls_target(self):
listener.queue.put(listener.sentinel)
listener._listen()
mock_store.assert_called_with("test")

def test_listen_creates_component(self):
assert False

def test_get_component_instance_setups_component(self):
assert False

def test_get_component_instance_raises_if_setup_not_successful(self):
assert False

0 comments on commit 72b7882

Please sign in to comment.