From e0a0249cea9cb8f8eea77fa70b639c3e9838834a Mon Sep 17 00:00:00 2001 From: Abhishek Gupta Date: Fri, 1 Dec 2023 16:49:02 -0500 Subject: [PATCH 1/2] Added logging and gracefully terminating workers --- .../client/automator/task_handler.py | 81 +++++++++++++++---- src/conductor/client/automator/task_runner.py | 57 ++++++++----- .../client/configuration/configuration.py | 11 +++ .../client/worker/worker_interface.py | 7 ++ 4 files changed, 120 insertions(+), 36 deletions(-) diff --git a/src/conductor/client/automator/task_handler.py b/src/conductor/client/automator/task_handler.py index 1714320a..5b4ba525 100644 --- a/src/conductor/client/automator/task_handler.py +++ b/src/conductor/client/automator/task_handler.py @@ -4,8 +4,9 @@ from conductor.client.telemetry.metrics_collector import MetricsCollector from conductor.client.worker.worker import Worker from conductor.client.worker.worker_interface import WorkerInterface -from multiprocessing import Process, freeze_support +from multiprocessing import Process, freeze_support, Queue from configparser import ConfigParser +from logging.handlers import QueueHandler from typing import List import ast import astor @@ -20,7 +21,6 @@ ) ) - def get_annotated_workers(): pkg = __get_client_topmost_package_filepath() workers = __get_annotated_workers_from_subtree(pkg) @@ -36,6 +36,7 @@ def __init__( metrics_settings: MetricsSettings = None, scan_for_annotated_workers: bool = None, ): + self.logger_process, self.queue = setup_logging_queue(configuration) self.worker_config = load_worker_config() if workers is None: workers = [] @@ -61,7 +62,10 @@ def __exit__(self, exc_type, exc_value, traceback): def stop_processes(self) -> None: self.__stop_task_runner_processes() self.__stop_metrics_provider_process() - logger.debug('stopped processes') + logger.info('Stopped worker processes...') + logger.info('Stopping logger process...') + self.queue.put(None) + self.logger_process.terminate() def start_processes(self) -> None: logger.info('Starting worker processes...') @@ -71,9 +75,13 @@ def start_processes(self) -> None: logger.info('Started all processes') def join_processes(self) -> None: - self.__join_task_runner_processes() - self.__join_metrics_provider_process() - logger.info('Joined all processes') + try: + self.__join_task_runner_processes() + self.__join_metrics_provider_process() + logger.info('Joined all processes') + except KeyboardInterrupt: + logger.info('KeyboardInterrupt: Stopping all processes') + self.stop_processes() def __create_metrics_provider_process(self, metrics_settings: MetricsSettings) -> None: if metrics_settings == None: @@ -108,7 +116,7 @@ def __create_task_runner_process( worker, configuration, metrics_settings, self.worker_config ) process = Process( - target=task_runner.run + target=task_runner.run, args=(self.queue,) ) self.task_runner_processes.append(process) @@ -145,13 +153,12 @@ def __stop_process(self, process: Process): if process == None: return try: - process.kill() - logger.debug(f'Killed process: {process}') - except Exception as e: - logger.debug(f'Failed to kill process: {process}, reason: {e}') + logger.debug(f'Terminating process: {process.pid}') process.terminate() - logger.debug('Terminated process: {process}') - + except Exception as e: + logger.debug(f'Failed to terminate process: {process.pid}, reason: {e}') + process.kill() + logger.debug(f'Killed process: {process.pid}') def __get_client_topmost_package_filepath(): module = inspect.getmodule(inspect.stack()[-1][0]) @@ -242,4 +249,50 @@ def load_worker_config(): return worker_config def __get_config_file_path() -> str: - return os.getcwd() + "/worker.ini" \ No newline at end of file + return os.getcwd() + "/worker.ini" + +# Setup centralized logging queue +def setup_logging_queue(configuration): + queue = Queue() + logger.addHandler(QueueHandler(queue)) + if configuration: + configuration.apply_logging_config() + log_level = configuration.log_level + logger_format = configuration.logger_format + else: + log_level = logging.DEBUG + logger_format = None + + logger.setLevel(log_level) + + # start the logger process + logger_p = Process(target=__logger_process, args=(queue, log_level, logger_format)) + logger_p.start() + return logger_p, queue + +# This process performs the centralized logging +def __logger_process(queue, log_level, logger_format=None): + c_logger = logging.getLogger( + Configuration.get_logging_formatted_name( + __name__ + ) + ) + + c_logger.setLevel(log_level) + + # configure a stream handler + sh = logging.StreamHandler() + if logger_format: + formatter = logging.Formatter(logger_format) + sh.setFormatter(formatter) + c_logger.addHandler(sh) + + # run forever + while True: + # consume a log message, block until one arrives + message = queue.get() + # check for shutdown + if message is None: + break + # log the message + c_logger.handle(message) \ No newline at end of file diff --git a/src/conductor/client/automator/task_runner.py b/src/conductor/client/automator/task_runner.py index a8da405d..7db5aae1 100644 --- a/src/conductor/client/automator/task_runner.py +++ b/src/conductor/client/automator/task_runner.py @@ -8,19 +8,14 @@ from conductor.client.telemetry.metrics_collector import MetricsCollector from conductor.client.worker.worker_interface import WorkerInterface, DEFAULT_POLLING_INTERVAL from configparser import ConfigParser +from logging.handlers import QueueHandler +from multiprocessing import Queue import logging import sys import time import traceback import os -logger = logging.getLogger( - Configuration.get_logging_formatted_name( - __name__ - ) -) - - class TaskRunner: def __init__( self, @@ -48,9 +43,25 @@ def __init__( ) ) - def run(self) -> None: + def run(self, queue) -> None: + # Setup shared logger using Queues + self.logger = logging.getLogger( + Configuration.get_logging_formatted_name( + __name__ + ) + ) + # Add a handler that uses the shared queue + if queue: + self.logger.addHandler(QueueHandler(queue)) + if self.configuration != None: self.configuration.apply_logging_config() + else: + self.logger.setLevel(logging.DEBUG) + + task_names = ','.join(self.worker.task_definition_names) + self.logger.info(f'Started worker process for task(s): {task_names}') + while True: try: self.run_once() @@ -68,13 +79,13 @@ def run_once(self) -> None: def __poll_task(self) -> Task: task_definition_name = self.worker.get_task_definition_name() if self.worker.paused(): - logger.debug(f'Stop polling task for: {task_definition_name}') + self.logger.debug(f'Stop polling task for: {task_definition_name}') return None if self.metrics_collector is not None: self.metrics_collector.increment_task_poll( task_definition_name ) - logger.debug(f'Polling task for: {task_definition_name}') + self.logger.debug(f'Polling task for: {task_definition_name}') try: start_time = time.time() domain = self.worker.get_domain() @@ -96,12 +107,12 @@ def __poll_task(self) -> Task: self.metrics_collector.increment_task_poll_error( task_definition_name, type(e) ) - logger.error( + self.logger.error( f'Failed to poll task for: {task_definition_name}, reason: {traceback.format_exc()}' ) return None if task != None: - logger.debug( + self.logger.debug( f'Polled task: {task_definition_name}, worker_id: {self.worker.get_identity()}, domain: {self.worker.get_domain()}' ) return task @@ -110,7 +121,7 @@ def __execute_task(self, task: Task) -> TaskResult: if not isinstance(task, Task): return None task_definition_name = self.worker.get_task_definition_name() - logger.debug( + self.logger.debug( 'Executing task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}'.format( task_id=task.task_id, workflow_instance_id=task.workflow_instance_id, @@ -131,7 +142,7 @@ def __execute_task(self, task: Task) -> TaskResult: task_definition_name, sys.getsizeof(task_result) ) - logger.debug( + self.logger.debug( 'Executed task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}'.format( task_id=task.task_id, workflow_instance_id=task.workflow_instance_id, @@ -152,7 +163,7 @@ def __execute_task(self, task: Task) -> TaskResult: task_result.reason_for_incompletion = str(e) task_result.logs = [TaskExecLog( traceback.format_exc(), task_result.task_id, int(time.time()))] - logger.error( + self.logger.error( 'Failed to execute task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}, reason: {reason}'.format( task_id=task.task_id, workflow_instance_id=task.workflow_instance_id, @@ -166,7 +177,7 @@ def __update_task(self, task_result: TaskResult): if not isinstance(task_result, TaskResult): return None task_definition_name = self.worker.get_task_definition_name() - logger.debug( + self.logger.debug( 'Updating task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}'.format( task_id=task_result.task_id, workflow_instance_id=task_result.workflow_instance_id, @@ -179,7 +190,7 @@ def __update_task(self, task_result: TaskResult): time.sleep(attempt * 10) try: response = self.task_client.update_task(body=task_result) - logger.debug( + self.logger.debug( 'Updated task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}, response: {response}'.format( task_id=task_result.task_id, workflow_instance_id=task_result.workflow_instance_id, @@ -193,7 +204,7 @@ def __update_task(self, task_result: TaskResult): self.metrics_collector.increment_task_update_error( task_definition_name, type(e) ) - logger.error( + self.logger.error( 'Failed to update task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}, reason: {reason}'.format( task_id=task_result.task_id, workflow_instance_id=task_result.workflow_instance_id, @@ -205,10 +216,12 @@ def __update_task(self, task_result: TaskResult): def __wait_for_polling_interval(self) -> None: polling_interval = self.worker.get_polling_interval_in_seconds() - logger.debug(f'Sleep for {polling_interval} seconds') + self.logger.debug(f'Sleep for {polling_interval} seconds') time.sleep(polling_interval) def __set_worker_properties(self) -> None: + # If multiple tasks are supplied to the same worker, then only first + # task will be considered for setting worker properties task_type = self.worker.get_task_definition_name() # Fetch from ENV Variables if present @@ -223,7 +236,7 @@ def __set_worker_properties(self) -> None: self.worker.poll_interval = float(polling_interval) polling_interval_initialized = True except Exception as e: - logger.error("Exception in reading polling interval from environment variable: {0}.".format(str(e))) + self.logger.error("Exception in reading polling interval from environment variable: {0}.".format(str(e))) # Fetch from Config if present if not domain or not polling_interval_initialized: @@ -247,9 +260,9 @@ def __set_worker_properties(self) -> None: try: # Read polling interval from config self.worker.poll_interval = float(section.get("polling_interval", default_polling_interval)) - logger.debug("Override polling interval to {0} ms".format(self.worker.poll_interval)) + self.logger.debug("Override polling interval to {0} ms".format(self.worker.poll_interval)) except Exception as e: - logger.error("Exception reading polling interval: {0}. Defaulting to {1} ms".format(str(e), default_polling_interval)) + self.logger.error("Exception reading polling interval: {0}. Defaulting to {1} ms".format(str(e), default_polling_interval)) def __get_property_value_from_env(self, prop, task_type): prefix = "conductor_worker" diff --git a/src/conductor/client/configuration/configuration.py b/src/conductor/client/configuration/configuration.py index 0a312ffd..cde836cb 100644 --- a/src/conductor/client/configuration/configuration.py +++ b/src/conductor/client/configuration/configuration.py @@ -93,6 +93,17 @@ def logger_format(self, value): """ self.__logger_format = value + @property + def log_level(self): + """The log level. + + The log_level will be updated when sets logger_format. + + :param value: The format string. + :type: str + """ + return self.__log_level + def apply_logging_config(self): logging.basicConfig( format=self.logger_format, diff --git a/src/conductor/client/worker/worker_interface.py b/src/conductor/client/worker/worker_interface.py index 2ff98331..72512fc6 100644 --- a/src/conductor/client/worker/worker_interface.py +++ b/src/conductor/client/worker/worker_interface.py @@ -51,6 +51,13 @@ def get_task_definition_name(self) -> str: """ return self.task_definition_name_cache + @property + def task_definition_names(self): + if isinstance(self.task_definition_name, list): + return self.task_definition_name + else: + return [self.task_definition_name] + @property def task_definition_name_cache(self): if self._task_definition_name_cache is None: From cd7ba200720fbcf231b7232f5a49227bb31f0986 Mon Sep 17 00:00:00 2001 From: Abhishek Gupta Date: Sun, 3 Dec 2023 10:37:35 -0500 Subject: [PATCH 2/2] Fixed unit tests --- src/conductor/client/automator/task_runner.py | 46 +++++++++---------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/src/conductor/client/automator/task_runner.py b/src/conductor/client/automator/task_runner.py index 7db5aae1..970564f6 100644 --- a/src/conductor/client/automator/task_runner.py +++ b/src/conductor/client/automator/task_runner.py @@ -16,6 +16,12 @@ import traceback import os +logger = logging.getLogger( + Configuration.get_logging_formatted_name( + __name__ + ) +) + class TaskRunner: def __init__( self, @@ -44,23 +50,17 @@ def __init__( ) def run(self, queue) -> None: - # Setup shared logger using Queues - self.logger = logging.getLogger( - Configuration.get_logging_formatted_name( - __name__ - ) - ) # Add a handler that uses the shared queue if queue: - self.logger.addHandler(QueueHandler(queue)) + logger.addHandler(QueueHandler(queue)) if self.configuration != None: self.configuration.apply_logging_config() else: - self.logger.setLevel(logging.DEBUG) + logger.setLevel(logging.DEBUG) task_names = ','.join(self.worker.task_definition_names) - self.logger.info(f'Started worker process for task(s): {task_names}') + logger.info(f'Started worker process for task(s): {task_names}') while True: try: @@ -79,13 +79,13 @@ def run_once(self) -> None: def __poll_task(self) -> Task: task_definition_name = self.worker.get_task_definition_name() if self.worker.paused(): - self.logger.debug(f'Stop polling task for: {task_definition_name}') + logger.debug(f'Stop polling task for: {task_definition_name}') return None if self.metrics_collector is not None: self.metrics_collector.increment_task_poll( task_definition_name ) - self.logger.debug(f'Polling task for: {task_definition_name}') + logger.debug(f'Polling task for: {task_definition_name}') try: start_time = time.time() domain = self.worker.get_domain() @@ -107,12 +107,12 @@ def __poll_task(self) -> Task: self.metrics_collector.increment_task_poll_error( task_definition_name, type(e) ) - self.logger.error( + logger.error( f'Failed to poll task for: {task_definition_name}, reason: {traceback.format_exc()}' ) return None if task != None: - self.logger.debug( + logger.debug( f'Polled task: {task_definition_name}, worker_id: {self.worker.get_identity()}, domain: {self.worker.get_domain()}' ) return task @@ -121,7 +121,7 @@ def __execute_task(self, task: Task) -> TaskResult: if not isinstance(task, Task): return None task_definition_name = self.worker.get_task_definition_name() - self.logger.debug( + logger.debug( 'Executing task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}'.format( task_id=task.task_id, workflow_instance_id=task.workflow_instance_id, @@ -142,7 +142,7 @@ def __execute_task(self, task: Task) -> TaskResult: task_definition_name, sys.getsizeof(task_result) ) - self.logger.debug( + logger.debug( 'Executed task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}'.format( task_id=task.task_id, workflow_instance_id=task.workflow_instance_id, @@ -163,7 +163,7 @@ def __execute_task(self, task: Task) -> TaskResult: task_result.reason_for_incompletion = str(e) task_result.logs = [TaskExecLog( traceback.format_exc(), task_result.task_id, int(time.time()))] - self.logger.error( + logger.error( 'Failed to execute task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}, reason: {reason}'.format( task_id=task.task_id, workflow_instance_id=task.workflow_instance_id, @@ -177,7 +177,7 @@ def __update_task(self, task_result: TaskResult): if not isinstance(task_result, TaskResult): return None task_definition_name = self.worker.get_task_definition_name() - self.logger.debug( + logger.debug( 'Updating task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}'.format( task_id=task_result.task_id, workflow_instance_id=task_result.workflow_instance_id, @@ -190,7 +190,7 @@ def __update_task(self, task_result: TaskResult): time.sleep(attempt * 10) try: response = self.task_client.update_task(body=task_result) - self.logger.debug( + logger.debug( 'Updated task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}, response: {response}'.format( task_id=task_result.task_id, workflow_instance_id=task_result.workflow_instance_id, @@ -204,7 +204,7 @@ def __update_task(self, task_result: TaskResult): self.metrics_collector.increment_task_update_error( task_definition_name, type(e) ) - self.logger.error( + logger.error( 'Failed to update task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}, reason: {reason}'.format( task_id=task_result.task_id, workflow_instance_id=task_result.workflow_instance_id, @@ -216,7 +216,7 @@ def __update_task(self, task_result: TaskResult): def __wait_for_polling_interval(self) -> None: polling_interval = self.worker.get_polling_interval_in_seconds() - self.logger.debug(f'Sleep for {polling_interval} seconds') + logger.debug(f'Sleep for {polling_interval} seconds') time.sleep(polling_interval) def __set_worker_properties(self) -> None: @@ -236,7 +236,7 @@ def __set_worker_properties(self) -> None: self.worker.poll_interval = float(polling_interval) polling_interval_initialized = True except Exception as e: - self.logger.error("Exception in reading polling interval from environment variable: {0}.".format(str(e))) + logger.error("Exception in reading polling interval from environment variable: {0}.".format(str(e))) # Fetch from Config if present if not domain or not polling_interval_initialized: @@ -260,9 +260,9 @@ def __set_worker_properties(self) -> None: try: # Read polling interval from config self.worker.poll_interval = float(section.get("polling_interval", default_polling_interval)) - self.logger.debug("Override polling interval to {0} ms".format(self.worker.poll_interval)) + logger.debug("Override polling interval to {0} ms".format(self.worker.poll_interval)) except Exception as e: - self.logger.error("Exception reading polling interval: {0}. Defaulting to {1} ms".format(str(e), default_polling_interval)) + logger.error("Exception reading polling interval: {0}. Defaulting to {1} ms".format(str(e), default_polling_interval)) def __get_property_value_from_env(self, prop, task_type): prefix = "conductor_worker"