Skip to content

Commit

Permalink
incomplete label to daily files (#353) aka fast pipeline with predict…
Browse files Browse the repository at this point in the history
…ed SPICE kernels

* add incomplete label to daily files

* incomplete fits files:  update_ephemeris_headers

* try fix LFS for kernels

* try fix2 fls

* ignore mk.abs files in LFS

* Update stixcore/processing/publish.py

Co-authored-by: Shane Maloney <[email protected]>

* Update stixcore/processing/publish.py

Co-authored-by: Shane Maloney <[email protected]>

* Update stixcore/processing/publish.py

Co-authored-by: Shane Maloney <[email protected]>

* to address review

* add detection and deletion for c/u twins

* fix tests

---------

Co-authored-by: Shane Maloney <[email protected]>
  • Loading branch information
nicHoch and samaloney authored Sep 6, 2023
1 parent 6203981 commit f628d3e
Show file tree
Hide file tree
Showing 24 changed files with 668 additions and 142 deletions.
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
stixcore/data/test/*/** filter=lfs diff=lfs merge=binary -text
*.mk.abs filter= diff= merge= text
Git LFS file not shown
47 changes: 30 additions & 17 deletions stixcore/ephemeris/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ class SpiceKernelType(Enum):
suitable for a given mission period."""

MK_PRED = 'mk', "solo_ANC_soc-pred-mk_*.tm"
"""TODO"""
"""Predicted Meta-kernel files (a.k.a "furnsh" files) that provide lists of kernels
suitable for a given mission period "in the future"."""

PCK = 'pck', "*.*"
"""Kernels that define planetary constants."""
Expand Down Expand Up @@ -96,7 +97,7 @@ def _get_latest(self, kerneltype, *, top_n=1):

top_n = min(len(files), top_n)

return files[0:top_n] if top_n > 1 else files[0]
return files[0:top_n] if top_n > 1 else [files[0]]

def get_latest_sclk(self, *, top_n=1):
return self._get_latest(SpiceKernelType.SCLK, top_n=top_n)
Expand All @@ -107,6 +108,14 @@ def get_latest_lsk(self, *, top_n=1):
def get_latest_mk(self, *, top_n=1):
return self._get_latest(SpiceKernelType.MK, top_n=top_n)

def get_latest_mk_pred(self, *, top_n=1):
return self._get_latest(SpiceKernelType.MK_PRED, top_n=top_n)

def get_latest_mk_and_pred(self, *, top_n=1):
mks = self._get_latest(SpiceKernelType.MK_PRED, top_n=top_n)
mks.extend(self._get_latest(SpiceKernelType.MK, top_n=top_n))
return mks

def get_latest(self, kerneltype=SpiceKernelType.MK, *, top_n=1):
"""Finds the latest version of the spice kernel.
Expand All @@ -131,26 +140,29 @@ def __init__(self, meta_kernel_pathes):
Parameters
----------
meta_kernel_path : `str` or `pathlib.Path`
meta_kernel_path : list of `str` or `pathlib.Path`
Path to the meta kernel
"""
self.meta_kernel_path = list()
# unload all old kernels
spiceypy.kclear()

for meta_kernel_path in np.atleast_1d(meta_kernel_pathes):
try:
self.meta_kernel_path = Path(meta_kernel_path)
if not self.meta_kernel_path.exists():
raise ValueError(f'Meta kernel not found: {self.meta_kernel_path}')
meta_kernel_path = Path(meta_kernel_path)
if not meta_kernel_path.exists():
raise ValueError(f'Meta kernel not found: {meta_kernel_path}')

# look for a twin file *.abs where the path definition is absolute
# if not existing create it on the fly and store it in same location for later reuse

abs_file = self.meta_kernel_path.parent / (self.meta_kernel_path.name + ".abs")
abs_file = meta_kernel_path.parent / (meta_kernel_path.name + ".abs")

if not abs_file.exists():
with self.meta_kernel_path.open('r') as mk:
with meta_kernel_path.open('r') as mk:
original_mk = mk.read()
kernel_dir = str(self.meta_kernel_path.parent.parent.resolve())
kernel_dir = str(meta_kernel_path.parent.parent.resolve())
kernel_dir = kernel_dir.replace('\\', '\\\\')
# spice meta kernel seems to have a max variable length of 80 characters
# https://naif.jpl.nasa.gov/pub/naif/toolkit_docs/C/req/kernel.html#Additional%20Meta-kernel%20Specifications # noqa
Expand All @@ -161,15 +173,15 @@ def __init__(self, meta_kernel_pathes):

with abs_file.open('w') as f:
f.write(new_mk)

logger.info(f"LOADING NEW META KERNEL: {self.meta_kernel_path}")
spiceypy.kclear()
# load the meta kernel
spiceypy.furnsh(str(abs_file))
return
logger.info(f"LOADING NEW META KERNEL: {meta_kernel_path}")
self.meta_kernel_path.append(meta_kernel_path)
except Exception as e:
logger.warning(f"Failed LOADING NEW META KERNEL: {self.meta_kernel_path}\n{e}")
raise ValueError(f"Failed to load any NEW META KERNEL: {meta_kernel_pathes}")
logger.warning(f"Failed LOADING NEW META KERNEL: {meta_kernel_path}\n{e}")

if len(self.meta_kernel_path) == 0:
raise ValueError(f"Failed to load any NEW META KERNEL: {meta_kernel_pathes}")

@staticmethod
def _wrap_value_field(field):
Expand Down Expand Up @@ -433,7 +445,8 @@ def get_fits_headers(self, *, start_time, average_time):
except (SpiceBADPARTNUMBER, SpiceINVALIDSCLKSTRING):
et = spiceypy.utc2et(average_time.isot)

headers = (('SPICE_MK', self.meta_kernel_path.name, 'SPICE meta kernel file'),)
headers = (('SPICE_MK', ', '.join([mk.name for mk in self.meta_kernel_path]),
'SPICE meta kernel file'),)

header_results = defaultdict(lambda: '')
# HeliographicStonyhurst
Expand Down Expand Up @@ -559,4 +572,4 @@ def get_fits_headers(self, *, start_time, average_time):
if 'pytest' in sys.modules:
# only set the global in test scenario
_spm = SpiceKernelManager(Path(CONFIG.get("Paths", "spice_kernels")))
Spice.instance = Spice(_spm.get_latest_mk())
Spice.instance = Spice(_spm.get_latest_mk_and_pred())
9 changes: 6 additions & 3 deletions stixcore/ephemeris/tests/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@ def test_manager_create(spicekernelmanager):


def test_manager_get_latest(spicekernelmanager):
assert (spicekernelmanager.get_latest(SpiceKernelType.MK).name ==
assert (spicekernelmanager.get_latest(SpiceKernelType.MK)[0].name ==
"solo_ANC_soc-flown-mk_V105_20200515_001.tm")
assert (spicekernelmanager.get_latest(SpiceKernelType.SCLK).name ==
assert (spicekernelmanager.get_latest(SpiceKernelType.SCLK)[0].name ==
"solo_ANC_soc-sclk_20200904_V01.tsc")
assert (spicekernelmanager.get_latest(SpiceKernelType.LSK).name ==
assert (spicekernelmanager.get_latest(SpiceKernelType.LSK)[0].name ==
"naif0012.tls")

assert (spicekernelmanager.get_latest(SpiceKernelType.MK_PRED)[0].name ==
"solo_ANC_soc-pred-mk_V106_20201116_001.tm")

with pytest.raises(ValueError) as e:
spicekernelmanager.get_latest(SpiceKernelType.FK)
assert str(e.value).startswith('No current kernel found')
4 changes: 2 additions & 2 deletions stixcore/idb/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,9 +423,9 @@ def convert_version_label(version_label):
`str`
a label like '1.2.3'
"""
if(isinstance(version_label, str)):
if isinstance(version_label, str):
return version_label
if(isinstance(version_label, (list, tuple))):
if isinstance(version_label, (list, tuple)):
return IDB_VERSION_DELIM.join(map(str, version_label))

def _get_filename_for_version(self, version_label):
Expand Down
37 changes: 28 additions & 9 deletions stixcore/io/fits/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from stixcore.soop.manager import SOOPManager, SoopObservationType
from stixcore.time.datetime import SEC_IN_DAY
from stixcore.util.logging import get_logger
from stixcore.util.util import get_complete_file_name_and_path

__all__ = ['SEC_IN_DAY', 'FitsProcessor', 'FitsLBProcessor', 'FitsL0Processor',
'FitsL1Processor', 'FitsL2Processor']
Expand Down Expand Up @@ -64,7 +65,7 @@ class FitsProcessor:
# TODO abstract some general processing pattern methods

@classmethod
def generate_filename(cls, product, *, version, date_range, status=''):
def generate_filename(cls, product, *, version, date_range, status='', header=True):
"""
Generate fits file name with SOLO conventions.
Parameters
Expand All @@ -80,8 +81,6 @@ def generate_filename(cls, product, *, version, date_range, status=''):
`str`
The filename
"""
if status:
status = f'_{status}'

user_req = ''
if 'request_id' in product.control.colnames:
Expand All @@ -91,6 +90,10 @@ def generate_filename(cls, product, *, version, date_range, status=''):
if 'tc_packet_seq_control' in product.control.colnames and user_req != '':
tc_control = f'-{product.control["tc_packet_seq_control"][0]:05d}'

if (status == '' and (not header)
and (product.level != 'LB' and product.fits_daily_file is True)):
status = 'U'

return f'solo_{product.level}_stix-{product.type}-{product.name.replace("_", "-")}' \
f'_{date_range}_V{version:02d}{status}{user_req}{tc_control}.fits'

Expand Down Expand Up @@ -470,7 +473,8 @@ def write_fits(self, product, path=None, *, version=1):
"""
created_files = []
for prod in product.split_to_files():
filename = self.generate_filename(product=prod, version=version)
filename = self.generate_filename(product=prod, version=version, header=False)

# start_day = np.floor((prod.obs_beg.as_float()
# // (1 * u.day).to('s')).value * SEC_IN_DAY).astype(int)
parts = [prod.level, prod.service_type, prod.service_subtype]
Expand All @@ -480,12 +484,19 @@ def write_fits(self, product, path=None, *, version=1):
path.mkdir(parents=True, exist_ok=True)

fitspath = path / filename
fitspath_complete = get_complete_file_name_and_path(fitspath)
if fitspath.exists():
logger.info('Fits file %s exists appending data', fitspath.name)
existing = Product(fitspath)
logger.debug('Existing %s, Current %s', existing, prod)
prod = prod + existing
logger.debug('Combined %s', prod)
elif fitspath_complete.exists():
logger.info('Complete Fits file %s exists appending data', fitspath.name)
existing = Product(fitspath_complete)
logger.debug('Existing %s, Current %s', existing, prod)
prod = prod + existing
logger.debug('Combined %s', prod)

control = prod.control
data = prod.data
Expand Down Expand Up @@ -587,7 +598,7 @@ def add_optional_energy_table(product, hdul):
hdul.append((energy_hdu))

@staticmethod
def generate_filename(product, version=None, status=''):
def generate_filename(product, *, version=None, status='', header=True):
"""
Generate fits file name with SOLO conventions.
Expand All @@ -613,7 +624,7 @@ def generate_filename(product, version=None, status=''):
end_obs = product.scet_timerange.end.coarse
date_range = f'{start_obs:010d}-{end_obs:010d}'
return FitsProcessor.generate_filename(product, version=version,
date_range=date_range, status=status)
date_range=date_range, status=status, header=header)

@classmethod
def generate_primary_header(cls, filename, product, *, version=1):
Expand Down Expand Up @@ -668,15 +679,15 @@ def __init__(self, archive_path):
self.archive_path = archive_path

@classmethod
def generate_filename(cls, product, *, version=1, status=''):
def generate_filename(cls, product, *, version=1, status='', header=True):

date_range = f'{product.utc_timerange.start.strftime("%Y%m%dT%H%M%S")}-' +\
f'{product.utc_timerange.end.strftime("%Y%m%dT%H%M%S")}'
if product.type != 'sci' or product.name == 'burst-aspect':
date_range = product.utc_timerange.center.strftime("%Y%m%d")

return FitsProcessor.generate_filename(product, version=version, date_range=date_range,
status=status)
status=status, header=header)

def generate_primary_header(self, filename, product, *, version=1):
# if product.level != 'L1':
Expand Down Expand Up @@ -754,7 +765,7 @@ def write_fits(self, product, *, version=1):
"""
created_files = []
for prod in product.split_to_files():
filename = self.generate_filename(product=prod, version=version)
filename = self.generate_filename(product=prod, version=version, header=False)
# start_day = np.floor((prod.obs_beg.as_float()
# // (1 * u.day).to('s')).value * SEC_IN_DAY).astype(int)

Expand All @@ -768,12 +779,20 @@ def write_fits(self, product, *, version=1):
path.mkdir(parents=True, exist_ok=True)

fitspath = path / filename
fitspath_complete = get_complete_file_name_and_path(fitspath)

if fitspath.exists():
logger.info('Fits file %s exists appending data', fitspath.name)
existing = Product(fitspath)
logger.debug('Existing %s, Current %s', existing, prod)
prod = prod + existing
logger.debug('Combined %s', prod)
elif fitspath_complete.exists():
logger.info('Complete Fits file %s exists appending data', fitspath.name)
existing = Product(fitspath_complete)
logger.debug('Existing %s, Current %s', existing, prod)
prod = prod + existing
logger.debug('Combined %s', prod)

control = prod.control
data = prod.data
Expand Down
10 changes: 7 additions & 3 deletions stixcore/processing/L0toL1.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from stixcore.products.level0.scienceL0 import NotCombineException
from stixcore.soop.manager import SOOPManager
from stixcore.util.logging import get_logger
from stixcore.util.util import get_complete_file_name

logger = get_logger(__name__)

Expand Down Expand Up @@ -91,9 +92,12 @@ def process_type(files, *, processor, soopmanager, spice_kernel_path, config):
tmp = Product._check_registered_widget(level='L1', service_type=l0.service_type,
service_subtype=l0.service_subtype,
ssid=l0.ssid, data=None, control=None)
l1 = tmp.from_level0(l0, parent=file.name)
files = processor.write_fits(l1)
all_files.extend(files)

# see https://github.com/i4Ds/STIXCore/issues/350
complete_file_name = get_complete_file_name(file.name)
l1 = tmp.from_level0(l0, parent=complete_file_name)

all_files.extend(processor.write_fits(l1))
except NoMatchError:
logger.warning('No match for product %s', l0)
except NotCombineException as nc:
Expand Down
10 changes: 7 additions & 3 deletions stixcore/processing/L1toL2.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from stixcore.products.level0.scienceL0 import NotCombineException
from stixcore.soop.manager import SOOPManager
from stixcore.util.logging import get_logger
from stixcore.util.util import get_complete_file_name_and_path

logger = get_logger(__name__)

Expand Down Expand Up @@ -84,9 +85,12 @@ def process_type(files, *, processor, soopmanager, spice_kernel_path, config):
tmp = Product._check_registered_widget(level='L2', service_type=l1.service_type,
service_subtype=l1.service_subtype,
ssid=l1.ssid, data=None, control=None)
for l2 in tmp.from_level1(l1, parent=file, idlprocessor=idlprocessor):
files = processor.write_fits(l2)
all_files.extend(files)

# see https://github.com/i4Ds/STIXCore/issues/350
complete_file_name = get_complete_file_name_and_path(file)
for l2 in tmp.from_level1(l1, parent=complete_file_name, idlprocessor=idlprocessor):
new_files = processor.write_fits(l2)
all_files.extend(new_files)
# if a batch of X files have summed up run the IDL processing
if idlprocessor.opentasks >= max_idlbatch:
all_files.extend(idlprocessor.process())
Expand Down
14 changes: 9 additions & 5 deletions stixcore/processing/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,11 @@ def log_result(self, gen_files):

def process_tm(path, **args):
with PipelineErrorReport(path) as error_report:
# set the latest spice file for each run
if args['spm'].get_latest_mk() != Spice.instance.meta_kernel_path:
Spice.instance = Spice(args['spm'].get_latest_mk())
logger.info("new spice kernel detected and loaded")
# set the latest spice kernel files for each run
if ((args['spm'].get_latest_mk()[0] not in Spice.instance.meta_kernel_path) or
(args['spm'].get_latest_mk_pred()[0] not in Spice.instance.meta_kernel_path)):
Spice.instance = Spice(args['spm'].get_latest_mk_and_pred())
logger.info("new spice kernels detected and loaded")

lb_files = process_tmtc_to_levelbinary([SOCPacketFile(path)])
logger.info(f"generated LB files: \n{pformat(lb_files)}")
Expand Down Expand Up @@ -360,7 +361,7 @@ def main():
tmpath = Path(CONFIG.get('Paths', 'tm_archive'))
soop_path = Path(CONFIG.get('Paths', 'soop_files'))
spm = SpiceKernelManager(Path(CONFIG.get("Paths", "spice_kernels")))
Spice.instance = Spice(spm.get_latest_mk())
Spice.instance = Spice(spm.get_latest_mk_and_pred())

logging_handler = LoggingEventHandler(logger=logger)

Expand All @@ -382,8 +383,11 @@ def main():
logger.info("Skipping search for unprocessed tm files")

observer.schedule(soop_handler, soop_manager.data_root, recursive=False)
logger.info(f"Start observing {soop_manager.data_root} for SOOPs")
observer.schedule(logging_handler, tmpath, recursive=True)
logger.info(f"Start observing {tmpath} for logging")
observer.schedule(tm_handler, tmpath, recursive=True)
logger.info(f"Start observing {tmpath} for incoming TMs")

observer.start()
try:
Expand Down
2 changes: 1 addition & 1 deletion stixcore/processing/pipeline_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def main():
spicemeta = Path(args.spice_file)
else:
_spm = SpiceKernelManager(Path(CONFIG.get('Paths', 'spice_kernels')))
spicemeta = _spm.get_latest_mk(top_n=30)
spicemeta = _spm.get_latest_mk(top_n=10)

Spice.instance = Spice(spicemeta)

Expand Down
Loading

0 comments on commit f628d3e

Please sign in to comment.