Skip to content

Commit

Permalink
Fix log storage in results tar files (#1103)
Browse files Browse the repository at this point in the history
* Fix input gen logging issue

Track and copy sub-tasks logs to target dir before tar creation

Store V1 logs in same locations

logs -> log

* Fix V2 model with single chunk not following ktools_num_processes

* pep

* Fix log tar arcname between v1 an v2
  • Loading branch information
sambles authored Aug 16, 2024
1 parent bb918e7 commit b86c565
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 4 deletions.
17 changes: 14 additions & 3 deletions src/model_execution_worker/distributed_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,9 @@ def prepare_input_generation_params(
gen_files_params = OasisManager()._params_generate_oasis_files(**lookup_params)
params = paths_to_absolute_paths({**gen_files_params}, config_path)

params['log_storage'] = dict()
params['log_location'] = filestore.put(kwargs.get('log_filename'))
params['log_storage'][slug] = params['log_location']
params['verbose'] = debug_worker
return params

Expand Down Expand Up @@ -541,6 +543,7 @@ def pre_analysis_hook(self,
else:
logger.info('pre_generation_hook: SKIPPING, param "exposure_pre_analysis_module" not set')
params['log_location'] = filestore.put(kwargs.get('log_filename'))
params['log_storage'][slug] = params['log_location']
return params


Expand Down Expand Up @@ -601,6 +604,7 @@ def prepare_keys_file_chunk(
)

params['log_location'] = filestore.put(kwargs.get('log_filename'))
params['log_storage'][slug] = params['log_location']
return params


Expand Down Expand Up @@ -693,6 +697,7 @@ def take_first(paths, output_file):
)

chunk_params['log_location'] = filestore.put(kwargs.get('log_filename'))
chunk_params['log_storage'][slug] = chunk_params['log_location']
return chunk_params


Expand All @@ -717,13 +722,19 @@ def write_input_files(self, params, run_data_uuid=None, analysis_id=None, initia
if params['user_data_dir'] is not None:
shutil.rmtree(params['user_data_dir'], ignore_errors=True)

# collect logs here before archive is created
params['log_location'] = filestore.put(kwargs.get('log_filename'))
params['log_storage'][slug] = params['log_location']
for log_ref in params['log_storage']:
filestore.get(params['log_storage'][log_ref], os.path.join(params['target_dir'], 'log', f'v2-{log_ref}.txt'))

return {
'lookup_error_location': filestore.put(os.path.join(params['target_dir'], 'keys-errors.csv')),
'lookup_success_location': filestore.put(os.path.join(params['target_dir'], 'gul_summary_map.csv')),
'lookup_validation_location': filestore.put(os.path.join(params['target_dir'], 'exposure_summary_report.json')),
'summary_levels_location': filestore.put(os.path.join(params['target_dir'], 'exposure_summary_levels.json')),
'output_location': filestore.put(params['target_dir']),
'log_location': filestore.put(kwargs.get('log_filename')),
'log_location': params['log_location'],
}


Expand Down Expand Up @@ -940,7 +951,7 @@ def generate_losses_chunk(self, params, chunk_idx, num_chunks, analysis_id=None,
if num_chunks == 1:
# Run multiple ktools pipes (based on cpu cores)
current_chunk_id = None
max_chunk_id = -1
max_chunk_id = params.get('ktools_num_processes', -1)
work_dir = 'work'
else:
# Run a single ktools pipe
Expand Down Expand Up @@ -1036,7 +1047,7 @@ def generate_losses_output(self, params, analysis_id=None, slug=None, **kwargs):
return {
**res,
'output_location': filestore.put(output_dir, arcname='output'),
'run_logs': filestore.put(logs_dir, arcname='logs'),
'run_logs': filestore.put(logs_dir),
'log_location': filestore.put(kwargs.get('log_filename')),
'raw_output_locations': {
r: filestore.put(os.path.join(output_dir, r), arcname=f'output_{r}')
Expand Down
5 changes: 4 additions & 1 deletion src/model_execution_worker/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,8 +507,11 @@ def generate_input(self,
lookup_validation_fp = next(iter(glob.glob(os.path.join(oasis_files_dir, 'exposure_summary_report.json'))), None)
summary_levels_fp = next(iter(glob.glob(os.path.join(oasis_files_dir, 'exposure_summary_levels.json'))), None)

# Store result files
# Store logs
traceback = filestore.put(kwargs['log_filename'])
filestore.get(traceback, os.path.join(oasis_files_dir, 'log', 'v1-generate-oasis-files.txt'))

# Store result files
lookup_error = filestore.put(lookup_error_fp)
lookup_success = filestore.put(lookup_success_fp)
lookup_validation = filestore.put(lookup_validation_fp)
Expand Down

0 comments on commit b86c565

Please sign in to comment.