Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Macauff pipeline boilerplate #152

Merged
merged 12 commits into from
Oct 30, 2023
127 changes: 127 additions & 0 deletions src/hipscat_import/cross_match/macauff_arguments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
from __future__ import annotations

from dataclasses import dataclass, field
from os import path
from typing import List

from hipscat.io import FilePointer, file_io
from hipscat.io.validation import is_valid_catalog

from hipscat_import.runtime_arguments import RuntimeArguments

# pylint: disable=too-many-instance-attributes
# pylint: disable=unsupported-binary-operation


@dataclass
class MacauffArguments(RuntimeArguments):
"""Data class for holding cross-match association arguments"""
## Input - Cross-match data
input_path: FilePointer | None = None
"""path to search for the input data"""
input_format: str = ""
"""specifier of the input data format. this will be used to find an appropriate
InputReader type, and may be used to find input files, via a match like
``<input_path>/*<input_format>`` """
input_file_list: List[FilePointer] = field(default_factory=list)
"""can be used instead of `input_format` to import only specified files"""
input_paths: List[FilePointer] = field(default_factory=list)
"""resolved list of all files that will be used in the importer"""
add_hipscat_index: bool = True
"""add the hipscat spatial index field alongside the data"""

## Input - Left catalog
left_catalog_dir: str = ""
left_id_column: str = ""
left_ra_column: str = ""
left_dec_column: str = ""

## Input - Right catalog
right_catalog_dir: str = ""
right_id_column: str = ""
right_ra_column: str = ""
right_dec_column: str = ""

## `macauff` specific attributes
metadata_file_path: str = ""
match_probability_columns: List[str] = field(default_factory=list)
column_names: List[str] = field(default_factory=list)

def __post_init__(self):
self._check_arguments()

def _check_arguments(self):
super()._check_arguments()

if not self.input_path and not self.input_file_list:
raise ValueError("input_path nor input_file_list not provided")
if not self.input_format:
raise ValueError("input_format is required")

if not self.left_catalog_dir:
raise ValueError("left_catalog_dir is required")
if not self.left_id_column:
raise ValueError("left_id_column is required")
if not self.left_ra_column:
raise ValueError("left_ra_column is required")
if not self.left_dec_column:
raise ValueError("left_dec_column is required")
if not is_valid_catalog(self.left_catalog_dir):
raise ValueError("left_catalog_dir not a valid catalog")

if not self.right_catalog_dir:
raise ValueError("right_catalog_dir is required")
if not self.right_id_column:
raise ValueError("right_id_column is required")
if not self.right_ra_column:
raise ValueError("right_ra_column is required")
if not self.right_dec_column:
raise ValueError("right_dec_column is required")
if not is_valid_catalog(self.right_catalog_dir):
raise ValueError("right_catalog_dir not a valid catalog")

if not self.metadata_file_path:
raise ValueError("metadata_file_path required for macauff crossmatch")
if not path.isfile(self.metadata_file_path):
raise ValueError("Macauff column metadata file must point to valid file path.")

# Basic checks complete - make more checks and create directories where necessary
if self.input_path:
if not file_io.does_file_or_directory_exist(self.input_path):
raise FileNotFoundError("input_path not found on local storage")
self.input_paths = file_io.find_files_matching_path(self.input_path, f"*{self.input_format}")
elif self.input_file_list:
self.input_paths = self.input_file_list
if len(self.input_paths) == 0:
raise FileNotFoundError("No input files found")

self.column_names = self.get_column_names()

def get_column_names(self):
"""Grab the macauff column names."""
# TODO: Actually read in the metadata file once we get the example file from Tom.

return [
'Gaia_designation',
'Gaia_RA',
'Gaia_Dec',
'BP',
'G',
'RP',
'CatWISE_Name',
'CatWISE_RA',
'CatWISE_Dec',
'W1',
'W2',
'match_p',
'Separation',
'eta',
'xi',
'Gaia_avg_cont',
'CatWISE_avg_cont',
'Gaia_cont_f1',
'Gaia_cont_f10',
'CatWISE_cont_f1',
'CatWISE_cont_f10',
'CatWISE_fit_sig',
]
12 changes: 12 additions & 0 deletions src/hipscat_import/cross_match/run_macauff_import.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from hipscat_import.cross_match.macauff_arguments import MacauffArguments

# pylint: disable=unused-argument

def run(args, client):
"""run macauff cross-match import pipeline"""
if not args:
raise TypeError("args is required and should be type MacauffArguments")
if not isinstance(args, MacauffArguments):
raise TypeError("args must be type MacauffArguments")

raise NotImplementedError("macauff pipeline not implemented yet.")
4 changes: 4 additions & 0 deletions src/hipscat_import/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
from dask.distributed import Client

import hipscat_import.catalog.run_import as catalog_runner
import hipscat_import.cross_match.run_macauff_import as macauff_runner
import hipscat_import.index.run_index as index_runner
import hipscat_import.margin_cache.margin_cache as margin_runner
import hipscat_import.soap.run_soap as soap_runner
import hipscat_import.verification.run_verification as verification_runner
from hipscat_import.catalog.arguments import ImportArguments
from hipscat_import.cross_match.macauff_arguments import MacauffArguments
from hipscat_import.index.arguments import IndexArguments
from hipscat_import.margin_cache.margin_cache_arguments import MarginCacheArguments
from hipscat_import.runtime_arguments import RuntimeArguments
Expand Down Expand Up @@ -49,6 +51,8 @@ def pipeline_with_client(args: RuntimeArguments, client: Client):
soap_runner.run(args, client)
elif isinstance(args, VerificationArguments):
verification_runner.run(args)
elif isinstance(args, MacauffArguments):
macauff_runner.run(args, client)
else:
raise ValueError("unknown args type")
except Exception as exception: # pylint: disable=broad-exception-caught
Expand Down
4 changes: 4 additions & 0 deletions tests/hipscat_import/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ def formats_pandasindex(test_data_dir):
def formats_multiindex(test_data_dir):
return os.path.join(test_data_dir, "test_formats", "multiindex.parquet")

@pytest.fixture
def formats_yaml(test_data_dir):
return os.path.join(test_data_dir, "test_formats", "macauff_metadata.yaml")


@pytest.fixture
def small_sky_parts_dir(test_data_dir):
Expand Down
Loading