diff --git a/src/hipscat_import/catalog/resume_plan.py b/src/hipscat_import/catalog/resume_plan.py index e73d6a4f..c554f32b 100644 --- a/src/hipscat_import/catalog/resume_plan.py +++ b/src/hipscat_import/catalog/resume_plan.py @@ -32,8 +32,6 @@ class ResumePlan(PipelineResumePlan): SPLITTING_STAGE = "splitting" REDUCING_STAGE = "reducing" - ORIGINAL_INPUT_PATHS = "input_paths.txt" - HISTOGRAM_BINARY_FILE = "mapping_histogram.binary" HISTOGRAMS_DIR = "histograms" @@ -63,15 +61,7 @@ def gather_plan(self): step_progress.update(1) ## Validate that we're operating on the same file set as the previous instance. - unique_file_paths = set(self.input_paths) - self.input_paths = list(unique_file_paths) - self.input_paths.sort() - original_input_paths = self.get_original_paths() - if not original_input_paths: - self.save_original_paths() - else: - if original_input_paths != unique_file_paths: - raise ValueError("Different file set from resumed pipeline execution.") + self.input_paths = self.check_original_input_paths(self.input_paths) step_progress.update(1) ## Gather keys for execution. @@ -97,25 +87,6 @@ def gather_plan(self): ) step_progress.update(1) - def get_original_paths(self): - """Get all input file paths from the first pipeline attempt.""" - file_path = file_io.append_paths_to_pointer(self.tmp_path, self.ORIGINAL_INPUT_PATHS) - try: - with open(file_path, "r", encoding="utf-8") as file_handle: - contents = file_handle.readlines() - contents = [path.strip() for path in contents] - original_input_paths = set(contents) - return original_input_paths - except FileNotFoundError: - return [] - - def save_original_paths(self): - """Save input file paths from the first pipeline attempt.""" - file_path = file_io.append_paths_to_pointer(self.tmp_path, self.ORIGINAL_INPUT_PATHS) - with open(file_path, "w", encoding="utf-8") as file_handle: - for path in self.input_paths: - file_handle.write(f"{path}\n") - def get_remaining_map_keys(self): """Gather remaining keys, dropping successful mapping tasks from histogram names. diff --git a/src/hipscat_import/cross_match/macauff_arguments.py b/src/hipscat_import/cross_match/macauff_arguments.py index 74283f18..26079c01 100644 --- a/src/hipscat_import/cross_match/macauff_arguments.py +++ b/src/hipscat_import/cross_match/macauff_arguments.py @@ -4,9 +4,12 @@ from os import path from typing import List +from hipscat.catalog.association_catalog.association_catalog import AssociationCatalogInfo +from hipscat.catalog.catalog_type import CatalogType from hipscat.io import FilePointer from hipscat.io.validation import is_valid_catalog +from hipscat_import.catalog.file_readers import InputReader, get_file_reader from hipscat_import.runtime_arguments import RuntimeArguments, find_input_paths # pylint: disable=too-many-instance-attributes @@ -28,8 +31,6 @@ class MacauffArguments(RuntimeArguments): """can be used instead of `input_format` to import only specified files""" input_paths: List[FilePointer] = field(default_factory=list) """resolved list of all files that will be used in the importer""" - add_hipscat_index: bool = True - """add the hipscat spatial index field alongside the data""" ## Input - Left catalog left_catalog_dir: str = "" @@ -45,8 +46,11 @@ class MacauffArguments(RuntimeArguments): ## `macauff` specific attributes metadata_file_path: str = "" - match_probability_columns: List[str] = field(default_factory=list) - column_names: List[str] = field(default_factory=list) + resume: bool = True + """if there are existing intermediate resume files, should we + read those and continue to create a new catalog where we left off""" + + file_reader: InputReader | None = None def __post_init__(self): self._check_arguments() @@ -89,33 +93,33 @@ def _check_arguments(self): # Basic checks complete - make more checks and create directories where necessary self.input_paths = find_input_paths(self.input_path, f"*{self.input_format}", self.input_file_list) - self.column_names = self.get_column_names() - - def get_column_names(self): - """Grab the macauff column names.""" - # TODO: Actually read in the metadata file once we get the example file from Tom. - - return [ - "Gaia_designation", - "Gaia_RA", - "Gaia_Dec", - "BP", - "G", - "RP", - "CatWISE_Name", - "CatWISE_RA", - "CatWISE_Dec", - "W1", - "W2", - "match_p", - "Separation", - "eta", - "xi", - "Gaia_avg_cont", - "CatWISE_avg_cont", - "Gaia_cont_f1", - "Gaia_cont_f10", - "CatWISE_cont_f1", - "CatWISE_cont_f10", - "CatWISE_fit_sig", - ] + if not self.file_reader: + self.file_reader = get_file_reader(file_format=self.input_format) + + def to_catalog_info(self, total_rows) -> AssociationCatalogInfo: + """Catalog-type-specific dataset info.""" + info = { + "catalog_name": self.output_artifact_name, + "catalog_type": CatalogType.ASSOCIATION, + "total_rows": total_rows, + "primary_column": self.left_id_column, + "primary_catalog": str(self.left_catalog_dir), + "join_column": self.right_id_column, + "join_catalog": str(self.right_catalog_dir), + } + return AssociationCatalogInfo(**info) + + def additional_runtime_provenance_info(self) -> dict: + return { + "input_path": self.input_path, + "input_format": self.input_format, + "left_catalog_dir": self.left_catalog_dir, + "left_id_column": self.left_id_column, + "left_ra_column": self.left_ra_column, + "left_dec_column": self.left_dec_column, + "right_catalog_dir": self.right_catalog_dir, + "right_id_column": self.right_id_column, + "right_ra_column": self.right_ra_column, + "right_dec_column": self.right_dec_column, + "metadata_file_path": self.metadata_file_path, + } diff --git a/src/hipscat_import/cross_match/macauff_map_reduce.py b/src/hipscat_import/cross_match/macauff_map_reduce.py new file mode 100644 index 00000000..27a4b157 --- /dev/null +++ b/src/hipscat_import/cross_match/macauff_map_reduce.py @@ -0,0 +1,106 @@ +import healpy as hp +import numpy as np +import pyarrow as pa +import pyarrow.parquet as pq +from hipscat.io import file_io, paths +from hipscat.pixel_math.healpix_pixel import HealpixPixel +from hipscat.pixel_math.healpix_pixel_function import get_pixel_argsort + +from hipscat_import.catalog.map_reduce import _get_pixel_directory, _iterate_input_file +from hipscat_import.cross_match.macauff_resume_plan import MacauffResumePlan + +# pylint: disable=too-many-arguments,too-many-locals + + +def split_associations( + input_file, + file_reader, + splitting_key, + highest_left_order, + highest_right_order, + left_alignment, + right_alignment, + left_ra_column, + left_dec_column, + right_ra_column, + right_dec_column, + tmp_path, +): + """Map a file of links to their healpix pixels and split into shards. + + + Raises: + ValueError: if the `ra_column` or `dec_column` cannot be found in the input file. + FileNotFoundError: if the file does not exist, or is a directory + """ + for chunk_number, data, mapped_left_pixels in _iterate_input_file( + input_file, file_reader, highest_left_order, left_ra_column, left_dec_column, False + ): + aligned_left_pixels = left_alignment[mapped_left_pixels] + unique_pixels, unique_inverse = np.unique(aligned_left_pixels, return_inverse=True) + + mapped_right_pixels = hp.ang2pix( + 2**highest_right_order, + data[right_ra_column].values, + data[right_dec_column].values, + lonlat=True, + nest=True, + ) + aligned_right_pixels = right_alignment[mapped_right_pixels] + + data["Norder"] = [pix.order for pix in aligned_left_pixels] + data["Dir"] = [pix.dir for pix in aligned_left_pixels] + data["Npix"] = [pix.pixel for pix in aligned_left_pixels] + + data["join_Norder"] = [pix.order for pix in aligned_right_pixels] + data["join_Dir"] = [pix.dir for pix in aligned_right_pixels] + data["join_Npix"] = [pix.pixel for pix in aligned_right_pixels] + + for unique_index, pixel in enumerate(unique_pixels): + mapped_indexes = np.where(unique_inverse == unique_index) + data_indexes = data.index[mapped_indexes[0].tolist()] + + filtered_data = data.filter(items=data_indexes, axis=0) + + pixel_dir = _get_pixel_directory(tmp_path, pixel.order, pixel.pixel) + file_io.make_directory(pixel_dir, exist_ok=True) + output_file = file_io.append_paths_to_pointer( + pixel_dir, f"shard_{splitting_key}_{chunk_number}.parquet" + ) + filtered_data.to_parquet(output_file, index=False) + del data, filtered_data, data_indexes + + MacauffResumePlan.splitting_key_done(tmp_path=tmp_path, splitting_key=splitting_key) + + +def reduce_associations(left_pixel, tmp_path, catalog_path, reduce_key): + """For all points determined to be in the target left_pixel, map them to the appropriate right_pixel + and aggregate into a single parquet file.""" + inputs = _get_pixel_directory(tmp_path, left_pixel.order, left_pixel.pixel) + + if not file_io.directory_has_contents(inputs): + MacauffResumePlan.reducing_key_done( + tmp_path=tmp_path, reducing_key=f"{left_pixel.order}_{left_pixel.pixel}" + ) + print(f"Warning: no input data for pixel {left_pixel}") + return + destination_dir = paths.pixel_directory(catalog_path, left_pixel.order, left_pixel.pixel) + file_io.make_directory(destination_dir, exist_ok=True) + + destination_file = paths.pixel_catalog_file(catalog_path, left_pixel.order, left_pixel.pixel) + + merged_table = pq.read_table(inputs) + dataframe = merged_table.to_pandas().reset_index() + + ## One row group per join_Norder/join_Npix + + join_pixel_frames = dataframe.groupby(["join_Norder", "join_Npix"], group_keys=True) + join_pixels = [HealpixPixel(pixel[0], pixel[1]) for pixel, _ in join_pixel_frames] + pixel_argsort = get_pixel_argsort(join_pixels) + with pq.ParquetWriter(destination_file, merged_table.schema) as writer: + for pixel_index in pixel_argsort: + join_pixel = join_pixels[pixel_index] + join_pixel_frame = join_pixel_frames.get_group((join_pixel.order, join_pixel.pixel)).reset_index() + writer.write_table(pa.Table.from_pandas(join_pixel_frame, schema=merged_table.schema)) + + MacauffResumePlan.reducing_key_done(tmp_path=tmp_path, reducing_key=reduce_key) diff --git a/src/hipscat_import/cross_match/macauff_metadata.py b/src/hipscat_import/cross_match/macauff_metadata.py index 0e20caf0..65c8bf99 100644 --- a/src/hipscat_import/cross_match/macauff_metadata.py +++ b/src/hipscat_import/cross_match/macauff_metadata.py @@ -94,7 +94,7 @@ def from_yaml(input_file, output_directory): table_name = table.get("name", f"metadata_table_{index}") for col_index, column in enumerate(table.get("columns", [])): name = column.get("name", f"column_{col_index}") - units = column.get("units", "string") + units = column.get("datatype", "string") fields.append(_construct_field(name, units, metadata_dict=column)) schema = pa.schema(fields) diff --git a/src/hipscat_import/cross_match/macauff_resume_plan.py b/src/hipscat_import/cross_match/macauff_resume_plan.py new file mode 100644 index 00000000..dfc3d871 --- /dev/null +++ b/src/hipscat_import/cross_match/macauff_resume_plan.py @@ -0,0 +1,141 @@ +"""Utility to hold the file-level pipeline execution plan.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import List, Tuple + +from hipscat.io import FilePointer, file_io +from hipscat.pixel_math.healpix_pixel import HealpixPixel + +from hipscat_import.cross_match.macauff_arguments import MacauffArguments +from hipscat_import.pipeline_resume_plan import PipelineResumePlan + +# pylint: disable=duplicate-code + + +@dataclass +class MacauffResumePlan(PipelineResumePlan): + """Container class for holding the state of each file in the pipeline plan.""" + + input_paths: List[FilePointer] = field(default_factory=list) + """resolved list of all files that will be used in the importer""" + split_keys: List[Tuple[str, str]] = field(default_factory=list) + """set of files (and job keys) that have yet to be split""" + reduce_keys: List[Tuple[HealpixPixel, str]] = field(default_factory=list) + """set of left side catalog pixels (and job keys) that have yet to be reduced/combined""" + + SPLITTING_STAGE = "splitting" + REDUCING_STAGE = "reducing" + + def __init__(self, args: MacauffArguments, left_pixels): + if not args.tmp_path: # pragma: no cover (not reachable, but required for mypy) + raise ValueError("tmp_path is required") + super().__init__(resume=args.resume, progress_bar=args.progress_bar, tmp_path=args.tmp_path) + self.input_paths = args.input_paths + self.gather_plan(left_pixels) + + def gather_plan(self, left_pixels): + """Initialize the plan.""" + ## Make sure it's safe to use existing resume state. + super().safe_to_resume() + + ## Validate existing resume state. + ## - if a later stage is complete, the earlier stages should be complete too. + splitting_done = self.is_splitting_done() + reducing_done = self.is_reducing_done() + + if reducing_done and not splitting_done: + raise ValueError("splitting must be complete before reducing") + + ## Validate that we're operating on the same file set as the previous instance. + self.input_paths = self.check_original_input_paths(self.input_paths) + + ## Gather keys for execution. + if not splitting_done: + self.split_keys = self.get_remaining_split_keys() + if not reducing_done: + self.reduce_keys = self.get_reduce_keys(left_pixels) + ## Go ahead and create our directories for storing resume files. + file_io.make_directory( + file_io.append_paths_to_pointer(self.tmp_path, self.SPLITTING_STAGE), + exist_ok=True, + ) + file_io.make_directory( + file_io.append_paths_to_pointer(self.tmp_path, self.REDUCING_STAGE), + exist_ok=True, + ) + + def get_remaining_split_keys(self): + """Gather remaining keys, dropping successful split tasks from done file names. + + Returns: + list of splitting keys *not* found in files like /resume/path/split_key.done + """ + split_keys = set(self.read_done_keys(self.SPLITTING_STAGE)) + return [ + (f"split_{i}", file_path) + for i, file_path in enumerate(self.input_paths) + if f"split_{i}" not in split_keys + ] + + @classmethod + def splitting_key_done(cls, tmp_path, splitting_key: str): + """Mark a single splitting task as done + + Args: + tmp_path (str): where to write intermediate resume files. + splitting_key (str): unique string for each splitting task (e.g. "split_57") + """ + cls.touch_key_done_file(tmp_path, cls.SPLITTING_STAGE, splitting_key) + + @classmethod + def reducing_key_done(cls, tmp_path, reducing_key: str): + """Mark a single reducing task as done + + Args: + tmp_path (str): where to write intermediate resume files. + reducing_key (str): unique string for each reducing task (e.g. "3_57") + """ + cls.touch_key_done_file(tmp_path, cls.REDUCING_STAGE, reducing_key) + + def wait_for_splitting(self, futures): + """Wait for splitting futures to complete.""" + self.wait_for_futures(futures, self.SPLITTING_STAGE) + remaining_split_items = self.get_remaining_split_keys() + if len(remaining_split_items) > 0: + raise RuntimeError(f"{len(remaining_split_items)} split stages did not complete successfully.") + self.touch_stage_done_file(self.SPLITTING_STAGE) + + def is_splitting_done(self) -> bool: + """Are there files left to split?""" + return self.done_file_exists(self.SPLITTING_STAGE) + + def get_reduce_keys(self, left_pixels): + """Fetch a Tuple for each partition to reduce. + + Tuple contains: + + - left pixel (healpix pixel with both order and pixel) + - reduce key (string of left order+pixel) + + """ + reduced_keys = set(self.read_done_keys(self.REDUCING_STAGE)) + reduce_items = [ + (hp_pixel, f"{hp_pixel.order}_{hp_pixel.pixel}") + for hp_pixel in left_pixels + if f"{hp_pixel.order}_{hp_pixel.pixel}" not in reduced_keys + ] + return reduce_items + + def is_reducing_done(self) -> bool: + """Are there partitions left to reduce?""" + return self.done_file_exists(self.REDUCING_STAGE) + + def wait_for_reducing(self, futures, left_pixels): + """Wait for reducing futures to complete.""" + self.wait_for_futures(futures, self.REDUCING_STAGE) + remaining_reduce_items = self.get_reduce_keys(left_pixels) + if len(remaining_reduce_items) > 0: + raise RuntimeError(f"{len(remaining_reduce_items)} reduce stages did not complete successfully.") + self.touch_stage_done_file(self.REDUCING_STAGE) diff --git a/src/hipscat_import/cross_match/run_macauff_import.py b/src/hipscat_import/cross_match/run_macauff_import.py index aad49b51..35424058 100644 --- a/src/hipscat_import/cross_match/run_macauff_import.py +++ b/src/hipscat_import/cross_match/run_macauff_import.py @@ -1,8 +1,79 @@ +import healpy as hp +import numpy as np +from hipscat.catalog import Catalog +from hipscat.io import file_io, parquet_metadata, paths, write_metadata +from tqdm import tqdm + from hipscat_import.cross_match.macauff_arguments import MacauffArguments +from hipscat_import.cross_match.macauff_map_reduce import reduce_associations, split_associations +from hipscat_import.cross_match.macauff_resume_plan import MacauffResumePlan # pylint: disable=unused-argument +def split( + args, + highest_left_order, + highest_right_order, + left_alignment, + right_alignment, + resume_plan, + client, +): + """Split association rows by their aligned left pixel.""" + + if resume_plan.is_splitting_done(): + return + + reader_future = client.scatter(args.file_reader) + left_alignment_future = client.scatter(left_alignment) + right_alignment_future = client.scatter(right_alignment) + futures = [] + for key, file_path in resume_plan.split_keys: + futures.append( + client.submit( + split_associations, + input_file=file_path, + file_reader=reader_future, + splitting_key=key, + highest_left_order=highest_left_order, + highest_right_order=highest_right_order, + left_alignment=left_alignment_future, + right_alignment=right_alignment_future, + left_ra_column=args.left_ra_column, + left_dec_column=args.left_dec_column, + right_ra_column=args.right_ra_column, + right_dec_column=args.right_dec_column, + tmp_path=args.tmp_path, + ) + ) + resume_plan.wait_for_splitting(futures) + + +def reduce(args, left_pixels, resume_plan, client): + """Reduce left pixel files into a single parquet file per.""" + + if resume_plan.is_reducing_done(): + return + + futures = [] + for ( + left_pixel, + pixel_key, + ) in resume_plan.reduce_keys: + futures.append( + client.submit( + reduce_associations, + left_pixel=left_pixel, + tmp_path=args.tmp_path, + catalog_path=args.catalog_path, + reduce_key=pixel_key, + ) + ) + + resume_plan.wait_for_reducing(futures, left_pixels) + + def run(args, client): """run macauff cross-match import pipeline""" if not args: @@ -10,4 +81,72 @@ def run(args, client): if not isinstance(args, MacauffArguments): raise TypeError("args must be type MacauffArguments") - raise NotImplementedError("macauff pipeline not implemented yet.") + ## Lump all of the catalog reading stuff together under a single block, + ## since it can take a while to load large catalogs and some feedback is nice. + with tqdm(total=5, desc="Planning", disable=not args.progress_bar) as step_progress: + left_catalog = Catalog.read_from_hipscat(args.left_catalog_dir) + step_progress.update(1) + right_catalog = Catalog.read_from_hipscat(args.right_catalog_dir) + step_progress.update(1) + highest_left_order = left_catalog.partition_info.get_highest_order() + highest_right_order = right_catalog.partition_info.get_highest_order() + + left_pixels = left_catalog.partition_info.get_healpix_pixels() + right_pixels = right_catalog.partition_info.get_healpix_pixels() + + regenerated_left_alignment = np.full(hp.order2npix(highest_left_order), None) + for pixel in left_pixels: + explosion_factor = 4 ** (highest_left_order - pixel.order) + exploded_pixels = np.arange( + pixel.pixel * explosion_factor, + (pixel.pixel + 1) * explosion_factor, + ) + regenerated_left_alignment[exploded_pixels] = pixel + step_progress.update(1) + + regenerated_right_alignment = np.full(hp.order2npix(highest_right_order), None) + for pixel in right_pixels: + explosion_factor = 4 ** (highest_right_order - pixel.order) + exploded_pixels = np.arange( + pixel.pixel * explosion_factor, + (pixel.pixel + 1) * explosion_factor, + ) + regenerated_right_alignment[exploded_pixels] = pixel + step_progress.update(1) + + resume_plan = MacauffResumePlan(args, left_pixels) + step_progress.update(1) + + split( + args, + highest_left_order=highest_left_order, + highest_right_order=highest_right_order, + left_alignment=regenerated_left_alignment, + right_alignment=regenerated_right_alignment, + resume_plan=resume_plan, + client=client, + ) + reduce(args, left_pixels, resume_plan, client) + + # All done - write out the metadata + with tqdm(total=4, desc="Finishing", disable=not args.progress_bar) as step_progress: + parquet_metadata.write_parquet_metadata(args.catalog_path) + total_rows = 0 + metadata_path = paths.get_parquet_metadata_pointer(args.catalog_path) + for row_group in parquet_metadata.read_row_group_fragments(metadata_path): + total_rows += row_group.num_rows + # pylint: disable=duplicate-code + # Very similar to /index/run_index.py + step_progress.update(1) + total_rows = int(total_rows) + catalog_info = args.to_catalog_info(total_rows) + write_metadata.write_provenance_info( + catalog_base_dir=args.catalog_path, + dataset_info=catalog_info, + tool_args=args.provenance_info(), + ) + step_progress.update(1) + write_metadata.write_catalog_info(dataset_info=catalog_info, catalog_base_dir=args.catalog_path) + step_progress.update(1) + file_io.remove_directory(args.tmp_path, ignore_errors=True) + step_progress.update(1) diff --git a/src/hipscat_import/pipeline_resume_plan.py b/src/hipscat_import/pipeline_resume_plan.py index 38d727f7..91759f7c 100644 --- a/src/hipscat_import/pipeline_resume_plan.py +++ b/src/hipscat_import/pipeline_resume_plan.py @@ -24,6 +24,8 @@ class PipelineResumePlan: """if true, a tqdm progress bar will be displayed for user feedback of planning progress""" + ORIGINAL_INPUT_PATHS = "input_paths.txt" + def safe_to_resume(self): """Check that we are ok to resume an in-progress pipeline, if one exists. @@ -148,3 +150,39 @@ def get_formatted_stage_name(stage_name) -> str: stage_name = "progress" return f"{stage_name.capitalize(): <10}" + + def check_original_input_paths(self, input_paths): + """Validate that we're operating on the same file set as the original pipeline, + or save the inputs as the originals if not found. + + Args: + input_paths (list[str]): input paths that will be processed by a pipeline. + + Raises: + ValueError if the retrieved file set differs from `input_paths`. + """ + unique_file_paths = set(input_paths) + + original_input_paths = [] + + file_path = file_io.append_paths_to_pointer(self.tmp_path, self.ORIGINAL_INPUT_PATHS) + try: + with open(file_path, "r", encoding="utf-8") as file_handle: + contents = file_handle.readlines() + contents = [path.strip() for path in contents] + original_input_paths = set(contents) + except FileNotFoundError: + pass + + if len(original_input_paths) == 0: + file_path = file_io.append_paths_to_pointer(self.tmp_path, self.ORIGINAL_INPUT_PATHS) + with open(file_path, "w", encoding="utf-8") as file_handle: + for path in input_paths: + file_handle.write(f"{path}\n") + else: + if original_input_paths != unique_file_paths: + raise ValueError("Different file set from resumed pipeline execution.") + + input_paths = list(unique_file_paths) + input_paths.sort() + return input_paths diff --git a/tests/hipscat_import/cross_match/test_macauff_runner.py b/tests/hipscat_import/cross_match/test_macauff_runner.py index 144e7954..f6839175 100644 --- a/tests/hipscat_import/cross_match/test_macauff_runner.py +++ b/tests/hipscat_import/cross_match/test_macauff_runner.py @@ -1,7 +1,13 @@ +import os + import pytest +from hipscat.catalog.association_catalog.association_catalog import AssociationCatalog +from hipscat.io import file_io import hipscat_import.cross_match.run_macauff_import as runner +from hipscat_import.catalog.file_readers import CsvReader from hipscat_import.cross_match.macauff_arguments import MacauffArguments +from hipscat_import.cross_match.macauff_metadata import from_yaml # pylint: disable=too-many-instance-attributes # pylint: disable=duplicate-code @@ -19,31 +25,97 @@ def test_bad_args(dask_client): @pytest.mark.dask -def test_no_implementation( +def test_object_to_object( small_sky_object_catalog, - small_sky_source_catalog, - small_sky_dir, - formats_yaml, tmp_path, + macauff_data_dir, dask_client, ): """Test that we can create a MacauffArguments instance with two valid catalogs.""" + + yaml_input_file = os.path.join(macauff_data_dir, "macauff_gaia_catwise_match_and_nonmatches.yaml") + from_yaml(yaml_input_file, tmp_path) + matches_schema_file = os.path.join(tmp_path, "macauff_GaiaDR3xCatWISE2020_matches.parquet") + single_metadata = file_io.read_parquet_metadata(matches_schema_file) + schema = single_metadata.schema.to_arrow_schema() + + assert len(schema) == 7 + args = MacauffArguments( output_path=tmp_path, - output_artifact_name="object_to_source", + output_artifact_name="object_to_object", tmp_dir=tmp_path, left_catalog_dir=small_sky_object_catalog, - left_ra_column="ra", - left_dec_column="dec", - left_id_column="id", - right_catalog_dir=small_sky_source_catalog, - right_ra_column="source_ra", - right_dec_column="source_dec", - right_id_column="source_id", - input_path=small_sky_dir, + left_ra_column="gaia_ra", + left_dec_column="gaia_dec", + left_id_column="gaia_source_id", + right_catalog_dir=small_sky_object_catalog, + right_ra_column="catwise_ra", + right_dec_column="catwise_dec", + right_id_column="catwise_name", + input_file_list=[os.path.join(macauff_data_dir, "gaia_small_sky_matches.csv")], input_format="csv", - metadata_file_path=formats_yaml, + overwrite=True, + file_reader=CsvReader(schema_file=matches_schema_file, header=None), + metadata_file_path=matches_schema_file, + # progress_bar=False, ) + os.makedirs(os.path.join(args.tmp_path, "splitting")) - with pytest.raises(NotImplementedError, match="not implemented yet."): - runner.run(args, dask_client) + runner.run(args, dask_client) + + ## Check that the association data can be parsed as a valid association catalog. + catalog = AssociationCatalog.read_from_hipscat(args.catalog_path) + assert catalog.on_disk + assert catalog.catalog_path == args.catalog_path + assert len(catalog.get_join_pixels()) == 1 + assert catalog.catalog_info.total_rows == 131 + + +@pytest.mark.dask +def test_source_to_object( + small_sky_object_catalog, + small_sky_source_catalog, + tmp_path, + macauff_data_dir, + dask_client, +): + """Test that we can create a MacauffArguments instance with two valid catalogs.""" + + yaml_input_file = os.path.join(macauff_data_dir, "macauff_gaia_catwise_match_and_nonmatches.yaml") + from_yaml(yaml_input_file, tmp_path) + matches_schema_file = os.path.join(tmp_path, "macauff_GaiaDR3xCatWISE2020_matches.parquet") + single_metadata = file_io.read_parquet_metadata(matches_schema_file) + schema = single_metadata.schema.to_arrow_schema() + + assert len(schema) == 7 + + args = MacauffArguments( + output_path=tmp_path, + output_artifact_name="object_to_object", + tmp_dir=tmp_path, + left_catalog_dir=small_sky_source_catalog, + left_ra_column="gaia_ra", + left_dec_column="gaia_dec", + left_id_column="gaia_source_id", + right_catalog_dir=small_sky_object_catalog, + right_ra_column="catwise_ra", + right_dec_column="catwise_dec", + right_id_column="catwise_name", + input_file_list=[os.path.join(macauff_data_dir, "small_sky_and_source_matches.csv")], + input_format="csv", + overwrite=True, + file_reader=CsvReader(schema_file=matches_schema_file, header=None), + metadata_file_path=matches_schema_file, + progress_bar=False, + ) + os.makedirs(os.path.join(args.tmp_path, "splitting")) + + runner.run(args, dask_client) + + ## Check that the association data can be parsed as a valid association catalog. + catalog = AssociationCatalog.read_from_hipscat(args.catalog_path) + assert catalog.on_disk + assert catalog.catalog_path == args.catalog_path + assert len(catalog.get_join_pixels()) == 8 + assert catalog.catalog_info.total_rows == 34 diff --git a/tests/hipscat_import/data/macauff/gaia_small_sky_matches.csv b/tests/hipscat_import/data/macauff/gaia_small_sky_matches.csv new file mode 100644 index 00000000..fddaf044 --- /dev/null +++ b/tests/hipscat_import/data/macauff/gaia_small_sky_matches.csv @@ -0,0 +1,131 @@ +700,282.5,-58.5,cat_700,282.5,-58.5,0.5354709584 +701,299.5,-48.5,cat_701,299.5,-48.5,0.4724403517 +702,310.5,-27.5,cat_702,310.5,-27.5,0.4614836636 +703,286.5,-69.5,cat_703,286.5,-69.5,0.6277813704 +704,326.5,-45.5,cat_704,326.5,-45.5,0.1882040269 +705,335.5,-32.5,cat_705,335.5,-32.5,0.9985247394 +706,297.5,-36.5,cat_706,297.5,-36.5,0.02301896624 +707,308.5,-69.5,cat_707,308.5,-69.5,0.5904110262 +708,307.5,-37.5,cat_708,307.5,-37.5,0.5878325892 +709,294.5,-45.5,cat_709,294.5,-45.5,0.1980368365 +710,341.5,-39.5,cat_710,341.5,-39.5,0.04196035272 +711,305.5,-49.5,cat_711,305.5,-49.5,0.3062621527 +712,288.5,-49.5,cat_712,288.5,-49.5,0.6435039744 +713,298.5,-41.5,cat_713,298.5,-41.5,0.7619788882 +714,303.5,-37.5,cat_714,303.5,-37.5,0.7915978441 +715,280.5,-35.5,cat_715,280.5,-35.5,0.1868950003 +716,305.5,-60.5,cat_716,305.5,-60.5,0.8443717349 +717,303.5,-43.5,cat_717,303.5,-43.5,0.3929019182 +718,292.5,-60.5,cat_718,292.5,-60.5,0.2896284887 +719,344.5,-39.5,cat_719,344.5,-39.5,0.00222404657 +720,344.5,-47.5,cat_720,344.5,-47.5,0.6291603128 +721,314.5,-34.5,cat_721,314.5,-34.5,0.6322839802 +722,350.5,-58.5,cat_722,350.5,-58.5,0.6985155513 +723,315.5,-68.5,cat_723,315.5,-68.5,0.7755598471 +724,323.5,-41.5,cat_724,323.5,-41.5,0.1102038902 +725,308.5,-41.5,cat_725,308.5,-41.5,0.3612986851 +726,341.5,-37.5,cat_726,341.5,-37.5,0.3112333674 +727,301.5,-44.5,cat_727,301.5,-44.5,0.7305355582 +728,328.5,-47.5,cat_728,328.5,-47.5,0.07008040529 +729,299.5,-59.5,cat_729,299.5,-59.5,0.7683396723 +730,322.5,-61.5,cat_730,322.5,-61.5,0.4413972816 +731,343.5,-52.5,cat_731,343.5,-52.5,0.4255687054 +732,337.5,-39.5,cat_732,337.5,-39.5,0.7397828751 +733,329.5,-65.5,cat_733,329.5,-65.5,0.1127992088 +734,348.5,-66.5,cat_734,348.5,-66.5,0.3307251864 +735,299.5,-65.5,cat_735,299.5,-65.5,0.07576674192 +736,303.5,-52.5,cat_736,303.5,-52.5,0.1144396065 +737,316.5,-33.5,cat_737,316.5,-33.5,0.05698199793 +738,345.5,-64.5,cat_738,345.5,-64.5,0.1369706484 +739,332.5,-57.5,cat_739,332.5,-57.5,0.3326924736 +740,306.5,-33.5,cat_740,306.5,-33.5,0.4659156208 +741,303.5,-38.5,cat_741,303.5,-38.5,0.6866683066 +742,348.5,-45.5,cat_742,348.5,-45.5,0.1719145118 +743,307.5,-25.5,cat_743,307.5,-25.5,0.469163747 +744,349.5,-39.5,cat_744,349.5,-39.5,0.2711982345 +745,337.5,-38.5,cat_745,337.5,-38.5,0.5957227712 +746,283.5,-31.5,cat_746,283.5,-31.5,0.8216960109 +747,327.5,-61.5,cat_747,327.5,-61.5,0.9992690083 +748,296.5,-63.5,cat_748,296.5,-63.5,0.9912890703 +749,293.5,-55.5,cat_749,293.5,-55.5,0.5712789508 +750,338.5,-67.5,cat_750,338.5,-67.5,0.0007103625003 +751,330.5,-44.5,cat_751,330.5,-44.5,0.2530383004 +752,291.5,-34.5,cat_752,291.5,-34.5,0.007396371453 +753,307.5,-45.5,cat_753,307.5,-45.5,0.4834114007 +754,313.5,-30.5,cat_754,313.5,-30.5,0.565495019 +755,303.5,-38.5,cat_755,303.5,-38.5,0.3207177632 +756,319.5,-35.5,cat_756,319.5,-35.5,0.5495406617 +757,346.5,-34.5,cat_757,346.5,-34.5,0.9568408496 +758,325.5,-53.5,cat_758,325.5,-53.5,0.8464719616 +759,290.5,-48.5,cat_759,290.5,-48.5,0.08335239057 +760,320.5,-53.5,cat_760,320.5,-53.5,0.6372577995 +761,329.5,-29.5,cat_761,329.5,-29.5,0.2821003251 +762,327.5,-51.5,cat_762,327.5,-51.5,0.3431085653 +763,306.5,-38.5,cat_763,306.5,-38.5,0.7543905106 +764,297.5,-45.5,cat_764,297.5,-45.5,0.8929035356 +765,306.5,-35.5,cat_765,306.5,-35.5,0.8097229976 +766,310.5,-63.5,cat_766,310.5,-63.5,0.6720056414 +767,314.5,-29.5,cat_767,314.5,-29.5,0.4712806906 +768,297.5,-60.5,cat_768,297.5,-60.5,0.5851340965 +769,307.5,-42.5,cat_769,307.5,-42.5,0.7633953196 +770,285.5,-29.5,cat_770,285.5,-29.5,0.05761527425 +771,348.5,-67.5,cat_771,348.5,-67.5,0.9965400568 +772,348.5,-64.5,cat_772,348.5,-64.5,0.4134575263 +773,293.5,-50.5,cat_773,293.5,-50.5,0.006651984057 +774,281.5,-54.5,cat_774,281.5,-54.5,0.01173790109 +775,321.5,-54.5,cat_775,321.5,-54.5,0.5145306498 +776,344.5,-63.5,cat_776,344.5,-63.5,0.3543206217 +777,307.5,-39.5,cat_777,307.5,-39.5,0.8025718501 +778,313.5,-36.5,cat_778,313.5,-36.5,0.3606419908 +779,347.5,-29.5,cat_779,347.5,-29.5,0.8431876746 +780,326.5,-52.5,cat_780,326.5,-52.5,0.3550175616 +781,330.5,-46.5,cat_781,330.5,-46.5,0.01706686238 +782,290.5,-39.5,cat_782,290.5,-39.5,0.4475960667 +783,286.5,-42.5,cat_783,286.5,-42.5,0.1192026354 +784,338.5,-40.5,cat_784,338.5,-40.5,0.7837749925 +785,296.5,-44.5,cat_785,296.5,-44.5,0.5337388926 +786,336.5,-33.5,cat_786,336.5,-33.5,0.1637974224 +787,320.5,-47.5,cat_787,320.5,-47.5,0.1606707258 +788,283.5,-61.5,cat_788,283.5,-61.5,0.3226904481 +789,287.5,-45.5,cat_789,287.5,-45.5,0.871912064 +790,286.5,-35.5,cat_790,286.5,-35.5,0.4421276183 +791,312.5,-28.5,cat_791,312.5,-28.5,0.2629705535 +792,320.5,-69.5,cat_792,320.5,-69.5,0.4247698928 +793,289.5,-58.5,cat_793,289.5,-58.5,0.1293312261 +794,300.5,-66.5,cat_794,300.5,-66.5,0.9858600303 +795,306.5,-58.5,cat_795,306.5,-58.5,0.141651889 +796,320.5,-33.5,cat_796,320.5,-33.5,0.5471312733 +797,308.5,-62.5,cat_797,308.5,-62.5,0.4071495713 +798,316.5,-36.5,cat_798,316.5,-36.5,0.298389701 +799,313.5,-31.5,cat_799,313.5,-31.5,0.1003646965 +800,299.5,-37.5,cat_800,299.5,-37.5,0.1195110611 +801,309.5,-50.5,cat_801,309.5,-50.5,0.8172118166 +802,304.5,-49.5,cat_802,304.5,-49.5,0.53025181 +803,336.5,-25.5,cat_803,336.5,-25.5,0.2723708625 +804,322.5,-66.5,cat_804,322.5,-66.5,0.6105308707 +805,297.5,-52.5,cat_805,297.5,-52.5,0.02027159887 +806,312.5,-29.5,cat_806,312.5,-29.5,0.2661396854 +807,303.5,-60.5,cat_807,303.5,-60.5,0.9609377314 +808,320.5,-40.5,cat_808,320.5,-40.5,0.113751978 +809,283.5,-34.5,cat_809,283.5,-34.5,0.998665039 +810,301.5,-59.5,cat_810,301.5,-59.5,0.1373287203 +811,315.5,-68.5,cat_811,315.5,-68.5,0.6869200694 +812,346.5,-60.5,cat_812,346.5,-60.5,0.5913654966 +813,349.5,-37.5,cat_813,349.5,-37.5,0.4280948593 +814,312.5,-33.5,cat_814,312.5,-33.5,0.06385846256 +815,283.5,-68.5,cat_815,283.5,-68.5,0.5410954638 +816,288.5,-69.5,cat_816,288.5,-69.5,0.2854363325 +817,318.5,-48.5,cat_817,318.5,-48.5,0.5805624378 +818,300.5,-55.5,cat_818,300.5,-55.5,0.006019407311 +819,313.5,-35.5,cat_819,313.5,-35.5,0.8343255244 +820,286.5,-46.5,cat_820,286.5,-46.5,0.8349398583 +821,330.5,-52.5,cat_821,330.5,-52.5,0.164118026 +822,301.5,-54.5,cat_822,301.5,-54.5,0.3361374088 +823,338.5,-45.5,cat_823,338.5,-45.5,0.8403731808 +824,305.5,-28.5,cat_824,305.5,-28.5,0.3213994958 +825,315.5,-30.5,cat_825,315.5,-30.5,0.8849820777 +826,335.5,-69.5,cat_826,335.5,-69.5,0.3293432462 +827,310.5,-40.5,cat_827,310.5,-40.5,0.6342142199 +828,330.5,-26.5,cat_828,330.5,-26.5,0.8067550106 +829,314.5,-35.5,cat_829,314.5,-35.5,0.8037233427 +830,306.5,-50.5,cat_830,306.5,-50.5,0.8159603245 \ No newline at end of file diff --git a/tests/hipscat_import/data/macauff/small_sky_and_source_matches.csv b/tests/hipscat_import/data/macauff/small_sky_and_source_matches.csv new file mode 100644 index 00000000..146877e2 --- /dev/null +++ b/tests/hipscat_import/data/macauff/small_sky_and_source_matches.csv @@ -0,0 +1,34 @@ +72008,320.8364113,-69.45376863,792,320.5,-69.5,0.996 +73091,320.9404216,-69.46498164,792,320.5,-69.5,0.998 +83813,335.5861031,-69.37807662,826,335.5,-69.5,0.994 +78312,335.5182331,-69.38325891,826,335.5,-69.5,0.991 +76201,288.9361436,-69.31626483,816,288.5,-69.5,0.993 +72926,288.9503144,-69.3115179,816,288.5,-69.5,0.99 +72813,310.5307814,-63.34133051,766,310.5,-63.5,0.994 +70048,310.5876212,-63.33485542,766,310.5,-63.5,0.999 +83424,283.7763878,-61.30283808,788,283.5,-61.5,1 +73626,283.864721,-61.29113052,788,283.5,-61.5,1 +84534,347.9612914,-29.13951069,779,347.5,-29.5,0.998 +87130,347.9655757,-29.1246194,779,347.5,-29.5,0.993 +79615,347.9345496,-29.10876863,779,347.5,-29.5,0.99 +73071,347.9463072,-29.08860161,779,347.5,-29.5,0.992 +78803,347.997414,-29.07112828,779,347.5,-29.5,0.999 +76988,348.0338029,-29.04750582,779,347.5,-29.5,0.999 +83444,348.0537862,-29.02085159,779,347.5,-29.5,0.996 +72480,320.0880522,-35.28432758,756,319.5,-35.5,0.997 +76134,320.0697349,-35.21411381,756,319.5,-35.5,0.99 +75313,319.7793679,-35.45350619,756,319.5,-35.5,0.999 +79351,319.7409873,-35.4177272,756,319.5,-35.5,0.993 +78766,319.8029046,-35.42603476,756,319.5,-35.5,0.992 +74689,319.7981819,-35.41676507,756,319.5,-35.5,0.997 +73928,319.7099797,-35.43311803,756,319.5,-35.5,0.99 +77882,319.689082,-35.43731031,756,319.5,-35.5,0.998 +85015,319.6872701,-35.43434368,756,319.5,-35.5,0.99 +75167,319.7008698,-35.43045134,756,319.5,-35.5,0.996 +75394,319.736227,-35.40559895,756,319.5,-35.5,0.999 +80736,319.7140687,-35.37583874,756,319.5,-35.5,0.99 +86351,290.5372378,-39.34034881,782,290.5,-39.5,0.996 +84773,290.5185662,-39.3174862,782,290.5,-39.5,0.998 +75092,290.5865147,-39.30033282,782,290.5,-39.5,0.992 +78548,290.5404456,-39.31843165,782,290.5,-39.5,0.997 +79186,290.7615303,-39.38550864,782,290.5,-39.5,0.994 \ No newline at end of file