diff --git a/pyproject.toml b/pyproject.toml index 5067110c..59ef225f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,7 @@ dependencies = [ "ipykernel", # Support for Jupyter notebooks "pandas < 2.1.0", "pyarrow", + "pyyaml", "tqdm", "numpy < 1.25", "fsspec <= 2023.9.2", # Remove when pyarrow updates to reflect api changes @@ -50,6 +51,7 @@ dev = [ "ipython", # Also used in building notebooks into Sphinx "matplotlib", # Used in sample notebook intro_notebook.ipynb "ray", # Used for dask-on-ray testing. + "types-PyYAML", # type stubs for pyyaml ] [build-system] diff --git a/src/hipscat_import/catalog/arguments.py b/src/hipscat_import/catalog/arguments.py index a05ea9d4..fb43b35b 100644 --- a/src/hipscat_import/catalog/arguments.py +++ b/src/hipscat_import/catalog/arguments.py @@ -6,12 +6,12 @@ from typing import List from hipscat.catalog.catalog import CatalogInfo -from hipscat.io import FilePointer, file_io +from hipscat.io import FilePointer from hipscat.pixel_math import hipscat_id from hipscat_import.catalog.file_readers import InputReader, get_file_reader from hipscat_import.catalog.resume_plan import ResumePlan -from hipscat_import.runtime_arguments import RuntimeArguments +from hipscat_import.runtime_arguments import RuntimeArguments, find_input_paths # pylint: disable=too-many-locals,too-many-arguments,too-many-instance-attributes,too-many-branches,too-few-public-methods @@ -102,14 +102,7 @@ def _check_arguments(self): self.file_reader = get_file_reader(self.input_format) # Basic checks complete - make more checks and create directories where necessary - if self.input_path: - if not file_io.does_file_or_directory_exist(self.input_path): - raise FileNotFoundError("input_path not found on local storage") - self.input_paths = file_io.find_files_matching_path(self.input_path, f"*{self.input_format}") - elif self.input_file_list: - self.input_paths = self.input_file_list - if len(self.input_paths) == 0: - raise FileNotFoundError("No input files found") + self.input_paths = find_input_paths(self.input_path, f"*{self.input_format}", self.input_file_list) self.resume_plan = ResumePlan( resume=self.resume, progress_bar=self.progress_bar, diff --git a/src/hipscat_import/cross_match/__init__.py b/src/hipscat_import/cross_match/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/hipscat_import/cross_match/macauff_arguments.py b/src/hipscat_import/cross_match/macauff_arguments.py index 93f3bca6..74283f18 100644 --- a/src/hipscat_import/cross_match/macauff_arguments.py +++ b/src/hipscat_import/cross_match/macauff_arguments.py @@ -4,10 +4,10 @@ from os import path from typing import List -from hipscat.io import FilePointer, file_io +from hipscat.io import FilePointer from hipscat.io.validation import is_valid_catalog -from hipscat_import.runtime_arguments import RuntimeArguments +from hipscat_import.runtime_arguments import RuntimeArguments, find_input_paths # pylint: disable=too-many-instance-attributes # pylint: disable=unsupported-binary-operation @@ -87,14 +87,7 @@ def _check_arguments(self): raise ValueError("Macauff column metadata file must point to valid file path.") # Basic checks complete - make more checks and create directories where necessary - if self.input_path: - if not file_io.does_file_or_directory_exist(self.input_path): - raise FileNotFoundError("input_path not found on local storage") - self.input_paths = file_io.find_files_matching_path(self.input_path, f"*{self.input_format}") - elif self.input_file_list: - self.input_paths = self.input_file_list - if len(self.input_paths) == 0: - raise FileNotFoundError("No input files found") + self.input_paths = find_input_paths(self.input_path, f"*{self.input_format}", self.input_file_list) self.column_names = self.get_column_names() diff --git a/src/hipscat_import/cross_match/macauff_metadata.py b/src/hipscat_import/cross_match/macauff_metadata.py new file mode 100644 index 00000000..0e20caf0 --- /dev/null +++ b/src/hipscat_import/cross_match/macauff_metadata.py @@ -0,0 +1,102 @@ +"""Utilities for generating parquet metdata from macauff-generated metadata files.""" + +import xml.etree.ElementTree as ET + +import pyarrow as pa +import pyarrow.parquet as pq +import yaml +from hipscat.io import file_io + + +def _get_inner_xml_value(parent_el, node_type, default_value): + child_el = parent_el.findall(node_type) + if len(child_el) == 0: + return default_value + if len(child_el) > 1: + raise ValueError(f"found too many {node_type} XML elements") + return child_el[0].text.strip() + + +def _construct_field(name, units, metadata_dict): + """Helper method to construct a pyarrow field from macauff metadata strings.""" + if units == "string": + pa_type = pa.string() + elif units in ("float", "double"): + pa_type = pa.float64() + elif units in ("integer", "long"): + pa_type = pa.int64() + else: + raise ValueError(f"unhandled units {units}") + return pa.field(name, pa_type, metadata=metadata_dict) + + +def from_xml(input_file, output_file): + """Read XML file with column metadata for a cross-match file from macauff. + + Expects XML with the format:: + + + + $COLUMN_NAME + $COLUMN_DESCRIPTION + $COLUMN_UNIT_DESCRIPTOR + + + + Args: + input file (str): file to read for match metadata + output_file (str): desired location for output parquet metadata file + + Raises + ValueError: if the XML is mal-formed + """ + fields = [] + root_el = ET.parse(input_file).getroot() + columns = root_el.findall("column") + + for column in columns: + name = _get_inner_xml_value(column, "name", "foo") + description = _get_inner_xml_value(column, "description", "") + units = _get_inner_xml_value(column, "units", "string") + + fields.append(_construct_field(name, units, metadata_dict={"macauff_description": description})) + + schema = pa.schema(fields) + pq.write_table(schema.empty_table(), where=output_file) + + +def from_yaml(input_file, output_directory): + """Read YAML file with column metadata for the various cross-match files from macauff. + + Expects YAML with the format:: + + name: macauff_GaiaDR3xCatWISE2020 + description: Match and non-match table for macauff cross-matches of Gaia DR3 and CatWISE 2020. + tables: + - name: macauff_GaiaDR3xCatWISE2020_matches + "@id": "#macauff_GaiaDR3xCatWISE2020_matches" + description: Counterpart associations between Gaia and WISE + columns: + - name: gaia_source_id + datatype: long + description: The Gaia DR3 object ID. + + Args: + input file (str): file to read for match metadata + output_dir (str): desired location for output parquet metadata files + We will write one file per table in the "tables" element. + """ + with open(input_file, "r", encoding="utf-8") as file_handle: + metadata = yaml.safe_load(file_handle) + tables = metadata.get("tables", []) + for index, table in enumerate(tables): + fields = [] + table_name = table.get("name", f"metadata_table_{index}") + for col_index, column in enumerate(table.get("columns", [])): + name = column.get("name", f"column_{col_index}") + units = column.get("units", "string") + fields.append(_construct_field(name, units, metadata_dict=column)) + + schema = pa.schema(fields) + output_file = file_io.append_paths_to_pointer(output_directory, f"{table_name}.parquet") + pq.write_table(schema.empty_table(), where=str(output_file)) diff --git a/src/hipscat_import/runtime_arguments.py b/src/hipscat_import/runtime_arguments.py index f4e227e1..cd924888 100644 --- a/src/hipscat_import/runtime_arguments.py +++ b/src/hipscat_import/runtime_arguments.py @@ -124,3 +124,27 @@ def provenance_info(self) -> dict: def additional_runtime_provenance_info(self): """Any additional runtime args to be included in provenance info from subclasses""" return {} + + +def find_input_paths(input_path="", file_matcher="", input_file_list=None): + """Helper method to find input paths, given either a prefix and format, or an + explicit list of paths. + + Args: + input_path (str): prefix to search for + file_matcher (str): matcher to use when searching for files + input_file_list (List[str]): list of input paths + Returns: + matching files, if input_path is provided, otherwise, input_file_list + Raises: + FileNotFoundError if no files are found at the input_path and the provided list is empty. + """ + if input_path: + if not file_io.does_file_or_directory_exist(input_path): + raise FileNotFoundError("input_path not found on local storage") + input_paths = file_io.find_files_matching_path(input_path, file_matcher) + elif input_file_list: + input_paths = input_file_list + if len(input_paths) == 0: + raise FileNotFoundError("No input files found") + return input_paths diff --git a/tests/hipscat_import/conftest.py b/tests/hipscat_import/conftest.py index 1d6f2d77..3bc386f8 100644 --- a/tests/hipscat_import/conftest.py +++ b/tests/hipscat_import/conftest.py @@ -94,6 +94,11 @@ def empty_data_dir(test_data_dir): return os.path.join(test_data_dir, "empty") +@pytest.fixture +def macauff_data_dir(test_data_dir): + return os.path.join(test_data_dir, "macauff") + + @pytest.fixture def formats_dir(test_data_dir): return os.path.join(test_data_dir, "test_formats") diff --git a/tests/hipscat_import/cross_match/test_macauff_metadata.py b/tests/hipscat_import/cross_match/test_macauff_metadata.py new file mode 100644 index 00000000..5db39084 --- /dev/null +++ b/tests/hipscat_import/cross_match/test_macauff_metadata.py @@ -0,0 +1,101 @@ +import os +from xml.etree.ElementTree import ParseError + +import pytest +from hipscat.io import file_io + +from hipscat_import.cross_match.macauff_metadata import from_xml, from_yaml + + +def test_from_xml(macauff_data_dir, tmp_path): + """Test XML file reading and parquet metadata generation.""" + xml_input_file = os.path.join(macauff_data_dir, "macauff_gaia_catwise_match.xml") + output_file = os.path.join(tmp_path, "output.parquet") + + from_xml(xml_input_file, output_file) + + single_metadata = file_io.read_parquet_metadata(output_file) + schema = single_metadata.schema.to_arrow_schema() + + assert len(schema) == 6 + + +def test_from_xml_malformed(tmp_path): + """Test some invalid XML file inputs.""" + input_file = os.path.join(tmp_path, "input.parquet") + output_file = os.path.join(tmp_path, "output.parquet") + + ## No "columns" found at all + with open(input_file, "w", encoding="utf-8") as file_handle: + file_handle.write("") + + with pytest.raises(ParseError, match="no element found"): + from_xml(input_file, output_file) + + ## Some columns, too many required fields + with open(input_file, "w", encoding="utf-8") as file_handle: + file_handle.write( + """ + + Gaia_designation + The Gaia DR3 object ID. + long + + """ + ) + + with pytest.raises(ValueError, match="too many name XML elements"): + from_xml(input_file, output_file) + + ## Unhandled types + with open(input_file, "w", encoding="utf-8") as file_handle: + file_handle.write( + """ + + Gaia_designation + The Gaia DR3 object ID. + blob + + """ + ) + + with pytest.raises(ValueError, match="unhandled units blob"): + from_xml(input_file, output_file) + + ## Some empty fields are ok! + with open(input_file, "w", encoding="utf-8") as file_handle: + file_handle.write( + """ + + + long + + """ + ) + + from_xml(input_file, output_file) + + +def test_from_yaml(macauff_data_dir, tmp_path): + """Test YAML file reading and parquet metadata generation.""" + yaml_input_file = os.path.join(macauff_data_dir, "macauff_gaia_catwise_match_and_nonmatches.yaml") + + from_yaml(yaml_input_file, tmp_path) + + output_file = os.path.join(tmp_path, "macauff_GaiaDR3xCatWISE2020_matches.parquet") + single_metadata = file_io.read_parquet_metadata(output_file) + schema = single_metadata.schema.to_arrow_schema() + + assert len(schema) == 7 + + output_file = os.path.join(tmp_path, "macauff_GaiaDR3xCatWISE2020_gaia_nonmatches.parquet") + single_metadata = file_io.read_parquet_metadata(output_file) + schema = single_metadata.schema.to_arrow_schema() + + assert len(schema) == 4 + + output_file = os.path.join(tmp_path, "macauff_GaiaDR3xCatWISE2020_catwise_nonmatches.parquet") + single_metadata = file_io.read_parquet_metadata(output_file) + schema = single_metadata.schema.to_arrow_schema() + + assert len(schema) == 4 diff --git a/tests/hipscat_import/data/macauff/macauff_gaia_catwise_match.xml b/tests/hipscat_import/data/macauff/macauff_gaia_catwise_match.xml new file mode 100644 index 00000000..985d8973 --- /dev/null +++ b/tests/hipscat_import/data/macauff/macauff_gaia_catwise_match.xml @@ -0,0 +1,68 @@ + + + + Gaia_designation + + + The Gaia DR3 object ID. + + + long + + + + + Gaia_RA + + + Right Ascension of the Gaia DR3 source. + + + float + + + + + Gaia_Dec + + + The Gaia DR3 declination. + + + float + + + + + CatWISE_Name + + + The object identifier from the CatWISE 2020 catalogue. + + + string + + + + + CatWISE_RA + + + Right Ascension of the object as quoted by the CatWISE 2020 catalogue. + + + float + + + + + CatWISE_Dec + + + CatWISE 2020 Declination. + + + float + + + \ No newline at end of file diff --git a/tests/hipscat_import/data/macauff/macauff_gaia_catwise_match_and_nonmatches.yaml b/tests/hipscat_import/data/macauff/macauff_gaia_catwise_match_and_nonmatches.yaml new file mode 100644 index 00000000..5cba93f6 --- /dev/null +++ b/tests/hipscat_import/data/macauff/macauff_gaia_catwise_match_and_nonmatches.yaml @@ -0,0 +1,81 @@ +--- +name: macauff_GaiaDR3xCatWISE2020 +description: Match and non-match table for macauff cross-matches of Gaia DR3 and CatWISE 2020. +tables: +- name: macauff_GaiaDR3xCatWISE2020_matches + "@id": "#macauff_GaiaDR3xCatWISE2020_matches" + description: Counterpart associations between Gaia and WISE, as well as derived values from the cross-match process such as match probability and contamination flux. + columns: + - name: gaia_source_id + "@id": "#macauff_GaiaDR3xCatWISE2020_matches.gaia_source_id" + datatype: long + description: The Gaia DR3 object ID. + - name: gaia_ra + "@id": "#macauff_GaiaDR3xCatWISE2020_matches.gaia_ra" + datatype: double + description: Right Ascension of the Gaia DR3 source. + - name: gaia_dec + "@id": "#macauff_GaiaDR3xCatWISE2020_matches.gaia_dec" + datatype: double + description: The Gaia DR3 declination. + - name: catwise_name + "@id": "#macauff_GaiaDR3xCatWISE2020_matches.catwise_name" + datatype: string + description: The object identifier from the CatWISE 2020 catalogue. + - name: catwise_ra + "@id": "#macauff_GaiaDR3xCatWISE2020_matches.catwise_ra" + datatype: double + description: Right Ascension of the object as quoted by the CatWISE 2020 catalogue. + - name: catwise_dec + "@id": "#macauff_GaiaDR3xCatWISE2020_matches.catwise_dec" + datatype: double + description: CatWISE 2020 Declination. + - name: match_p + "@id": "#macauff_GaiaDR3xCatWISE2020_matches.match_p" + datatype: double + description: Overall probability that the Gaia DR3 and CatWISE sources are detections of the same object, as given by equation 26 of Wilson & Naylor (2018a). + primaryKey: "#macauff_GaiaDR3xCatWISE2020_matches.gaia_source_id" + +- name: macauff_GaiaDR3xCatWISE2020_gaia_nonmatches + "@id": "#macauff_GaiaDR3xCatWISE2020_gaia_nonmatches" + description: Objects in Gaia DR3 with no counterpart in the CatWISE2020 catalogue, with derived columns such as match probability and simulated flux contamination. + columns: + - name: gaia_source_id + "@id": "#macauff_GaiaDR3xCatWISE2020_gaia_nonmatches.gaia_source_id" + datatype: long + description: The Gaia DR3 object ID. + - name: gaia_ra + "@id": "#macauff_GaiaDR3xCatWISE2020_gaia_nonmatches.gaia_ra" + datatype: double + description: Right Ascension of the Gaia DR3 source. + - name: gaia_dec + "@id": "#macauff_GaiaDR3xCatWISE2020_gaia_nonmatches.gaia_dec" + datatype: double + description: The Gaia DR3 declination. + - name: match_p + "@id": "#macauff_GaiaDR3xCatWISE2020_gaia_nonmatches.match_p" + datatype: double + description: Overall probability that the Gaia DR3 source does not have a corresponding CatWISE detection, as given by equation 26 of Wilson & Naylor (2018a). + primaryKey: "#macauff_GaiaDR3xCatWISE2020_gaia_nonmatches.gaia_source_id" + +- name: macauff_GaiaDR3xCatWISE2020_catwise_nonmatches + "@id": "#macauff_GaiaDR3xCatWISE2020_catwise_nonmatches" + description: Objects in CatWISE2020 with no counterpart in the Gaia DR3 catalogue, with derived columns such as match probability and simulated flux contamination. + columns: + - name: catwise_name + "@id": "#macauff_GaiaDR3xCatWISE2020_catwise_nonmatches.catwise_name" + datatype: string + description: The object identifier from the CatWISE 2020 catalogue. + - name: catwise_ra + "@id": "#macauff_GaiaDR3xCatWISE2020_catwise_nonmatches.catwise_ra" + datatype: double + description: Right Ascension of the object as quoted by the CatWISE 2020 catalogue. + - name: catwise_dec + "@id": "#macauff_GaiaDR3xCatWISE2020_catwise_nonmatches.catwise_dec" + datatype: double + description: CatWISE 2020 Declination. + - name: match_p + "@id": "#macauff_GaiaDR3xCatWISE2020_catwise_nonmatches.match_p" + datatype: double + description: Overall probability that the CatWISE source does not have a corresponding Gaia DR3 detection, as given by equation 26 of Wilson & Naylor (2018a). + primaryKey: "#macauff_GaiaDR3xCatWISE2020_catwise_nonmatches.catwise_name" \ No newline at end of file