-
Notifications
You must be signed in to change notification settings - Fork 28
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
rcal-920 script to create associations based on skycells (#1437)
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
- Loading branch information
1 parent
1ae172e
commit 09e3844
Showing
6 changed files
with
255 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
Added code to take an input list of calibrated WFI exposures and creates associations based on the | ||
skycells that they overlap |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,173 @@ | ||
"""Create an association based on skycells""" | ||
|
||
import argparse | ||
import json | ||
import logging | ||
import sys | ||
|
||
import numpy as np | ||
import roman_datamodels as rdm | ||
|
||
import romancal.patch_match.patch_match as pm | ||
from romancal.associations import asn_from_list | ||
|
||
__all__ = ["skycell_asn"] | ||
|
||
# Configure logging | ||
logger = logging.getLogger(__name__) | ||
logger.addHandler(logging.NullHandler()) | ||
logger.setLevel("INFO") | ||
|
||
|
||
def skycell_asn(self): | ||
"""Create the associaton from the list""" | ||
all_patches = [] | ||
file_list = [] | ||
for file_name in self.parsed.filelist: | ||
cal_file = rdm.open(file_name) | ||
filter_id = cal_file.meta.instrument.optical_element | ||
file_patch_list = pm.find_patch_matches(cal_file.meta.wcs) | ||
logger.info(f"Patch List: {file_name} {file_patch_list[0]}") | ||
file_list.append([file_name, file_patch_list[0]]) | ||
all_patches.append(file_patch_list[0]) | ||
|
||
unique_patches = np.unique(np.concatenate(all_patches)) | ||
for item in unique_patches: | ||
member_list = [] | ||
patch_name = pm.PATCH_TABLE[item]["name"] | ||
for a in file_list: | ||
if np.isin(item, a[1]): | ||
member_list.append(a[0]) | ||
|
||
# grab all the wcs parameters needed for generate_tan_wcs | ||
projcell_info = dict( | ||
[ | ||
("pixel_scale", float(pm.PATCH_TABLE[item]["pixel_scale"])), | ||
("ra_cent", float(pm.PATCH_TABLE[item]["ra_projection_center"])), | ||
("dec_cent", float(pm.PATCH_TABLE[item]["dec_projection_center"])), | ||
("shiftx", float(pm.PATCH_TABLE[item]["x0_projection"])), | ||
("shifty", float(pm.PATCH_TABLE[item]["y0_projection"])), | ||
("nx", int(pm.PATCH_TABLE[item]["nx"])), | ||
("ny", int(pm.PATCH_TABLE[item]["ny"])), | ||
] | ||
) | ||
program_id = member_list[0][1:6] | ||
root_asn_name = self.parsed.output_file_root | ||
product_type = self.parsed.product_type | ||
product_release = self.parsed.release_product | ||
suffix = "i2d" | ||
sep = "_" | ||
asn_file_name = ( | ||
root_asn_name | ||
+ sep | ||
+ patch_name | ||
+ sep | ||
+ product_type | ||
+ sep | ||
+ filter_id | ||
+ sep | ||
+ product_release | ||
+ sep | ||
+ suffix | ||
) | ||
with open(asn_file_name + "_asn.json", "w") as outfile: | ||
prompt_product_asn = asn_from_list.asn_from_list( | ||
member_list, product_name=asn_file_name | ||
) | ||
prompt_product_asn["asn_type"] = "image" | ||
prompt_product_asn["program"] = program_id | ||
prompt_product_asn["target"] = patch_name | ||
# prompt_product_asn["skycell_wcs_info"] = projcell_info | ||
prompt_product_asn["skycell_wcs_info"] = json.dumps(projcell_info) | ||
_, serialized = prompt_product_asn.dump(format="json") | ||
outfile.write(serialized) | ||
|
||
|
||
class Main: | ||
"""Command-line interface for list_to_asn | ||
Parameters | ||
---------- | ||
args: [str, ...], or None | ||
The command line arguments. Can be one of | ||
- `None`: `sys.argv` is then used. | ||
- `[str, ...]`: A list of strings which create the command line | ||
with the similar structure as `sys.argv` | ||
""" | ||
|
||
def __init__(self, args=None): | ||
if args is None: | ||
args = sys.argv[1:] | ||
if isinstance(args, str): | ||
args = args.split(" ") | ||
|
||
parser = argparse.ArgumentParser( | ||
description="Create an association from a list of files", | ||
usage="skycell_asn --product-type visit --release-product prompt *_cal.asdf -o r512", | ||
) | ||
|
||
parser.add_argument( | ||
"-o", | ||
"--output-file-root", | ||
type=str, | ||
required=True, | ||
help="Root string for file to write association to", | ||
) | ||
|
||
parser.add_argument( | ||
"-f", | ||
"--format", | ||
type=str, | ||
default="json", | ||
help='Format of the association files. Default: "%(default)s"', | ||
) | ||
|
||
parser.add_argument( | ||
"--product-type", | ||
type=str, | ||
default="visit", | ||
help="The product type when creating the association", | ||
) | ||
|
||
parser.add_argument( | ||
"--release-product", | ||
type=str, | ||
default="prompt", | ||
help="The release product when creating the association", | ||
) | ||
|
||
parser.add_argument( | ||
"-r", | ||
"--rule", | ||
type=str, | ||
default="DMS_ELPP_Base", | ||
help=( | ||
"The rule to base the association structure on." | ||
' Default: "%(default)s"' | ||
), | ||
) | ||
parser.add_argument( | ||
"-i", | ||
"--id", | ||
type=str, | ||
default="o999", | ||
help='The association candidate id to use. Default: "%(default)s"', | ||
dest="acid", | ||
) | ||
|
||
parser.add_argument( | ||
"filelist", | ||
type=str, | ||
nargs="+", | ||
help="File list to include in the association", | ||
) | ||
|
||
self.parsed = parser.parse_args(args=args) | ||
logger.info("Command-line arguments: %s", self.parsed) | ||
|
||
skycell_asn(self) | ||
|
||
|
||
if __name__ == "__main__": | ||
|
||
Main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
"""Tests for skycell_asn""" | ||
|
||
import pytest | ||
|
||
# from romancal.associations import Association, AssociationRegistry, load_asn | ||
from romancal.associations.skycell_asn import Main | ||
|
||
|
||
def test_cmdline_fails(): | ||
"""Exercise the command line interface""" | ||
|
||
# No arguments | ||
with pytest.raises(SystemExit): | ||
Main([]) | ||
|
||
# Only the association file argument | ||
with pytest.raises(SystemExit): | ||
Main(["-o", "test_asn.json"]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
""" Roman tests for generating associatinos based on skycells""" | ||
|
||
import os | ||
|
||
import pytest | ||
|
||
from romancal.associations import skycell_asn | ||
|
||
|
||
@pytest.mark.bigdata | ||
def test_skycell_asn_generation(rtdata): | ||
"""Test for the generation of associations based on skycells""" | ||
|
||
# This test should generate seven json files | ||
args = [ | ||
"r0000101001001001001_01101_0002_WFI01_cal.asdf", | ||
"r0000101001001001001_01101_0002_WFI10_cal.asdf", | ||
"-o", | ||
"r512", | ||
] | ||
rtdata.get_data("WFI/image/r0000101001001001001_01101_0002_WFI01_cal.asdf") | ||
rtdata.get_data("WFI/image/r0000101001001001001_01101_0002_WFI10_cal.asdf") | ||
|
||
skycell_asn.Main(args) | ||
|
||
# skycell associations that should be generated | ||
output_files = [ | ||
"r512_r274dp63x31y80_visit_F158_prompt_i2d_asn.json", | ||
"r512_r274dp63x31y81_visit_F158_prompt_i2d_asn.json", | ||
"r512_r274dp63x31y82_visit_F158_prompt_i2d_asn.json", | ||
"r512_r274dp63x32y80_visit_F158_prompt_i2d_asn.json", | ||
"r512_r274dp63x32y81_visit_F158_prompt_i2d_asn.json", | ||
"r512_r274dp63x33y80_visit_F158_prompt_i2d_asn.json", | ||
"r512_r274dp63x33y81_visit_F158_prompt_i2d_asn.json", | ||
] | ||
# Test that the json files exist | ||
for file in output_files: | ||
skycell_asn.logger.info(f"Check that the json file exists{file}") | ||
assert os.path.isfile(file) |