diff --git a/.gitignore b/.gitignore index 9bf4efa91..6c20b4aeb 100644 --- a/.gitignore +++ b/.gitignore @@ -66,4 +66,9 @@ pip-wheel-metadata/ # Mac OSX .DS_Store .vscode + +# Other .pytest_cache +.jukit +venv +.venv diff --git a/specutils/io/default_loaders/sdss.py b/specutils/io/default_loaders/sdss.py index fbfdcc001..261034141 100644 --- a/specutils/io/default_loaders/sdss.py +++ b/specutils/io/default_loaders/sdss.py @@ -47,10 +47,10 @@ def _sdss_wcs_to_log_wcs(old_wcs): This function does the conversion from the SDSS WCS to FITS WCS. """ w0 = old_wcs.wcs.crval[0] - w1 = old_wcs.wcs.cd[0,0] - crval = 10 ** w0 + w1 = old_wcs.wcs.cd[0, 0] + crval = 10**w0 cdelt = crval * w1 * np.log(10) - cunit = Unit('Angstrom') + cunit = Unit("Angstrom") ctype = "WAVE-LOG" w = WCS(naxis=1) @@ -70,11 +70,11 @@ def spec_identify(origin, *args, **kwargs): """ # Test if fits has extension of type BinTable and check for spec-specific keys with read_fileobj_or_hdulist(*args, **kwargs) as hdulist: - return (hdulist[0].header.get('TELESCOP') == 'SDSS 2.5-M' and - hdulist[0].header.get('FIBERID', 0) > 0 and - len(hdulist) > 1 and - (isinstance(hdulist[1], fits.BinTableHDU) and - hdulist[1].header.get('TTYPE3').lower() == 'ivar')) + return (hdulist[0].header.get("TELESCOP") == "SDSS 2.5-M" + and hdulist[0].header.get("FIBERID", 0) > 0 + and len(hdulist) > 1 + and (isinstance(hdulist[1], fits.BinTableHDU) + and hdulist[1].header.get("TTYPE3").lower() == "ivar")) def spSpec_identify(origin, *args, **kwargs): @@ -85,10 +85,10 @@ def spSpec_identify(origin, *args, **kwargs): # Test telescope keyword and check if primary HDU contains data # consistent with spSpec format with read_fileobj_or_hdulist(*args, **kwargs) as hdulist: - return (hdulist[0].header.get('TELESCOP') == 'SDSS 2.5-M' and - hdulist[0].header.get('FIBERID', 0) > 0 and - isinstance(hdulist[0].data, np.ndarray) and - hdulist[0].data.shape[0] == 5) + return (hdulist[0].header.get("TELESCOP") == "SDSS 2.5-M" + and hdulist[0].header.get("FIBERID", 0) > 0 + and isinstance(hdulist[0].data, np.ndarray) + and hdulist[0].data.shape[0] == 5) def spPlate_identify(origin, *args, **kwargs): @@ -99,14 +99,16 @@ def spPlate_identify(origin, *args, **kwargs): # Test telescope keyword and check if primary HDU contains data # consistent with spSpec format with read_fileobj_or_hdulist(*args, **kwargs) as hdulist: - return (hdulist[0].header.get('TELESCOP') == 'SDSS 2.5-M' and - hdulist[0].header.get('FIBERID', 0) <= 0 and - isinstance(hdulist[0].data, np.ndarray) and - hdulist[0].data.shape[0] > 5) + return (hdulist[0].header.get("TELESCOP") == "SDSS 2.5-M" + and hdulist[0].header.get("FIBERID", 0) <= 0 + and isinstance(hdulist[0].data, np.ndarray) + and hdulist[0].data.shape[0] > 5) @data_loader( - label="SDSS-III/IV spec", identifier=spec_identify, extensions=['fits'], + label="SDSS-III/IV spec", + identifier=spec_identify, + extensions=["fits"], priority=10, ) def spec_loader(file_obj, **kwargs): @@ -127,30 +129,37 @@ def spec_loader(file_obj, **kwargs): """ with read_fileobj_or_hdulist(file_obj, **kwargs) as hdulist: header = hdulist[0].header - meta = {'header': header} + meta = {"header": header} - bunit = header.get('BUNIT', '1e-17 erg / (Angstrom cm2 s)') - if 'Ang' in bunit and 'strom' not in bunit: - bunit = bunit.replace('Ang', 'Angstrom') + bunit = header.get("BUNIT", "1e-17 erg / (Angstrom cm2 s)") + if "Ang" in bunit and "strom" not in bunit: + bunit = bunit.replace("Ang", "Angstrom") flux_unit = Unit(bunit) # spectrum is in HDU 1 - flux = hdulist[1].data['flux'] * flux_unit + flux = hdulist[1].data["flux"] * flux_unit - uncertainty = InverseVariance(hdulist[1].data['ivar'] / flux_unit**2) + uncertainty = InverseVariance(hdulist[1].data["ivar"] / flux_unit**2) - dispersion = 10**hdulist[1].data['loglam'] - dispersion_unit = Unit('Angstrom') + dispersion = 10**hdulist[1].data["loglam"] + dispersion_unit = Unit("Angstrom") - mask = hdulist[1].data['and_mask'] != 0 + mask = hdulist[1].data["and_mask"] != 0 - return Spectrum1D(flux=flux, spectral_axis=dispersion * dispersion_unit, - uncertainty=uncertainty, meta=meta, mask=mask) + return Spectrum1D( + flux=flux, + spectral_axis=dispersion * dispersion_unit, + uncertainty=uncertainty, + meta=meta, + mask=mask, + ) @data_loader( - label="SDSS-I/II spSpec", identifier=spSpec_identify, - extensions=['fit', 'fits'], priority=10, + label="SDSS-I/II spSpec", + identifier=spSpec_identify, + extensions=["fit", "fits"], + priority=10, ) def spSpec_loader(file_obj, **kwargs): """ @@ -176,32 +185,37 @@ def spSpec_loader(file_obj, **kwargs): with read_fileobj_or_hdulist(file_obj, **kwargs) as hdulist: header = hdulist[0].header # name = header.get('NAME') - meta = {'header': header} + meta = {"header": header} wcs = WCS(header).dropaxis(1) - bunit = header.get('BUNIT', '1e-17 erg / (Angstrom cm2 s)') + bunit = header.get("BUNIT", "1e-17 erg / (Angstrom cm2 s)") # fix mutilated flux unit - bunit = bunit.replace('/cm/s/Ang', '/ (Angstrom cm2 s)') - if 'Ang' in bunit and 'strom' not in bunit: - bunit = bunit.replace('Ang', 'Angstrom') + bunit = bunit.replace("/cm/s/Ang", "/ (Angstrom cm2 s)") + if "Ang" in bunit and "strom" not in bunit: + bunit = bunit.replace("Ang", "Angstrom") flux_unit = Unit(bunit) flux = hdulist[0].data[0, :] * flux_unit uncertainty = StdDevUncertainty(hdulist[0].data[2, :] * flux_unit) # Fix the WCS if it is claimed to be linear - if header.get('DC-Flag', 1) == 1: + if header.get("DC-Flag", 1) == 1: fixed_wcs = _sdss_wcs_to_log_wcs(wcs) else: fixed_wcs = wcs mask = hdulist[0].data[3, :] != 0 - return Spectrum1D(flux=flux, wcs=fixed_wcs, - uncertainty=uncertainty, meta=meta, mask=mask) + return Spectrum1D(flux=flux, + wcs=fixed_wcs, + uncertainty=uncertainty, + meta=meta, + mask=mask) -@data_loader(label="SDSS spPlate", identifier=spPlate_identify, extensions=['fits']) +@data_loader(label="SDSS spPlate", + identifier=spPlate_identify, + extensions=["fits"]) def spPlate_loader(file_obj, limit=None, **kwargs): """ Loader for SDSS spPlate files, reading flux spectra from all fibres into single array. @@ -223,26 +237,30 @@ def spPlate_loader(file_obj, limit=None, **kwargs): """ with read_fileobj_or_hdulist(file_obj, **kwargs) as hdulist: header = hdulist[0].header - meta = {'header': header} + meta = {"header": header} wcs = WCS(header).dropaxis(1) if limit is None: - limit = header['NAXIS2'] + limit = header["NAXIS2"] - bunit = header.get('BUNIT', '1e-17 erg / (Angstrom cm2 s)') - if 'Ang' in bunit and 'strom' not in bunit: - bunit = bunit.replace('Ang', 'Angstrom') + bunit = header.get("BUNIT", "1e-17 erg / (Angstrom cm2 s)") + if "Ang" in bunit and "strom" not in bunit: + bunit = bunit.replace("Ang", "Angstrom") flux_unit = Unit(bunit) flux = hdulist[0].data[0:limit, :] * flux_unit - uncertainty = InverseVariance(hdulist[1].data[0:limit, :] / flux_unit**2) + uncertainty = InverseVariance(hdulist[1].data[0:limit, :] / + flux_unit**2) # Fix the WCS if it is claimed to be linear - if header.get('DC-Flag', 1) == 1: + if header.get("DC-Flag", 1) == 1: fixed_wcs = _sdss_wcs_to_log_wcs(wcs) else: fixed_wcs = wcs mask = hdulist[2].data[0:limit, :] != 0 - meta['plugmap'] = Table.read(hdulist[5])[0:limit] + meta["plugmap"] = Table.read(hdulist[5])[0:limit] - return Spectrum1D(flux=flux, wcs=fixed_wcs, - uncertainty=uncertainty, meta=meta, mask=mask) + return Spectrum1D(flux=flux, + wcs=fixed_wcs, + uncertainty=uncertainty, + meta=meta, + mask=mask) diff --git a/specutils/io/default_loaders/sdss_v.py b/specutils/io/default_loaders/sdss_v.py new file mode 100644 index 000000000..514432978 --- /dev/null +++ b/specutils/io/default_loaders/sdss_v.py @@ -0,0 +1,605 @@ +"""Register reader functions for various spectral formats.""" +from typing import Optional + +import numpy as np +from astropy.units import Unit, Quantity, Angstrom +from astropy.nddata import StdDevUncertainty, InverseVariance +from astropy.io.fits import HDUList, BinTableHDU, ImageHDU + +from ...spectra import Spectrum1D, SpectrumList +from ..registers import data_loader +from ..parsing_utils import read_fileobj_or_hdulist + +__all__ = [ + "load_sdss_apVisit_1D", + "load_sdss_apVisit_list", + "load_sdss_apStar_1D", + "load_sdss_apStar_list", + "load_sdss_spec_1D", + "load_sdss_spec_list", + "load_sdss_mwm_1d", + "load_sdss_mwm_list", +] + + +def apVisit_identify(origin, *args, **kwargs): + """ + Check whether given file is FITS. This is used for Astropy I/O + Registry. + + Updated for SDSS-V outputs. + """ + with read_fileobj_or_hdulist(*args, **kwargs) as hdulist: + # Test if fits has extension of type BinTable and check for + # apVisit-specific keys + return (hdulist[0].header.get("SURVEY").lower() == "sdss-v" + and len(hdulist) > 4 + and hdulist[1].header.get("BUNIT", "none").startswith("Flux") + and hdulist[2].header.get("BUNIT", "none").startswith("Flux") + and hdulist[4].header.get("BUNIT", "none").startswith("Wave")) + + +def apStar_identify(origin, *args, **kwargs): + """ + Check whether given file is FITS. This is used for Astropy I/O + Registry. + + Updated for SDSS-V outputs. + """ + with read_fileobj_or_hdulist(*args, **kwargs) as hdulist: + # Test if fits has extension of type BinTable and check for + # apogee-specific keys + keys for individual apVisits + return ("APRED" in hdulist[0].header.keys() and len(hdulist) > 9 + and "NVISITS" in hdulist[0].header.keys() + and isinstance(hdulist[1], ImageHDU) + and hdulist[1].header.get("BUNIT", "none").startswith("Flux") + and hdulist[2].header.get("BUNIT", "none").startswith("Err")) + + +def spec_sdss5_identify(origin, *args, **kwargs): + """ + Check whether given input is FITS and has SDSS-V spec type + BINTABLE in first extension. This is used for Astropy I/O Registry. + + Derived from SDSS-III/IV method in sdss.py. Single change (FIBERID key). + """ + # Test if fits has extension of type BinTable and check for spec-specific keys + with read_fileobj_or_hdulist(*args, **kwargs) as hdulist: + return ( + (hdulist[0].header.get("TELESCOP").lower() == "sdss 2.5-m") # and + # hdulist[0].header.get("OBSERVAT").lower() in ["apo", "lco"] + and (hdulist[1].header.get("TTYPE1").lower() == "flux") and + (hdulist[1].header.get("TTYPE2").lower() == "loglam") and + (len(hdulist) > 1) and (isinstance(hdulist[1], BinTableHDU)) and + (hdulist[1].header.get("TTYPE3").lower() == "ivar")) + + +def mwm_identify(origin, *args, **kwargs): + """ + Check whether given input is FITS and has SDSS-V mwm type + BINTABLE in all 4 extensions. This is used for Astropy I/O Registry. + """ + with read_fileobj_or_hdulist(*args, **kwargs) as hdulist: + return (("V_ASTRA" in hdulist[0].header.keys()) and len(hdulist) > 0 + and ("SDSS_ID" in hdulist[0].header.keys()) + and (isinstance(hdulist[i], BinTableHDU) for i in range(1, 5))) + + +def _wcs_log_linear(naxis, cdelt, crval): + """ + Convert WCS from log to linear. + """ + return 10**(np.arange(naxis) * cdelt + crval) + + +def _fetch_flux_unit(hdu): + """ + Fetch the flux unit by accessing the BUNIT key of the given HDU. + + Parameters + ---------- + hdu : HDUImage + Flux or e_flux HDUImage object. + + Returns + ------- + flux_unit : Unit + Flux unit as Astropy Unit object. + + """ + flux_unit = (hdu.header.get("BUNIT").replace("(", "").replace( + ")", "").split(" ")[-2:]) + flux_unit = "".join(flux_unit) + if "Ang" in flux_unit and "strom" not in flux_unit: + flux_unit = flux_unit.replace("Ang", "Angstrom") + + flux_unit = flux_unit.replace("s/cm^2/Angstrom", "(s cm2 Angstrom)") + + return Unit(flux_unit) + + +# APOGEE files +@data_loader( + "SDSS-V apStar", + identifier=apStar_identify, + dtype=Spectrum1D, + priority=10, + extensions=["fits"], +) +def load_sdss_apStar_1D(file_obj, idx: int = 0, **kwargs): + """ + Load an apStar file as a Spectrum1D. + + Parameters + ---------- + file_obj : str, file-like, or HDUList + FITS file name, file object, or HDUList. + idx : int + The specified visit to load. Defaults to coadd (index 0). + + Returns + ------- + Spectrum1D + The spectrum contained in the file + + """ + # intialize slicer + + with read_fileobj_or_hdulist(file_obj, memmap=False, **kwargs) as hdulist: + # Obtain spectral axis from header (HDU[1]) + wavelength = _wcs_log_linear( + hdulist[1].header.get("NAXIS1"), + hdulist[1].header.get("CDELT1"), + hdulist[1].header.get("CRVAL1"), + ) + spectral_axis = Quantity(wavelength, unit=Angstrom) + + # Obtain flux and e_flux from data (HDU[1] & HDU[2]) + flux_unit = _fetch_flux_unit(hdulist[1]) + flux = Quantity(hdulist[1].data[idx], unit=flux_unit) + e_flux = StdDevUncertainty(hdulist[2].data[idx]) + + # reduce flux array if 1D in 2D np array + # NOTE: bypasses jdaviz specviz NotImplementedError, but could be the expected output for specutils + if flux.shape[0] == 1: + flux = flux[0] + e_flux = e_flux[0] + + # Add header + bitmask + meta = dict() + meta["header"] = hdulist[0].header + # SDSS masks are arrays of bit values storing multiple bool conds + mask = hdulist[3].data[idx] + # NOTE: specutils considers 0/False as valid values, simlar to numpy convention + mask = mask != 0 + + return Spectrum1D( + spectral_axis=spectral_axis, + flux=flux, + uncertainty=e_flux, + mask=mask, + meta=meta, + ) + + +@data_loader( + "SDSS-V apStar multi", + identifier=apStar_identify, + dtype=SpectrumList, + priority=10, + extensions=["fits"], +) +def load_sdss_apStar_list(file_obj, **kwargs): + """ + Load an apStar file as a SpectrumList. + + Parameters + ---------- + file_obj : str, file-like, or HDUList + FITS file name, file object, or HDUList. + + Returns + ------- + SpectrumList + The spectra contained in the file + """ + with read_fileobj_or_hdulist(file_obj, **kwargs) as hdulist: + nvisits = hdulist[0].header.get("NVISITS") + if nvisits <= 1: + raise ValueError( + "Only 1 visit in this file. Use Spectrum1D.read() instead.") + return SpectrumList([ + load_sdss_apStar_1D(file_obj, idx=i, **kwargs) + for i in range(nvisits) + ]) + + +@data_loader( + "SDSS-V apVisit", + identifier=apVisit_identify, + dtype=Spectrum1D, + priority=10, + extensions=["fits"], +) +def load_sdss_apVisit_1D(file_obj, **kwargs): + """ + Load an apVisit file as a Spectrum1D. + + Parameters + ---------- + file_obj : str, file-like, or HDUList + FITS file name, file object, or HDUList. + + Returns + ------- + Spectrum1D + The spectrum contained in the file + """ + flux_unit = Unit("1e-17 erg / (Angstrom cm2 s)") + + with read_fileobj_or_hdulist(file_obj, memmap=False, **kwargs) as hdulist: + # Fetch and flatten spectrum parameters + spectral_axis = Quantity(hdulist[4].data.flatten(), unit=Angstrom) + flux_unit = _fetch_flux_unit(hdulist[1]) + flux = Quantity(hdulist[1].data.flatten(), unit=flux_unit) + e_flux = StdDevUncertainty(hdulist[2].data.flatten()) + + # Get metadata and attach bitmask and MJD + meta = dict() + meta["header"] = hdulist[0].header + mask = hdulist[3].data.flatten() + # NOTE: specutils considers 0/False as valid values, simlar to numpy convention + mask = mask != 0 + + return Spectrum1D(spectral_axis=spectral_axis, + flux=flux, + mask=mask, + uncertainty=e_flux, + meta=meta) + + +@data_loader( + "SDSS-V apVisit multi", + identifier=apVisit_identify, + dtype=SpectrumList, + priority=10, + extensions=["fits"], +) +def load_sdss_apVisit_list(file_obj, **kwargs): + """ + Load an apVisit file as a SpectrumList. + + Parameters + ---------- + file_obj : str, file-like, or HDUList + FITS file name, file object, or HDUList. + + Returns + ------- + SpectrumList + The spectra from each chip contained in the file + """ + spectra = SpectrumList() + with read_fileobj_or_hdulist(file_obj, memmap=False, **kwargs) as hdulist: + # Get metadata + flux_unit = _fetch_flux_unit(hdulist[1]) + common_meta = dict() + common_meta["header"] = hdulist[0].header + + for chip in range(hdulist[1].data.shape[0]): + # Fetch spectral axis and flux, and E_flux + spectral_axis = Quantity(hdulist[4].data[chip], unit=Angstrom) + flux = Quantity(hdulist[1].data[chip], unit=flux_unit) + e_flux = StdDevUncertainty(hdulist[2].data[chip]) + + # Copy metadata for each, adding chip bitmask and MJD to each + meta = common_meta.copy() + mask = hdulist[3].data[chip] + # NOTE: specutils considers 0/False as valid values, simlar to numpy convention + mask = mask != 0 + + spectra.append( + Spectrum1D( + spectral_axis=spectral_axis, + flux=flux, + mask=mask, + uncertainty=e_flux, + meta=meta, + )) + + return spectra + + +# BOSS REDUX products (specLite, specFull, custom coadd files, etc) + +@data_loader( + "SDSS-V spec", + identifier=spec_sdss5_identify, + dtype=Spectrum1D, + priority=5, + extensions=["fits"], +) +def load_sdss_spec_1D(file_obj, *args, hdu: Optional[int] = None, **kwargs): + """ + Load a given BOSS spec file as a Spectrum1D object. + + Parameters + ---------- + file_obj : str, file-like, or HDUList + FITS file name, file object, or HDUList.. + hdu : int + The specified HDU to load a given spectra from. + + Returns + ------- + Spectrum1D + The spectrum contained in the file at the specified HDU. + """ + if hdu is None: + # TODO: how should we handle this -- multiple things in file, but the user cannot choose. + hdu = 1 # defaulting to coadd + # raise ValueError("HDU not specified! Please specify a HDU to load.") + elif hdu in [2, 3, 4]: + raise ValueError("Invalid HDU! HDU{} is not spectra.".format(hdu)) + with read_fileobj_or_hdulist(file_obj, memmap=False, **kwargs) as hdulist: + # directly load the coadd at HDU1 + return _load_BOSS_HDU(hdulist, hdu, **kwargs) + + +@data_loader( + "SDSS-V spec multi", + identifier=spec_sdss5_identify, + dtype=SpectrumList, + priority=5, + extensions=["fits"], +) +def load_sdss_spec_list(file_obj, **kwargs): + """ + Load a given BOSS spec file as a SpectrumList object. + + Parameters + ---------- + file_obj : str, file-like, or HDUList + FITS file name, file object, or HDUList.. + + Returns + ------- + SpectrumList + The spectra contained in the file. + """ + with read_fileobj_or_hdulist(file_obj, memmap=False, **kwargs) as hdulist: + spectra = list() + for hdu in range(1, len(hdulist)): + if hdulist[hdu].name in ["SPALL", "ZALL", "ZLINE"]: + continue + spectra.append(_load_BOSS_HDU(hdulist, hdu, **kwargs)) + return SpectrumList(spectra) + + +def _load_BOSS_HDU(hdulist: HDUList, hdu: int, **kwargs): + """ + HDU processor for BOSS spectra redux HDU's + + Parameters + ---------- + hdulist: HDUList + HDUList generated from imported file. + hdu : int + Specified HDU to load. + + Returns + ------- + Spectrum1D + The spectrum contained in the file + + """ + # Fetch flux, e_flux, spectral axis, and units + # NOTE: flux unit info is not in BOSS spec files. + flux_unit = Unit("1e-17 erg / (Angstrom cm2 s)") # NOTE: hardcoded unit + spectral_axis = Quantity(10**hdulist[hdu].data["LOGLAM"], unit=Angstrom) + + flux = Quantity(hdulist[hdu].data["FLUX"], unit=flux_unit) + # no e_flux, so we use inverse of variance + ivar = InverseVariance(hdulist[hdu].data["IVAR"]) + + # Fetch mask -- stored in two ways + try: + mask = hdulist[hdu].data["MASK"] + except KeyError: + mask = hdulist[hdu].data["OR_MASK"] + + # NOTE: specutils considers 0/False as valid values, simlar to numpy convention + mask = mask != 0 + + # Fetch metadata + # NOTE: specFull file does not include S/N value, but this gets calculated + # for mwmVisit/mwmStar files when they are created. + meta = dict() + meta["header"] = hdulist[0].header + meta["name"] = hdulist[hdu].name + + return Spectrum1D(spectral_axis=spectral_axis, + flux=flux, + uncertainty=ivar, + mask=mask, + meta=meta) + + +# MWM LOADERS +@data_loader( + "SDSS-V mwm", + identifier=mwm_identify, + dtype=Spectrum1D, + priority=20, + extensions=["fits"], +) +def load_sdss_mwm_1d(file_obj, hdu: Optional[int] = None, **kwargs): + """ + Load an unspecified spec file as a Spectrum1D. + + Parameters + ---------- + file_obj : str, file-like, or HDUList + FITS file name, file object, or HDUList.. + hdu : int + Specified HDU to load. + + Returns + ------- + Spectrum1D + The spectrum contained in the file + """ + with read_fileobj_or_hdulist(file_obj, memmap=False, **kwargs) as hdulist: + # Check if file is empty first + datasums = [] + for i in range(1, len(hdulist)): + datasums.append(int(hdulist[i].header.get("DATASUM"))) + if (np.array(datasums) == 0).all(): + raise ValueError("Specified file is empty.") + + # TODO: how should we handle this -- multiple things in file, but the user cannot choose. + if hdu is None: + for i in range(len(hdulist)): + if hdulist[i].header.get("DATASUM") != "0": + hdu = i + break + + return _load_mwmVisit_or_mwmStar_hdu(hdulist, hdu, **kwargs) + + +@data_loader( + "SDSS-V mwm multi", + identifier=mwm_identify, + dtype=SpectrumList, + priority=20, + extensions=["fits"], +) +def load_sdss_mwm_list(file_obj, **kwargs): + """ + Load an mwmStar spec file as a SpectrumList. + + Parameters + ---------- + file_obj : str, file-like, or HDUList + FITS file name, file object, or HDUList. + + Returns + ------- + SpectrumList + The spectra contained in the file, where: + Spectrum1D + A given spectra of nD flux + None + If there are no spectra for that spectrograph/observatory + """ + spectra = SpectrumList() + with read_fileobj_or_hdulist(file_obj, memmap=False, **kwargs) as hdulist: + # Check if file is empty first + datasums = [] + for hdu in range(1, len(hdulist)): + datasums.append(int(hdulist[hdu].header.get("DATASUM"))) + if (np.array(datasums) == 0).all(): + raise ValueError("Specified file is empty.") + + # Now load file + for hdu in range(1, len(hdulist)): + if hdulist[hdu].header.get("DATASUM") == "0": + # Skip zero data HDU's + # TODO: validate if we want this printed warning or not. + # it might get annoying & fill logs with useless alerts. + print("WARNING: HDU{} ({}) is empty.".format( + hdu, hdulist[hdu].name)) + continue + spectra.append(_load_mwmVisit_or_mwmStar_hdu(hdulist, hdu)) + return spectra + + +def _load_mwmVisit_or_mwmStar_hdu(hdulist: HDUList, hdu: int, **kwargs): + """ + HDU loader subfunction for MWM files + + Parameters + ---------- + hdulist: HDUList + HDUList generated from imported file. + hdu: int + Specified HDU to load. + + Returns + ------- + Spectrum1D + The spectrum with nD flux contained in the HDU. + + """ + if hdulist[hdu].header.get("DATASUM") == "0": + raise IndexError( + "Attemped to load an empty HDU specified at HDU{}".format(hdu)) + + # Fetch wavelength + # encoded as WCS for visit, and 'wavelength' for star + try: + wavelength = np.array(hdulist[hdu].data["wavelength"])[0] + except KeyError: + wavelength = _wcs_log_linear( + hdulist[hdu].header.get("NPIXELS"), + hdulist[hdu].header.get("CDELT"), + hdulist[hdu].header.get("CRVAL"), + ) + finally: + if wavelength is None: + raise ValueError( + "Couldn't find wavelength data in HDU{}.".format(hdu)) + spectral_axis = Quantity(wavelength, unit=Angstrom) + + # Fetch flux, e_flux + # NOTE:: flux info is not written into mwm files + flux_unit = Unit("1e-17 erg / (Angstrom cm2 s)") # NOTE: hardcoded unit + flux = Quantity(hdulist[hdu].data["flux"], unit=flux_unit) + e_flux = InverseVariance(array=hdulist[hdu].data["ivar"]) + + # Collect bitmask + mask = hdulist[hdu].data["pixel_flags"] + # NOTE: specutils considers 0/False as valid values, simlar to numpy convention + mask = mask != 0 + + # collapse shape if 1D spectra in 2D array + # NOTE: this fixes a jdaviz handling bug for 2D of shape 1, + # it could be that it's expected to be parsed this way. + if flux.shape[0] == 1: + flux = flux[0] + e_flux = e_flux[0] + mask = mask[0] + + # Create metadata + meta = dict() + meta["header"] = hdulist[0].header + + # Add SNR to metadata + meta["snr"] = np.array(hdulist[hdu].data["snr"]) + + # Add identifiers (obj, telescope, mjd, datatype) + # TODO: need to see what metadata we're interested in for the MWM files. + meta["telescope"] = hdulist[hdu].data["telescope"] + meta["instrument"] = hdulist[hdu].header.get("INSTRMNT") + try: + meta["obj"] = hdulist[hdu].data["obj"] + except KeyError: + pass + try: + meta["date"] = hdulist[hdu].data["date_obs"] + meta["mjd"] = hdulist[hdu].data["mjd"] + meta["datatype"] = "mwmVisit" + except KeyError: + meta["mjd"] = (str(hdulist[hdu].data["min_mjd"][0]) + "->" + + str(hdulist[hdu].data["max_mjd"][0])) + meta["datatype"] = "mwmStar" + finally: + meta["name"] = hdulist[hdu].name + + return Spectrum1D( + spectral_axis=spectral_axis, + flux=flux, + uncertainty=e_flux, + mask=mask, + meta=meta, + ) diff --git a/specutils/io/default_loaders/tests/test_sdss_v.py b/specutils/io/default_loaders/tests/test_sdss_v.py new file mode 100644 index 000000000..f70490482 --- /dev/null +++ b/specutils/io/default_loaders/tests/test_sdss_v.py @@ -0,0 +1,724 @@ +import numpy as np +import pytest + +from astropy.io import fits +from astropy.units import Unit, Angstrom + +from specutils import Spectrum1D, SpectrumList + + +def generate_apogee_hdu(observatory="APO", with_wl=True, datasum="0"): + wl = (10**(4.179 + 6e-6 * np.arange(8575))).reshape((1, -1)) + flux = np.zeros_like(wl) + ivar = np.zeros_like(wl) + pixel_flags = np.zeros_like(wl) + continuum = np.zeros_like(wl) + nmf_rectified_model_flux = np.zeros_like(wl) + + columns = [ + fits.Column(name="spectrum_pk_id", array=[159783564], format="K"), + fits.Column(name="release", array=[b"sdss5"], format="5A"), + fits.Column(name="filetype", array=[b"apStar"], format="6A"), + fits.Column(name="v_astra", array=[b"0.5.0"], format="5A"), + fits.Column(name="healpix", array=[3], format="J"), + fits.Column(name="sdss_id", array=[42], format="K"), + fits.Column(name="apred", array=[b"1.2"], format="3A"), + fits.Column(name="obj", array=[b"2M19534321+6705175"], format="18A"), + fits.Column(name="telescope", array=[b"apo25m"], format="6A"), + fits.Column(name="min_mjd", array=[59804], format="J"), + fits.Column(name="max_mjd", array=[59866], format="J"), + fits.Column(name="n_entries", array=[-1], format="J"), + fits.Column(name="n_visits", array=[5], format="J"), + fits.Column(name="n_good_visits", array=[5], format="J"), + fits.Column(name="n_good_rvs", array=[5], format="J"), + fits.Column(name="snr", array=[46.56802], format="E"), + fits.Column(name="mean_fiber", array=[256.0], format="E"), + fits.Column(name="std_fiber", array=[0.0], format="E"), + fits.Column(name="spectrum_flags", array=[1048576], format="J"), + fits.Column(name="v_rad", array=[-56.7284381], format="E"), + fits.Column(name="e_v_rad", array=[5.35407624], format="E"), + fits.Column(name="std_v_rad", array=[10.79173857], format="E"), + fits.Column(name="median_e_v_rad", array=[16.19418386], format="E"), + fits.Column(name="doppler_teff", array=[7169.0107], format="E"), + fits.Column(name="doppler_e_teff", array=[9.405238], format="E"), + fits.Column(name="doppler_logg", array=[2.981389], format="E"), + fits.Column(name="doppler_e_logg", array=[0.01916536], format="E"), + fits.Column(name="doppler_fe_h", array=[-1.20532212], format="E"), + fits.Column(name="doppler_e_fe_h", array=[0.0093738], format="E"), + fits.Column(name="doppler_rchi2", array=[1.1424173], format="E"), + fits.Column(name="doppler_flags", array=[0], format="J"), + fits.Column(name="xcorr_v_rad", array=[np.nan], format="E"), + fits.Column(name="xcorr_v_rel", array=[np.nan], format="E"), + fits.Column(name="xcorr_e_v_rel", array=[np.nan], format="E"), + fits.Column(name="ccfwhm", array=[np.nan], format="E"), + fits.Column(name="autofwhm", array=[np.nan], format="E"), + fits.Column(name="n_components", array=[1], format="J"), + ] + if with_wl: + columns.append( + fits.Column(name="wavelength", + array=wl, + format="8575E", + dim="(8575)")) + columns += [ + fits.Column(name="flux", array=flux, format="8575E", dim="(8575)"), + fits.Column(name="ivar", array=ivar, format="8575E", dim="(8575)"), + fits.Column(name="pixel_flags", + array=pixel_flags, + format="8575E", + dim="(8575)"), + fits.Column(name="continuum", + array=continuum, + format="8575E", + dim="(8575)"), + fits.Column( + name="nmf_rectified_model_flux", + array=nmf_rectified_model_flux, + format="8575E", + dim="(8575)", + ), + fits.Column(name="nmf_rchi2", array=[2.3391197], format="E"), + fits.Column(name="nmf_flags", array=[0], format="J"), + ] + header = fits.Header(cards=[ + ("EXTNAME", f"APOGEE/{observatory}", ""), + ("OBSRVTRY", observatory, None), + ("INSTRMNT", "APOGEE", None), + ("CRVAL", 4.179, None), + ("CDELT", 6e-6, None), + ("CTYPE", "LOG-LINEAR", None), + ("CUNIT", "Angstrom (Vacuum)"), + ("CRPIX", 1, None), + ("DC-FAG", 1, None), + ("NPIXELS", 8575, None), + ("DATASUM", datasum, "data unit checksum updated 2023-11-13T03:21:47"), + ]) + + return fits.BinTableHDU.from_columns(columns, header=header) + + +def generate_boss_hdu(observatory="APO", with_wl=True, datasum="0"): + wl = (10**(3.5523 + 1e-4 * np.arange(4648))).reshape((1, -1)) + flux = ivar = continuum = pixel_flags = nmf_rectified_model_flux = np.zeros_like( + wl) + columns = [ + fits.Column(name="spectrum_pk_id", array=[0], format="K"), + fits.Column(name="release", array=["sdss5"], format="5A"), + fits.Column(name="filetype", array=["specFull"], format="7A"), + fits.Column(name="v_astra", array=["0.5.0"], format="5A"), + fits.Column(name="healpix", array=[34], format="J"), + fits.Column(name="sdss_id", array=[42], format="K"), + fits.Column(name="run2d", array=["6_1_2"], format="6A"), + fits.Column(name="telescope", array=["apo25m"], format="6A"), + fits.Column(name="min_mjd", array=[54], format="J"), + fits.Column(name="max_mjd", array=[488], format="J"), + fits.Column(name="n_visits", array=[1], format="J"), + fits.Column(name="n_good_visits", array=[1], format="J"), + fits.Column(name="n_good_rvs", array=[1], format="J"), + fits.Column(name="v_rad", array=[0], format="E"), + fits.Column(name="e_v_rad", array=[1], format="E"), + fits.Column(name="std_v_rad", array=[1], format="E"), + fits.Column(name="median_e_v_rad", array=[3], format="E"), + fits.Column(name="xcsao_teff", array=[5000], format="E"), + fits.Column(name="xcsao_e_teff", array=[10], format="E"), + fits.Column(name="xcsao_logg", array=[4], format="E"), + fits.Column(name="xcsao_e_logg", array=[3], format="E"), + fits.Column(name="xcsao_fe_h", array=[0], format="E"), + fits.Column(name="xcsao_e_fe_h", array=[5], format="E"), + fits.Column(name="xcsao_meanrxc", array=[0], format="E"), + fits.Column(name="snr", array=[50], format="E"), + fits.Column(name="gri_gaia_transform_flags", array=[1], format="J"), + fits.Column(name="zwarning_flags", array=[0], format="J"), + ] + if with_wl: + columns.append( + fits.Column(name="wavelength", + array=wl, + format="4648E", + dim="(4648)")) + columns += [ + fits.Column(name="flux", array=flux, format="4648E", dim="(4648)"), + fits.Column(name="ivar", array=ivar, format="4648E", dim="(4648)"), + fits.Column(name="pixel_flags", + array=pixel_flags, + format="4648E", + dim="(4648)"), + fits.Column(name="continuum", + array=continuum, + format="4648E", + dim="(4648)"), + fits.Column( + name="nmf_rectified_model_flux", + array=nmf_rectified_model_flux, + format="4648E", + dim="(4648)", + ), + fits.Column(name="nmf_rchi2", array=[5], format="E"), + fits.Column(name="nmf_flags", array=[0], format="J"), + ] + header = fits.Header(cards=[ + ("EXTNAME", f"BOSS/{observatory}", ""), + ("OBSRVTRY", observatory, None), + ("INSTRMNT", "BOSS", None), + ("CRVAL", 3.5523, None), + ("CDELT", 1e-4, None), + ("CTYPE", "LOG-LINEAR", None), + ("CUNIT", "Angstrom (Vacuum)"), + ("CRPIX", 1, None), + ("DC-FAG", 1, None), + ("NPIXELS", 4648, None), + ("DATASUM", datasum, "data unit checksum updated 2023-11-13T03:21:47"), + ]) + + return fits.BinTableHDU.from_columns(columns, header=header) + + +def fake_primary_hdu(): + return fits.PrimaryHDU(header=fits.Header(cards=[ + ("SIMPLE", True, "conforms to FITS standard"), + ("BITPIX", 8, "array data type"), + ("NAXIS", 0, "number of array dimensions"), + ("EXTEND", True, ""), + ("", "", ""), + ("", "Metadata", ""), + ("", "", ""), + ("V_ASTRA", "0.5.0", "Astra version"), + ( + "CREATED", + "23-11-13 10:21:47", + "File creation time (UTC %y-%m-%d %H:%M:%S)", + ), + ("", "", ""), + ("", "Identifiers", ""), + ("", "", ""), + ("SDSS_ID", 34, "SDSS-5 unique identifier"), + ("APOGEEID", "", "SDSS-4 DR17 APOGEE identifier"), + ("GAIA2_ID", 23423, "Gaia DR2 source identifier"), + ("GAIA3_ID", 23423, "Gaia DR3 source identifier"), + ("TIC_ID", 453, "TESS Input Catalog (v8) identifier"), + ("HEALPIX", 7769, "HEALPix (128 side)"), + ("", "", ""), + ("", "Targeting Provenance", ""), + ("", "", ""), + ("CARTON_0", "", "Highest priority carton name"), + ("LEAD", "tic_v8", "Lead catalog used for cross-match"), + ("VER_ID", 25, "SDSS catalog version for targeting"), + ( + "CAT_ID", + 27021597842679140, + "Catalog identifier used to target the source", + ), + ("CAT_ID21", 4283338543, "Catalog identifier (v21; v0.0)"), + ("CAT_ID25", 27021597842679140, "Catalog identifier (v25; v0.5)"), + ("CAT_ID31", 63050395058384891, "Catalog identifier (v31; v1.0)"), + ("N_ASSOC", 1, "SDSS_IDs associated with this CATALOGID"), + ("N_NEIGH", 0, 'Sources within 3" and G_MAG < G_MAG_source + 5'), + ("", "", ""), + ("", "Astrometry", ""), + ("", "", ""), + ("RA", 198.4346, "Right ascension [deg]"), + ("DEC", 67.110405, "Declination [deg]"), + ("L", 39.57304281336641, "Galactic longitude [deg]"), + ("B", 69.09476252445475, "Galactic latitude [deg]"), + ("PLX", 6.2856047, "Parallax [mas]"), + ("E_PLX", 1.04624034, "Error on parallax [mas]"), + ("PMRA", -3.6440573, "Proper motion in RA [mas/yr]"), + ("E_PMRA", 6.05078333, "Error on proper motion in RA [mas/yr]"), + ("PMDE", -8.818821, "Proper motion in DEC [mas/yr]"), + ("E_PMDE", 1.05103116, "Error on proper motion in DEC [mas/yr]"), + ("V_RAD", "NaN", "Gaia radial velocity [km/s]"), + ("E_V_RAD", "NaN", "Error on Gaia radial velocity [km/s]"), + ("", "", ""), + ("", "Gaia Photometry", ""), + ("", "", ""), + ("G_MAG", 5.212288, "Gaia DR3 mean G band magnitude [mag]"), + ("BP_MAG", 1.417452, "Gaia DR3 mean BP band magnitude [mag]"), + ("RP_MAG", 13.138654, "Gaia DR3 mean RP band magnitude [mag]"), + ("", "", ""), + ("", "2MASS Photometry", ""), + ("", "", ""), + ("J_MAG", 3.709, "2MASS J band magnitude [mag]"), + ("E_J_MAG", 0.026, "Error on 2MASS J band magnitude [mag]"), + ("H_MAG", 2.893, "2MASS H band magnitude [mag]"), + ("E_H_MAG", 0.032, "Error on 2MASS H band magnitude [mag]"), + ("K_MAG", 1.776, "2MASS K band magnitude [mag]"), + ("E_K_MAG", 0.026, "Error on 2MASS K band magnitude [mag]"), + ("PH_QUAL", "", "2MASS photometric quality flag"), + ("BL_FLG", "", "Number of components fit per band (JHK)"), + ("CC_FLG", "", "Contamination and confusion flag"), + ( + "COMMENT", + "See https://www.ipac.caltech.edu/2mass/releases/allsky/doc/sec2_2a.html", + "", + ), + ("", "", ""), + ("", "unWISE Photometry", ""), + ("", "", ""), + ("W1_MAG", 1.615836388268159, "W1 magnitude"), + ("E_W1_MAG", 3.001952228308748062, "Error on W1 magnitude"), + ("W1_FLUX", 954.997, "W1 flux [Vega nMgy]"), + ("W1_DFLUX", 1.1017, "Error on W1 flux [Vega nMgy]"), + ("W1_FRAC", 0.976102, "Fraction of W1 flux from this object"), + ("W2_MAG", 12.20587252800215, "W2 magnitude [Vega]"), + ("E_W2_MAG", 0.04519932356304733, "Error on W2 magnitude"), + ("W2_FLUX", 868.906, "W2 flux [Vega nMgy]"), + ("W2_DFLUX", 3.172016, "Error on W2 flux [Vega nMgy]"), + ("W2_FRAC", 0.9741495, "Fraction of W2 flux from this object"), + ("W1UFLAGS", 0, "unWISE flags for W1"), + ("W2UFLAGS", 0, "unWISE flags for W2"), + ("W1AFLAGS", 0, "Additional flags for W1"), + ("W2AFLAGS", 0, "Additional flags for W2"), + ("COMMENT", "See https://catalog.unwise.me/catalogs.html", ""), + ("", "", ""), + ("", "GLIMPSE Photometry", ""), + ("", "", ""), + ("MAG4_5", "", "IRAC band 4.5 micron magnitude [mag]"), + ("D4_5M", "", "Error on IRAC band 4.5 micron magnitude [mag]"), + ("RMS_F4_5", "", "RMS deviations from final flux [mJy]"), + ("SQF_4_5", 0, "Source quality flag for IRAC band 4.5 micron"), + ("MF4_5", 0, "Flux calculation method flag"), + ("CSF", 0, "Close source flag"), + ( + "COMMENT", + "See https://irsa.ipac.caltech.edu/data/SPITZER/GLIMPSE/gator_docs/", + "", + ), + ("", "", ""), + ("", "Observations Summary", ""), + ("", "", ""), + ("N_BOSS", 1, "Number of BOSS visits"), + ("B_MINMJD", 59804, "Minimum MJD of BOSS visits"), + ("N_MAXMJD", 59866, "Maximum MJD of BOSS visits"), + ("N_APOGEE", 1, "Number of APOGEE visits"), + ("A_MINMJD", 59804, "Minimum MJD of APOGEE visits"), + ("A_MAXMJD", 59866, "Maximum MJD of APOGEE visits"), + ("", "", ""), + ("", "Data Integrity", ""), + ("", "", ""), + ( + "CHECKSUM", + "UKTRaHQOWHQOaHQO", + "HDU checksum updated 2023-11-13T03:21:47", + ), + ("DATASUM", "0", "data unit checksum updated 2023-11-13T03:21:47"), + ("", "", ""), + ("", "HDU Descriptions", ""), + ("", "", ""), + ("COMMENT", "HDU 0: Summary information only", ""), + ("COMMENT", "HDU 1: BOSS spectra from Apache Point Observatory", ""), + ("COMMENT", "HDU 2: BOSS spectra from Las Campanas Observatory", ""), + ("COMMENT", "HDU 3: APOGEE spectra from Apache Point Observatory", ""), + ("COMMENT", "HDU 4: APOGEE spectra from Las Campanas Observatory", ""), + ])) + + +def mwm_HDUList(hduflags, with_wl): + hdulist = [fake_primary_hdu()] + for i, flag in enumerate(hduflags): + obs = ["APO", "LCO"] + if i <= 1: + hdulist.append( + generate_boss_hdu(obs[i % 2], + with_wl=with_wl, + datasum=str(flag))) + else: + hdulist.append( + generate_apogee_hdu(obs[i % 2], + with_wl=with_wl, + datasum=str(flag))) + + print(hdulist) + return fits.HDUList(hdulist) + + +def apStar_HDUList(n_spectra): + """Mock an apStar HDUList of n_spectra spectra.""" + np.random.seed(20) + + # init primary hdu header + hdr = fits.Header() + hdr["FOOBAR"] = "barfoo" + hdr["V_APRED"] = "q" + hdr["APRED"] = 1.3 + hdr["SNR"] = 40 + hdr["NVISITS"] = n_spectra + + # Init hdulist + hdulist = fits.HDUList() + hdulist.append(fits.PrimaryHDU(header=hdr)) + + # Init the key HDU's (flux, error, bitmask) + # names + units = [ + "Flux (10^-17 erg/s/cm^2/Ang)", "Err (10^-17 erg/s/cm^2/Ang)", "Mask" + ] + (["test"] * 7) + for i in range(10): + hdu = fits.ImageHDU(data=np.random.random((n_spectra, 10))) + hdu.header["NAXIS"] = 2 + hdu.header["NAXIS1"] = 10 + hdu.header["NAXIS2"] = n_spectra + hdu.header["CDELT1"] = 6e-06 + hdu.header["CRVAL1"] = 4.179 + hdu.header["BUNIT"] = units[i] + hdu.name = f"apstar{i}" + hdulist.append(hdu) + + return hdulist + + +def apVisit_HDUList(): + """Mock an apVisit HDUList""" + np.random.seed(20) + + # init primary hdu header + hdr = fits.Header() + hdr["FOOBAR"] = "barfoo" + hdr["SURVEY"] = "SDSS-V" + hdr["MJD5"] = 99999 + hdr["DATE-OBS"] = "1970-01-01" + + # Init hdulist + hdulist = fits.HDUList() + hdulist.append(fits.PrimaryHDU(header=hdr)) + + # Init the key HDU's (flux, error, bitmask, spectral) + for i in range(4): + if i == 3: + hdu = fits.ImageHDU(data=np.array([ + np.arange(1, 11, 1), + np.arange(11, 21, 1), + np.arange(21, 31, 1) + ])) + hdu.header["BUNIT"] = "Wavelength (Ang)" + + else: + hdu = fits.ImageHDU(data=np.random.random((3, 10))) + hdu.header["BUNIT"] = "Flux (10^-17 erg/s/cm^2/Ang)" + hdulist.append(hdu) + + return hdulist + + +def spec_HDUList(n_spectra): + """Mock an BOSS spec HDUList of n_spectra spectra + 1 coadd.""" + np.random.seed(20) + + # init primary hdu header + hdr = fits.Header() + hdr["OBSERVAT"] = "APO" + hdr["TELESCOP"] = "SDSS 2.5-M" + hdr["FOOBAR"] = "barfoo" + hdr["MJD5"] = 99999 + hdr["DATE-OBS"] = "1970-01-01" + + # Init hdulist + hdulist = fits.HDUList() + hdulist.append(fits.PrimaryHDU(header=hdr)) + + # Init the key HDU's (flux, error, bitmask, spectral) + names = ["COADD", "SPALL", "ZALL", "ZLINE"] + for i in range(4): + hdu = fits.BinTableHDU.from_columns([ + fits.Column(name="FLUX", format="E", array=np.random.random(10)), + fits.Column(name="LOGLAM", + format="E", + array=np.random.random(10).sort()), + fits.Column(name="IVAR", format="E", array=np.random.random(10)), + fits.Column(name="AND_MASK", + format="E", + array=np.random.random(10)), + fits.Column(name="OR_MASK", format="E", + array=np.random.random(10)), + ]) + hdu.name = names[i] + hdulist.append(hdu) + for i in range(n_spectra): + hdu = fits.BinTableHDU.from_columns([ + fits.Column(name="LOGLAM", + format="E", + array=np.random.random(10).sort()), + fits.Column(name="FLUX", format="E", array=np.random.random(10)), + fits.Column(name="IVAR", format="E", array=np.random.random(10)), + fits.Column(name="AND_MASK", + format="E", + array=np.random.random(10)), + fits.Column(name="OR_MASK", format="E", + array=np.random.random(10)), + ]) + hdu.name = f"spectrum{i}" + hdulist.append(hdu) + + return hdulist + + +# TEST MWM loaders +@pytest.mark.parametrize( + "file_obj, hdu, with_wl, hduflags", + [ + ("mwm-temp", None, False, [0, 0, 1, 0]), + ("mwm-temp", 3, False, [0, 0, 1, 0]), + ("mwm-temp", None, True, [0, 1, 1, 0]), + ("mwm-temp", 2, True, [0, 1, 1, 0]), + ], +) +def test_mwm_1d(file_obj, hdu, with_wl, hduflags): + """Test mwm Spectrum1D loader""" + tmpfile = str(file_obj) + ".fits" + mwm_HDUList(hduflags, with_wl).writeto(tmpfile, overwrite=True) + + if hdu is None: + data = Spectrum1D.read(tmpfile) + else: + data = Spectrum1D.read(tmpfile, hdu=hdu) + assert isinstance(data, Spectrum1D) + assert isinstance(data.meta["header"], fits.Header) + if data.meta["instrument"].lower() == "apogee": + length = 8575 + elif data.meta["instrument"].lower() == "boss": + length = 4648 + else: + raise ValueError( + "INSTRMNT tag in test HDU header is not set properly.") + assert len(data.spectral_axis.value) == length + assert len(data.flux.value) == length + assert data.spectral_axis.unit == Angstrom + assert data.flux.unit == Unit("1e-17 erg / (s cm2 Angstrom)") + + +@pytest.mark.parametrize( + "file_obj, with_wl, hduflags", + [ + ("mwm-temp", False, [1, 1, 1, 1]), + ("mwm-temp", True, [0, 1, 0, 1]), + ], +) +def test_mwm_list(file_obj, with_wl, hduflags): + """Test mwm SpectrumList loader""" + tmpfile = str(file_obj) + ".fits" + mwm_HDUList(hduflags, with_wl).writeto(tmpfile, overwrite=True) + + data = SpectrumList.read(tmpfile, format="SDSS-V mwm multi") + assert isinstance(data, SpectrumList) + for i in range(len(data)): + assert isinstance(data[i], Spectrum1D) + assert isinstance(data[i].meta["header"], fits.Header) + if data[i].meta["instrument"].lower() == "apogee": + length = 8575 + elif data[i].meta["instrument"].lower() == "boss": + length = 4648 + else: + raise ValueError( + "INSTRMNT tag in test HDU header is not set properly.") + assert len(data[i].spectral_axis.value) == length + assert len(data[i].flux.value) == length + assert data[i].spectral_axis.unit == Angstrom + assert data[i].flux.unit == Unit("1e-17 erg / (s cm2 Angstrom)") + + +@pytest.mark.parametrize( + "file_obj, hdu, hduflags", + [ + ("mwm-temp", 1, [0, 1, 0, 1]), + ("mwm-temp", 2, [1, 0, 1, 1]), + ("mwm-temp", 3, [0, 0, 0, 1]), + ("mwm-temp", 4, [0, 1, 1, 0]), + ], +) +def test_mwm_1d_fail_spec(file_obj, hdu, hduflags): + """Test mwm Spectrum1D loader fail on bad spec""" + tmpfile = str(file_obj) + ".fits" + mwm_HDUList(hduflags, True).writeto(tmpfile, overwrite=True) + with pytest.raises(IndexError): + Spectrum1D.read(tmpfile, hdu=hdu) + + +@pytest.mark.parametrize( + "file_obj, with_wl", + [ + ("mwm-temp", False), + ("mwm-temp", True), + ], +) +def test_mwm_1d_fail(file_obj, with_wl): + """Test mwm Spectrum1D loader fail on empty""" + tmpfile = str(file_obj) + ".fits" + mwm_HDUList([0, 0, 0, 0], with_wl).writeto(tmpfile, overwrite=True) + + with pytest.raises(ValueError): + Spectrum1D.read(tmpfile) + + +@pytest.mark.parametrize( + "file_obj, with_wl", + [ + ("mwm-temp", False), + ("mwm-temp", True), + ], +) +def test_mwm_list_fail(file_obj, with_wl): + """Test mwm SpectrumList loader fail on empty""" + tmpfile = str(file_obj) + ".fits" + mwm_HDUList([0, 0, 0, 0], with_wl).writeto(tmpfile, overwrite=True) + + with pytest.raises(ValueError): + SpectrumList.read(tmpfile, format="SDSS-V mwm multi") + + +@pytest.mark.parametrize( + "file_obj,n_spectra", + [ + ("spec-temp", 1), + ("spec-temp", 5), + ], +) +def test_spec_1d(file_obj, n_spectra): + """Test BOSS spec loader""" + tmpfile = str(file_obj) + ".fits" + spec_HDUList(n_spectra).writeto(tmpfile, overwrite=True) + + idxs = [1] + list(np.arange(5, 5 + n_spectra, 1)) + + for i in idxs: + data = Spectrum1D.read(tmpfile, hdu=i) + assert isinstance(data, Spectrum1D) + assert len(data.flux.value) == 10 + assert data.flux.unit == Unit("1e-17 erg / (s cm2 Angstrom)") + + assert len(data.spectral_axis.value) == 10 + assert data.spectral_axis.unit == Angstrom + assert len(data.mask) == 10 + + assert data[i].meta["header"].get("foobar") == "barfoo" + + +@pytest.mark.parametrize( + "file_obj,n_spectra", + [ + ("spec-temp", 0), + ("spec-temp", 5), + ], +) +def test_spec_list(file_obj, n_spectra): + """Test BOSS SpectrumList loader""" + tmpfile = str(file_obj) + ".fits" + spec_HDUList(n_spectra).writeto(tmpfile, overwrite=True) + + data = SpectrumList.read(tmpfile, format="SDSS-V spec multi") + assert isinstance(data, SpectrumList) + assert len(data) == n_spectra + 1 + for i in range(n_spectra): + assert len(data[i].flux.value) == 10 + assert data[i].flux.unit == Unit("1e-17 erg / (s cm2 Angstrom)") + assert len(data[i].spectral_axis.value) == 10 + assert data[i].spectral_axis.unit == Angstrom + assert len(data[i].mask) == 10 + assert data[i].meta["header"].get("foobar") == "barfoo" + + +@pytest.mark.parametrize( + "file_obj,hdu", + [ + ("spec-temp", 2), + ("spec-temp", 3), + ("spec-temp", 4), + ], + ids=["Fail on HDU2", "Fail on HDU3", "Fail on HDU4"], +) +def test_spec_1d_fail_hdu(file_obj, hdu): + """Test if fail on reading HDU2, HDU3, or HDU4 (non-spectra)""" + tmpfile = str(file_obj) + ".fits" + spec_HDUList(5).writeto(tmpfile, overwrite=True) + + with pytest.raises(ValueError): + Spectrum1D.read(tmpfile, hdu=hdu) + + +@pytest.mark.parametrize( + "file_obj,idx", + [ + ("apStar-temp", 1), + ("apStar-temp", 4), + ], +) +def test_apStar_1D(file_obj, idx): + tmpfile = str(file_obj) + ".fits" + apStar_HDUList(6).writeto(tmpfile, overwrite=True) + + data = Spectrum1D.read(tmpfile, idx=idx) + assert isinstance(data, Spectrum1D) + assert len(data.flux.value) == 10 + assert data.flux.unit == Unit("1e-17 erg / (s cm2 Angstrom)") + + assert len(data.spectral_axis.value) == 10 + assert data.spectral_axis.unit == Angstrom + + assert data.meta["header"].get("foobar") == "barfoo" + + +@pytest.mark.parametrize( + "file_obj,n_spectra", + [ + ("apStar-temp", 3), + ("apStar-temp", 6), + ], +) +def test_apStar_list(file_obj, n_spectra): + tmpfile = str(file_obj) + ".fits" + apStar_HDUList(n_spectra).writeto(tmpfile, overwrite=True) + + data = SpectrumList.read(tmpfile, format="SDSS-V apStar multi") + assert isinstance(data, SpectrumList) + assert len(data) == n_spectra + for i in range(len(data)): + assert isinstance(data[i], Spectrum1D) + assert len(data[i].flux.value) == 10 + assert data[i].flux.unit == Unit("1e-17 erg / (s cm2 Angstrom)") + assert len(data[i].spectral_axis.value) == 10 + assert data[i].meta["header"].get("foobar") == "barfoo" + + +@pytest.mark.parametrize( + "file_obj", + [ + ("apStar-temp"), + ], +) +def test_apStar_fail_list(file_obj): + """Test if fail on reading 1D apStar as list""" + tmpfile = str(file_obj) + ".fits" + apStar_HDUList(1).writeto(tmpfile, overwrite=True) + + with pytest.raises(ValueError): + SpectrumList.read(tmpfile, format="SDSS-V apStar multi") + + +@pytest.mark.parametrize( + "file_obj", + [ + ("apVisit-temp"), + ], +) +def test_apVisit_1D(file_obj): + tmpfile = str(file_obj) + ".fits" + apVisit_HDUList().writeto(tmpfile, overwrite=True) + + data = Spectrum1D.read(tmpfile) + assert isinstance(data, Spectrum1D) + assert np.array_equal(data.spectral_axis.value, np.arange(1, 31, 1)) + assert len(data.flux.value) == 30 + assert data.meta["header"].get("foobar") == "barfoo" + + +@pytest.mark.parametrize( + "file_obj", + [ + ("apVisit-temp"), + ], +) +def test_apVisit_list(file_obj): + tmpfile = str(file_obj) + ".fits" + apVisit_HDUList().writeto(tmpfile, overwrite=True) + + data = SpectrumList.read(tmpfile, format="SDSS-V apVisit multi") + assert isinstance(data, SpectrumList) + assert len(data) == 3