Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial commit of metadata file conversion #156

Merged
merged 3 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ dependencies = [
"ipykernel", # Support for Jupyter notebooks
"pandas < 2.1.0",
"pyarrow",
"pyyaml",
"tqdm",
"numpy < 1.25",
"fsspec <= 2023.9.2", # Remove when pyarrow updates to reflect api changes
Expand Down Expand Up @@ -50,6 +51,7 @@ dev = [
"ipython", # Also used in building notebooks into Sphinx
"matplotlib", # Used in sample notebook intro_notebook.ipynb
"ray", # Used for dask-on-ray testing.
"types-PyYAML", # type stubs for pyyaml
maxwest-uw marked this conversation as resolved.
Show resolved Hide resolved
]

[build-system]
Expand Down
13 changes: 3 additions & 10 deletions src/hipscat_import/catalog/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
from typing import List

from hipscat.catalog.catalog import CatalogInfo
from hipscat.io import FilePointer, file_io
from hipscat.io import FilePointer
from hipscat.pixel_math import hipscat_id

from hipscat_import.catalog.file_readers import InputReader, get_file_reader
from hipscat_import.catalog.resume_plan import ResumePlan
from hipscat_import.runtime_arguments import RuntimeArguments
from hipscat_import.runtime_arguments import RuntimeArguments, find_input_paths

# pylint: disable=too-many-locals,too-many-arguments,too-many-instance-attributes,too-many-branches,too-few-public-methods

Expand Down Expand Up @@ -102,14 +102,7 @@ def _check_arguments(self):
self.file_reader = get_file_reader(self.input_format)

# Basic checks complete - make more checks and create directories where necessary
if self.input_path:
if not file_io.does_file_or_directory_exist(self.input_path):
raise FileNotFoundError("input_path not found on local storage")
self.input_paths = file_io.find_files_matching_path(self.input_path, f"*{self.input_format}")
elif self.input_file_list:
self.input_paths = self.input_file_list
if len(self.input_paths) == 0:
raise FileNotFoundError("No input files found")
self.input_paths = find_input_paths(self.input_path, f"*{self.input_format}", self.input_file_list)
self.resume_plan = ResumePlan(
resume=self.resume,
progress_bar=self.progress_bar,
Expand Down
Empty file.
13 changes: 3 additions & 10 deletions src/hipscat_import/cross_match/macauff_arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
from os import path
from typing import List

from hipscat.io import FilePointer, file_io
from hipscat.io import FilePointer
from hipscat.io.validation import is_valid_catalog

from hipscat_import.runtime_arguments import RuntimeArguments
from hipscat_import.runtime_arguments import RuntimeArguments, find_input_paths

# pylint: disable=too-many-instance-attributes
# pylint: disable=unsupported-binary-operation
Expand Down Expand Up @@ -87,14 +87,7 @@ def _check_arguments(self):
raise ValueError("Macauff column metadata file must point to valid file path.")

# Basic checks complete - make more checks and create directories where necessary
if self.input_path:
if not file_io.does_file_or_directory_exist(self.input_path):
raise FileNotFoundError("input_path not found on local storage")
self.input_paths = file_io.find_files_matching_path(self.input_path, f"*{self.input_format}")
elif self.input_file_list:
self.input_paths = self.input_file_list
if len(self.input_paths) == 0:
raise FileNotFoundError("No input files found")
self.input_paths = find_input_paths(self.input_path, f"*{self.input_format}", self.input_file_list)

self.column_names = self.get_column_names()

Expand Down
102 changes: 102 additions & 0 deletions src/hipscat_import/cross_match/macauff_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
"""Utilities for generating parquet metdata from macauff-generated metadata files."""

import xml.etree.ElementTree as ET

import pyarrow as pa
import pyarrow.parquet as pq
import yaml
from hipscat.io import file_io


def _get_inner_xml_value(parent_el, node_type, default_value):
child_el = parent_el.findall(node_type)
if len(child_el) == 0:
return default_value
if len(child_el) > 1:
raise ValueError(f"found too many {node_type} XML elements")
return child_el[0].text.strip()


def _construct_field(name, units, metadata_dict):
"""Helper method to construct a pyarrow field from macauff metadata strings."""
if units == "string":
pa_type = pa.string()
elif units in ("float", "double"):
pa_type = pa.float64()
elif units in ("integer", "long"):
pa_type = pa.int64()
else:
raise ValueError(f"unhandled units {units}")
return pa.field(name, pa_type, metadata=metadata_dict)


def from_xml(input_file, output_file):
delucchi-cmu marked this conversation as resolved.
Show resolved Hide resolved
"""Read XML file with column metadata for a cross-match file from macauff.

Expects XML with the format::

<columns>
<column>
<name>$COLUMN_NAME</name>
<description>$COLUMN_DESCRIPTION</description>
<units>$COLUMN_UNIT_DESCRIPTOR</units>
</column>
</columns>

Args:
input file (str): file to read for match metadata
output_file (str): desired location for output parquet metadata file

Raises
ValueError: if the XML is mal-formed
"""
fields = []
root_el = ET.parse(input_file).getroot()
columns = root_el.findall("column")

for column in columns:
name = _get_inner_xml_value(column, "name", "foo")
description = _get_inner_xml_value(column, "description", "")
units = _get_inner_xml_value(column, "units", "string")

fields.append(_construct_field(name, units, metadata_dict={"macauff_description": description}))

schema = pa.schema(fields)
pq.write_table(schema.empty_table(), where=output_file)


def from_yaml(input_file, output_directory):
"""Read YAML file with column metadata for the various cross-match files from macauff.

Expects YAML with the format::

name: macauff_GaiaDR3xCatWISE2020
description: Match and non-match table for macauff cross-matches of Gaia DR3 and CatWISE 2020.
tables:
- name: macauff_GaiaDR3xCatWISE2020_matches
"@id": "#macauff_GaiaDR3xCatWISE2020_matches"
description: Counterpart associations between Gaia and WISE
columns:
- name: gaia_source_id
datatype: long
description: The Gaia DR3 object ID.

Args:
input file (str): file to read for match metadata
output_dir (str): desired location for output parquet metadata files
We will write one file per table in the "tables" element.
"""
with open(input_file, "r", encoding="utf-8") as file_handle:
metadata = yaml.safe_load(file_handle)
tables = metadata.get("tables", [])
for index, table in enumerate(tables):
fields = []
table_name = table.get("name", f"metadata_table_{index}")
for col_index, column in enumerate(table.get("columns", [])):
name = column.get("name", f"column_{col_index}")
units = column.get("units", "string")
fields.append(_construct_field(name, units, metadata_dict=column))

schema = pa.schema(fields)
output_file = file_io.append_paths_to_pointer(output_directory, f"{table_name}.parquet")
pq.write_table(schema.empty_table(), where=str(output_file))
24 changes: 24 additions & 0 deletions src/hipscat_import/runtime_arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,27 @@ def provenance_info(self) -> dict:
def additional_runtime_provenance_info(self):
"""Any additional runtime args to be included in provenance info from subclasses"""
return {}


def find_input_paths(input_path="", file_matcher="", input_file_list=None):
"""Helper method to find input paths, given either a prefix and format, or an
explicit list of paths.

Args:
input_path (str): prefix to search for
file_matcher (str): matcher to use when searching for files
input_file_list (List[str]): list of input paths
Returns:
matching files, if input_path is provided, otherwise, input_file_list
Raises:
FileNotFoundError if no files are found at the input_path and the provided list is empty.
"""
if input_path:
if not file_io.does_file_or_directory_exist(input_path):
raise FileNotFoundError("input_path not found on local storage")
input_paths = file_io.find_files_matching_path(input_path, file_matcher)
elif input_file_list:
input_paths = input_file_list
if len(input_paths) == 0:
raise FileNotFoundError("No input files found")
return input_paths
5 changes: 5 additions & 0 deletions tests/hipscat_import/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ def empty_data_dir(test_data_dir):
return os.path.join(test_data_dir, "empty")


@pytest.fixture
def macauff_data_dir(test_data_dir):
return os.path.join(test_data_dir, "macauff")


@pytest.fixture
def formats_dir(test_data_dir):
return os.path.join(test_data_dir, "test_formats")
Expand Down
101 changes: 101 additions & 0 deletions tests/hipscat_import/cross_match/test_macauff_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import os
from xml.etree.ElementTree import ParseError

import pytest
from hipscat.io import file_io

from hipscat_import.cross_match.macauff_metadata import from_xml, from_yaml


def test_from_xml(macauff_data_dir, tmp_path):
"""Test XML file reading and parquet metadata generation."""
xml_input_file = os.path.join(macauff_data_dir, "macauff_gaia_catwise_match.xml")
output_file = os.path.join(tmp_path, "output.parquet")

from_xml(xml_input_file, output_file)

single_metadata = file_io.read_parquet_metadata(output_file)
schema = single_metadata.schema.to_arrow_schema()

assert len(schema) == 6


def test_from_xml_malformed(tmp_path):
"""Test some invalid XML file inputs."""
input_file = os.path.join(tmp_path, "input.parquet")
output_file = os.path.join(tmp_path, "output.parquet")

## No "columns" found at all
with open(input_file, "w", encoding="utf-8") as file_handle:
file_handle.write("")

with pytest.raises(ParseError, match="no element found"):
from_xml(input_file, output_file)

## Some columns, too many required fields
with open(input_file, "w", encoding="utf-8") as file_handle:
file_handle.write(
"""<columns>
<column>
<name>Gaia_designation</name>
<name>The Gaia DR3 object ID.</name>
<units>long</units>
</column>
</columns>"""
)

with pytest.raises(ValueError, match="too many name XML elements"):
from_xml(input_file, output_file)

## Unhandled types
with open(input_file, "w", encoding="utf-8") as file_handle:
file_handle.write(
"""<columns>
<column>
<name>Gaia_designation</name>
<description>The Gaia DR3 object ID.</description>
<units>blob</units>
</column>
</columns>"""
)

with pytest.raises(ValueError, match="unhandled units blob"):
from_xml(input_file, output_file)

## Some empty fields are ok!
with open(input_file, "w", encoding="utf-8") as file_handle:
file_handle.write(
"""<columns>
<column>
<name> </name>
<units>long </units>
</column>
</columns>"""
)

from_xml(input_file, output_file)


def test_from_yaml(macauff_data_dir, tmp_path):
"""Test YAML file reading and parquet metadata generation."""
yaml_input_file = os.path.join(macauff_data_dir, "macauff_gaia_catwise_match_and_nonmatches.yaml")

from_yaml(yaml_input_file, tmp_path)

output_file = os.path.join(tmp_path, "macauff_GaiaDR3xCatWISE2020_matches.parquet")
single_metadata = file_io.read_parquet_metadata(output_file)
schema = single_metadata.schema.to_arrow_schema()

assert len(schema) == 7

output_file = os.path.join(tmp_path, "macauff_GaiaDR3xCatWISE2020_gaia_nonmatches.parquet")
single_metadata = file_io.read_parquet_metadata(output_file)
schema = single_metadata.schema.to_arrow_schema()

assert len(schema) == 4

output_file = os.path.join(tmp_path, "macauff_GaiaDR3xCatWISE2020_catwise_nonmatches.parquet")
single_metadata = file_io.read_parquet_metadata(output_file)
schema = single_metadata.schema.to_arrow_schema()

assert len(schema) == 4
68 changes: 68 additions & 0 deletions tests/hipscat_import/data/macauff/macauff_gaia_catwise_match.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
<columns>
<column>
<name>
Gaia_designation
</name>
<description>
The Gaia DR3 object ID.
</description>
<units>
long
</units>
</column>
<column>
<name>
Gaia_RA
</name>
<description>
Right Ascension of the Gaia DR3 source.
</description>
<units>
float
</units>
</column>
<column>
<name>
Gaia_Dec
</name>
<description>
The Gaia DR3 declination.
</description>
<units>
float
</units>
</column>
<column>
<name>
CatWISE_Name
</name>
<description>
The object identifier from the CatWISE 2020 catalogue.
</description>
<units>
string
</units>
</column>
<column>
<name>
CatWISE_RA
</name>
<description>
Right Ascension of the object as quoted by the CatWISE 2020 catalogue.
</description>
<units>
float
</units>
</column>
<column>
<name>
CatWISE_Dec
</name>
<description>
CatWISE 2020 Declination.
</description>
<units>
float
</units>
</column>
</columns>
Loading