From 7cdec9f4e4944ec4a1ab5308d62a025654d4e637 Mon Sep 17 00:00:00 2001 From: sambles Date: Tue, 19 Mar 2024 10:53:45 +0000 Subject: [PATCH] Logging fixes for workers (#994) - 1.27 (#998) * Logging fixes for workers (#994) * first pass - refactor input gen call and logging * Fix exceptions not logged by worker monitor * remove test exception * Call generate losses from python instead of bash * fix logger calls * wip * test refactor * tmp * Add sub-task error logs to main analysis error trace * concat output logs on input gen success * Fix v2 worker gen files * Fix losses gen traceback * Fix V1 worker logging * Fix CI docker log storage * Fix * Fix tests * flak8 * Fix debug logging level * wip - fix missing ktools log tar * Fix missing ktools log tar * pep * Clean up task log files on exit * Fix traceback filenames v2 * Enable tests on release branches * Fix piwind branch * Add missing tasks log dir --- .github/workflows/build-schema.yml | 2 + .github/workflows/scan.yml | 2 + .github/workflows/test-images.yml | 15 +- .github/workflows/test-python.yml | 3 + Dockerfile.model_worker | 2 +- Dockerfile.model_worker_debian | 2 +- src/model_execution_worker/tasks.py | 469 +++++++++------------------- src/model_execution_worker/utils.py | 233 ++++++++++++++ tests/test_tasks.py | 82 ++--- 9 files changed, 446 insertions(+), 364 deletions(-) create mode 100644 src/model_execution_worker/utils.py diff --git a/.github/workflows/build-schema.yml b/.github/workflows/build-schema.yml index bdc5c3d9e..b790daf85 100644 --- a/.github/workflows/build-schema.yml +++ b/.github/workflows/build-schema.yml @@ -5,10 +5,12 @@ on: branches: - main-platform1 - stable** + - release** pull_request: branches: - main-platform1 - stable** + - release** workflow_dispatch: inputs: ods_branch: diff --git a/.github/workflows/scan.yml b/.github/workflows/scan.yml index e7f29d719..aab53d9f9 100644 --- a/.github/workflows/scan.yml +++ b/.github/workflows/scan.yml @@ -5,10 +5,12 @@ on: branches: - main-platform1 - stable** + - release** pull_request: branches: - main-platform1 - stable** + - release** schedule: - cron: '0 */6 * * *' # Run scan every 6 hours diff --git a/.github/workflows/test-images.yml b/.github/workflows/test-images.yml index 3d7ce127b..d96f6ddf3 100644 --- a/.github/workflows/test-images.yml +++ b/.github/workflows/test-images.yml @@ -5,10 +5,12 @@ on: branches: - main-platform1 - stable** + - release** pull_request: branches: - main-platform1 - stable** + - release** workflow_dispatch: inputs: last_release: @@ -86,22 +88,13 @@ jobs: - name: Select PiWind branch id: piwind run: | - # Select matching base branch on piwind - if [[ "${{ github.event_name }}" = "pull_request" ]]; then - BRANCH=${{ github.base_ref }} - elif [[ "${{ github.event_name }}" = "push" ]]; then - BRANCH=${{ github.ref_name }} + if [[ -z "${{ inputs.piwind_branch }}" ]]; then + BRANCH='stable/1.27.x' else BRANCH=${{ inputs.piwind_branch }} fi - - #override 'main-platform1' -> 'main' - if [[ "$BRANCH" = 'main-platform1' ]]; then - BRANCH=main - fi echo "branch=$BRANCH" >> $GITHUB_OUTPUT - - name: Select Pytest Options id: pytest run: | diff --git a/.github/workflows/test-python.yml b/.github/workflows/test-python.yml index 9fa44803e..8cd1ae293 100644 --- a/.github/workflows/test-python.yml +++ b/.github/workflows/test-python.yml @@ -5,10 +5,13 @@ on: branches: - main-platform1 - stable** + - release** pull_request: branches: - main-platform1 - stable** + - release** + workflow_dispatch: inputs: ods_branch: diff --git a/Dockerfile.model_worker b/Dockerfile.model_worker index 1020a852b..c88600d99 100755 --- a/Dockerfile.model_worker +++ b/Dockerfile.model_worker @@ -53,7 +53,7 @@ COPY ./tests/integration /home/worker/tests/integration # Add required directories RUN mkdir -p /var/oasis && \ mkdir -p /home/worker/model && \ - mkdir -p /var/log/oasis && \ + mkdir -p /var/log/oasis/tasks && \ mkdir -p /shared-fs && \ touch /var/log/oasis/worker.log && \ chmod 777 /var/log/oasis/worker.log diff --git a/Dockerfile.model_worker_debian b/Dockerfile.model_worker_debian index 4eaadf42d..177a82072 100644 --- a/Dockerfile.model_worker_debian +++ b/Dockerfile.model_worker_debian @@ -40,7 +40,7 @@ COPY ./VERSION ./ RUN mkdir -p /var/oasis && \ mkdir -p /home/worker/model && \ - mkdir -p /var/log/oasis && \ + mkdir -p /var/log/oasis/tasks && \ mkdir -p /shared-fs && \ touch /var/log/oasis/worker.log && \ chmod 777 /var/log/oasis/worker.log diff --git a/src/model_execution_worker/tasks.py b/src/model_execution_worker/tasks.py index 33e8d4ed6..0d87bcd17 100755 --- a/src/model_execution_worker/tasks.py +++ b/src/model_execution_worker/tasks.py @@ -5,36 +5,40 @@ import logging import os import sys -import shutil -import subprocess import time import fasteners -import tempfile import tarfile from contextlib import contextmanager, suppress -from packaging import version from celery import Celery, signature +from celery.utils.log import get_task_logger from celery.signals import worker_ready from celery.exceptions import WorkerLostError, Terminated -from celery.platforms import signals +from oasislmf.manager import OasisManager +from oasislmf.utils.data import get_json from oasislmf.utils.exceptions import OasisException -from oasislmf.utils.log import oasis_log from oasislmf.utils.status import OASIS_TASK_STATUS -from oasislmf import __version__ as mdk_version -from pathlib2 import Path from ..conf import celeryconf as celery_conf from ..conf.iniconf import settings -from ..common.data import STORED_FILENAME, ORIGINAL_FILENAME -# from .storage_manager import StorageSelector from .storage_manager import BaseStorageConnector from .backends.aws_storage import AwsObjectStore from .backends.azure_storage import AzureObjectStore +from .utils import ( + LoggingTaskContext, + log_params, + paths_to_absolute_paths, + TemporaryDir, + get_oasislmf_config_path, + get_model_settings, + get_worker_versions, + InvalidInputsException, + prepare_complex_model_file_inputs, +) ''' Celery task wrapper for Oasis ktools calculation. @@ -43,11 +47,18 @@ LOG_FILE_SUFFIX = 'txt' ARCHIVE_FILE_SUFFIX = 'tar.gz' RUNNING_TASK_STATUS = OASIS_TASK_STATUS["running"]["id"] +TASK_LOG_DIR = settings.get('worker', 'TASK_LOG_DIR', fallback='/var/log/oasis/tasks') app = Celery() app.config_from_object(celery_conf) -logging.info("Started worker") + +logger = get_task_logger(__name__) +logger.info("Started worker") debug_worker = settings.getboolean('worker', 'DEBUG', fallback=False) +# Quiet sub-loggers +logging.getLogger('billiard').setLevel('INFO') +logging.getLogger('numba').setLevel('INFO') + # Set storage manager selected_storage = settings.get('worker', 'STORAGE_TYPE', fallback="").lower() if selected_storage in ['local-fs', 'shared-fs']: @@ -60,92 +71,6 @@ raise OasisException('Invalid value for STORAGE_TYPE: {}'.format(selected_storage)) -class TemporaryDir(object): - """Context manager for mkdtemp() with option to persist""" - - def __init__(self, persist=False, basedir=None): - self.persist = persist - self.basedir = basedir - - if basedir: - os.makedirs(basedir, exist_ok=True) - - def __enter__(self): - self.name = tempfile.mkdtemp(dir=self.basedir) - return self.name - - def __exit__(self, exc_type, exc_value, traceback): - if not self.persist and os.path.isdir(self.name): - shutil.rmtree(self.name) - - -def get_oasislmf_config_path(model_id=None): - """ Search for the oasislmf confiuration file - """ - conf_path = None - model_root = settings.get('worker', 'model_data_directory', fallback='/home/worker/model') - - # 1: Explicit location - conf_path = Path(settings.get('worker', 'oasislmf_config', fallback="")) - if conf_path.is_file(): - return str(conf_path) - - # 2: try 'model specific conf' - if model_id: - conf_path = Path(model_root, '{}-oasislmf.json'.format(model_id)) - if conf_path.is_file(): - return str(conf_path) - - # 3: Try generic model conf - conf_path = Path(model_root, 'oasislmf.json') - if conf_path.is_file(): - return str(conf_path) - - # 4: check compatibility look for older model mount - conf_path = Path('/var/oasis', 'oasislmf.json') - if conf_path.is_file(): - return str(conf_path) - - # 5: warn and return fallback - logging.warning("WARNING: 'oasislmf.json' Configuration file not found") - return str(Path(model_root, 'oasislmf.json')) - - -def get_model_settings(): - """ Read the settings file from the path OASIS_MODEL_SETTINGS - returning the contents as a python dicself.t (none if not found) - """ - settings_data = None - settings_fp = settings.get('worker', 'MODEL_SETTINGS_FILE', fallback=None) - try: - if os.path.isfile(settings_fp): - with open(settings_fp) as f: - settings_data = json.load(f) - except Exception as e: - logging.error("Failed to load Model settings: {}".format(e)) - - return settings_data - - -def get_worker_versions(): - """ Search and return the versions of Oasis components - """ - ktool_ver_str = subprocess.getoutput('fmcalc -v') - plat_ver_file = '/home/worker/VERSION' - - if os.path.isfile(plat_ver_file): - with open(plat_ver_file, 'r') as f: - plat_ver_str = f.read().strip() - else: - plat_ver_str = "" - - return { - "oasislmf": mdk_version, - "ktools": ktool_ver_str, - "platform": plat_ver_str - } - - def check_worker_lost(task, analysis_pk): """ SAFE GUARD: - Fail any tasks received from dead workers @@ -165,7 +90,7 @@ def check_worker_lost(task, analysis_pk): so this should be viewed as a fallback option. """ current_state = task.AsyncResult(task.request.id).state - logging.info(current_state) + logger.info(current_state) if current_state == RUNNING_TASK_STATUS: raise WorkerLostError( 'Task received from dead worker - A worker container crashed when executing a task from analysis_id={}'.format(analysis_pk) @@ -181,18 +106,18 @@ def register_worker(sender, **k): m_name = os.environ.get('OASIS_MODEL_ID') m_id = os.environ.get('OASIS_MODEL_VERSION_ID') m_version = get_worker_versions() - logging.info('Worker: SUPPLIER_ID={}, MODEL_ID={}, VERSION_ID={}'.format(m_supplier, m_name, m_id)) - logging.info('versions: {}'.format(m_version)) + logger.info('Worker: SUPPLIER_ID={}, MODEL_ID={}, VERSION_ID={}'.format(m_supplier, m_name, m_id)) + logger.info('versions: {}'.format(m_version)) # Check for 'DISABLE_WORKER_REG' before sending task to API if settings.getboolean('worker', 'DISABLE_WORKER_REG', fallback=False): - logging.info(('Worker auto-registration DISABLED: to enable:\n' - ' set DISABLE_WORKER_REG=False in conf.ini or\n' - ' set the envoritment variable OASIS_DISABLE_WORKER_REG=False')) + logger.info(('Worker auto-registration DISABLED: to enable:\n' + ' set DISABLE_WORKER_REG=False in conf.ini or\n' + ' set the envoritment variable OASIS_DISABLE_WORKER_REG=False')) else: - logging.info('Auto registrating with the Oasis API:') - m_settings = get_model_settings() - logging.info('settings: {}'.format(m_settings)) + logger.info('Auto registrating with the Oasis API:') + m_settings = get_model_settings(settings) + logger.info('settings: {}'.format(m_settings)) signature( 'run_register_worker', @@ -201,44 +126,44 @@ def register_worker(sender, **k): ).delay() # Required ENV - logging.info("LOCK_FILE: {}".format(settings.get('worker', 'LOCK_FILE'))) - logging.info("LOCK_TIMEOUT_IN_SECS: {}".format(settings.getfloat('worker', 'LOCK_TIMEOUT_IN_SECS'))) - logging.info("LOCK_RETRY_COUNTDOWN_IN_SECS: {}".format(settings.get('worker', 'LOCK_RETRY_COUNTDOWN_IN_SECS'))) + logger.info("LOCK_FILE: {}".format(settings.get('worker', 'LOCK_FILE'))) + logger.info("LOCK_TIMEOUT_IN_SECS: {}".format(settings.getfloat('worker', 'LOCK_TIMEOUT_IN_SECS'))) + logger.info("LOCK_RETRY_COUNTDOWN_IN_SECS: {}".format(settings.get('worker', 'LOCK_RETRY_COUNTDOWN_IN_SECS'))) # Storage Mode selected_storage = settings.get('worker', 'STORAGE_TYPE', fallback="").lower() - logging.info("STORAGE_MANAGER: {}".format(type(filestore))) - logging.info("STORAGE_TYPE: {}".format(settings.get('worker', 'STORAGE_TYPE', fallback='None'))) + logger.info("STORAGE_MANAGER: {}".format(type(filestore))) + logger.info("STORAGE_TYPE: {}".format(settings.get('worker', 'STORAGE_TYPE', fallback='None'))) if debug_worker: - logging.info("MODEL_DATA_DIRECTORY: {}".format(settings.get('worker', 'MODEL_DATA_DIRECTORY', fallback='/home/worker/model'))) + logger.info("MODEL_DATA_DIRECTORY: {}".format(settings.get('worker', 'MODEL_DATA_DIRECTORY', fallback='/home/worker/model'))) if selected_storage in ['local-fs', 'shared-fs']: - logging.info("MEDIA_ROOT: {}".format(settings.get('worker', 'MEDIA_ROOT'))) + logger.info("MEDIA_ROOT: {}".format(settings.get('worker', 'MEDIA_ROOT'))) elif selected_storage in ['aws-s3', 'aws', 's3']: - logging.info("AWS_BUCKET_NAME: {}".format(settings.get('worker', 'AWS_BUCKET_NAME', fallback='None'))) - logging.info("AWS_SHARED_BUCKET: {}".format(settings.get('worker', 'AWS_SHARED_BUCKET', fallback='None'))) - logging.info("AWS_LOCATION: {}".format(settings.get('worker', 'AWS_LOCATION', fallback='None'))) - logging.info("AWS_ACCESS_KEY_ID: {}".format(settings.get('worker', 'AWS_ACCESS_KEY_ID', fallback='None'))) - logging.info("AWS_QUERYSTRING_EXPIRE: {}".format(settings.get('worker', 'AWS_QUERYSTRING_EXPIRE', fallback='None'))) - logging.info("AWS_QUERYSTRING_AUTH: {}".format(settings.get('worker', 'AWS_QUERYSTRING_AUTH', fallback='None'))) - logging.info('AWS_LOG_LEVEL: {}'.format(settings.get('worker', 'AWS_LOG_LEVEL', fallback='None'))) + logger.info("AWS_BUCKET_NAME: {}".format(settings.get('worker', 'AWS_BUCKET_NAME', fallback='None'))) + logger.info("AWS_SHARED_BUCKET: {}".format(settings.get('worker', 'AWS_SHARED_BUCKET', fallback='None'))) + logger.info("AWS_LOCATION: {}".format(settings.get('worker', 'AWS_LOCATION', fallback='None'))) + logger.info("AWS_ACCESS_KEY_ID: {}".format(settings.get('worker', 'AWS_ACCESS_KEY_ID', fallback='None'))) + logger.info("AWS_QUERYSTRING_EXPIRE: {}".format(settings.get('worker', 'AWS_QUERYSTRING_EXPIRE', fallback='None'))) + logger.info("AWS_QUERYSTRING_AUTH: {}".format(settings.get('worker', 'AWS_QUERYSTRING_AUTH', fallback='None'))) + logger.info('AWS_LOG_LEVEL: {}'.format(settings.get('worker', 'AWS_LOG_LEVEL', fallback='None'))) # Optional ENV - logging.info("MODEL_SETTINGS_FILE: {}".format(settings.get('worker', 'MODEL_SETTINGS_FILE', fallback='None'))) - logging.info("DISABLE_WORKER_REG: {}".format(settings.getboolean('worker', 'DISABLE_WORKER_REG', fallback='False'))) - logging.info("KEEP_RUN_DIR: {}".format(settings.get('worker', 'KEEP_RUN_DIR', fallback='False'))) - logging.info("DEBUG: {}".format(settings.get('worker', 'DEBUG', fallback='False'))) - logging.info("BASE_RUN_DIR: {}".format(settings.get('worker', 'BASE_RUN_DIR', fallback='None'))) - logging.info("OASISLMF_CONFIG: {}".format(settings.get('worker', 'oasislmf_config', fallback='None'))) + logger.info("MODEL_SETTINGS_FILE: {}".format(settings.get('worker', 'MODEL_SETTINGS_FILE', fallback='None'))) + logger.info("DISABLE_WORKER_REG: {}".format(settings.getboolean('worker', 'DISABLE_WORKER_REG', fallback='False'))) + logger.info("KEEP_RUN_DIR: {}".format(settings.get('worker', 'KEEP_RUN_DIR', fallback='False'))) + logger.info("DEBUG: {}".format(settings.get('worker', 'DEBUG', fallback='False'))) + logger.info("BASE_RUN_DIR: {}".format(settings.get('worker', 'BASE_RUN_DIR', fallback='None'))) + logger.info("OASISLMF_CONFIG: {}".format(settings.get('worker', 'oasislmf_config', fallback='None'))) # Log Env variables if debug_worker: # show all env variables and override root log level - logging.info('ALL_OASIS_ENV_VARS:' + json.dumps({k: v for (k, v) in os.environ.items() if k.startswith('OASIS_')}, indent=4)) + logger.info('ALL_OASIS_ENV_VARS:' + json.dumps({k: v for (k, v) in os.environ.items() if k.startswith('OASIS_')}, indent=4)) else: # Limit Env variables to run only variables - logging.info('OASIS_ENV_VARS:' + json.dumps({ + logger.info('OASIS_ENV_VARS:' + json.dumps({ k: v for (k, v) in os.environ.items() if k.startswith('OASIS_') and not any( substring in k for substring in [ 'SERVER', @@ -258,16 +183,6 @@ def register_worker(sender, **k): os.rmdir(tmpdir) -class InvalidInputsException(OasisException): - def __init__(self, input_archive): - super(InvalidInputsException, self).__init__('Inputs location not a tarfile: {}'.format(input_archive)) - - -class MissingModelDataException(OasisException): - def __init__(self, model_data_path): - super(MissingModelDataException, self).__init__('Model data not found: {}'.format(model_data_path)) - - @contextmanager def get_lock(): lock = fasteners.InterProcessLock(settings.get('worker', 'LOCK_FILE')) @@ -280,7 +195,7 @@ def get_lock(): # Send notification back to the API Once task is read from Queue def notify_api_status(analysis_pk, task_status): - logging.info("Notify API: analysis_id={}, status={}".format( + logger.info("Notify API: analysis_id={}, status={}".format( analysis_pk, task_status )) @@ -291,8 +206,23 @@ def notify_api_status(analysis_pk, task_status): ).delay() +def V1_task_logger(fn): + + def run(self, analysis_pk, *args): + kwargs = { + 'log_filename': os.path.join(TASK_LOG_DIR, f"analysis_{analysis_pk}_{self.request.id}.log") + } + log_level = 'DEBUG' if debug_worker else 'INFO' + with LoggingTaskContext(logging.getLogger(), log_filename=kwargs['log_filename'], level=log_level): + logger.info(f'====== {fn.__name__} '.ljust(90, '=')) + return fn(self, analysis_pk, *args, **kwargs) + + return run + + @app.task(name='run_analysis', bind=True, acks_late=True, throws=(Terminated,)) -def start_analysis_task(self, analysis_pk, input_location, analysis_settings, complex_data_files=None): +@V1_task_logger +def start_analysis_task(self, analysis_pk, input_location, analysis_settings, complex_data_files=None, **kwargs): """Task wrapper for running an analysis. Args: @@ -305,18 +235,18 @@ def start_analysis_task(self, analysis_pk, input_location, analysis_settings, co Returns: (string) The location of the outputs. """ - logging.info("LOCK_FILE: {}".format(settings.get('worker', 'LOCK_FILE'))) - logging.info("LOCK_RETRY_COUNTDOWN_IN_SECS: {}".format( + logger.info("LOCK_FILE: {}".format(settings.get('worker', 'LOCK_FILE'))) + logger.info("LOCK_RETRY_COUNTDOWN_IN_SECS: {}".format( settings.get('worker', 'LOCK_RETRY_COUNTDOWN_IN_SECS'))) with get_lock() as gotten: if not gotten: - logging.info("Failed to get resource lock - retry task") + logger.info("Failed to get resource lock - retry task") raise self.retry( max_retries=None, countdown=settings.getint('worker', 'LOCK_RETRY_COUNTDOWN_IN_SECS')) - logging.info("Acquired resource lock") + logger.info("Acquired resource lock") try: # Check if this task was re-queued from a lost worker @@ -327,20 +257,20 @@ def start_analysis_task(self, analysis_pk, input_location, analysis_settings, co output_location, traceback_location, log_location, return_code = start_analysis( analysis_settings, input_location, - complex_data_files=complex_data_files + complex_data_files=complex_data_files, + **kwargs ) except Terminated: sys.exit('Task aborted') except Exception: - logging.exception("Model execution task failed.") + logger.exception("Model execution task failed.") raise return output_location, traceback_location, log_location, return_code -@oasis_log() -def start_analysis(analysis_settings, input_location, complex_data_files=None): +def start_analysis(analysis_settings, input_location, complex_data_files=None, **kwargs): """Run an analysis. Args: @@ -354,23 +284,11 @@ def start_analysis(analysis_settings, input_location, complex_data_files=None): """ # Check that the input archive exists and is valid - logging.info("args: {}".format(str(locals()))) - logging.info(str(get_worker_versions())) + logger.info("args: {}".format(str(locals()))) + logger.info(str(get_worker_versions())) tmpdir_persist = settings.getboolean('worker', 'KEEP_RUN_DIR', fallback=False) tmpdir_base = settings.get('worker', 'BASE_RUN_DIR', fallback=None) - # Setup Job cancellation handler - - def analysis_cancel_handler(signum, frame): - logging.info('TASK CANCELLATION') - if proc is not None: - os.killpg(os.getpgid(proc.pid), 15) - raise Terminated("Cancellation request sent from API") - - proc = None # Popen object for subpross runner - signals['SIGTERM'] = analysis_cancel_handler - - config_path = get_oasislmf_config_path() tmp_dir = TemporaryDir(persist=tmpdir_persist, basedir=tmpdir_base) filestore.media_root = settings.get('worker', 'MEDIA_ROOT') @@ -383,65 +301,49 @@ def analysis_cancel_handler(signum, frame): # Fetch generated inputs analysis_settings_file = filestore.get(analysis_settings, run_dir, required=True) + oasis_files_dir = os.path.join(run_dir, 'input') input_archive = filestore.get(input_location, run_dir, required=True) if not tarfile.is_tarfile(input_archive): raise InvalidInputsException(input_archive) - - oasis_files_dir = os.path.join(run_dir, 'input') filestore.extract(input_archive, oasis_files_dir) - run_args = [ - '--oasis-files-dir', oasis_files_dir, - '--config', config_path, - '--model-run-dir', run_dir, - '--analysis-settings-json', analysis_settings_file, - '--ktools-fifo-relative', - '--verbose' - ] + # oasislmf.json + config_path = get_oasislmf_config_path(settings) + config = get_json(config_path) + + # model settings + model_settings_fp = settings.get('worker', 'MODEL_SETTINGS_FILE', fallback='') + model_settings_file = model_settings_fp if model_settings_fp and os.path.isfile(model_settings_fp) else None + + task_params = { + 'oasis_files_dir': oasis_files_dir, + 'model_run_dir': run_dir, + 'analysis_settings_json': analysis_settings_file, + 'model_settings_json': model_settings_file, + 'ktools_fifo_relative': True, + 'verbose': debug_worker, + } if complex_data_files: - prepare_complex_model_file_inputs(complex_data_files, input_data_dir) - run_args += ['--user-data-dir', input_data_dir] - - # check version and load model_settings for default samples if given - if version.parse(mdk_version) > version.parse("1.26.0"): - model_settings_fp = settings.get('worker', 'MODEL_SETTINGS_FILE', fallback='') - if model_settings_fp and os.path.isfile(model_settings_fp): - run_args += ['--model-settings-json', model_settings_fp] - - # Log MDK run command - args_list = run_args + [''] if (len(run_args) % 2) else run_args - mdk_args = [x for t in list(zip(*[iter(args_list)] * 2)) if (None not in t) and ('--model-run-dir' not in t) for x in t] - logging.info('run_directory: {}'.format(oasis_files_dir)) - # logging.info('args_list: {}'.format(str(run_args))) - logging.info("\nExecuting: generate-losses") - if debug_worker: - logging.info("\nCLI command: \noasislmf model generate-losses {}".format( - " ".join([str(arg) for arg in mdk_args]) - )) - logging.info(run_args) - - # Subprocess Execution - worker_env = os.environ.copy() - - proc = subprocess.Popen( - ['oasislmf', 'model', 'generate-losses'] + run_args, - stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=worker_env, - preexec_fn=os.setsid, # run the program in a new session, assigning a new process group to it and its children. - ) - stdout, stderr = proc.communicate() + prepare_complex_model_file_inputs(complex_data_files, input_data_dir, filestore) + task_params['user_data_dir'] = input_data_dir + # Create and log params + run_params = {**config, **task_params} + params = paths_to_absolute_paths(run_params, config_path) if debug_worker: - logging.info('stdout: {}'.format(stdout.decode())) - - logging.info('stderr: {}'.format(stderr.decode())) - proc.terminate() + log_params(params, kwargs) - # Traceback file (stdout + stderr) - traceback_file = filestore.create_traceback(stdout.decode(), stderr.decode(), run_dir) - traceback_location = filestore.put(traceback_file) + # Run generate losses + try: + OasisManager().generate_oasis_losses(**params) + returncode = 0 + except Exception as e: + logger.info(f'Exception: {e.__class__}: {e}') + returncode = 1 # Ktools log Tar file + traceback_location = filestore.put(kwargs['log_filename']) log_directory = os.path.join(run_dir, "log") log_location = filestore.put(log_directory, suffix=ARCHIVE_FILE_SUFFIX) @@ -449,10 +351,11 @@ def analysis_cancel_handler(signum, frame): output_directory = os.path.join(run_dir, "output") output_location = filestore.put(output_directory, suffix=ARCHIVE_FILE_SUFFIX, arcname='output') - return output_location, traceback_location, log_location, proc.returncode + return output_location, traceback_location, log_location, returncode @app.task(name='generate_input', bind=True, acks_late=True, throws=(Terminated,)) +@V1_task_logger def generate_input(self, analysis_pk, loc_file, @@ -460,7 +363,8 @@ def generate_input(self, info_file=None, scope_file=None, settings_file=None, - complex_data_files=None): + complex_data_files=None, + **kwargs): """Generates the input files for the loss calculation stage. This function is a thin wrapper around "oasislmf model generate-oasis-files". @@ -480,26 +384,14 @@ def generate_input(self, (tuple(str, str)) Paths to the outputs tar file and errors tar file. """ - logging.info("args: {}".format(str(locals()))) - logging.info(str(get_worker_versions())) + logger.info(str(get_worker_versions())) # Check if this task was re-queued from a lost worker check_worker_lost(self, analysis_pk) - # Setup Job cancellation handler - def generate_input_cancel_handler(signum, frame): - logging.info('TASK CANCELLATION') - if proc is not None: - os.killpg(os.getpgid(proc.pid), 15) - raise Terminated("Cancellation request sent from API") - - proc = None # Popen object for subpross runner - signals['SIGTERM'] = generate_input_cancel_handler - # Start Oasis file generation notify_api_status(analysis_pk, 'INPUTS_GENERATION_STARTED') filestore.media_root = settings.get('worker', 'MEDIA_ROOT') - config_path = get_oasislmf_config_path() tmpdir_persist = settings.getboolean('worker', 'KEEP_RUN_DIR', fallback=False) tmpdir_base = settings.get('worker', 'BASE_RUN_DIR', fallback=None) @@ -518,60 +410,49 @@ def generate_input_cancel_handler(signum, frame): ri_scope_file = filestore.get(scope_file, oasis_files_dir) lookup_settings_file = filestore.get(settings_file, oasis_files_dir) - run_args = [ - '--oasis-files-dir', oasis_files_dir, - '--config', config_path, - '--oed-location-csv', location_file, - ] - - if accounts_file: - run_args += ['--oed-accounts-csv', accounts_file] - - if ri_info_file: - run_args += ['--oed-info-csv', ri_info_file] - - if ri_scope_file: - run_args += ['--oed-scope-csv', ri_scope_file] - - if lookup_settings_file: - run_args += ['--lookup-complex-config-json', lookup_settings_file] + model_settings_fp = settings.get('worker', 'MODEL_SETTINGS_FILE', fallback='') + model_settings_file = model_settings_fp if model_settings_fp and os.path.isfile(model_settings_fp) else None + + task_params = { + 'oasis_files_dir': oasis_files_dir, + 'oed_location_csv': location_file, + 'oed_accounts_csv': accounts_file, + 'oed_info_csv': ri_info_file, + 'oed_scope_csv': ri_scope_file, + 'lookup_complex_config_json': lookup_settings_file, + 'analysis_settings_json': lookup_settings_file, + 'model_settings_json': model_settings_file, + 'verbose': debug_worker, + } if complex_data_files: prepare_complex_model_file_inputs(complex_data_files, input_data_dir) - run_args += ['--user-data-dir', input_data_dir] - - model_settings_fp = settings.get('worker', 'MODEL_SETTINGS_FILE', fallback='') - if model_settings_fp and os.path.isfile(model_settings_fp): - run_args += ['--model-settings-json', model_settings_fp] + task_params['user_data_dir'] = input_data_dir + config_path = get_oasislmf_config_path(settings) + config = get_json(config_path) + lookup_params = {**{k: v for k, v in config.items() if not k.startswith('oed_')}, **task_params} + params = paths_to_absolute_paths(lookup_params, config_path) if debug_worker: - run_args += ['--verbose'] - - # Log MDK generate command - args_list = run_args + [''] if (len(run_args) % 2) else run_args - mdk_args = [x for t in list(zip(*[iter(args_list)] * 2)) if None not in t for x in t] - logging.info('run_directory: {}'.format(oasis_files_dir)) - # logging.info('args_list: {}'.format(str(run_args))) - logging.info("\nExecuting: generate-oasis-files") - if debug_worker: - logging.info("\nCLI command: \noasislmf model generate-oasis-files {}".format( - " ".join([str(arg) for arg in mdk_args]) - )) - - # Subprocess Execution - worker_env = os.environ.copy() - proc = subprocess.Popen( - ['oasislmf', 'model', 'generate-oasis-files'] + run_args, - stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=worker_env, - preexec_fn=os.setsid, # run in a new session, assigning a new process group to it and its children. - ) - stdout, stderr = proc.communicate() + log_params(params, kwargs, exclude_keys=[ + 'profile_loc', + 'profile_loc_json', + 'profile_acc', + 'profile_fm_agg', + 'profile_fm_agg_json', + 'fm_aggregation_profile', + 'accounts_profile', + 'oed_hierarchy', + 'exposure_profile', + 'lookup_config', + ]) - # Log output and close - if debug_worker: - logging.info('stdout: {}'.format(stdout.decode())) - logging.info('stderr: {}'.format(stderr.decode())) - proc.terminate() + try: + OasisManager().generate_oasis_files(**params) + returncode = 0 + except Exception as e: + logger.info(f'Exception: {e.__class__}: {e}') + returncode = 1 # Find Generated Files lookup_error_fp = next(iter(glob.glob(os.path.join(oasis_files_dir, '*keys-errors*.csv'))), None) @@ -580,14 +461,13 @@ def generate_input_cancel_handler(signum, frame): summary_levels_fp = next(iter(glob.glob(os.path.join(oasis_files_dir, 'exposure_summary_levels.json'))), None) # Store result files - traceback_file = filestore.create_traceback(stdout.decode(), stderr.decode(), oasis_files_dir) - traceback = filestore.put(traceback_file) + traceback = filestore.put(kwargs['log_filename']) lookup_error = filestore.put(lookup_error_fp) lookup_success = filestore.put(lookup_success_fp) lookup_validation = filestore.put(lookup_validation_fp) summary_levels = filestore.put(summary_levels_fp) output_tar_path = filestore.put(oasis_files_dir) - return output_tar_path, lookup_error, lookup_success, lookup_validation, summary_levels, traceback, proc.returncode + return output_tar_path, lookup_error, lookup_success, lookup_validation, summary_levels, traceback, returncode @app.task(name='on_error') @@ -606,42 +486,3 @@ def on_error(request, ex, traceback, record_task_name, analysis_pk, initiator_pk args=(analysis_pk, initiator_pk, traceback), queue='celery' ).delay() - - -def prepare_complex_model_file_inputs(complex_model_files, run_directory): - """Places the specified complex model files in the run_directory. - - The unique upload filenames are converted back to the original upload names, so that the - names match any input configuration file. - - On Linux, the files are symlinked, whereas on Windows the files are simply copied. - - Args: - complex_model_files (list of complex_model_data_file): List of dicts giving the files - to make available. - run_directory (str): Model inputs directory to place the files in. - - Returns: - None. - - """ - for cmf in complex_model_files: - stored_fn = cmf[STORED_FILENAME] - orig_fn = cmf[ORIGINAL_FILENAME] - - if filestore._is_valid_url(stored_fn): - # If reference is a URL, then download the file & rename to 'original_filename' - fpath = filestore.get(stored_fn, run_directory) - shutil.move(fpath, os.path.join(run_directory, orig_fn)) - elif filestore._is_stored(stored_fn): - # If refrence is local filepath check that it exisits and copy/symlink - from_path = filestore.get(stored_fn) - to_path = os.path.join(run_directory, orig_fn) - if os.name == 'nt': - logging.info(f'complex_model_file: copy {from_path} to {to_path}') - shutil.copyfile(from_path, to_path) - else: - logging.info(f'complex_model_file: link {from_path} to {to_path}') - os.symlink(from_path, to_path) - else: - logging.info('WARNING: failed to get complex model file "{}"'.format(stored_fn)) diff --git a/src/model_execution_worker/utils.py b/src/model_execution_worker/utils.py new file mode 100644 index 000000000..b6bbe82ac --- /dev/null +++ b/src/model_execution_worker/utils.py @@ -0,0 +1,233 @@ +__all__ = [ + 'LoggingTaskContext', + 'log_params', + 'paths_to_absolute_paths', + 'TemporaryDir', + 'get_oasislmf_config_path', + 'get_model_settings', + 'get_worker_versions', + 'InvalidInputsException', + 'MissingModelDataException', + 'prepare_complex_model_file_inputs', +] + +import logging +import json +import os +import tempfile +import shutil +import subprocess +from copy import deepcopy + +from pathlib2 import Path +from oasislmf import __version__ as mdk_version +from oasislmf.utils.exceptions import OasisException + +from ..common.data import ORIGINAL_FILENAME, STORED_FILENAME + + +class LoggingTaskContext: + """ Adds a file log handler to the root logger and pushes a copy all logs to + the 'log_filename' + + Docs: https://docs.python.org/3/howto/logging-cookbook.html#using-a-context-manager-for-selective-logging + """ + + def __init__(self, logger, log_filename, level=None, close=True, delete_on_exit=True): + self.logger = logger + self.level = level + self.prev_level = logger.level + self.log_filename = log_filename + self.close = close + self.handler = logging.FileHandler(log_filename) + self.delete_on_exit = delete_on_exit + + def __enter__(self): + if self.level: + self.handler.setLevel(self.level) + self.logger.setLevel(self.level) + if self.handler: + self.logger.addHandler(self.handler) + + def __exit__(self, et, ev, tb): + if self.level: + self.logger.setLevel(self.prev_level) + if self.handler: + self.logger.removeHandler(self.handler) + if self.handler and self.close: + self.handler.close() + if os.path.isfile(self.log_filename) and self.delete_on_exit: + os.remove(self.log_filename) + + +def log_params(params, kwargs, exclude_keys=[]): + if isinstance(params, list): + params = params[0] + print_params = {k: params[k] for k in set(list(params.keys())) - set(exclude_keys)} + logging.info('task params: \nparams={}, \nkwargs={}'.format( + json.dumps(print_params, indent=2), + json.dumps(kwargs, indent=2), + )) + + +def paths_to_absolute_paths(dictionary, config_path=''): + basedir = os.path.dirname(config_path) + if not isinstance(dictionary, dict): + raise ValueError("Input must be a dictionary.") + + params = deepcopy(dictionary) + for key, value in dictionary.items(): + # If the value is a string and exists as a path, convert it to absolute path + if isinstance(value, str): + path_value = os.path.join(basedir, value) + if os.path.exists(path_value): + params[key] = os.path.abspath(path_value) + # if value is 'None' remote it + elif value is None: + del params[key] + return params + + +class TemporaryDir(object): + """Context manager for mkdtemp() with option to persist""" + + def __init__(self, persist=False, basedir=None): + self.persist = persist + self.basedir = basedir + + if basedir: + os.makedirs(basedir, exist_ok=True) + + def __enter__(self): + self.name = tempfile.mkdtemp(dir=self.basedir) + return self.name + + def __exit__(self, exc_type, exc_value, traceback): + if not self.persist and os.path.isdir(self.name): + shutil.rmtree(self.name) + + +def get_oasislmf_config_path(settings, model_id=None): + """ Search for the oasislmf confiuration file + """ + conf_path = None + model_root = settings.get('worker', 'model_data_directory', fallback='/home/worker/model') + + # 1: Explicit location + conf_path = Path(settings.get('worker', 'oasislmf_config', fallback="")) + if conf_path.is_file(): + return str(conf_path) + + # 2: try 'model specific conf' + if model_id: + conf_path = Path(model_root, '{}-oasislmf.json'.format(model_id)) + if conf_path.is_file(): + return str(conf_path) + + # 3: Try generic model conf + conf_path = Path(model_root, 'oasislmf.json') + if conf_path.is_file(): + return str(conf_path) + + # 4: check compatibility look for older model mount + conf_path = Path('/var/oasis', 'oasislmf.json') + if conf_path.is_file(): + return str(conf_path) + + # 5: warn and return fallback + logging.warning("WARNING: 'oasislmf.json' Configuration file not found") + return str(Path(model_root, 'oasislmf.json')) + + +def get_model_settings(settings): + """ Read the settings file from the path OASIS_MODEL_SETTINGS + returning the contents as a python dicself.t (none if not found) + """ + settings_data = None + settings_fp = settings.get('worker', 'MODEL_SETTINGS_FILE', fallback=None) + try: + if os.path.isfile(settings_fp): + with open(settings_fp) as f: + settings_data = json.load(f) + except Exception as e: + logging.error("Failed to load Model settings: {}".format(e)) + + return settings_data + + +def get_worker_versions(): + """ Search and return the versions of Oasis components + """ + ktool_ver_str = subprocess.getoutput('fmcalc -v') + plat_ver_file = '/home/worker/VERSION' + + if os.path.isfile(plat_ver_file): + with open(plat_ver_file, 'r') as f: + plat_ver_str = f.read().strip() + else: + plat_ver_str = "" + + return { + "oasislmf": mdk_version, + "ktools": ktool_ver_str, + "platform": plat_ver_str + } + + +class InvalidInputsException(OasisException): + def __init__(self, input_archive): + super(InvalidInputsException, self).__init__('Inputs location not a tarfile: {}'.format(input_archive)) + + +class MissingModelDataException(OasisException): + def __init__(self, model_data_path): + super(MissingModelDataException, self).__init__('Model data not found: {}'.format(model_data_path)) + + +def merge_dirs(src_root, dst_root): + for root, dirs, files in os.walk(src_root): + for f in files: + src = os.path.join(root, f) + rel_dst = os.path.relpath(src, src_root) + abs_dst = os.path.join(dst_root, rel_dst) + Path(abs_dst).parent.mkdir(exist_ok=True, parents=True) + shutil.copy(os.path.join(root, f), abs_dst) + + +def prepare_complex_model_file_inputs(complex_model_files, run_directory): + """Places the specified complex model files in the run_directory. + + The unique upload filenames are converted back to the original upload names, so that the + names match any input configuration file. + + On Linux, the files are symlinked, whereas on Windows the files are simply copied. + + Args: + complex_model_files (list of complex_model_data_file): List of dicts giving the files + to make available. + run_directory (str): Model inputs directory to place the files in. + + Returns: + None. + + """ + for cmf in complex_model_files: + stored_fn = cmf[STORED_FILENAME] + orig_fn = cmf[ORIGINAL_FILENAME] + + if filestore._is_valid_url(stored_fn): + # If reference is a URL, then download the file & rename to 'original_filename' + fpath = filestore.get(stored_fn, run_directory) + shutil.move(fpath, os.path.join(run_directory, orig_fn)) + elif filestore._is_stored(stored_fn): + # If refrence is local filepath check that it exisits and copy/symlink + from_path = filestore.get(stored_fn) + to_path = os.path.join(run_directory, orig_fn) + if os.name == 'nt': + logging.info(f'complex_model_file: copy {from_path} to {to_path}') + shutil.copyfile(from_path, to_path) + else: + logging.info(f'complex_model_file: link {from_path} to {to_path}') + os.symlink(from_path, to_path) + else: + logging.info('WARNING: failed to get complex model file "{}"'.format(stored_fn)) diff --git a/tests/test_tasks.py b/tests/test_tasks.py index 42be742f9..1e41e2b5c 100644 --- a/tests/test_tasks.py +++ b/tests/test_tasks.py @@ -1,6 +1,6 @@ import os -import subprocess import tarfile +import json from unittest import TestCase from contextlib import contextmanager @@ -12,10 +12,10 @@ from mock import patch, Mock, ANY from pathlib2 import Path -from src.conf.iniconf import SettingsPatcher, settings +from src.conf.iniconf import SettingsPatcher from src.model_execution_worker.storage_manager import MissingInputsException from src.model_execution_worker.tasks import start_analysis, InvalidInputsException, \ - start_analysis_task, get_oasislmf_config_path + start_analysis_task # from oasislmf.utils.status import OASIS_TASK_STATUS @@ -79,6 +79,7 @@ def test_custom_model_runner_does_not_exist___generate_losses_is_called_output_f with TemporaryDirectory() as media_root, \ TemporaryDirectory() as model_data_dir, \ TemporaryDirectory() as run_dir, \ + TemporaryDirectory() as log_dir, \ TemporaryDirectory() as work_dir: with SettingsPatcher( MODEL_SUPPLIER_ID='supplier', @@ -91,61 +92,68 @@ def test_custom_model_runner_does_not_exist___generate_losses_is_called_output_f Path(media_root, 'analysis_settings.json').touch() Path(run_dir, 'output').mkdir(parents=True) Path(model_data_dir, 'supplier', 'model', 'version').mkdir(parents=True) + log_file = Path(log_dir, 'log-file.log').touch() + + params = { + "oasis_files_dir": os.path.join(run_dir, 'input'), + "model_run_dir": run_dir, + "ktools_fifo_relative": True, + "verbose": False + } + with open(Path(model_data_dir, 'oasislmf.json'), 'w') as f: + f.write(json.dumps(params)) cmd_instance = Mock() - # cmd_instance.stdout = b'output' - # cmd_instance.stderr = b'errors' - cmd_instance.returncode = 0 - cmd_instance.communicate = Mock(return_value=(b'mock subprocess stdout', b'mock subprocess stderr')) @contextmanager def fake_run_dir(*args, **kwargs): yield run_dir - with patch('subprocess.Popen', Mock(return_value=cmd_instance)) as cmd_mock, \ + with patch('src.model_execution_worker.tasks.OasisManager', Mock(return_value=cmd_instance)) as cmd_mock, \ patch('src.model_execution_worker.tasks.get_worker_versions', Mock(return_value='')), \ patch('src.model_execution_worker.tasks.filestore.compress') as tarfile, \ + patch('src.model_execution_worker.tasks.TASK_LOG_DIR', log_dir), \ patch('src.model_execution_worker.tasks.TemporaryDir', fake_run_dir): + cmd_instance.generate_oasis_losses.return_value = "mocked result" # Mock the return value output_location, log_location, error_location, returncode = start_analysis( os.path.join(media_root, 'analysis_settings.json'), os.path.join(media_root, 'location.tar'), + log_filename=log_file, ) - test_env = os.environ.copy() - cmd_mock.assert_called_once_with(['oasislmf', 'model', 'generate-losses', - '--oasis-files-dir', os.path.join(run_dir, 'input'), - '--config', get_oasislmf_config_path(settings.get('worker', 'model_id')), - '--model-run-dir', run_dir, - '--analysis-settings-json', os.path.join(media_root, 'analysis_settings.json'), - '--ktools-fifo-relative', - '--verbose', - ], stderr=subprocess.PIPE, stdout=subprocess.PIPE, env=test_env, preexec_fn=os.setsid) - tarfile.assert_called_once_with(output_location, os.path.join(run_dir, 'output'), 'output') + expected_params = {**params, **{"analysis_settings_json": os.path.join(media_root, 'analysis_settings.json')}} + cmd_instance.generate_oasis_losses.assert_called_once_with(**expected_params) + tarfile.assert_called_once_with(os.path.join(media_root, output_location), os.path.join(run_dir, 'output'), 'output') class StartAnalysisTask(TestCase): @given(pk=integers(), location=text(), analysis_settings_path=text()) def test_lock_is_not_acquireable___retry_esception_is_raised(self, pk, location, analysis_settings_path): - with patch('fasteners.InterProcessLock.acquire', Mock(return_value=False)), \ - patch('src.model_execution_worker.tasks.check_worker_lost', Mock(return_value='')), \ - patch('src.model_execution_worker.tasks.notify_api_status') as api_notify: + with TemporaryDirectory() as log_dir: + with patch('fasteners.InterProcessLock.acquire', Mock(return_value=False)), \ + patch('src.model_execution_worker.tasks.check_worker_lost', Mock(return_value='')), \ + patch('src.model_execution_worker.tasks.TASK_LOG_DIR', log_dir), \ + patch('src.model_execution_worker.tasks.notify_api_status') as api_notify: - with self.assertRaises(Retry): - start_analysis_task(pk, location, analysis_settings_path) + with self.assertRaises(Retry): + start_analysis_task(pk, location, analysis_settings_path) @given(pk=integers(), location=text(), analysis_settings_path=text()) def test_lock_is_acquireable___start_analysis_is_ran(self, pk, location, analysis_settings_path): - with patch('src.model_execution_worker.tasks.start_analysis', Mock(return_value=('', '', '', 0))) as start_analysis_mock, \ - patch('src.model_execution_worker.tasks.check_worker_lost', Mock(return_value='')), \ - patch('src.model_execution_worker.tasks.notify_api_status') as api_notify: - - start_analysis_task.update_state = Mock() - start_analysis_task(pk, location, analysis_settings_path) - - api_notify.assert_called_once_with(pk, 'RUN_STARTED') - start_analysis_task.update_state.assert_called_once_with(state=OASIS_TASK_STATUS["running"]["id"]) - start_analysis_mock.assert_called_once_with( - analysis_settings_path, - location, - complex_data_files=None - ) + with TemporaryDirectory() as log_dir: + with patch('src.model_execution_worker.tasks.start_analysis', Mock(return_value=('', '', '', 0))) as start_analysis_mock, \ + patch('src.model_execution_worker.tasks.check_worker_lost', Mock(return_value='')), \ + patch('src.model_execution_worker.tasks.TASK_LOG_DIR', log_dir), \ + patch('src.model_execution_worker.tasks.notify_api_status') as api_notify: + + start_analysis_task.update_state = Mock() + start_analysis_task(pk, location, analysis_settings_path) + + api_notify.assert_called_once_with(pk, 'RUN_STARTED') + start_analysis_task.update_state.assert_called_once_with(state=OASIS_TASK_STATUS["running"]["id"]) + start_analysis_mock.assert_called_once_with( + analysis_settings_path, + location, + complex_data_files=None, + log_filename=f'{log_dir}/analysis_{pk}_None.log' + )