Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added logging and gracefully terminating workers #230

Merged
merged 2 commits into from
Dec 3, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 67 additions & 14 deletions src/conductor/client/automator/task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
from conductor.client.telemetry.metrics_collector import MetricsCollector
from conductor.client.worker.worker import Worker
from conductor.client.worker.worker_interface import WorkerInterface
from multiprocessing import Process, freeze_support
from multiprocessing import Process, freeze_support, Queue
from configparser import ConfigParser
from logging.handlers import QueueHandler
from typing import List
import ast
import astor
Expand All @@ -20,7 +21,6 @@
)
)


def get_annotated_workers():
pkg = __get_client_topmost_package_filepath()
workers = __get_annotated_workers_from_subtree(pkg)
Expand All @@ -36,6 +36,7 @@ def __init__(
metrics_settings: MetricsSettings = None,
scan_for_annotated_workers: bool = None,
):
self.logger_process, self.queue = setup_logging_queue(configuration)
self.worker_config = load_worker_config()
if workers is None:
workers = []
Expand All @@ -61,7 +62,10 @@ def __exit__(self, exc_type, exc_value, traceback):
def stop_processes(self) -> None:
self.__stop_task_runner_processes()
self.__stop_metrics_provider_process()
logger.debug('stopped processes')
logger.info('Stopped worker processes...')
logger.info('Stopping logger process...')
self.queue.put(None)
self.logger_process.terminate()

def start_processes(self) -> None:
logger.info('Starting worker processes...')
Expand All @@ -71,9 +75,13 @@ def start_processes(self) -> None:
logger.info('Started all processes')

def join_processes(self) -> None:
self.__join_task_runner_processes()
self.__join_metrics_provider_process()
logger.info('Joined all processes')
try:
self.__join_task_runner_processes()
self.__join_metrics_provider_process()
logger.info('Joined all processes')
except KeyboardInterrupt:
logger.info('KeyboardInterrupt: Stopping all processes')
self.stop_processes()

def __create_metrics_provider_process(self, metrics_settings: MetricsSettings) -> None:
if metrics_settings == None:
Expand Down Expand Up @@ -108,7 +116,7 @@ def __create_task_runner_process(
worker, configuration, metrics_settings, self.worker_config
)
process = Process(
target=task_runner.run
target=task_runner.run, args=(self.queue,)
)
self.task_runner_processes.append(process)

Expand Down Expand Up @@ -145,13 +153,12 @@ def __stop_process(self, process: Process):
if process == None:
return
try:
process.kill()
logger.debug(f'Killed process: {process}')
except Exception as e:
logger.debug(f'Failed to kill process: {process}, reason: {e}')
logger.debug(f'Terminating process: {process.pid}')
process.terminate()
logger.debug('Terminated process: {process}')

except Exception as e:
logger.debug(f'Failed to terminate process: {process.pid}, reason: {e}')
process.kill()
logger.debug(f'Killed process: {process.pid}')

def __get_client_topmost_package_filepath():
module = inspect.getmodule(inspect.stack()[-1][0])
Expand Down Expand Up @@ -242,4 +249,50 @@ def load_worker_config():
return worker_config

def __get_config_file_path() -> str:
return os.getcwd() + "/worker.ini"
return os.getcwd() + "/worker.ini"

# Setup centralized logging queue
def setup_logging_queue(configuration):
queue = Queue()
logger.addHandler(QueueHandler(queue))
if configuration:
configuration.apply_logging_config()
log_level = configuration.log_level
logger_format = configuration.logger_format
else:
log_level = logging.DEBUG
logger_format = None

logger.setLevel(log_level)

# start the logger process
logger_p = Process(target=__logger_process, args=(queue, log_level, logger_format))
logger_p.start()
return logger_p, queue

# This process performs the centralized logging
def __logger_process(queue, log_level, logger_format=None):
c_logger = logging.getLogger(
Configuration.get_logging_formatted_name(
__name__
)
)

c_logger.setLevel(log_level)

# configure a stream handler
sh = logging.StreamHandler()
if logger_format:
formatter = logging.Formatter(logger_format)
sh.setFormatter(formatter)
c_logger.addHandler(sh)

# run forever
while True:
# consume a log message, block until one arrives
message = queue.get()
# check for shutdown
if message is None:
break
# log the message
c_logger.handle(message)
57 changes: 35 additions & 22 deletions src/conductor/client/automator/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,14 @@
from conductor.client.telemetry.metrics_collector import MetricsCollector
from conductor.client.worker.worker_interface import WorkerInterface, DEFAULT_POLLING_INTERVAL
from configparser import ConfigParser
from logging.handlers import QueueHandler
from multiprocessing import Queue
import logging
import sys
import time
import traceback
import os

logger = logging.getLogger(
Configuration.get_logging_formatted_name(
__name__
)
)


class TaskRunner:
def __init__(
self,
Expand Down Expand Up @@ -48,9 +43,25 @@ def __init__(
)
)

def run(self) -> None:
def run(self, queue) -> None:
# Setup shared logger using Queues
self.logger = logging.getLogger(
Configuration.get_logging_formatted_name(
__name__
)
)
# Add a handler that uses the shared queue
if queue:
self.logger.addHandler(QueueHandler(queue))

if self.configuration != None:
self.configuration.apply_logging_config()
else:
self.logger.setLevel(logging.DEBUG)

task_names = ','.join(self.worker.task_definition_names)
self.logger.info(f'Started worker process for task(s): {task_names}')

