From b86c565b6775237044fa5e91edf625e0e819b5fd Mon Sep 17 00:00:00 2001 From: sambles Date: Fri, 16 Aug 2024 12:59:43 +0100 Subject: [PATCH] Fix log storage in results tar files (#1103) * Fix input gen logging issue Track and copy sub-tasks logs to target dir before tar creation Store V1 logs in same locations logs -> log * Fix V2 model with single chunk not following ktools_num_processes * pep * Fix log tar arcname between v1 an v2 --- src/model_execution_worker/distributed_tasks.py | 17 ++++++++++++++--- src/model_execution_worker/tasks.py | 5 ++++- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/src/model_execution_worker/distributed_tasks.py b/src/model_execution_worker/distributed_tasks.py index d65a419fe..08f4387ed 100644 --- a/src/model_execution_worker/distributed_tasks.py +++ b/src/model_execution_worker/distributed_tasks.py @@ -483,7 +483,9 @@ def prepare_input_generation_params( gen_files_params = OasisManager()._params_generate_oasis_files(**lookup_params) params = paths_to_absolute_paths({**gen_files_params}, config_path) + params['log_storage'] = dict() params['log_location'] = filestore.put(kwargs.get('log_filename')) + params['log_storage'][slug] = params['log_location'] params['verbose'] = debug_worker return params @@ -541,6 +543,7 @@ def pre_analysis_hook(self, else: logger.info('pre_generation_hook: SKIPPING, param "exposure_pre_analysis_module" not set') params['log_location'] = filestore.put(kwargs.get('log_filename')) + params['log_storage'][slug] = params['log_location'] return params @@ -601,6 +604,7 @@ def prepare_keys_file_chunk( ) params['log_location'] = filestore.put(kwargs.get('log_filename')) + params['log_storage'][slug] = params['log_location'] return params @@ -693,6 +697,7 @@ def take_first(paths, output_file): ) chunk_params['log_location'] = filestore.put(kwargs.get('log_filename')) + chunk_params['log_storage'][slug] = chunk_params['log_location'] return chunk_params @@ -717,13 +722,19 @@ def write_input_files(self, params, run_data_uuid=None, analysis_id=None, initia if params['user_data_dir'] is not None: shutil.rmtree(params['user_data_dir'], ignore_errors=True) + # collect logs here before archive is created + params['log_location'] = filestore.put(kwargs.get('log_filename')) + params['log_storage'][slug] = params['log_location'] + for log_ref in params['log_storage']: + filestore.get(params['log_storage'][log_ref], os.path.join(params['target_dir'], 'log', f'v2-{log_ref}.txt')) + return { 'lookup_error_location': filestore.put(os.path.join(params['target_dir'], 'keys-errors.csv')), 'lookup_success_location': filestore.put(os.path.join(params['target_dir'], 'gul_summary_map.csv')), 'lookup_validation_location': filestore.put(os.path.join(params['target_dir'], 'exposure_summary_report.json')), 'summary_levels_location': filestore.put(os.path.join(params['target_dir'], 'exposure_summary_levels.json')), 'output_location': filestore.put(params['target_dir']), - 'log_location': filestore.put(kwargs.get('log_filename')), + 'log_location': params['log_location'], } @@ -940,7 +951,7 @@ def generate_losses_chunk(self, params, chunk_idx, num_chunks, analysis_id=None, if num_chunks == 1: # Run multiple ktools pipes (based on cpu cores) current_chunk_id = None - max_chunk_id = -1 + max_chunk_id = params.get('ktools_num_processes', -1) work_dir = 'work' else: # Run a single ktools pipe @@ -1036,7 +1047,7 @@ def generate_losses_output(self, params, analysis_id=None, slug=None, **kwargs): return { **res, 'output_location': filestore.put(output_dir, arcname='output'), - 'run_logs': filestore.put(logs_dir, arcname='logs'), + 'run_logs': filestore.put(logs_dir), 'log_location': filestore.put(kwargs.get('log_filename')), 'raw_output_locations': { r: filestore.put(os.path.join(output_dir, r), arcname=f'output_{r}') diff --git a/src/model_execution_worker/tasks.py b/src/model_execution_worker/tasks.py index 0b0cbbb2a..522b76c46 100755 --- a/src/model_execution_worker/tasks.py +++ b/src/model_execution_worker/tasks.py @@ -507,8 +507,11 @@ def generate_input(self, lookup_validation_fp = next(iter(glob.glob(os.path.join(oasis_files_dir, 'exposure_summary_report.json'))), None) summary_levels_fp = next(iter(glob.glob(os.path.join(oasis_files_dir, 'exposure_summary_levels.json'))), None) - # Store result files + # Store logs traceback = filestore.put(kwargs['log_filename']) + filestore.get(traceback, os.path.join(oasis_files_dir, 'log', 'v1-generate-oasis-files.txt')) + + # Store result files lookup_error = filestore.put(lookup_error_fp) lookup_success = filestore.put(lookup_success_fp) lookup_validation = filestore.put(lookup_validation_fp)