Skip to content

Commit

Permalink
Merge branch 'develop' into remove-readnoise-bokeh
Browse files Browse the repository at this point in the history
  • Loading branch information
mfixstsci authored Oct 20, 2023
2 parents 89b6863 + 9e6c419 commit 7e0836b
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 19 deletions.
7 changes: 4 additions & 3 deletions jwql/instrument_monitors/common_monitors/bad_pixel_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -984,9 +984,10 @@ def process(self, illuminated_raw_files, illuminated_slope_files, flat_file_coun
else:
index += 1

min_dark_time = min(dark_obstimes)
max_dark_time = max(dark_obstimes)
mid_dark_time = instrument_properties.mean_time(dark_obstimes)
if len(dark_slope_files) > 0:
min_dark_time = min(dark_obstimes)
max_dark_time = max(dark_obstimes)
mid_dark_time = instrument_properties.mean_time(dark_obstimes)

# Check whether there are still enough files left to meet the threshold
if illuminated_slope_files is None:
Expand Down
22 changes: 19 additions & 3 deletions jwql/shared_tasks/run_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import shutil
import sys
import time
import traceback

from jwst import datamodels
from jwst.dq_init import DQInitStep
Expand Down Expand Up @@ -51,18 +52,27 @@ def run_pipe(input_file, short_name, work_directory, instrument, outputs, max_co
status_f.write("\t start_dir is {} ({})\n".format(start_dir, type(start_dir)))
status_f.write("\t uncal_file is {} ({})\n".format(uncal_file, type(uncal_file)))
status_f.write(f"\t outputs is {outputs}\n")
sys.stderr.write("Running run_pipe\n")
sys.stderr.write("\t input_file_basename is {} ({})\n".format(input_file_basename, type(input_file_basename)))
sys.stderr.write("\t start_dir is {} ({})\n".format(start_dir, type(start_dir)))
sys.stderr.write("\t uncal_file is {} ({})\n".format(uncal_file, type(uncal_file)))
sys.stderr.write(f"\t outputs is {outputs}\n")

try:
sys.stderr.write("Copying file {} to working directory.\n".format(input_file))
copy_files([input_file], work_directory)
sys.stderr.write("Setting permissions on {}\n".format(uncal_file))
set_permissions(uncal_file)

steps = get_pipeline_steps(instrument)
sys.stderr.write("Pipeline steps initialized to {}\n".format(steps))

# If the input file is a file other than uncal.fits, then we may only need to run a
# subset of steps. Check the completed steps in the input file. Find the latest step
# that has been completed, and skip that plus all prior steps
if 'uncal' not in input_file:
completed_steps = completed_pipeline_steps(input_file)
sys.stderr.write("Steps {} already completed.\n".format(completed_steps))

# Reverse the boolean value, so that now steps answers the question: "Do we need
# to run this step?""
Expand All @@ -80,18 +90,21 @@ def run_pipe(input_file, short_name, work_directory, instrument, outputs, max_co

for step in steps:
if not steps[step]:
sys.stderr.write("Setting last_run to {}.\n".format(step))
last_run = deepcopy(step)

for step in steps:
if step == last_run:
break
if step != last_run:
sys.stderr.write("Setting {} to skip while looking for last_run.\n".format(step))
steps[step] = False

# Set any steps the user specifically asks to skip
for step, step_dict in step_args.items():
if 'skip' in step_dict:
if step_dict['skip']:
sys.stderr.write("Setting step {} to skip by user request.\n".format(step))
steps[step] = False

# Run each specified step
Expand Down Expand Up @@ -161,7 +174,8 @@ def run_pipe(input_file, short_name, work_directory, instrument, outputs, max_co
with open(status_file, "a+") as status_f:
status_f.write("EXCEPTION\n")
status_f.write("{}\n".format(e))
status_f.write("FAILED")
status_f.write("FAILED\n")
status_f.write(traceback.format_exc())
sys.exit(1)

with open(status_file, "a+") as status_f:
Expand Down Expand Up @@ -217,6 +231,7 @@ def run_save_jump(input_file, short_name, work_directory, instrument, ramp_fit=T
params['refpix'] = dict(odd_even_rows=False)

# Default CR rejection threshold is too low
params['jump'] = {}
params['jump']['rejection_threshold'] = 15

# Set up to save jump step output
Expand Down Expand Up @@ -284,7 +299,8 @@ def run_save_jump(input_file, short_name, work_directory, instrument, ramp_fit=T
with open(status_file, "a+") as status_f:
status_f.write("EXCEPTION\n")
status_f.write("{}\n".format(e))
status_f.write("FAILED")
status_f.write("FAILED\n")
status_f.write(traceback.format_exc())
sys.exit(1)

with open(status_file, "a+") as status_f:
Expand Down Expand Up @@ -344,7 +360,7 @@ def run_save_jump(input_file, short_name, work_directory, instrument, ramp_fit=T
outputs = args.outputs
step_args = args.step_args

status_file = os.path.join(working_path, short_name+"_status.txt")
status_file = os.path.join(working_path, short_name + "_status.txt")
with open(status_file, 'w') as out_file:
out_file.write("Starting Process\n")
out_file.write("\tpipeline is {} ({})\n".format(pipe_type, type(pipe_type)))
Expand Down
34 changes: 21 additions & 13 deletions jwql/shared_tasks/shared_tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#! /usr/bin/env python
#! /usr/bin/env python

"""This module contains code for the celery application, which is used for any demanding
work which should be restricted in terms of how many iterations are run simultaneously, or
Expand Down Expand Up @@ -204,7 +204,7 @@ def log_subprocess_output(pipe):
If a subprocess STDOUT has been set to subprocess.PIPE, this function will log each
line to the logging output.
"""
for line in iter(pipe.readline, b''): # b'\n'-separated lines
for line in iter(pipe.readline, b''): # b'\n'-separated lines
logging.info("\t{}".format(line.decode('UTF-8').strip()))


Expand All @@ -224,6 +224,7 @@ def after_setup_celery_logger(logger, **kwargs):
def collect_after_task(**kwargs):
gc.collect()


def convert_step_args_to_string(args_dict):
"""Convert the nested dictionary containing pipeline step parameter keyword/value pairs
to a string so that it can be passed via command line
Expand All @@ -239,17 +240,17 @@ def convert_step_args_to_string(args_dict):
args_str : str
String representation of ``args_dict``
"""
args_str="'{"
args_str = "'{"

for i, step in enumerate(args_dict):
args_str += f'"{step}":'
args_str += '{'
for j, (param, val) in enumerate(args_dict[step].items()):
args_str += f'"{param}":"{val}"'
if j < len(args_dict[step])-1:
if j < len(args_dict[step]) - 1:
args_str += ', '
args_str += "}"
if i < len(args_dict)-1:
if i < len(args_dict) - 1:
args_str += ','
args_str += "}'"
return args_str
Expand Down Expand Up @@ -331,7 +332,7 @@ def run_calwebb_detector1(input_file_name, short_name, ext_or_exts, instrument,
current_dir = os.path.dirname(__file__)
cmd_name = os.path.join(current_dir, "run_pipeline.py")
outputs = ",".join(ext_or_exts)
result_file = os.path.join(cal_dir, short_name+"_status.txt")
result_file = os.path.join(cal_dir, short_name + "_status.txt")
if "all" in ext_or_exts:
logging.info("All outputs requested")
if instrument.lower() != 'miri':
Expand Down Expand Up @@ -399,7 +400,7 @@ def run_calwebb_detector1(input_file_name, short_name, ext_or_exts, instrument,
set_permissions(os.path.join(output_dir, file))

logging.info("Removing local files.")
files_to_remove = glob(os.path.join(cal_dir, short_name+"*"))
files_to_remove = glob(os.path.join(cal_dir, short_name + "*"))
for file_name in files_to_remove:
logging.info("\tRemoving {}".format(file_name))
os.remove(file_name)
Expand All @@ -408,7 +409,7 @@ def run_calwebb_detector1(input_file_name, short_name, ext_or_exts, instrument,


@celery_app.task(name='jwql.shared_tasks.shared_tasks.calwebb_detector1_save_jump')
def calwebb_detector1_save_jump(input_file_name, instrument, ramp_fit=True, save_fitopt=True):
def calwebb_detector1_save_jump(input_file_name, instrument, ramp_fit=True, save_fitopt=True, step_args={}):
"""Call ``calwebb_detector1`` on the provided file, running all
steps up to the ``ramp_fit`` step, and save the result. Optionally
run the ``ramp_fit`` step and save the resulting slope file as well.
Expand All @@ -430,6 +431,13 @@ def calwebb_detector1_save_jump(input_file_name, instrument, ramp_fit=True, save
If ``True``, the file of optional outputs from the ramp fitting
step of the pipeline is saved.
step_args : dict
A dictionary containing custom arguments to supply to individual pipeline steps.
When a step is run, the dictionary will be checked for a key matching the step
name (as defined in jwql.utils.utils.get_pipeline_steps() for the provided
instrument). The value matching the step key should, itself, be a dictionary that
can be spliced in to step.call() via dereferencing (**dict)
Returns
-------
jump_output : str
Expand Down Expand Up @@ -465,11 +473,11 @@ def calwebb_detector1_save_jump(input_file_name, instrument, ramp_fit=True, save
output_dir = os.path.join(config["transfer_dir"], "outgoing")

cmd_name = os.path.join(os.path.dirname(__file__), "run_pipeline.py")
result_file = os.path.join(cal_dir, short_name+"_status.txt")
result_file = os.path.join(cal_dir, short_name + "_status.txt")

cores = 'all'
status = run_subprocess(cmd_name, "jump", "all", cal_dir, instrument, input_file,
short_name, result_file, cores)
short_name, result_file, cores, step_args)

if status[-1].strip() == "SUCCEEDED":
logging.info("Subprocess reports successful finish.")
Expand All @@ -484,7 +492,7 @@ def calwebb_detector1_save_jump(input_file_name, instrument, ramp_fit=True, save
if core_fail:
cores = "half"
status = run_subprocess(cmd_name, "jump", "all", cal_dir, instrument,
input_file, short_name, result_file, cores)
input_file, short_name, result_file, cores, step_args)
if status[-1].strip() == "SUCCEEDED":
logging.info("Subprocess reports successful finish.")
managed = True
Expand All @@ -498,7 +506,7 @@ def calwebb_detector1_save_jump(input_file_name, instrument, ramp_fit=True, save
if core_fail:
cores = "none"
status = run_subprocess(cmd_name, "jump", "all", cal_dir, instrument,
input_file, short_name, result_file, cores)
input_file, short_name, result_file, cores, step_args)
if status[-1].strip() == "SUCCEEDED":
logging.info("Subprocess reports successful finish.")
managed = True
Expand All @@ -524,7 +532,7 @@ def calwebb_detector1_save_jump(input_file_name, instrument, ramp_fit=True, save
files["fitopt_output"] = os.path.join(output_dir, file)

logging.info("Removing local files.")
files_to_remove = glob(os.path.join(cal_dir, short_name+"*"))
files_to_remove = glob(os.path.join(cal_dir, short_name + "*"))
for file_name in files_to_remove:
logging.info("\tRemoving {}".format(file_name))
os.remove(file_name)
Expand Down

0 comments on commit 7e0836b

Please sign in to comment.