while True:
try:
self.run_once()
Expand All @@ -68,13 +79,13 @@ def run_once(self) -> None:
def __poll_task(self) -> Task:
task_definition_name = self.worker.get_task_definition_name()
if self.worker.paused():
logger.debug(f'Stop polling task for: {task_definition_name}')
self.logger.debug(f'Stop polling task for: {task_definition_name}')
return None
if self.metrics_collector is not None:
self.metrics_collector.increment_task_poll(
task_definition_name
)
logger.debug(f'Polling task for: {task_definition_name}')
self.logger.debug(f'Polling task for: {task_definition_name}')
try:
start_time = time.time()
domain = self.worker.get_domain()
Expand All @@ -96,12 +107,12 @@ def __poll_task(self) -> Task:
self.metrics_collector.increment_task_poll_error(
task_definition_name, type(e)
)
logger.error(
self.logger.error(
f'Failed to poll task for: {task_definition_name}, reason: {traceback.format_exc()}'
)
return None
if task != None:
logger.debug(
self.logger.debug(
f'Polled task: {task_definition_name}, worker_id: {self.worker.get_identity()}, domain: {self.worker.get_domain()}'
)
return task
Expand All @@ -110,7 +121,7 @@ def __execute_task(self, task: Task) -> TaskResult:
if not isinstance(task, Task):
return None
task_definition_name = self.worker.get_task_definition_name()
logger.debug(
self.logger.debug(
'Executing task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}'.format(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
Expand All @@ -131,7 +142,7 @@ def __execute_task(self, task: Task) -> TaskResult:
task_definition_name,
sys.getsizeof(task_result)
)
logger.debug(
self.logger.debug(
'Executed task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}'.format(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
Expand All @@ -152,7 +163,7 @@ def __execute_task(self, task: Task) -> TaskResult:
task_result.reason_for_incompletion = str(e)
task_result.logs = [TaskExecLog(
traceback.format_exc(), task_result.task_id, int(time.time()))]
logger.error(
self.logger.error(
'Failed to execute task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}, reason: {reason}'.format(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
Expand All @@ -166,7 +177,7 @@ def __update_task(self, task_result: TaskResult):
if not isinstance(task_result, TaskResult):
return None
task_definition_name = self.worker.get_task_definition_name()
logger.debug(
self.logger.debug(
'Updating task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}'.format(
task_id=task_result.task_id,
workflow_instance_id=task_result.workflow_instance_id,
Expand All @@ -179,7 +190,7 @@ def __update_task(self, task_result: TaskResult):
time.sleep(attempt * 10)
try:
response = self.task_client.update_task(body=task_result)
logger.debug(
self.logger.debug(
'Updated task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}, response: {response}'.format(
task_id=task_result.task_id,
workflow_instance_id=task_result.workflow_instance_id,
Expand All @@ -193,7 +204,7 @@ def __update_task(self, task_result: TaskResult):
self.metrics_collector.increment_task_update_error(
task_definition_name, type(e)
)
logger.error(
self.logger.error(
'Failed to update task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}, reason: {reason}'.format(
task_id=task_result.task_id,
workflow_instance_id=task_result.workflow_instance_id,
Expand All @@ -205,10 +216,12 @@ def __update_task(self, task_result: TaskResult):

def __wait_for_polling_interval(self) -> None:
polling_interval = self.worker.get_polling_interval_in_seconds()
logger.debug(f'Sleep for {polling_interval} seconds')
self.logger.debug(f'Sleep for {polling_interval} seconds')
time.sleep(polling_interval)

def __set_worker_properties(self) -> None:
# If multiple tasks are supplied to the same worker, then only first
# task will be considered for setting worker properties
task_type = self.worker.get_task_definition_name()

# Fetch from ENV Variables if present
Expand All @@ -223,7 +236,7 @@ def __set_worker_properties(self) -> None:
self.worker.poll_interval = float(polling_interval)
polling_interval_initialized = True
except Exception as e:
logger.error("Exception in reading polling interval from environment variable: {0}.".format(str(e)))
self.logger.error("Exception in reading polling interval from environment variable: {0}.".format(str(e)))

# Fetch from Config if present
if not domain or not polling_interval_initialized:
Expand All @@ -247,9 +260,9 @@ def __set_worker_properties(self) -> None:
try:
# Read polling interval from config
self.worker.poll_interval = float(section.get("polling_interval", default_polling_interval))
logger.debug("Override polling interval to {0} ms".format(self.worker.poll_interval))
self.logger.debug("Override polling interval to {0} ms".format(self.worker.poll_interval))
except Exception as e:
logger.error("Exception reading polling interval: {0}. Defaulting to {1} ms".format(str(e), default_polling_interval))
self.logger.error("Exception reading polling interval: {0}. Defaulting to {1} ms".format(str(e), default_polling_interval))

def __get_property_value_from_env(self, prop, task_type):
prefix = "conductor_worker"
Expand Down
11 changes: 11 additions & 0 deletions src/conductor/client/configuration/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,17 @@ def logger_format(self, value):
"""
self.__logger_format = value

@property
def log_level(self):
"""The log level.

The log_level will be updated when sets logger_format.

:param value: The format string.
:type: str
"""
return self.__log_level

def apply_logging_config(self):
logging.basicConfig(
format=self.logger_format,
Expand Down
7 changes: 7 additions & 0 deletions src/conductor/client/worker/worker_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ def get_task_definition_name(self) -> str:
"""
return self.task_definition_name_cache

@property
def task_definition_names(self):
if isinstance(self.task_definition_name, list):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when will this be a list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will happen when the same worker shares polling for different tasks. This was done as an optimization by a customer developer - #219.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should document this a little better.

return self.task_definition_name
else:
return [self.task_definition_name]

@property
def task_definition_name_cache(self):
if self._task_definition_name_cache is None:
Expand Down