From fd7c838616b7518db13ea035cfdd79dcd71638f1 Mon Sep 17 00:00:00 2001 From: "york@stsci.edu" Date: Mon, 11 Sep 2023 15:23:09 -0400 Subject: [PATCH 1/4] Adding more logging --- jwql/shared_tasks/run_pipeline.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/jwql/shared_tasks/run_pipeline.py b/jwql/shared_tasks/run_pipeline.py index 0531f0337..e73c5bf1e 100755 --- a/jwql/shared_tasks/run_pipeline.py +++ b/jwql/shared_tasks/run_pipeline.py @@ -51,18 +51,27 @@ def run_pipe(input_file, short_name, work_directory, instrument, outputs, max_co status_f.write("\t start_dir is {} ({})\n".format(start_dir, type(start_dir))) status_f.write("\t uncal_file is {} ({})\n".format(uncal_file, type(uncal_file))) status_f.write(f"\t outputs is {outputs}\n") + sys.stderr.write("Running run_pipe\n") + sys.stderr.write("\t input_file_basename is {} ({})\n".format(input_file_basename, type(input_file_basename))) + sys.stderr.write("\t start_dir is {} ({})\n".format(start_dir, type(start_dir))) + sys.stderr.write("\t uncal_file is {} ({})\n".format(uncal_file, type(uncal_file))) + sys.stderr.write(f"\t outputs is {outputs}\n") try: + sys.stderr.write("Copying file {} to working directory.\n".format(input_file)) copy_files([input_file], work_directory) + sys.stderr.write("Setting permissions on {}\n".format(uncal_file)) set_permissions(uncal_file) steps = get_pipeline_steps(instrument) + sys.stderr.write("Pipeline steps initialized to {}\n".format(steps)) # If the input file is a file other than uncal.fits, then we may only need to run a # subset of steps. Check the completed steps in the input file. Find the latest step # that has been completed, and skip that plus all prior steps if 'uncal' not in input_file: completed_steps = completed_pipeline_steps(input_file) + sys.stderr.write("Steps {} already completed.\n".format(completed_steps)) # Reverse the boolean value, so that now steps answers the question: "Do we need # to run this step?"" @@ -80,18 +89,21 @@ def run_pipe(input_file, short_name, work_directory, instrument, outputs, max_co for step in steps: if not steps[step]: + sys.stderr.write("Setting last_run to {}.\n".format(step)) last_run = deepcopy(step) for step in steps: if step == last_run: break if step != last_run: + sys.stderr.write("Setting {} to skip while looking for last_run.\n".format(step)) steps[step] = False # Set any steps the user specifically asks to skip for step, step_dict in step_args.items(): if 'skip' in step_dict: if step_dict['skip']: + sys.stderr.write("Setting step {} to skip by user request.\n".format(step)) steps[step] = False # Run each specified step From 1834232cfa409f4339b333b368fd9ea2ddb308ce Mon Sep 17 00:00:00 2001 From: Bryan Hilbert Date: Wed, 13 Sep 2023 15:12:15 -0400 Subject: [PATCH 2/4] Add step_args to calwebb_detector1_save_jump --- jwql/shared_tasks/shared_tasks.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/jwql/shared_tasks/shared_tasks.py b/jwql/shared_tasks/shared_tasks.py index be232935d..1e8e95549 100644 --- a/jwql/shared_tasks/shared_tasks.py +++ b/jwql/shared_tasks/shared_tasks.py @@ -408,7 +408,7 @@ def run_calwebb_detector1(input_file_name, short_name, ext_or_exts, instrument, @celery_app.task(name='jwql.shared_tasks.shared_tasks.calwebb_detector1_save_jump') -def calwebb_detector1_save_jump(input_file_name, instrument, ramp_fit=True, save_fitopt=True): +def calwebb_detector1_save_jump(input_file_name, instrument, ramp_fit=True, save_fitopt=True, step_args={}): """Call ``calwebb_detector1`` on the provided file, running all steps up to the ``ramp_fit`` step, and save the result. Optionally run the ``ramp_fit`` step and save the resulting slope file as well. @@ -430,6 +430,13 @@ def calwebb_detector1_save_jump(input_file_name, instrument, ramp_fit=True, save If ``True``, the file of optional outputs from the ramp fitting step of the pipeline is saved. + step_args : dict + A dictionary containing custom arguments to supply to individual pipeline steps. + When a step is run, the dictionary will be checked for a key matching the step + name (as defined in jwql.utils.utils.get_pipeline_steps() for the provided + instrument). The value matching the step key should, itself, be a dictionary that + can be spliced in to step.call() via dereferencing (**dict) + Returns ------- jump_output : str @@ -469,7 +476,7 @@ def calwebb_detector1_save_jump(input_file_name, instrument, ramp_fit=True, save cores = 'all' status = run_subprocess(cmd_name, "jump", "all", cal_dir, instrument, input_file, - short_name, result_file, cores) + short_name, result_file, cores, step_args) if status[-1].strip() == "SUCCEEDED": logging.info("Subprocess reports successful finish.") @@ -484,7 +491,7 @@ def calwebb_detector1_save_jump(input_file_name, instrument, ramp_fit=True, save if core_fail: cores = "half" status = run_subprocess(cmd_name, "jump", "all", cal_dir, instrument, - input_file, short_name, result_file, cores) + input_file, short_name, result_file, cores, step_args) if status[-1].strip() == "SUCCEEDED": logging.info("Subprocess reports successful finish.") managed = True @@ -498,7 +505,7 @@ def calwebb_detector1_save_jump(input_file_name, instrument, ramp_fit=True, save if core_fail: cores = "none" status = run_subprocess(cmd_name, "jump", "all", cal_dir, instrument, - input_file, short_name, result_file, cores) + input_file, short_name, result_file, cores, step_args) if status[-1].strip() == "SUCCEEDED": logging.info("Subprocess reports successful finish.") managed = True From 2f0e8258c8edb0952c1b95c6a80bcf5470b4d3b6 Mon Sep 17 00:00:00 2001 From: Bryan Hilbert Date: Fri, 22 Sep 2023 12:37:11 -0400 Subject: [PATCH 3/4] Better logging. Fix param def. Allow empty dark file list --- .../common_monitors/bad_pixel_monitor.py | 7 ++++--- jwql/shared_tasks/run_pipeline.py | 8 ++++++-- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/jwql/instrument_monitors/common_monitors/bad_pixel_monitor.py b/jwql/instrument_monitors/common_monitors/bad_pixel_monitor.py index 546fc6d80..cc4b7fb68 100755 --- a/jwql/instrument_monitors/common_monitors/bad_pixel_monitor.py +++ b/jwql/instrument_monitors/common_monitors/bad_pixel_monitor.py @@ -984,9 +984,10 @@ def process(self, illuminated_raw_files, illuminated_slope_files, flat_file_coun else: index += 1 - min_dark_time = min(dark_obstimes) - max_dark_time = max(dark_obstimes) - mid_dark_time = instrument_properties.mean_time(dark_obstimes) + if len(dark_slope_files) > 0: + min_dark_time = min(dark_obstimes) + max_dark_time = max(dark_obstimes) + mid_dark_time = instrument_properties.mean_time(dark_obstimes) # Check whether there are still enough files left to meet the threshold if illuminated_slope_files is None: diff --git a/jwql/shared_tasks/run_pipeline.py b/jwql/shared_tasks/run_pipeline.py index 0531f0337..82ea0689e 100755 --- a/jwql/shared_tasks/run_pipeline.py +++ b/jwql/shared_tasks/run_pipeline.py @@ -10,6 +10,7 @@ import shutil import sys import time +import traceback from jwst import datamodels from jwst.dq_init import DQInitStep @@ -161,7 +162,8 @@ def run_pipe(input_file, short_name, work_directory, instrument, outputs, max_co with open(status_file, "a+") as status_f: status_f.write("EXCEPTION\n") status_f.write("{}\n".format(e)) - status_f.write("FAILED") + status_f.write("FAILED\n") + status_f.write(traceback.format_exc()) sys.exit(1) with open(status_file, "a+") as status_f: @@ -217,6 +219,7 @@ def run_save_jump(input_file, short_name, work_directory, instrument, ramp_fit=T params['refpix'] = dict(odd_even_rows=False) # Default CR rejection threshold is too low + params['jump'] = {} params['jump']['rejection_threshold'] = 15 # Set up to save jump step output @@ -284,7 +287,8 @@ def run_save_jump(input_file, short_name, work_directory, instrument, ramp_fit=T with open(status_file, "a+") as status_f: status_f.write("EXCEPTION\n") status_f.write("{}\n".format(e)) - status_f.write("FAILED") + status_f.write("FAILED\n") + status_f.write(traceback.format_exc()) sys.exit(1) with open(status_file, "a+") as status_f: From 55b12f399cc7504e05879157e2c3bfae7b85866c Mon Sep 17 00:00:00 2001 From: Bryan Hilbert Date: Mon, 25 Sep 2023 13:49:40 -0400 Subject: [PATCH 4/4] pep8 fixes --- jwql/shared_tasks/run_pipeline.py | 2 +- jwql/shared_tasks/shared_tasks.py | 19 ++++++++++--------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/jwql/shared_tasks/run_pipeline.py b/jwql/shared_tasks/run_pipeline.py index 82ea0689e..991e08d47 100755 --- a/jwql/shared_tasks/run_pipeline.py +++ b/jwql/shared_tasks/run_pipeline.py @@ -348,7 +348,7 @@ def run_save_jump(input_file, short_name, work_directory, instrument, ramp_fit=T outputs = args.outputs step_args = args.step_args - status_file = os.path.join(working_path, short_name+"_status.txt") + status_file = os.path.join(working_path, short_name + "_status.txt") with open(status_file, 'w') as out_file: out_file.write("Starting Process\n") out_file.write("\tpipeline is {} ({})\n".format(pipe_type, type(pipe_type))) diff --git a/jwql/shared_tasks/shared_tasks.py b/jwql/shared_tasks/shared_tasks.py index 1e8e95549..f076e2035 100644 --- a/jwql/shared_tasks/shared_tasks.py +++ b/jwql/shared_tasks/shared_tasks.py @@ -1,4 +1,4 @@ - #! /usr/bin/env python +#! /usr/bin/env python """This module contains code for the celery application, which is used for any demanding work which should be restricted in terms of how many iterations are run simultaneously, or @@ -204,7 +204,7 @@ def log_subprocess_output(pipe): If a subprocess STDOUT has been set to subprocess.PIPE, this function will log each line to the logging output. """ - for line in iter(pipe.readline, b''): # b'\n'-separated lines + for line in iter(pipe.readline, b''): # b'\n'-separated lines logging.info("\t{}".format(line.decode('UTF-8').strip())) @@ -224,6 +224,7 @@ def after_setup_celery_logger(logger, **kwargs): def collect_after_task(**kwargs): gc.collect() + def convert_step_args_to_string(args_dict): """Convert the nested dictionary containing pipeline step parameter keyword/value pairs to a string so that it can be passed via command line @@ -239,17 +240,17 @@ def convert_step_args_to_string(args_dict): args_str : str String representation of ``args_dict`` """ - args_str="'{" + args_str = "'{" for i, step in enumerate(args_dict): args_str += f'"{step}":' args_str += '{' for j, (param, val) in enumerate(args_dict[step].items()): args_str += f'"{param}":"{val}"' - if j < len(args_dict[step])-1: + if j < len(args_dict[step]) - 1: args_str += ', ' args_str += "}" - if i < len(args_dict)-1: + if i < len(args_dict) - 1: args_str += ',' args_str += "}'" return args_str @@ -331,7 +332,7 @@ def run_calwebb_detector1(input_file_name, short_name, ext_or_exts, instrument, current_dir = os.path.dirname(__file__) cmd_name = os.path.join(current_dir, "run_pipeline.py") outputs = ",".join(ext_or_exts) - result_file = os.path.join(cal_dir, short_name+"_status.txt") + result_file = os.path.join(cal_dir, short_name + "_status.txt") if "all" in ext_or_exts: logging.info("All outputs requested") if instrument.lower() != 'miri': @@ -399,7 +400,7 @@ def run_calwebb_detector1(input_file_name, short_name, ext_or_exts, instrument, set_permissions(os.path.join(output_dir, file)) logging.info("Removing local files.") - files_to_remove = glob(os.path.join(cal_dir, short_name+"*")) + files_to_remove = glob(os.path.join(cal_dir, short_name + "*")) for file_name in files_to_remove: logging.info("\tRemoving {}".format(file_name)) os.remove(file_name) @@ -472,7 +473,7 @@ def calwebb_detector1_save_jump(input_file_name, instrument, ramp_fit=True, save output_dir = os.path.join(config["transfer_dir"], "outgoing") cmd_name = os.path.join(os.path.dirname(__file__), "run_pipeline.py") - result_file = os.path.join(cal_dir, short_name+"_status.txt") + result_file = os.path.join(cal_dir, short_name + "_status.txt") cores = 'all' status = run_subprocess(cmd_name, "jump", "all", cal_dir, instrument, input_file, @@ -531,7 +532,7 @@ def calwebb_detector1_save_jump(input_file_name, instrument, ramp_fit=True, save files["fitopt_output"] = os.path.join(output_dir, file) logging.info("Removing local files.") - files_to_remove = glob(os.path.join(cal_dir, short_name+"*")) + files_to_remove = glob(os.path.join(cal_dir, short_name + "*")) for file_name in files_to_remove: logging.info("\tRemoving {}".format(file_name)) os.remove(file_name)