Skip to content

Commit

Permalink
fix acceptance tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Oct 21, 2024
1 parent 08c51a9 commit 662acae
Showing 1 changed file with 6 additions and 11 deletions.
17 changes: 6 additions & 11 deletions logprep/framework/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,6 @@ class ComponentQueueListener:
"""The implementation to use for the listener. Options are threading or multiprocessing.
Default is threading."""

component: Component = field(init=False)
"""The component instance that is listening to the queue."""

def __attrs_post_init__(self):
if self._implementation == "threading":
self._instance = threading.Thread(target=self._listen, daemon=True)
Expand All @@ -97,8 +94,6 @@ def __attrs_post_init__(self):
def start(self):
"""Start the listener."""
logger.debug("Starting listener with target: %s", self.target)
if not hasattr(self, "component"):
raise AttributeError("Component instance not set.")
self._instance.start()

def get_component_instance(self) -> Component:
Expand All @@ -117,9 +112,8 @@ def get_component_instance(self) -> Component:
return component

def _listen(self):
if not self.component:
return
target = getattr(self.component, self.target)
component = self.get_component_instance()
target = getattr(component, self.target)
while 1:
item = self.queue.get()
logger.debug("Got item from queue: %s", item)
Expand All @@ -131,7 +125,7 @@ def _listen(self):
except Exception as error: # pylint: disable=broad-except
logger.error("Error processing item: %s due to %s", item, error)
self._drain_queue(target)
self.component.shut_down()
component.shut_down()

def _drain_queue(self, target):
while not self.queue.empty():
Expand Down Expand Up @@ -216,7 +210,6 @@ def _setup_error_queue(self):
self._configuration.error_output,
implementation=self._configuration.component_queue_listener_implementation,
)
self._error_listener.component = self._error_listener.get_component_instance()
self._error_listener.start()
# wait for the error listener to be ready before starting the pipelines
if self.error_queue.get(block=True) is None:
Expand Down Expand Up @@ -342,7 +335,9 @@ def _create_pipeline(self, index) -> multiprocessing.Process:
if pipeline.pipeline_index == 1 and self.prometheus_exporter:
pipeline_health_functions = pipeline.get_health_functions()
error_output_health_function = (
self._error_listener.component.health if self._error_listener else None
self._error_listener.get_component_instance().health
if self._error_listener
else None
)
self.prometheus_exporter.update_healthchecks(
[error_output_health_function, *pipeline_health_functions]
Expand Down

0 comments on commit 662acae

Please sign in to comment.