Skip to content

Commit

Permalink
Initial commit of metadata file conversion (#156)
Browse files Browse the repository at this point in the history
* Initial commit of metadata file conversion

* Move shared logic for locating input files.

* Explicitly depend on pyyaml
  • Loading branch information
delucchi-cmu committed Nov 2, 2023
1 parent 7038f7d commit 0629cb4
Show file tree
Hide file tree
Showing 10 changed files with 389 additions and 20 deletions.
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ dependencies = [
"ipykernel", # Support for Jupyter notebooks
"pandas < 2.1.0",
"pyarrow",
"pyyaml",
"tqdm",
"numpy < 1.25",
"fsspec <= 2023.9.2", # Remove when pyarrow updates to reflect api changes
Expand Down Expand Up @@ -50,6 +51,7 @@ dev = [
"ipython", # Also used in building notebooks into Sphinx
"matplotlib", # Used in sample notebook intro_notebook.ipynb
"ray", # Used for dask-on-ray testing.
"types-PyYAML", # type stubs for pyyaml
]

[build-system]
Expand Down
13 changes: 3 additions & 10 deletions src/hipscat_import/catalog/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
from typing import List

from hipscat.catalog.catalog import CatalogInfo
from hipscat.io import FilePointer, file_io
from hipscat.io import FilePointer
from hipscat.pixel_math import hipscat_id

from hipscat_import.catalog.file_readers import InputReader, get_file_reader
from hipscat_import.catalog.resume_plan import ResumePlan
from hipscat_import.runtime_arguments import RuntimeArguments
from hipscat_import.runtime_arguments import RuntimeArguments, find_input_paths

# pylint: disable=too-many-locals,too-many-arguments,too-many-instance-attributes,too-many-branches,too-few-public-methods

Expand Down Expand Up @@ -102,14 +102,7 @@ def _check_arguments(self):
self.file_reader = get_file_reader(self.input_format)

# Basic checks complete - make more checks and create directories where necessary
if self.input_path:
if not file_io.does_file_or_directory_exist(self.input_path):
raise FileNotFoundError("input_path not found on local storage")
self.input_paths = file_io.find_files_matching_path(self.input_path, f"*{self.input_format}")
elif self.input_file_list:
self.input_paths = self.input_file_list
if len(self.input_paths) == 0:
raise FileNotFoundError("No input files found")
self.input_paths = find_input_paths(self.input_path, f"*{self.input_format}", self.input_file_list)
self.resume_plan = ResumePlan(
resume=self.resume,
progress_bar=self.progress_bar,
Expand Down
Empty file.
13 changes: 3 additions & 10 deletions src/hipscat_import/cross_match/macauff_arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
from os import path
from typing import List

from hipscat.io import FilePointer, file_io
from hipscat.io import FilePointer
from hipscat.io.validation import is_valid_catalog

from hipscat_import.runtime_arguments import RuntimeArguments
from hipscat_import.runtime_arguments import RuntimeArguments, find_input_paths

# pylint: disable=too-many-instance-attributes
# pylint: disable=unsupported-binary-operation
Expand Down Expand Up @@ -87,14 +87,7 @@ def _check_arguments(self):
raise ValueError("Macauff column metadata file must point to valid file path.")

# Basic checks complete - make more checks and create directories where necessary
if self.input_path:
if not file_io.does_file_or_directory_exist(self.input_path):
raise FileNotFoundError("input_path not found on local storage")
self.input_paths = file_io.find_files_matching_path(self.input_path, f"*{self.input_format}")
elif self.input_file_list:
self.input_paths = self.input_file_list
if len(self.input_paths) == 0:
raise FileNotFoundError("No input files found")
self.input_paths = find_input_paths(self.input_path, f"*{self.input_format}", self.input_file_list)

self.column_names = self.get_column_names()

Expand Down
102 changes: 102 additions & 0 deletions src/hipscat_import/cross_match/macauff_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
"""Utilities for generating parquet metdata from macauff-generated metadata files."""

import xml.etree.ElementTree as ET

import pyarrow as pa
import pyarrow.parquet as pq
import yaml
from hipscat.io import file_io


def _get_inner_xml_value(parent_el, node_type, default_value):
child_el = parent_el.findall(node_type)
if len(child_el) == 0:
return default_value
if len(child_el) > 1:
raise ValueError(f"found too many {node_type} XML elements")
return child_el[0].text.strip()


def _construct_field(name, units, metadata_dict):
"""Helper method to construct a pyarrow field from macauff metadata strings."""
if units == "string":
pa_type = pa.string()
elif units in ("float", "double"):
pa_type = pa.float64()
elif units in ("integer", "long"):
pa_type = pa.int64()
else:
raise ValueError(f"unhandled units {units}")
return pa.field(name, pa_type, metadata=metadata_dict)


def from_xml(input_file, output_file):
"""Read XML file with column metadata for a cross-match file from macauff.
Expects XML with the format::
<columns>
<column>
<name>$COLUMN_NAME</name>
<description>$COLUMN_DESCRIPTION</description>
<units>$COLUMN_UNIT_DESCRIPTOR</units>
</column>
</columns>
Args:
input file (str): file to read for match metadata
output_file (str): desired location for output parquet metadata file
Raises
ValueError: if the XML is mal-formed
"""
fields = []
root_el = ET.parse(input_file).getroot()
columns = root_el.findall("column")

for column in columns:
name = _get_inner_xml_value(column, "name", "foo")
description = _get_inner_xml_value(column, "description", "")
units = _get_inner_xml_value(column, "units", "string")

fields.append(_construct_field(name, units, metadata_dict={"macauff_description": description}))

schema = pa.schema(fields)
pq.write_table(schema.empty_table(), where=output_file)


def from_yaml(input_file, output_directory):
"""Read YAML file with column metadata for the various cross-match files from macauff.
Expects YAML with the format::
name: macauff_GaiaDR3xCatWISE2020
description: Match and non-match table for macauff cross-matches of Gaia DR3 and CatWISE 2020.
tables:
- name: macauff_GaiaDR3xCatWISE2020_matches
"@id": "#macauff_GaiaDR3xCatWISE2020_matches"
description: Counterpart associations between Gaia and WISE
columns:
- name: gaia_source_id
datatype: long
description: The Gaia DR3 object ID.
Args:
input file (str): file to read for match metadata
output_dir (str): desired location for output parquet metadata files
We will write one file per table in the "tables" element.
"""
with open(input_file, "r", encoding="utf-8") as file_handle:
metadata = yaml.safe_load(file_handle)
tables = metadata.get("tables", [])
for index, table in enumerate(tables):
fields = []
table_name = table.get("name", f"metadata_table_{index}")
for col_index, column in enumerate(table.get("columns", [])):
name = column.get("name", f"column_{col_index}")
units = column.get("units", "string")
fields.append(_construct_field(name, units, metadata_dict=column))

schema = pa.schema(fields)
output_file = file_io.append_paths_to_pointer(output_directory, f"{table_name}.parquet")
pq.write_table(schema.empty_table(), where=str(output_file))
24 changes: 24 additions & 0 deletions src/hipscat_import/runtime_arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,27 @@ def provenance_info(self) -> dict:
def additional_runtime_provenance_info(self):
"""Any additional runtime args to be included in provenance info from subclasses"""
return {}


def find_input_paths(input_path="", file_matcher="", input_file_list=None):
"""Helper method to find input paths, given either a prefix and format, or an
explicit list of paths.
Args:
input_path (str): prefix to search for
file_matcher (str): matcher to use when searching for files
input_file_list (List[str]): list of input paths
Returns:
matching files, if input_path is provided, otherwise, input_file_list
Raises:
FileNotFoundError if no files are found at the input_path and the provided list is empty.
"""
if input_path:
if not file_io.does_file_or_directory_exist(input_path):
raise FileNotFoundError("input_path not found on local storage")
input_paths = file_io.find_files_matching_path(input_path, file_matcher)
elif input_file_list:
input_paths = input_file_list
if len(input_paths) == 0:
raise FileNotFoundError("No input files found")
return input_paths
5 changes: 5 additions & 0 deletions tests/hipscat_import/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ def empty_data_dir(test_data_dir):
return os.path.join(test_data_dir, "empty")


@pytest.fixture
def macauff_data_dir(test_data_dir):
return os.path.join(test_data_dir, "macauff")


@pytest.fixture
def formats_dir(test_data_dir):
return os.path.join(test_data_dir, "test_formats")
Expand Down
101 changes: 101 additions & 0 deletions tests/hipscat_import/cross_match/test_macauff_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import os
from xml.etree.ElementTree import ParseError

import pytest
from hipscat.io import file_io

from hipscat_import.cross_match.macauff_metadata import from_xml, from_yaml


def test_from_xml(macauff_data_dir, tmp_path):
"""Test XML file reading and parquet metadata generation."""
xml_input_file = os.path.join(macauff_data_dir, "macauff_gaia_catwise_match.xml")
output_file = os.path.join(tmp_path, "output.parquet")

from_xml(xml_input_file, output_file)

single_metadata = file_io.read_parquet_metadata(output_file)
schema = single_metadata.schema.to_arrow_schema()

assert len(schema) == 6


def test_from_xml_malformed(tmp_path):
"""Test some invalid XML file inputs."""
input_file = os.path.join(tmp_path, "input.parquet")
output_file = os.path.join(tmp_path, "output.parquet")

## No "columns" found at all
with open(input_file, "w", encoding="utf-8") as file_handle:
file_handle.write("")

with pytest.raises(ParseError, match="no element found"):
from_xml(input_file, output_file)

## Some columns, too many required fields
with open(input_file, "w", encoding="utf-8") as file_handle:
file_handle.write(
"""<columns>
<column>
<name>Gaia_designation</name>
<name>The Gaia DR3 object ID.</name>
<units>long</units>
</column>
</columns>"""
)

with pytest.raises(ValueError, match="too many name XML elements"):
from_xml(input_file, output_file)

## Unhandled types
with open(input_file, "w", encoding="utf-8") as file_handle:
file_handle.write(
"""<columns>
<column>
<name>Gaia_designation</name>
<description>The Gaia DR3 object ID.</description>
<units>blob</units>
</column>
</columns>"""
)

with pytest.raises(ValueError, match="unhandled units blob"):
from_xml(input_file, output_file)

## Some empty fields are ok!
with open(input_file, "w", encoding="utf-8") as file_handle:
file_handle.write(
"""<columns>
<column>
<name> </name>
<units>long </units>
</column>
</columns>"""
)

from_xml(input_file, output_file)


def test_from_yaml(macauff_data_dir, tmp_path):
"""Test YAML file reading and parquet metadata generation."""
yaml_input_file = os.path.join(macauff_data_dir, "macauff_gaia_catwise_match_and_nonmatches.yaml")

from_yaml(yaml_input_file, tmp_path)

output_file = os.path.join(tmp_path, "macauff_GaiaDR3xCatWISE2020_matches.parquet")
single_metadata = file_io.read_parquet_metadata(output_file)
schema = single_metadata.schema.to_arrow_schema()

assert len(schema) == 7

output_file = os.path.join(tmp_path, "macauff_GaiaDR3xCatWISE2020_gaia_nonmatches.parquet")
single_metadata = file_io.read_parquet_metadata(output_file)
schema = single_metadata.schema.to_arrow_schema()

assert len(schema) == 4

output_file = os.path.join(tmp_path, "macauff_GaiaDR3xCatWISE2020_catwise_nonmatches.parquet")
single_metadata = file_io.read_parquet_metadata(output_file)
schema = single_metadata.schema.to_arrow_schema()

assert len(schema) == 4
68 changes: 68 additions & 0 deletions tests/hipscat_import/data/macauff/macauff_gaia_catwise_match.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
<columns>
<column>
<name>
Gaia_designation
</name>
<description>
The Gaia DR3 object ID.
</description>
<units>
long
</units>
</column>
<column>
<name>
Gaia_RA
</name>
<description>
Right Ascension of the Gaia DR3 source.
</description>
<units>
float
</units>
</column>
<column>
<name>
Gaia_Dec
</name>
<description>
The Gaia DR3 declination.
</description>
<units>
float
</units>
</column>
<column>
<name>
CatWISE_Name
</name>
<description>
The object identifier from the CatWISE 2020 catalogue.
</description>
<units>
string
</units>
</column>
<column>
<name>
CatWISE_RA
</name>
<description>
Right Ascension of the object as quoted by the CatWISE 2020 catalogue.
</description>
<units>
float
</units>
</column>
<column>
<name>
CatWISE_Dec
</name>
<description>
CatWISE 2020 Declination.
</description>
<units>
float
</units>
</column>
</columns>
Loading

0 comments on commit 0629cb4

Please sign in to comment.