diff --git a/src/hipscat_import/catalog/map_reduce.py b/src/hipscat_import/catalog/map_reduce.py index 9c3115e2..cec8c39a 100644 --- a/src/hipscat_import/catalog/map_reduce.py +++ b/src/hipscat_import/catalog/map_reduce.py @@ -8,34 +8,16 @@ import pyarrow.parquet as pq from hipscat import pixel_math from hipscat.io import FilePointer, file_io, paths +from hipscat.pixel_math.healpix_pixel import HealpixPixel from hipscat.pixel_math.hipscat_id import HIPSCAT_ID_COLUMN, hipscat_id_to_healpix from hipscat_import.catalog.file_readers import InputReader from hipscat_import.catalog.resume_plan import ResumePlan +from hipscat_import.pipeline_resume_plan import get_pixel_cache_directory # pylint: disable=too-many-locals,too-many-arguments -def _get_pixel_directory(cache_path: FilePointer, order: np.int64, pixel: np.uint64): - """Create a path for intermediate pixel data. - - This will take the form: - - /dir_/pixel_ - - where the directory separator is calculated using integer division: - - (pixel/10000)*10000 - - and exists to mitigate problems on file systems that don't support - more than 10_000 children nodes. - """ - dir_number = int(pixel / 10_000) * 10_000 - return file_io.append_paths_to_pointer( - cache_path, f"order_{order}", f"dir_{dir_number}", f"pixel_{pixel}" - ) - - def _has_named_index(dataframe): """Heuristic to determine if a dataframe has some meaningful index. @@ -172,7 +154,7 @@ def split_pixels( filtered_data = data.filter(items=data_indexes, axis=0) - pixel_dir = _get_pixel_directory(cache_shard_path, order, pixel) + pixel_dir = get_pixel_cache_directory(cache_shard_path, HealpixPixel(order, pixel)) file_io.make_directory(pixel_dir, exist_ok=True) output_file = file_io.append_paths_to_pointer( pixel_dir, f"shard_{splitting_key}_{chunk_number}.parquet" @@ -259,7 +241,8 @@ def reduce_pixel_shards( ).schema.to_arrow_schema() tables = [] - pixel_dir = _get_pixel_directory(cache_shard_path, destination_pixel_order, destination_pixel_number) + healpix_pixel = HealpixPixel(destination_pixel_order, destination_pixel_number) + pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel) if schema: tables.append(pq.read_table(pixel_dir, schema=schema)) @@ -273,7 +256,7 @@ def reduce_pixel_shards( if rows_written != destination_pixel_size: raise ValueError( "Unexpected number of objects at pixel " - f"({destination_pixel_order}, {destination_pixel_number})." + f"({healpix_pixel})." f" Expected {destination_pixel_size}, wrote {rows_written}" ) @@ -286,13 +269,9 @@ def reduce_pixel_shards( dataframe[dec_column].values, ) - dataframe["Norder"] = np.full(rows_written, fill_value=destination_pixel_order, dtype=np.uint8) - dataframe["Dir"] = np.full( - rows_written, - fill_value=int(destination_pixel_number / 10_000) * 10_000, - dtype=np.uint64, - ) - dataframe["Npix"] = np.full(rows_written, fill_value=destination_pixel_number, dtype=np.uint64) + dataframe["Norder"] = np.full(rows_written, fill_value=healpix_pixel.order, dtype=np.uint8) + dataframe["Dir"] = np.full(rows_written, fill_value=healpix_pixel.dir, dtype=np.uint64) + dataframe["Npix"] = np.full(rows_written, fill_value=healpix_pixel.pixel, dtype=np.uint64) if add_hipscat_index: ## If we had a meaningful index before, preserve it as a column. @@ -304,7 +283,7 @@ def reduce_pixel_shards( del dataframe, merged_table, tables if delete_input_files: - pixel_dir = _get_pixel_directory(cache_shard_path, destination_pixel_order, destination_pixel_number) + pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel) file_io.remove_directory(pixel_dir, ignore_errors=True, storage_options=storage_options) diff --git a/src/hipscat_import/cross_match/__init__.py b/src/hipscat_import/cross_match/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/src/hipscat_import/cross_match/macauff_arguments.py b/src/hipscat_import/cross_match/macauff_arguments.py deleted file mode 100644 index 26079c01..00000000 --- a/src/hipscat_import/cross_match/macauff_arguments.py +++ /dev/null @@ -1,125 +0,0 @@ -from __future__ import annotations - -from dataclasses import dataclass, field -from os import path -from typing import List - -from hipscat.catalog.association_catalog.association_catalog import AssociationCatalogInfo -from hipscat.catalog.catalog_type import CatalogType -from hipscat.io import FilePointer -from hipscat.io.validation import is_valid_catalog - -from hipscat_import.catalog.file_readers import InputReader, get_file_reader -from hipscat_import.runtime_arguments import RuntimeArguments, find_input_paths - -# pylint: disable=too-many-instance-attributes -# pylint: disable=unsupported-binary-operation - - -@dataclass -class MacauffArguments(RuntimeArguments): - """Data class for holding cross-match association arguments""" - - ## Input - Cross-match data - input_path: FilePointer | None = None - """path to search for the input data""" - input_format: str = "" - """specifier of the input data format. this will be used to find an appropriate - InputReader type, and may be used to find input files, via a match like - ``/*`` """ - input_file_list: List[FilePointer] = field(default_factory=list) - """can be used instead of `input_format` to import only specified files""" - input_paths: List[FilePointer] = field(default_factory=list) - """resolved list of all files that will be used in the importer""" - - ## Input - Left catalog - left_catalog_dir: str = "" - left_id_column: str = "" - left_ra_column: str = "" - left_dec_column: str = "" - - ## Input - Right catalog - right_catalog_dir: str = "" - right_id_column: str = "" - right_ra_column: str = "" - right_dec_column: str = "" - - ## `macauff` specific attributes - metadata_file_path: str = "" - resume: bool = True - """if there are existing intermediate resume files, should we - read those and continue to create a new catalog where we left off""" - - file_reader: InputReader | None = None - - def __post_init__(self): - self._check_arguments() - - def _check_arguments(self): - super()._check_arguments() - - if not self.input_path and not self.input_file_list: - raise ValueError("input_path nor input_file_list not provided") - if not self.input_format: - raise ValueError("input_format is required") - - if not self.left_catalog_dir: - raise ValueError("left_catalog_dir is required") - if not self.left_id_column: - raise ValueError("left_id_column is required") - if not self.left_ra_column: - raise ValueError("left_ra_column is required") - if not self.left_dec_column: - raise ValueError("left_dec_column is required") - if not is_valid_catalog(self.left_catalog_dir): - raise ValueError("left_catalog_dir not a valid catalog") - - if not self.right_catalog_dir: - raise ValueError("right_catalog_dir is required") - if not self.right_id_column: - raise ValueError("right_id_column is required") - if not self.right_ra_column: - raise ValueError("right_ra_column is required") - if not self.right_dec_column: - raise ValueError("right_dec_column is required") - if not is_valid_catalog(self.right_catalog_dir): - raise ValueError("right_catalog_dir not a valid catalog") - - if not self.metadata_file_path: - raise ValueError("metadata_file_path required for macauff crossmatch") - if not path.isfile(self.metadata_file_path): - raise ValueError("Macauff column metadata file must point to valid file path.") - - # Basic checks complete - make more checks and create directories where necessary - self.input_paths = find_input_paths(self.input_path, f"*{self.input_format}", self.input_file_list) - - if not self.file_reader: - self.file_reader = get_file_reader(file_format=self.input_format) - - def to_catalog_info(self, total_rows) -> AssociationCatalogInfo: - """Catalog-type-specific dataset info.""" - info = { - "catalog_name": self.output_artifact_name, - "catalog_type": CatalogType.ASSOCIATION, - "total_rows": total_rows, - "primary_column": self.left_id_column, - "primary_catalog": str(self.left_catalog_dir), - "join_column": self.right_id_column, - "join_catalog": str(self.right_catalog_dir), - } - return AssociationCatalogInfo(**info) - - def additional_runtime_provenance_info(self) -> dict: - return { - "input_path": self.input_path, - "input_format": self.input_format, - "left_catalog_dir": self.left_catalog_dir, - "left_id_column": self.left_id_column, - "left_ra_column": self.left_ra_column, - "left_dec_column": self.left_dec_column, - "right_catalog_dir": self.right_catalog_dir, - "right_id_column": self.right_id_column, - "right_ra_column": self.right_ra_column, - "right_dec_column": self.right_dec_column, - "metadata_file_path": self.metadata_file_path, - } diff --git a/src/hipscat_import/cross_match/macauff_map_reduce.py b/src/hipscat_import/cross_match/macauff_map_reduce.py deleted file mode 100644 index 27a4b157..00000000 --- a/src/hipscat_import/cross_match/macauff_map_reduce.py +++ /dev/null @@ -1,106 +0,0 @@ -import healpy as hp -import numpy as np -import pyarrow as pa -import pyarrow.parquet as pq -from hipscat.io import file_io, paths -from hipscat.pixel_math.healpix_pixel import HealpixPixel -from hipscat.pixel_math.healpix_pixel_function import get_pixel_argsort - -from hipscat_import.catalog.map_reduce import _get_pixel_directory, _iterate_input_file -from hipscat_import.cross_match.macauff_resume_plan import MacauffResumePlan - -# pylint: disable=too-many-arguments,too-many-locals - - -def split_associations( - input_file, - file_reader, - splitting_key, - highest_left_order, - highest_right_order, - left_alignment, - right_alignment, - left_ra_column, - left_dec_column, - right_ra_column, - right_dec_column, - tmp_path, -): - """Map a file of links to their healpix pixels and split into shards. - - - Raises: - ValueError: if the `ra_column` or `dec_column` cannot be found in the input file. - FileNotFoundError: if the file does not exist, or is a directory - """ - for chunk_number, data, mapped_left_pixels in _iterate_input_file( - input_file, file_reader, highest_left_order, left_ra_column, left_dec_column, False - ): - aligned_left_pixels = left_alignment[mapped_left_pixels] - unique_pixels, unique_inverse = np.unique(aligned_left_pixels, return_inverse=True) - - mapped_right_pixels = hp.ang2pix( - 2**highest_right_order, - data[right_ra_column].values, - data[right_dec_column].values, - lonlat=True, - nest=True, - ) - aligned_right_pixels = right_alignment[mapped_right_pixels] - - data["Norder"] = [pix.order for pix in aligned_left_pixels] - data["Dir"] = [pix.dir for pix in aligned_left_pixels] - data["Npix"] = [pix.pixel for pix in aligned_left_pixels] - - data["join_Norder"] = [pix.order for pix in aligned_right_pixels] - data["join_Dir"] = [pix.dir for pix in aligned_right_pixels] - data["join_Npix"] = [pix.pixel for pix in aligned_right_pixels] - - for unique_index, pixel in enumerate(unique_pixels): - mapped_indexes = np.where(unique_inverse == unique_index) - data_indexes = data.index[mapped_indexes[0].tolist()] - - filtered_data = data.filter(items=data_indexes, axis=0) - - pixel_dir = _get_pixel_directory(tmp_path, pixel.order, pixel.pixel) - file_io.make_directory(pixel_dir, exist_ok=True) - output_file = file_io.append_paths_to_pointer( - pixel_dir, f"shard_{splitting_key}_{chunk_number}.parquet" - ) - filtered_data.to_parquet(output_file, index=False) - del data, filtered_data, data_indexes - - MacauffResumePlan.splitting_key_done(tmp_path=tmp_path, splitting_key=splitting_key) - - -def reduce_associations(left_pixel, tmp_path, catalog_path, reduce_key): - """For all points determined to be in the target left_pixel, map them to the appropriate right_pixel - and aggregate into a single parquet file.""" - inputs = _get_pixel_directory(tmp_path, left_pixel.order, left_pixel.pixel) - - if not file_io.directory_has_contents(inputs): - MacauffResumePlan.reducing_key_done( - tmp_path=tmp_path, reducing_key=f"{left_pixel.order}_{left_pixel.pixel}" - ) - print(f"Warning: no input data for pixel {left_pixel}") - return - destination_dir = paths.pixel_directory(catalog_path, left_pixel.order, left_pixel.pixel) - file_io.make_directory(destination_dir, exist_ok=True) - - destination_file = paths.pixel_catalog_file(catalog_path, left_pixel.order, left_pixel.pixel) - - merged_table = pq.read_table(inputs) - dataframe = merged_table.to_pandas().reset_index() - - ## One row group per join_Norder/join_Npix - - join_pixel_frames = dataframe.groupby(["join_Norder", "join_Npix"], group_keys=True) - join_pixels = [HealpixPixel(pixel[0], pixel[1]) for pixel, _ in join_pixel_frames] - pixel_argsort = get_pixel_argsort(join_pixels) - with pq.ParquetWriter(destination_file, merged_table.schema) as writer: - for pixel_index in pixel_argsort: - join_pixel = join_pixels[pixel_index] - join_pixel_frame = join_pixel_frames.get_group((join_pixel.order, join_pixel.pixel)).reset_index() - writer.write_table(pa.Table.from_pandas(join_pixel_frame, schema=merged_table.schema)) - - MacauffResumePlan.reducing_key_done(tmp_path=tmp_path, reducing_key=reduce_key) diff --git a/src/hipscat_import/cross_match/macauff_metadata.py b/src/hipscat_import/cross_match/macauff_metadata.py deleted file mode 100644 index 65c8bf99..00000000 --- a/src/hipscat_import/cross_match/macauff_metadata.py +++ /dev/null @@ -1,102 +0,0 @@ -"""Utilities for generating parquet metdata from macauff-generated metadata files.""" - -import xml.etree.ElementTree as ET - -import pyarrow as pa -import pyarrow.parquet as pq -import yaml -from hipscat.io import file_io - - -def _get_inner_xml_value(parent_el, node_type, default_value): - child_el = parent_el.findall(node_type) - if len(child_el) == 0: - return default_value - if len(child_el) > 1: - raise ValueError(f"found too many {node_type} XML elements") - return child_el[0].text.strip() - - -def _construct_field(name, units, metadata_dict): - """Helper method to construct a pyarrow field from macauff metadata strings.""" - if units == "string": - pa_type = pa.string() - elif units in ("float", "double"): - pa_type = pa.float64() - elif units in ("integer", "long"): - pa_type = pa.int64() - else: - raise ValueError(f"unhandled units {units}") - return pa.field(name, pa_type, metadata=metadata_dict) - - -def from_xml(input_file, output_file): - """Read XML file with column metadata for a cross-match file from macauff. - - Expects XML with the format:: - - - - $COLUMN_NAME - $COLUMN_DESCRIPTION - $COLUMN_UNIT_DESCRIPTOR - - - - Args: - input file (str): file to read for match metadata - output_file (str): desired location for output parquet metadata file - - Raises - ValueError: if the XML is mal-formed - """ - fields = [] - root_el = ET.parse(input_file).getroot() - columns = root_el.findall("column") - - for column in columns: - name = _get_inner_xml_value(column, "name", "foo") - description = _get_inner_xml_value(column, "description", "") - units = _get_inner_xml_value(column, "units", "string") - - fields.append(_construct_field(name, units, metadata_dict={"macauff_description": description})) - - schema = pa.schema(fields) - pq.write_table(schema.empty_table(), where=output_file) - - -def from_yaml(input_file, output_directory): - """Read YAML file with column metadata for the various cross-match files from macauff. - - Expects YAML with the format:: - - name: macauff_GaiaDR3xCatWISE2020 - description: Match and non-match table for macauff cross-matches of Gaia DR3 and CatWISE 2020. - tables: - - name: macauff_GaiaDR3xCatWISE2020_matches - "@id": "#macauff_GaiaDR3xCatWISE2020_matches" - description: Counterpart associations between Gaia and WISE - columns: - - name: gaia_source_id - datatype: long - description: The Gaia DR3 object ID. - - Args: - input file (str): file to read for match metadata - output_dir (str): desired location for output parquet metadata files - We will write one file per table in the "tables" element. - """ - with open(input_file, "r", encoding="utf-8") as file_handle: - metadata = yaml.safe_load(file_handle) - tables = metadata.get("tables", []) - for index, table in enumerate(tables): - fields = [] - table_name = table.get("name", f"metadata_table_{index}") - for col_index, column in enumerate(table.get("columns", [])): - name = column.get("name", f"column_{col_index}") - units = column.get("datatype", "string") - fields.append(_construct_field(name, units, metadata_dict=column)) - - schema = pa.schema(fields) - output_file = file_io.append_paths_to_pointer(output_directory, f"{table_name}.parquet") - pq.write_table(schema.empty_table(), where=str(output_file)) diff --git a/src/hipscat_import/cross_match/macauff_resume_plan.py b/src/hipscat_import/cross_match/macauff_resume_plan.py deleted file mode 100644 index dfc3d871..00000000 --- a/src/hipscat_import/cross_match/macauff_resume_plan.py +++ /dev/null @@ -1,141 +0,0 @@ -"""Utility to hold the file-level pipeline execution plan.""" - -from __future__ import annotations - -from dataclasses import dataclass, field -from typing import List, Tuple - -from hipscat.io import FilePointer, file_io -from hipscat.pixel_math.healpix_pixel import HealpixPixel - -from hipscat_import.cross_match.macauff_arguments import MacauffArguments -from hipscat_import.pipeline_resume_plan import PipelineResumePlan - -# pylint: disable=duplicate-code - - -@dataclass -class MacauffResumePlan(PipelineResumePlan): - """Container class for holding the state of each file in the pipeline plan.""" - - input_paths: List[FilePointer] = field(default_factory=list) - """resolved list of all files that will be used in the importer""" - split_keys: List[Tuple[str, str]] = field(default_factory=list) - """set of files (and job keys) that have yet to be split""" - reduce_keys: List[Tuple[HealpixPixel, str]] = field(default_factory=list) - """set of left side catalog pixels (and job keys) that have yet to be reduced/combined""" - - SPLITTING_STAGE = "splitting" - REDUCING_STAGE = "reducing" - - def __init__(self, args: MacauffArguments, left_pixels): - if not args.tmp_path: # pragma: no cover (not reachable, but required for mypy) - raise ValueError("tmp_path is required") - super().__init__(resume=args.resume, progress_bar=args.progress_bar, tmp_path=args.tmp_path) - self.input_paths = args.input_paths - self.gather_plan(left_pixels) - - def gather_plan(self, left_pixels): - """Initialize the plan.""" - ## Make sure it's safe to use existing resume state. - super().safe_to_resume() - - ## Validate existing resume state. - ## - if a later stage is complete, the earlier stages should be complete too. - splitting_done = self.is_splitting_done() - reducing_done = self.is_reducing_done() - - if reducing_done and not splitting_done: - raise ValueError("splitting must be complete before reducing") - - ## Validate that we're operating on the same file set as the previous instance. - self.input_paths = self.check_original_input_paths(self.input_paths) - - ## Gather keys for execution. - if not splitting_done: - self.split_keys = self.get_remaining_split_keys() - if not reducing_done: - self.reduce_keys = self.get_reduce_keys(left_pixels) - ## Go ahead and create our directories for storing resume files. - file_io.make_directory( - file_io.append_paths_to_pointer(self.tmp_path, self.SPLITTING_STAGE), - exist_ok=True, - ) - file_io.make_directory( - file_io.append_paths_to_pointer(self.tmp_path, self.REDUCING_STAGE), - exist_ok=True, - ) - - def get_remaining_split_keys(self): - """Gather remaining keys, dropping successful split tasks from done file names. - - Returns: - list of splitting keys *not* found in files like /resume/path/split_key.done - """ - split_keys = set(self.read_done_keys(self.SPLITTING_STAGE)) - return [ - (f"split_{i}", file_path) - for i, file_path in enumerate(self.input_paths) - if f"split_{i}" not in split_keys - ] - - @classmethod - def splitting_key_done(cls, tmp_path, splitting_key: str): - """Mark a single splitting task as done - - Args: - tmp_path (str): where to write intermediate resume files. - splitting_key (str): unique string for each splitting task (e.g. "split_57") - """ - cls.touch_key_done_file(tmp_path, cls.SPLITTING_STAGE, splitting_key) - - @classmethod - def reducing_key_done(cls, tmp_path, reducing_key: str): - """Mark a single reducing task as done - - Args: - tmp_path (str): where to write intermediate resume files. - reducing_key (str): unique string for each reducing task (e.g. "3_57") - """ - cls.touch_key_done_file(tmp_path, cls.REDUCING_STAGE, reducing_key) - - def wait_for_splitting(self, futures): - """Wait for splitting futures to complete.""" - self.wait_for_futures(futures, self.SPLITTING_STAGE) - remaining_split_items = self.get_remaining_split_keys() - if len(remaining_split_items) > 0: - raise RuntimeError(f"{len(remaining_split_items)} split stages did not complete successfully.") - self.touch_stage_done_file(self.SPLITTING_STAGE) - - def is_splitting_done(self) -> bool: - """Are there files left to split?""" - return self.done_file_exists(self.SPLITTING_STAGE) - - def get_reduce_keys(self, left_pixels): - """Fetch a Tuple for each partition to reduce. - - Tuple contains: - - - left pixel (healpix pixel with both order and pixel) - - reduce key (string of left order+pixel) - - """ - reduced_keys = set(self.read_done_keys(self.REDUCING_STAGE)) - reduce_items = [ - (hp_pixel, f"{hp_pixel.order}_{hp_pixel.pixel}") - for hp_pixel in left_pixels - if f"{hp_pixel.order}_{hp_pixel.pixel}" not in reduced_keys - ] - return reduce_items - - def is_reducing_done(self) -> bool: - """Are there partitions left to reduce?""" - return self.done_file_exists(self.REDUCING_STAGE) - - def wait_for_reducing(self, futures, left_pixels): - """Wait for reducing futures to complete.""" - self.wait_for_futures(futures, self.REDUCING_STAGE) - remaining_reduce_items = self.get_reduce_keys(left_pixels) - if len(remaining_reduce_items) > 0: - raise RuntimeError(f"{len(remaining_reduce_items)} reduce stages did not complete successfully.") - self.touch_stage_done_file(self.REDUCING_STAGE) diff --git a/src/hipscat_import/cross_match/run_macauff_import.py b/src/hipscat_import/cross_match/run_macauff_import.py deleted file mode 100644 index 35424058..00000000 --- a/src/hipscat_import/cross_match/run_macauff_import.py +++ /dev/null @@ -1,152 +0,0 @@ -import healpy as hp -import numpy as np -from hipscat.catalog import Catalog -from hipscat.io import file_io, parquet_metadata, paths, write_metadata -from tqdm import tqdm - -from hipscat_import.cross_match.macauff_arguments import MacauffArguments -from hipscat_import.cross_match.macauff_map_reduce import reduce_associations, split_associations -from hipscat_import.cross_match.macauff_resume_plan import MacauffResumePlan - -# pylint: disable=unused-argument - - -def split( - args, - highest_left_order, - highest_right_order, - left_alignment, - right_alignment, - resume_plan, - client, -): - """Split association rows by their aligned left pixel.""" - - if resume_plan.is_splitting_done(): - return - - reader_future = client.scatter(args.file_reader) - left_alignment_future = client.scatter(left_alignment) - right_alignment_future = client.scatter(right_alignment) - futures = [] - for key, file_path in resume_plan.split_keys: - futures.append( - client.submit( - split_associations, - input_file=file_path, - file_reader=reader_future, - splitting_key=key, - highest_left_order=highest_left_order, - highest_right_order=highest_right_order, - left_alignment=left_alignment_future, - right_alignment=right_alignment_future, - left_ra_column=args.left_ra_column, - left_dec_column=args.left_dec_column, - right_ra_column=args.right_ra_column, - right_dec_column=args.right_dec_column, - tmp_path=args.tmp_path, - ) - ) - resume_plan.wait_for_splitting(futures) - - -def reduce(args, left_pixels, resume_plan, client): - """Reduce left pixel files into a single parquet file per.""" - - if resume_plan.is_reducing_done(): - return - - futures = [] - for ( - left_pixel, - pixel_key, - ) in resume_plan.reduce_keys: - futures.append( - client.submit( - reduce_associations, - left_pixel=left_pixel, - tmp_path=args.tmp_path, - catalog_path=args.catalog_path, - reduce_key=pixel_key, - ) - ) - - resume_plan.wait_for_reducing(futures, left_pixels) - - -def run(args, client): - """run macauff cross-match import pipeline""" - if not args: - raise TypeError("args is required and should be type MacauffArguments") - if not isinstance(args, MacauffArguments): - raise TypeError("args must be type MacauffArguments") - - ## Lump all of the catalog reading stuff together under a single block, - ## since it can take a while to load large catalogs and some feedback is nice. - with tqdm(total=5, desc="Planning", disable=not args.progress_bar) as step_progress: - left_catalog = Catalog.read_from_hipscat(args.left_catalog_dir) - step_progress.update(1) - right_catalog = Catalog.read_from_hipscat(args.right_catalog_dir) - step_progress.update(1) - highest_left_order = left_catalog.partition_info.get_highest_order() - highest_right_order = right_catalog.partition_info.get_highest_order() - - left_pixels = left_catalog.partition_info.get_healpix_pixels() - right_pixels = right_catalog.partition_info.get_healpix_pixels() - - regenerated_left_alignment = np.full(hp.order2npix(highest_left_order), None) - for pixel in left_pixels: - explosion_factor = 4 ** (highest_left_order - pixel.order) - exploded_pixels = np.arange( - pixel.pixel * explosion_factor, - (pixel.pixel + 1) * explosion_factor, - ) - regenerated_left_alignment[exploded_pixels] = pixel - step_progress.update(1) - - regenerated_right_alignment = np.full(hp.order2npix(highest_right_order), None) - for pixel in right_pixels: - explosion_factor = 4 ** (highest_right_order - pixel.order) - exploded_pixels = np.arange( - pixel.pixel * explosion_factor, - (pixel.pixel + 1) * explosion_factor, - ) - regenerated_right_alignment[exploded_pixels] = pixel - step_progress.update(1) - - resume_plan = MacauffResumePlan(args, left_pixels) - step_progress.update(1) - - split( - args, - highest_left_order=highest_left_order, - highest_right_order=highest_right_order, - left_alignment=regenerated_left_alignment, - right_alignment=regenerated_right_alignment, - resume_plan=resume_plan, - client=client, - ) - reduce(args, left_pixels, resume_plan, client) - - # All done - write out the metadata - with tqdm(total=4, desc="Finishing", disable=not args.progress_bar) as step_progress: - parquet_metadata.write_parquet_metadata(args.catalog_path) - total_rows = 0 - metadata_path = paths.get_parquet_metadata_pointer(args.catalog_path) - for row_group in parquet_metadata.read_row_group_fragments(metadata_path): - total_rows += row_group.num_rows - # pylint: disable=duplicate-code - # Very similar to /index/run_index.py - step_progress.update(1) - total_rows = int(total_rows) - catalog_info = args.to_catalog_info(total_rows) - write_metadata.write_provenance_info( - catalog_base_dir=args.catalog_path, - dataset_info=catalog_info, - tool_args=args.provenance_info(), - ) - step_progress.update(1) - write_metadata.write_catalog_info(dataset_info=catalog_info, catalog_base_dir=args.catalog_path) - step_progress.update(1) - file_io.remove_directory(args.tmp_path, ignore_errors=True) - step_progress.update(1) diff --git a/src/hipscat_import/index/run_index.py b/src/hipscat_import/index/run_index.py index 144ab222..5d989005 100644 --- a/src/hipscat_import/index/run_index.py +++ b/src/hipscat_import/index/run_index.py @@ -20,15 +20,14 @@ def run(args): with tqdm( total=4, desc=PipelineResumePlan.get_formatted_stage_name("Finishing"), disable=not args.progress_bar ) as step_progress: - # pylint: disable=duplicate-code - catalog_info = args.to_catalog_info(int(rows_written)) + index_catalog_info = args.to_catalog_info(int(rows_written)) write_metadata.write_provenance_info( catalog_base_dir=args.catalog_path, - dataset_info=catalog_info, + dataset_info=index_catalog_info, tool_args=args.provenance_info(), ) step_progress.update(1) - write_metadata.write_catalog_info(catalog_base_dir=args.catalog_path, dataset_info=catalog_info) + write_metadata.write_catalog_info(catalog_base_dir=args.catalog_path, dataset_info=index_catalog_info) step_progress.update(1) file_io.remove_directory(args.tmp_path, ignore_errors=True) step_progress.update(1) diff --git a/src/hipscat_import/margin_cache/margin_cache.py b/src/hipscat_import/margin_cache/margin_cache.py index 64d6718c..6f30551e 100644 --- a/src/hipscat_import/margin_cache/margin_cache.py +++ b/src/hipscat_import/margin_cache/margin_cache.py @@ -128,21 +128,22 @@ def generate_margin_cache(args, client): _reduce_margin_shards(client=client, args=args, partition_pixels=combined_pixels) with tqdm(total=4, desc="Finishing", disable=not args.progress_bar) as step_progress: - # pylint: disable=duplicate-code parquet_metadata.write_parquet_metadata(args.catalog_path) total_rows = 0 metadata_path = paths.get_parquet_metadata_pointer(args.catalog_path) for row_group in parquet_metadata.read_row_group_fragments(metadata_path): total_rows += row_group.num_rows step_progress.update(1) - catalog_info = args.to_catalog_info(int(total_rows)) + margin_catalog_info = args.to_catalog_info(int(total_rows)) write_metadata.write_provenance_info( catalog_base_dir=args.catalog_path, - dataset_info=catalog_info, + dataset_info=margin_catalog_info, tool_args=args.provenance_info(), ) step_progress.update(1) - write_metadata.write_catalog_info(catalog_base_dir=args.catalog_path, dataset_info=catalog_info) + write_metadata.write_catalog_info( + catalog_base_dir=args.catalog_path, dataset_info=margin_catalog_info + ) step_progress.update(1) file_io.remove_directory(args.tmp_path, ignore_errors=True) step_progress.update(1) diff --git a/src/hipscat_import/margin_cache/margin_cache_arguments.py b/src/hipscat_import/margin_cache/margin_cache_arguments.py index 41dd088b..e3de49e1 100644 --- a/src/hipscat_import/margin_cache/margin_cache_arguments.py +++ b/src/hipscat_import/margin_cache/margin_cache_arguments.py @@ -43,23 +43,20 @@ def _check_arguments(self): margin_pixel_k = highest_order + 1 if self.margin_order > -1: if self.margin_order < margin_pixel_k: - # pylint: disable=line-too-long raise ValueError( "margin_order must be of a higher order " "than the highest order catalog partition pixel." ) - # pylint: enable=line-too-long else: self.margin_order = margin_pixel_k margin_pixel_nside = hp.order2nside(self.margin_order) if hp.nside2resol(margin_pixel_nside, arcmin=True) * 60.0 < self.margin_threshold: - # pylint: disable=line-too-long warnings.warn( - "Warning: margin pixels have a smaller resolution than margin_threshold; this may lead to data loss in the margin cache." + "Warning: margin pixels have a smaller resolution than margin_threshold; " + "this may lead to data loss in the margin cache." ) - # pylint: enable=line-too-long def to_catalog_info(self, total_rows) -> MarginCacheCatalogInfo: """Catalog-type-specific dataset info.""" diff --git a/src/hipscat_import/margin_cache/margin_cache_map_reduce.py b/src/hipscat_import/margin_cache/margin_cache_map_reduce.py index 21a2b7ae..4eb6f194 100644 --- a/src/hipscat_import/margin_cache/margin_cache_map_reduce.py +++ b/src/hipscat_import/margin_cache/margin_cache_map_reduce.py @@ -4,8 +4,9 @@ from hipscat import pixel_math from hipscat.catalog.partition_info import PartitionInfo from hipscat.io import file_io, paths +from hipscat.pixel_math.healpix_pixel import HealpixPixel -# pylint: disable=too-many-locals,too-many-arguments +from hipscat_import.pipeline_resume_plan import get_pixel_cache_directory def map_pixel_shards( @@ -53,10 +54,8 @@ def _to_pixel_shard(data, margin_threshold, output_path, ra_column, dec_column): margin_data = data.loc[data["margin_check"] == True] if len(margin_data): - # TODO: this should be a utility function in `hipscat` - # that properly handles the hive formatting - # generate a file name for our margin shard - partition_dir = _get_partition_directory(output_path, order, pix) + # generate a file name for our margin shard, that uses both sets of Norder/Npix + partition_dir = get_pixel_cache_directory(output_path, HealpixPixel(order, pix)) shard_dir = paths.pixel_directory(partition_dir, source_order, source_pix) file_io.make_directory(shard_dir, exist_ok=True) @@ -84,34 +83,28 @@ def _to_pixel_shard(data, margin_threshold, output_path, ra_column, dec_column): final_df[PartitionInfo.METADATA_DIR_COLUMN_NAME] = dir_column - final_df = final_df.astype({ - PartitionInfo.METADATA_ORDER_COLUMN_NAME: np.uint8, - PartitionInfo.METADATA_DIR_COLUMN_NAME: np.uint64, - PartitionInfo.METADATA_PIXEL_COLUMN_NAME: np.uint64, - }) + final_df = final_df.astype( + { + PartitionInfo.METADATA_ORDER_COLUMN_NAME: np.uint8, + PartitionInfo.METADATA_DIR_COLUMN_NAME: np.uint64, + PartitionInfo.METADATA_PIXEL_COLUMN_NAME: np.uint64, + } + ) final_df.to_parquet(shard_path) del data, margin_data, final_df -def _get_partition_directory(path, order, pix): - """Get the directory where a partition pixel's margin shards live""" - partition_file = paths.pixel_catalog_file(path, order, pix) - - # removes the '.parquet' and adds a slash - partition_dir = f"{partition_file[:-8]}/" - - return partition_dir - - def reduce_margin_shards(output_path, partition_order, partition_pixel): """Reduce all partition pixel directories into a single file""" - shard_dir = _get_partition_directory(output_path, partition_order, partition_pixel) + shard_dir = get_pixel_cache_directory(output_path, HealpixPixel(partition_order, partition_pixel)) if file_io.does_file_or_directory_exist(shard_dir): data = ds.dataset(shard_dir, format="parquet") full_df = data.to_table().to_pandas() + margin_cache_dir = paths.pixel_directory(output_path, partition_order, partition_pixel) + file_io.make_directory(margin_cache_dir, exist_ok=True) if len(full_df): margin_cache_file_path = paths.pixel_catalog_file(output_path, partition_order, partition_pixel) diff --git a/src/hipscat_import/pipeline.py b/src/hipscat_import/pipeline.py index 14df5c4b..e336d855 100644 --- a/src/hipscat_import/pipeline.py +++ b/src/hipscat_import/pipeline.py @@ -5,13 +5,11 @@ from dask.distributed import Client import hipscat_import.catalog.run_import as catalog_runner -import hipscat_import.cross_match.run_macauff_import as macauff_runner import hipscat_import.index.run_index as index_runner import hipscat_import.margin_cache.margin_cache as margin_runner import hipscat_import.soap.run_soap as soap_runner import hipscat_import.verification.run_verification as verification_runner from hipscat_import.catalog.arguments import ImportArguments -from hipscat_import.cross_match.macauff_arguments import MacauffArguments from hipscat_import.index.arguments import IndexArguments from hipscat_import.margin_cache.margin_cache_arguments import MarginCacheArguments from hipscat_import.runtime_arguments import RuntimeArguments @@ -51,8 +49,6 @@ def pipeline_with_client(args: RuntimeArguments, client: Client): soap_runner.run(args, client) elif isinstance(args, VerificationArguments): verification_runner.run(args) - elif isinstance(args, MacauffArguments): - macauff_runner.run(args, client) else: raise ValueError("unknown args type") diff --git a/src/hipscat_import/pipeline_resume_plan.py b/src/hipscat_import/pipeline_resume_plan.py index 91759f7c..a2fdb4ab 100644 --- a/src/hipscat_import/pipeline_resume_plan.py +++ b/src/hipscat_import/pipeline_resume_plan.py @@ -8,6 +8,7 @@ from dask.distributed import as_completed from hipscat.io import FilePointer, file_io +from hipscat.pixel_math.healpix_pixel import HealpixPixel from tqdm import tqdm @@ -121,7 +122,7 @@ def wait_for_futures(self, futures, stage_name): futures(List[future]): collected futures stage_name(str): name of the stage (e.g. mapping, reducing) Raises: - RuntimeError if any future returns an error status. + RuntimeError: if any future returns an error status. """ some_error = False formatted_stage_name = self.get_formatted_stage_name(stage_name) @@ -159,7 +160,7 @@ def check_original_input_paths(self, input_paths): input_paths (list[str]): input paths that will be processed by a pipeline. Raises: - ValueError if the retrieved file set differs from `input_paths`. + ValueError: if the retrieved file set differs from `input_paths`. """ unique_file_paths = set(input_paths) @@ -186,3 +187,21 @@ def check_original_input_paths(self, input_paths): input_paths = list(unique_file_paths) input_paths.sort() return input_paths + + +def get_pixel_cache_directory(cache_path, pixel: HealpixPixel): + """Create a path for intermediate pixel data. + + You can use this over the paths.get_pixel_directory method, as it will include the pixel + number in the path. Further, it will just *look* different from a real hipscat + path, so it's clearer that it's a temporary directory:: + + {cache_path}/order_{order}/dir_{dir}/pixel_{pixel}/ + + Args: + cache_path (str): root path to cache + pixel (HealpixPixel): pixel partition data + """ + return file_io.append_paths_to_pointer( + cache_path, f"order_{pixel.order}", f"dir_{pixel.dir}", f"pixel_{pixel.pixel}" + ) diff --git a/src/hipscat_import/runtime_arguments.py b/src/hipscat_import/runtime_arguments.py index 6e9d5869..5a0b1651 100644 --- a/src/hipscat_import/runtime_arguments.py +++ b/src/hipscat_import/runtime_arguments.py @@ -145,7 +145,7 @@ def find_input_paths( Returns: matching files, if input_path is provided, otherwise, input_file_list Raises: - FileNotFoundError if no files are found at the input_path and the provided list is empty. + FileNotFoundError: if no files are found at the input_path and the provided list is empty. """ if input_path: if not file_io.does_file_or_directory_exist(input_path, storage_options=storage_options): diff --git a/src/hipscat_import/soap/map_reduce.py b/src/hipscat_import/soap/map_reduce.py index 0a495119..b1f86fb0 100644 --- a/src/hipscat_import/soap/map_reduce.py +++ b/src/hipscat_import/soap/map_reduce.py @@ -11,17 +11,11 @@ from hipscat.pixel_math.healpix_pixel import HealpixPixel from hipscat.pixel_math.healpix_pixel_function import get_pixel_argsort +from hipscat_import.pipeline_resume_plan import get_pixel_cache_directory from hipscat_import.soap.arguments import SoapArguments from hipscat_import.soap.resume_plan import SoapPlan -def _get_pixel_directory(cache_path, pixel: HealpixPixel): - """Create a path for intermediate pixel data.""" - return file_io.append_paths_to_pointer( - cache_path, f"order_{pixel.order}", f"dir_{pixel.dir}", f"pixel_{pixel.pixel}" - ) - - def _count_joins_for_object(source_data, source_pixel, object_pixel, soap_args): object_path = paths.pixel_catalog_file( catalog_base_dir=soap_args.object_catalog_dir, @@ -38,7 +32,7 @@ def _count_joins_for_object(source_data, source_pixel, object_pixel, soap_args): if not soap_args.write_leaf_files or rows_written == 0: return rows_written - pixel_dir = _get_pixel_directory(soap_args.tmp_path, object_pixel) + pixel_dir = get_pixel_cache_directory(soap_args.tmp_path, object_pixel) file_io.make_directory(pixel_dir, exist_ok=True) output_file = file_io.append_paths_to_pointer( pixel_dir, f"source_{source_pixel.order}_{source_pixel.pixel}.parquet" @@ -180,7 +174,7 @@ def reduce_joins( ): """Reduce join tables into one parquet file per object-pixel, with one row-group inside per source pixel.""" - pixel_dir = _get_pixel_directory(soap_args.tmp_path, object_pixel) + pixel_dir = get_pixel_cache_directory(soap_args.tmp_path, object_pixel) # If there's no directory, this implies there were no matches to this object pixel # earlier in the pipeline. Move on. if not file_io.does_file_or_directory_exist(pixel_dir): diff --git a/src/hipscat_import/soap/run_soap.py b/src/hipscat_import/soap/run_soap.py index 54808307..321eb011 100644 --- a/src/hipscat_import/soap/run_soap.py +++ b/src/hipscat_import/soap/run_soap.py @@ -3,7 +3,7 @@ The actual logic of the map reduce is in the `map_reduce.py` file. """ -from hipscat.io import file_io, parquet_metadata, paths, write_metadata +from hipscat.io import parquet_metadata, paths, write_metadata from tqdm import tqdm from hipscat_import.pipeline_resume_plan import PipelineResumePlan @@ -59,8 +59,6 @@ def run(args, client): total_rows += row_group.num_rows else: total_rows = combine_partial_results(args.tmp_path, args.catalog_path) - # pylint: disable=duplicate-code - # Very similar to /index/run_index.py step_progress.update(1) total_rows = int(total_rows) catalog_info = args.to_catalog_info(total_rows) @@ -72,5 +70,5 @@ def run(args, client): step_progress.update(1) write_metadata.write_catalog_info(dataset_info=catalog_info, catalog_base_dir=args.catalog_path) step_progress.update(1) - file_io.remove_directory(args.tmp_path, ignore_errors=True) + resume_plan.clean_resume_files() step_progress.update(1) diff --git a/tests/hipscat_import/catalog/test_map_reduce.py b/tests/hipscat_import/catalog/test_map_reduce.py index edfe4d70..43de977e 100644 --- a/tests/hipscat_import/catalog/test_map_reduce.py +++ b/tests/hipscat_import/catalog/test_map_reduce.py @@ -403,7 +403,7 @@ def test_reduce_with_sorting_complex(assert_parquet_file_ids, tmp_path): combined_data = pd.concat([file1_data, file2_data]) combined_data["norder19_healpix"] = hp.ang2pix( - 2 ** 19, + 2**19, combined_data["ra"].values, combined_data["dec"].values, lonlat=True, diff --git a/tests/hipscat_import/conftest.py b/tests/hipscat_import/conftest.py index 00d2072e..c8e7a91f 100644 --- a/tests/hipscat_import/conftest.py +++ b/tests/hipscat_import/conftest.py @@ -94,11 +94,6 @@ def empty_data_dir(test_data_dir): return os.path.join(test_data_dir, "empty") -@pytest.fixture -def macauff_data_dir(test_data_dir): - return os.path.join(test_data_dir, "macauff") - - @pytest.fixture def formats_dir(test_data_dir): return os.path.join(test_data_dir, "test_formats") @@ -129,11 +124,6 @@ def formats_multiindex(test_data_dir): return os.path.join(test_data_dir, "test_formats", "multiindex.parquet") -@pytest.fixture -def formats_yaml(test_data_dir): - return os.path.join(test_data_dir, "test_formats", "macauff_metadata.yaml") - - @pytest.fixture def small_sky_parts_dir(test_data_dir): return os.path.join(test_data_dir, "small_sky_parts") @@ -203,7 +193,7 @@ def basic_data_shard_df(): ) test_df["margin_pixel"] = hp.ang2pix( - 2 ** 3, + 2**3, test_df["weird_ra"].values, test_df["weird_dec"].values, lonlat=True, @@ -235,7 +225,7 @@ def polar_data_shard_df(): ) test_df["margin_pixel"] = hp.ang2pix( - 2 ** 3, + 2**3, test_df["weird_ra"].values, test_df["weird_dec"].values, lonlat=True, diff --git a/tests/hipscat_import/cross_match/test_macauff_arguments.py b/tests/hipscat_import/cross_match/test_macauff_arguments.py deleted file mode 100644 index c92afa21..00000000 --- a/tests/hipscat_import/cross_match/test_macauff_arguments.py +++ /dev/null @@ -1,219 +0,0 @@ -"""Tests of macauff arguments""" - - -from os import path - -import pytest - -from hipscat_import.cross_match.macauff_arguments import MacauffArguments - -# pylint: disable=duplicate-code - - -def test_macauff_arguments( - small_sky_object_catalog, small_sky_source_catalog, small_sky_dir, formats_yaml, tmp_path -): - """Test that we can create a MacauffArguments instance with two valid catalogs.""" - args = MacauffArguments( - output_path=tmp_path, - output_artifact_name="object_to_source", - tmp_dir=tmp_path, - left_catalog_dir=small_sky_object_catalog, - left_ra_column="ra", - left_dec_column="dec", - left_id_column="id", - right_catalog_dir=small_sky_source_catalog, - right_ra_column="source_ra", - right_dec_column="source_dec", - right_id_column="source_id", - input_path=small_sky_dir, - input_format="csv", - metadata_file_path=formats_yaml, - ) - - assert len(args.input_paths) > 0 - - -def test_empty_required( - small_sky_object_catalog, small_sky_source_catalog, small_sky_dir, formats_yaml, tmp_path -): - """All non-runtime arguments are required.""" - - ## List of required args: - ## - match expression that should be found when missing - ## - default value - required_args = [ - ["output_path", tmp_path], - ["output_artifact_name", "object_to_source"], - ["left_catalog_dir", small_sky_object_catalog], - ["left_ra_column", "ra"], - ["left_dec_column", "dec"], - ["left_id_column", "id"], - ["right_catalog_dir", small_sky_source_catalog], - ["right_ra_column", "source_ra"], - ["right_dec_column", "source_dec"], - ["right_id_column", "source_id"], - ["input_path", small_sky_dir], - ["input_format", "csv"], - ["metadata_file_path", formats_yaml], - ] - - ## For each required argument, check that a ValueError is raised that matches the - ## expected name of the missing param. - for index, args in enumerate(required_args): - test_args = [ - list_args[1] if list_index != index else None - for list_index, list_args in enumerate(required_args) - ] - - print(f"testing required arg #{index}") - - with pytest.raises(ValueError, match=args[0]): - MacauffArguments( - output_path=test_args[0], - output_artifact_name=test_args[1], - tmp_dir=tmp_path, - left_catalog_dir=test_args[2], - left_ra_column=test_args[3], - left_dec_column=test_args[4], - left_id_column=test_args[5], - right_catalog_dir=test_args[6], - right_ra_column=test_args[7], - right_dec_column=test_args[8], - right_id_column=test_args[9], - input_path=test_args[10], - input_format=test_args[11], - metadata_file_path=test_args[12], - overwrite=True, - ) - - -def test_macauff_arguments_file_list( - small_sky_object_catalog, small_sky_source_catalog, small_sky_dir, formats_yaml, tmp_path -): - """Test that we can create a MacauffArguments instance with two valid catalogs.""" - files = [path.join(small_sky_dir, "catalog.csv")] - args = MacauffArguments( - output_path=tmp_path, - output_artifact_name="object_to_source", - tmp_dir=tmp_path, - left_catalog_dir=small_sky_object_catalog, - left_ra_column="ra", - left_dec_column="dec", - left_id_column="id", - right_catalog_dir=small_sky_source_catalog, - right_ra_column="source_ra", - right_dec_column="source_dec", - right_id_column="source_id", - input_file_list=files, - input_format="csv", - metadata_file_path=formats_yaml, - ) - - assert len(args.input_paths) > 0 - - -def test_macauff_args_invalid_catalog(small_sky_source_catalog, small_sky_dir, formats_yaml, tmp_path): - with pytest.raises(ValueError, match="left_catalog_dir"): - MacauffArguments( - output_path=tmp_path, - output_artifact_name="object_to_source", - tmp_dir=tmp_path, - left_catalog_dir=small_sky_dir, # valid path, but not a catalog - left_ra_column="ra", - left_dec_column="dec", - left_id_column="id", - right_catalog_dir=small_sky_source_catalog, - right_ra_column="source_ra", - right_dec_column="source_dec", - right_id_column="source_id", - input_path=small_sky_dir, - input_format="csv", - metadata_file_path=formats_yaml, - ) - - -def test_macauff_args_right_invalid_catalog(small_sky_object_catalog, small_sky_dir, formats_yaml, tmp_path): - with pytest.raises(ValueError, match="right_catalog_dir"): - MacauffArguments( - output_path=tmp_path, - output_artifact_name="object_to_source", - tmp_dir=tmp_path, - left_catalog_dir=small_sky_object_catalog, - left_ra_column="ra", - left_dec_column="dec", - left_id_column="id", - right_catalog_dir=small_sky_dir, # valid directory with files, not a catalog - right_ra_column="source_ra", - right_dec_column="source_dec", - right_id_column="source_id", - input_path=small_sky_dir, - input_format="csv", - metadata_file_path=formats_yaml, - ) - - -def test_macauff_args_invalid_metadata_file( - small_sky_object_catalog, small_sky_source_catalog, small_sky_dir, tmp_path -): - with pytest.raises(ValueError, match="column metadata file must"): - MacauffArguments( - output_path=tmp_path, - output_artifact_name="object_to_source", - tmp_dir=tmp_path, - left_catalog_dir=small_sky_object_catalog, - left_ra_column="ra", - left_dec_column="dec", - left_id_column="id", - right_catalog_dir=small_sky_source_catalog, - right_ra_column="source_ra", - right_dec_column="source_dec", - right_id_column="source_id", - input_path=small_sky_dir, - input_format="csv", - metadata_file_path="ceci_n_est_pas_un_fichier.xml", - ) - - -def test_macauff_args_invalid_input_directory( - small_sky_object_catalog, small_sky_source_catalog, formats_yaml, tmp_path -): - with pytest.raises(FileNotFoundError, match="input_path not found"): - MacauffArguments( - output_path=tmp_path, - output_artifact_name="object_to_source", - tmp_dir=tmp_path, - left_catalog_dir=small_sky_object_catalog, - left_ra_column="ra", - left_dec_column="dec", - left_id_column="id", - right_catalog_dir=small_sky_source_catalog, - right_ra_column="source_ra", - right_dec_column="source_dec", - right_id_column="source_id", - input_path="ceci_n_est_pas_un_directoire/", - input_format="csv", - metadata_file_path=formats_yaml, - ) - - -def test_macauff_args_no_files( - small_sky_object_catalog, small_sky_source_catalog, small_sky_dir, formats_yaml, tmp_path -): - with pytest.raises(FileNotFoundError, match="No input files found"): - MacauffArguments( - output_path=tmp_path, - output_artifact_name="object_to_source", - tmp_dir=tmp_path, - left_catalog_dir=small_sky_object_catalog, - left_ra_column="ra", - left_dec_column="dec", - left_id_column="id", - right_catalog_dir=small_sky_source_catalog, - right_ra_column="source_ra", - right_dec_column="source_dec", - right_id_column="source_id", - input_path=small_sky_dir, - input_format="parquet", # no files of this format will be found - metadata_file_path=formats_yaml, - ) diff --git a/tests/hipscat_import/cross_match/test_macauff_metadata.py b/tests/hipscat_import/cross_match/test_macauff_metadata.py deleted file mode 100644 index 5db39084..00000000 --- a/tests/hipscat_import/cross_match/test_macauff_metadata.py +++ /dev/null @@ -1,101 +0,0 @@ -import os -from xml.etree.ElementTree import ParseError - -import pytest -from hipscat.io import file_io - -from hipscat_import.cross_match.macauff_metadata import from_xml, from_yaml - - -def test_from_xml(macauff_data_dir, tmp_path): - """Test XML file reading and parquet metadata generation.""" - xml_input_file = os.path.join(macauff_data_dir, "macauff_gaia_catwise_match.xml") - output_file = os.path.join(tmp_path, "output.parquet") - - from_xml(xml_input_file, output_file) - - single_metadata = file_io.read_parquet_metadata(output_file) - schema = single_metadata.schema.to_arrow_schema() - - assert len(schema) == 6 - - -def test_from_xml_malformed(tmp_path): - """Test some invalid XML file inputs.""" - input_file = os.path.join(tmp_path, "input.parquet") - output_file = os.path.join(tmp_path, "output.parquet") - - ## No "columns" found at all - with open(input_file, "w", encoding="utf-8") as file_handle: - file_handle.write("") - - with pytest.raises(ParseError, match="no element found"): - from_xml(input_file, output_file) - - ## Some columns, too many required fields - with open(input_file, "w", encoding="utf-8") as file_handle: - file_handle.write( - """ - - Gaia_designation - The Gaia DR3 object ID. - long - - """ - ) - - with pytest.raises(ValueError, match="too many name XML elements"): - from_xml(input_file, output_file) - - ## Unhandled types - with open(input_file, "w", encoding="utf-8") as file_handle: - file_handle.write( - """ - - Gaia_designation - The Gaia DR3 object ID. - blob - - """ - ) - - with pytest.raises(ValueError, match="unhandled units blob"): - from_xml(input_file, output_file) - - ## Some empty fields are ok! - with open(input_file, "w", encoding="utf-8") as file_handle: - file_handle.write( - """ - - - long - - """ - ) - - from_xml(input_file, output_file) - - -def test_from_yaml(macauff_data_dir, tmp_path): - """Test YAML file reading and parquet metadata generation.""" - yaml_input_file = os.path.join(macauff_data_dir, "macauff_gaia_catwise_match_and_nonmatches.yaml") - - from_yaml(yaml_input_file, tmp_path) - - output_file = os.path.join(tmp_path, "macauff_GaiaDR3xCatWISE2020_matches.parquet") - single_metadata = file_io.read_parquet_metadata(output_file) - schema = single_metadata.schema.to_arrow_schema() - - assert len(schema) == 7 - - output_file = os.path.join(tmp_path, "macauff_GaiaDR3xCatWISE2020_gaia_nonmatches.parquet") - single_metadata = file_io.read_parquet_metadata(output_file) - schema = single_metadata.schema.to_arrow_schema() - - assert len(schema) == 4 - - output_file = os.path.join(tmp_path, "macauff_GaiaDR3xCatWISE2020_catwise_nonmatches.parquet") - single_metadata = file_io.read_parquet_metadata(output_file) - schema = single_metadata.schema.to_arrow_schema() - - assert len(schema) == 4 diff --git a/tests/hipscat_import/cross_match/test_macauff_runner.py b/tests/hipscat_import/cross_match/test_macauff_runner.py deleted file mode 100644 index f6839175..00000000 --- a/tests/hipscat_import/cross_match/test_macauff_runner.py +++ /dev/null @@ -1,121 +0,0 @@ -import os - -import pytest -from hipscat.catalog.association_catalog.association_catalog import AssociationCatalog -from hipscat.io import file_io - -import hipscat_import.cross_match.run_macauff_import as runner -from hipscat_import.catalog.file_readers import CsvReader -from hipscat_import.cross_match.macauff_arguments import MacauffArguments -from hipscat_import.cross_match.macauff_metadata import from_yaml - -# pylint: disable=too-many-instance-attributes -# pylint: disable=duplicate-code - - -@pytest.mark.dask -def test_bad_args(dask_client): - """Runner should fail with empty or mis-typed arguments""" - with pytest.raises(TypeError, match="MacauffArguments"): - runner.run(None, dask_client) - - args = {"output_artifact_name": "bad_arg_type"} - with pytest.raises(TypeError, match="MacauffArguments"): - runner.run(args, dask_client) - - -@pytest.mark.dask -def test_object_to_object( - small_sky_object_catalog, - tmp_path, - macauff_data_dir, - dask_client, -): - """Test that we can create a MacauffArguments instance with two valid catalogs.""" - - yaml_input_file = os.path.join(macauff_data_dir, "macauff_gaia_catwise_match_and_nonmatches.yaml") - from_yaml(yaml_input_file, tmp_path) - matches_schema_file = os.path.join(tmp_path, "macauff_GaiaDR3xCatWISE2020_matches.parquet") - single_metadata = file_io.read_parquet_metadata(matches_schema_file) - schema = single_metadata.schema.to_arrow_schema() - - assert len(schema) == 7 - - args = MacauffArguments( - output_path=tmp_path, - output_artifact_name="object_to_object", - tmp_dir=tmp_path, - left_catalog_dir=small_sky_object_catalog, - left_ra_column="gaia_ra", - left_dec_column="gaia_dec", - left_id_column="gaia_source_id", - right_catalog_dir=small_sky_object_catalog, - right_ra_column="catwise_ra", - right_dec_column="catwise_dec", - right_id_column="catwise_name", - input_file_list=[os.path.join(macauff_data_dir, "gaia_small_sky_matches.csv")], - input_format="csv", - overwrite=True, - file_reader=CsvReader(schema_file=matches_schema_file, header=None), - metadata_file_path=matches_schema_file, - # progress_bar=False, - ) - os.makedirs(os.path.join(args.tmp_path, "splitting")) - - runner.run(args, dask_client) - - ## Check that the association data can be parsed as a valid association catalog. - catalog = AssociationCatalog.read_from_hipscat(args.catalog_path) - assert catalog.on_disk - assert catalog.catalog_path == args.catalog_path - assert len(catalog.get_join_pixels()) == 1 - assert catalog.catalog_info.total_rows == 131 - - -@pytest.mark.dask -def test_source_to_object( - small_sky_object_catalog, - small_sky_source_catalog, - tmp_path, - macauff_data_dir, - dask_client, -): - """Test that we can create a MacauffArguments instance with two valid catalogs.""" - - yaml_input_file = os.path.join(macauff_data_dir, "macauff_gaia_catwise_match_and_nonmatches.yaml") - from_yaml(yaml_input_file, tmp_path) - matches_schema_file = os.path.join(tmp_path, "macauff_GaiaDR3xCatWISE2020_matches.parquet") - single_metadata = file_io.read_parquet_metadata(matches_schema_file) - schema = single_metadata.schema.to_arrow_schema() - - assert len(schema) == 7 - - args = MacauffArguments( - output_path=tmp_path, - output_artifact_name="object_to_object", - tmp_dir=tmp_path, - left_catalog_dir=small_sky_source_catalog, - left_ra_column="gaia_ra", - left_dec_column="gaia_dec", - left_id_column="gaia_source_id", - right_catalog_dir=small_sky_object_catalog, - right_ra_column="catwise_ra", - right_dec_column="catwise_dec", - right_id_column="catwise_name", - input_file_list=[os.path.join(macauff_data_dir, "small_sky_and_source_matches.csv")], - input_format="csv", - overwrite=True, - file_reader=CsvReader(schema_file=matches_schema_file, header=None), - metadata_file_path=matches_schema_file, - progress_bar=False, - ) - os.makedirs(os.path.join(args.tmp_path, "splitting")) - - runner.run(args, dask_client) - - ## Check that the association data can be parsed as a valid association catalog. - catalog = AssociationCatalog.read_from_hipscat(args.catalog_path) - assert catalog.on_disk - assert catalog.catalog_path == args.catalog_path - assert len(catalog.get_join_pixels()) == 8 - assert catalog.catalog_info.total_rows == 34 diff --git a/tests/hipscat_import/data/macauff/gaia_small_sky_matches.csv b/tests/hipscat_import/data/macauff/gaia_small_sky_matches.csv deleted file mode 100644 index fddaf044..00000000 --- a/tests/hipscat_import/data/macauff/gaia_small_sky_matches.csv +++ /dev/null @@ -1,131 +0,0 @@ -700,282.5,-58.5,cat_700,282.5,-58.5,0.5354709584 -701,299.5,-48.5,cat_701,299.5,-48.5,0.4724403517 -702,310.5,-27.5,cat_702,310.5,-27.5,0.4614836636 -703,286.5,-69.5,cat_703,286.5,-69.5,0.6277813704 -704,326.5,-45.5,cat_704,326.5,-45.5,0.1882040269 -705,335.5,-32.5,cat_705,335.5,-32.5,0.9985247394 -706,297.5,-36.5,cat_706,297.5,-36.5,0.02301896624 -707,308.5,-69.5,cat_707,308.5,-69.5,0.5904110262 -708,307.5,-37.5,cat_708,307.5,-37.5,0.5878325892 -709,294.5,-45.5,cat_709,294.5,-45.5,0.1980368365 -710,341.5,-39.5,cat_710,341.5,-39.5,0.04196035272 -711,305.5,-49.5,cat_711,305.5,-49.5,0.3062621527 -712,288.5,-49.5,cat_712,288.5,-49.5,0.6435039744 -713,298.5,-41.5,cat_713,298.5,-41.5,0.7619788882 -714,303.5,-37.5,cat_714,303.5,-37.5,0.7915978441 -715,280.5,-35.5,cat_715,280.5,-35.5,0.1868950003 -716,305.5,-60.5,cat_716,305.5,-60.5,0.8443717349 -717,303.5,-43.5,cat_717,303.5,-43.5,0.3929019182 -718,292.5,-60.5,cat_718,292.5,-60.5,0.2896284887 -719,344.5,-39.5,cat_719,344.5,-39.5,0.00222404657 -720,344.5,-47.5,cat_720,344.5,-47.5,0.6291603128 -721,314.5,-34.5,cat_721,314.5,-34.5,0.6322839802 -722,350.5,-58.5,cat_722,350.5,-58.5,0.6985155513 -723,315.5,-68.5,cat_723,315.5,-68.5,0.7755598471 -724,323.5,-41.5,cat_724,323.5,-41.5,0.1102038902 -725,308.5,-41.5,cat_725,308.5,-41.5,0.3612986851 -726,341.5,-37.5,cat_726,341.5,-37.5,0.3112333674 -727,301.5,-44.5,cat_727,301.5,-44.5,0.7305355582 -728,328.5,-47.5,cat_728,328.5,-47.5,0.07008040529 -729,299.5,-59.5,cat_729,299.5,-59.5,0.7683396723 -730,322.5,-61.5,cat_730,322.5,-61.5,0.4413972816 -731,343.5,-52.5,cat_731,343.5,-52.5,0.4255687054 -732,337.5,-39.5,cat_732,337.5,-39.5,0.7397828751 -733,329.5,-65.5,cat_733,329.5,-65.5,0.1127992088 -734,348.5,-66.5,cat_734,348.5,-66.5,0.3307251864 -735,299.5,-65.5,cat_735,299.5,-65.5,0.07576674192 -736,303.5,-52.5,cat_736,303.5,-52.5,0.1144396065 -737,316.5,-33.5,cat_737,316.5,-33.5,0.05698199793 -738,345.5,-64.5,cat_738,345.5,-64.5,0.1369706484 -739,332.5,-57.5,cat_739,332.5,-57.5,0.3326924736 -740,306.5,-33.5,cat_740,306.5,-33.5,0.4659156208 -741,303.5,-38.5,cat_741,303.5,-38.5,0.6866683066 -742,348.5,-45.5,cat_742,348.5,-45.5,0.1719145118 -743,307.5,-25.5,cat_743,307.5,-25.5,0.469163747 -744,349.5,-39.5,cat_744,349.5,-39.5,0.2711982345 -745,337.5,-38.5,cat_745,337.5,-38.5,0.5957227712 -746,283.5,-31.5,cat_746,283.5,-31.5,0.8216960109 -747,327.5,-61.5,cat_747,327.5,-61.5,0.9992690083 -748,296.5,-63.5,cat_748,296.5,-63.5,0.9912890703 -749,293.5,-55.5,cat_749,293.5,-55.5,0.5712789508 -750,338.5,-67.5,cat_750,338.5,-67.5,0.0007103625003 -751,330.5,-44.5,cat_751,330.5,-44.5,0.2530383004 -752,291.5,-34.5,cat_752,291.5,-34.5,0.007396371453 -753,307.5,-45.5,cat_753,307.5,-45.5,0.4834114007 -754,313.5,-30.5,cat_754,313.5,-30.5,0.565495019 -755,303.5,-38.5,cat_755,303.5,-38.5,0.3207177632 -756,319.5,-35.5,cat_756,319.5,-35.5,0.5495406617 -757,346.5,-34.5,cat_757,346.5,-34.5,0.9568408496 -758,325.5,-53.5,cat_758,325.5,-53.5,0.8464719616 -759,290.5,-48.5,cat_759,290.5,-48.5,0.08335239057 -760,320.5,-53.5,cat_760,320.5,-53.5,0.6372577995 -761,329.5,-29.5,cat_761,329.5,-29.5,0.2821003251 -762,327.5,-51.5,cat_762,327.5,-51.5,0.3431085653 -763,306.5,-38.5,cat_763,306.5,-38.5,0.7543905106 -764,297.5,-45.5,cat_764,297.5,-45.5,0.8929035356 -765,306.5,-35.5,cat_765,306.5,-35.5,0.8097229976 -766,310.5,-63.5,cat_766,310.5,-63.5,0.6720056414 -767,314.5,-29.5,cat_767,314.5,-29.5,0.4712806906 -768,297.5,-60.5,cat_768,297.5,-60.5,0.5851340965 -769,307.5,-42.5,cat_769,307.5,-42.5,0.7633953196 -770,285.5,-29.5,cat_770,285.5,-29.5,0.05761527425 -771,348.5,-67.5,cat_771,348.5,-67.5,0.9965400568 -772,348.5,-64.5,cat_772,348.5,-64.5,0.4134575263 -773,293.5,-50.5,cat_773,293.5,-50.5,0.006651984057 -774,281.5,-54.5,cat_774,281.5,-54.5,0.01173790109 -775,321.5,-54.5,cat_775,321.5,-54.5,0.5145306498 -776,344.5,-63.5,cat_776,344.5,-63.5,0.3543206217 -777,307.5,-39.5,cat_777,307.5,-39.5,0.8025718501 -778,313.5,-36.5,cat_778,313.5,-36.5,0.3606419908 -779,347.5,-29.5,cat_779,347.5,-29.5,0.8431876746 -780,326.5,-52.5,cat_780,326.5,-52.5,0.3550175616 -781,330.5,-46.5,cat_781,330.5,-46.5,0.01706686238 -782,290.5,-39.5,cat_782,290.5,-39.5,0.4475960667 -783,286.5,-42.5,cat_783,286.5,-42.5,0.1192026354 -784,338.5,-40.5,cat_784,338.5,-40.5,0.7837749925 -785,296.5,-44.5,cat_785,296.5,-44.5,0.5337388926 -786,336.5,-33.5,cat_786,336.5,-33.5,0.1637974224 -787,320.5,-47.5,cat_787,320.5,-47.5,0.1606707258 -788,283.5,-61.5,cat_788,283.5,-61.5,0.3226904481 -789,287.5,-45.5,cat_789,287.5,-45.5,0.871912064 -790,286.5,-35.5,cat_790,286.5,-35.5,0.4421276183 -791,312.5,-28.5,cat_791,312.5,-28.5,0.2629705535 -792,320.5,-69.5,cat_792,320.5,-69.5,0.4247698928 -793,289.5,-58.5,cat_793,289.5,-58.5,0.1293312261 -794,300.5,-66.5,cat_794,300.5,-66.5,0.9858600303 -795,306.5,-58.5,cat_795,306.5,-58.5,0.141651889 -796,320.5,-33.5,cat_796,320.5,-33.5,0.5471312733 -797,308.5,-62.5,cat_797,308.5,-62.5,0.4071495713 -798,316.5,-36.5,cat_798,316.5,-36.5,0.298389701 -799,313.5,-31.5,cat_799,313.5,-31.5,0.1003646965 -800,299.5,-37.5,cat_800,299.5,-37.5,0.1195110611 -801,309.5,-50.5,cat_801,309.5,-50.5,0.8172118166 -802,304.5,-49.5,cat_802,304.5,-49.5,0.53025181 -803,336.5,-25.5,cat_803,336.5,-25.5,0.2723708625 -804,322.5,-66.5,cat_804,322.5,-66.5,0.6105308707 -805,297.5,-52.5,cat_805,297.5,-52.5,0.02027159887 -806,312.5,-29.5,cat_806,312.5,-29.5,0.2661396854 -807,303.5,-60.5,cat_807,303.5,-60.5,0.9609377314 -808,320.5,-40.5,cat_808,320.5,-40.5,0.113751978 -809,283.5,-34.5,cat_809,283.5,-34.5,0.998665039 -810,301.5,-59.5,cat_810,301.5,-59.5,0.1373287203 -811,315.5,-68.5,cat_811,315.5,-68.5,0.6869200694 -812,346.5,-60.5,cat_812,346.5,-60.5,0.5913654966 -813,349.5,-37.5,cat_813,349.5,-37.5,0.4280948593 -814,312.5,-33.5,cat_814,312.5,-33.5,0.06385846256 -815,283.5,-68.5,cat_815,283.5,-68.5,0.5410954638 -816,288.5,-69.5,cat_816,288.5,-69.5,0.2854363325 -817,318.5,-48.5,cat_817,318.5,-48.5,0.5805624378 -818,300.5,-55.5,cat_818,300.5,-55.5,0.006019407311 -819,313.5,-35.5,cat_819,313.5,-35.5,0.8343255244 -820,286.5,-46.5,cat_820,286.5,-46.5,0.8349398583 -821,330.5,-52.5,cat_821,330.5,-52.5,0.164118026 -822,301.5,-54.5,cat_822,301.5,-54.5,0.3361374088 -823,338.5,-45.5,cat_823,338.5,-45.5,0.8403731808 -824,305.5,-28.5,cat_824,305.5,-28.5,0.3213994958 -825,315.5,-30.5,cat_825,315.5,-30.5,0.8849820777 -826,335.5,-69.5,cat_826,335.5,-69.5,0.3293432462 -827,310.5,-40.5,cat_827,310.5,-40.5,0.6342142199 -828,330.5,-26.5,cat_828,330.5,-26.5,0.8067550106 -829,314.5,-35.5,cat_829,314.5,-35.5,0.8037233427 -830,306.5,-50.5,cat_830,306.5,-50.5,0.8159603245 \ No newline at end of file diff --git a/tests/hipscat_import/data/macauff/macauff_gaia_catwise_match.xml b/tests/hipscat_import/data/macauff/macauff_gaia_catwise_match.xml deleted file mode 100644 index 985d8973..00000000 --- a/tests/hipscat_import/data/macauff/macauff_gaia_catwise_match.xml +++ /dev/null @@ -1,68 +0,0 @@ - - - - Gaia_designation - - - The Gaia DR3 object ID. - - - long - - - - - Gaia_RA - - - Right Ascension of the Gaia DR3 source. - - - float - - - - - Gaia_Dec - - - The Gaia DR3 declination. - - - float - - - - - CatWISE_Name - - - The object identifier from the CatWISE 2020 catalogue. - - - string - - - - - CatWISE_RA - - - Right Ascension of the object as quoted by the CatWISE 2020 catalogue. - - - float - - - - - CatWISE_Dec - - - CatWISE 2020 Declination. - - - float - - - \ No newline at end of file diff --git a/tests/hipscat_import/data/macauff/macauff_gaia_catwise_match_and_nonmatches.yaml b/tests/hipscat_import/data/macauff/macauff_gaia_catwise_match_and_nonmatches.yaml deleted file mode 100644 index 5cba93f6..00000000 --- a/tests/hipscat_import/data/macauff/macauff_gaia_catwise_match_and_nonmatches.yaml +++ /dev/null @@ -1,81 +0,0 @@ ---- -name: macauff_GaiaDR3xCatWISE2020 -description: Match and non-match table for macauff cross-matches of Gaia DR3 and CatWISE 2020. -tables: -- name: macauff_GaiaDR3xCatWISE2020_matches - "@id": "#macauff_GaiaDR3xCatWISE2020_matches" - description: Counterpart associations between Gaia and WISE, as well as derived values from the cross-match process such as match probability and contamination flux. - columns: - - name: gaia_source_id - "@id": "#macauff_GaiaDR3xCatWISE2020_matches.gaia_source_id" - datatype: long - description: The Gaia DR3 object ID. - - name: gaia_ra - "@id": "#macauff_GaiaDR3xCatWISE2020_matches.gaia_ra" - datatype: double - description: Right Ascension of the Gaia DR3 source. - - name: gaia_dec - "@id": "#macauff_GaiaDR3xCatWISE2020_matches.gaia_dec" - datatype: double - description: The Gaia DR3 declination. - - name: catwise_name - "@id": "#macauff_GaiaDR3xCatWISE2020_matches.catwise_name" - datatype: string - description: The object identifier from the CatWISE 2020 catalogue. - - name: catwise_ra - "@id": "#macauff_GaiaDR3xCatWISE2020_matches.catwise_ra" - datatype: double - description: Right Ascension of the object as quoted by the CatWISE 2020 catalogue. - - name: catwise_dec - "@id": "#macauff_GaiaDR3xCatWISE2020_matches.catwise_dec" - datatype: double - description: CatWISE 2020 Declination. - - name: match_p - "@id": "#macauff_GaiaDR3xCatWISE2020_matches.match_p" - datatype: double - description: Overall probability that the Gaia DR3 and CatWISE sources are detections of the same object, as given by equation 26 of Wilson & Naylor (2018a). - primaryKey: "#macauff_GaiaDR3xCatWISE2020_matches.gaia_source_id" - -- name: macauff_GaiaDR3xCatWISE2020_gaia_nonmatches - "@id": "#macauff_GaiaDR3xCatWISE2020_gaia_nonmatches" - description: Objects in Gaia DR3 with no counterpart in the CatWISE2020 catalogue, with derived columns such as match probability and simulated flux contamination. - columns: - - name: gaia_source_id - "@id": "#macauff_GaiaDR3xCatWISE2020_gaia_nonmatches.gaia_source_id" - datatype: long - description: The Gaia DR3 object ID. - - name: gaia_ra - "@id": "#macauff_GaiaDR3xCatWISE2020_gaia_nonmatches.gaia_ra" - datatype: double - description: Right Ascension of the Gaia DR3 source. - - name: gaia_dec - "@id": "#macauff_GaiaDR3xCatWISE2020_gaia_nonmatches.gaia_dec" - datatype: double - description: The Gaia DR3 declination. - - name: match_p - "@id": "#macauff_GaiaDR3xCatWISE2020_gaia_nonmatches.match_p" - datatype: double - description: Overall probability that the Gaia DR3 source does not have a corresponding CatWISE detection, as given by equation 26 of Wilson & Naylor (2018a). - primaryKey: "#macauff_GaiaDR3xCatWISE2020_gaia_nonmatches.gaia_source_id" - -- name: macauff_GaiaDR3xCatWISE2020_catwise_nonmatches - "@id": "#macauff_GaiaDR3xCatWISE2020_catwise_nonmatches" - description: Objects in CatWISE2020 with no counterpart in the Gaia DR3 catalogue, with derived columns such as match probability and simulated flux contamination. - columns: - - name: catwise_name - "@id": "#macauff_GaiaDR3xCatWISE2020_catwise_nonmatches.catwise_name" - datatype: string - description: The object identifier from the CatWISE 2020 catalogue. - - name: catwise_ra - "@id": "#macauff_GaiaDR3xCatWISE2020_catwise_nonmatches.catwise_ra" - datatype: double - description: Right Ascension of the object as quoted by the CatWISE 2020 catalogue. - - name: catwise_dec - "@id": "#macauff_GaiaDR3xCatWISE2020_catwise_nonmatches.catwise_dec" - datatype: double - description: CatWISE 2020 Declination. - - name: match_p - "@id": "#macauff_GaiaDR3xCatWISE2020_catwise_nonmatches.match_p" - datatype: double - description: Overall probability that the CatWISE source does not have a corresponding Gaia DR3 detection, as given by equation 26 of Wilson & Naylor (2018a). - primaryKey: "#macauff_GaiaDR3xCatWISE2020_catwise_nonmatches.catwise_name" \ No newline at end of file diff --git a/tests/hipscat_import/data/macauff/small_sky_and_source_matches.csv b/tests/hipscat_import/data/macauff/small_sky_and_source_matches.csv deleted file mode 100644 index 146877e2..00000000 --- a/tests/hipscat_import/data/macauff/small_sky_and_source_matches.csv +++ /dev/null @@ -1,34 +0,0 @@ -72008,320.8364113,-69.45376863,792,320.5,-69.5,0.996 -73091,320.9404216,-69.46498164,792,320.5,-69.5,0.998 -83813,335.5861031,-69.37807662,826,335.5,-69.5,0.994 -78312,335.5182331,-69.38325891,826,335.5,-69.5,0.991 -76201,288.9361436,-69.31626483,816,288.5,-69.5,0.993 -72926,288.9503144,-69.3115179,816,288.5,-69.5,0.99 -72813,310.5307814,-63.34133051,766,310.5,-63.5,0.994 -70048,310.5876212,-63.33485542,766,310.5,-63.5,0.999 -83424,283.7763878,-61.30283808,788,283.5,-61.5,1 -73626,283.864721,-61.29113052,788,283.5,-61.5,1 -84534,347.9612914,-29.13951069,779,347.5,-29.5,0.998 -87130,347.9655757,-29.1246194,779,347.5,-29.5,0.993 -79615,347.9345496,-29.10876863,779,347.5,-29.5,0.99 -73071,347.9463072,-29.08860161,779,347.5,-29.5,0.992 -78803,347.997414,-29.07112828,779,347.5,-29.5,0.999 -76988,348.0338029,-29.04750582,779,347.5,-29.5,0.999 -83444,348.0537862,-29.02085159,779,347.5,-29.5,0.996 -72480,320.0880522,-35.28432758,756,319.5,-35.5,0.997 -76134,320.0697349,-35.21411381,756,319.5,-35.5,0.99 -75313,319.7793679,-35.45350619,756,319.5,-35.5,0.999 -79351,319.7409873,-35.4177272,756,319.5,-35.5,0.993 -78766,319.8029046,-35.42603476,756,319.5,-35.5,0.992 -74689,319.7981819,-35.41676507,756,319.5,-35.5,0.997 -73928,319.7099797,-35.43311803,756,319.5,-35.5,0.99 -77882,319.689082,-35.43731031,756,319.5,-35.5,0.998 -85015,319.6872701,-35.43434368,756,319.5,-35.5,0.99 -75167,319.7008698,-35.43045134,756,319.5,-35.5,0.996 -75394,319.736227,-35.40559895,756,319.5,-35.5,0.999 -80736,319.7140687,-35.37583874,756,319.5,-35.5,0.99 -86351,290.5372378,-39.34034881,782,290.5,-39.5,0.996 -84773,290.5185662,-39.3174862,782,290.5,-39.5,0.998 -75092,290.5865147,-39.30033282,782,290.5,-39.5,0.992 -78548,290.5404456,-39.31843165,782,290.5,-39.5,0.997 -79186,290.7615303,-39.38550864,782,290.5,-39.5,0.994 \ No newline at end of file diff --git a/tests/hipscat_import/index/test_run_index.py b/tests/hipscat_import/index/test_run_index.py index 6ebace01..011ee2ca 100644 --- a/tests/hipscat_import/index/test_run_index.py +++ b/tests/hipscat_import/index/test_run_index.py @@ -114,7 +114,7 @@ def test_run_index_on_source( @pytest.mark.dask def test_run_index_on_source_object_id( small_sky_source_catalog, - dask_client, # pylint: disable=unused-argument + dask_client, # pylint: disable=unused-argument tmp_path, assert_parquet_file_index, ): diff --git a/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py b/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py index 6c0afacc..5f150d57 100644 --- a/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py +++ b/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py @@ -1,8 +1,12 @@ +import os + import pandas as pd import pytest -from hipscat.io import file_io, paths +from hipscat.io import paths +from hipscat.pixel_math.healpix_pixel import HealpixPixel from hipscat_import.margin_cache import margin_cache_map_reduce +from hipscat_import.pipeline_resume_plan import get_pixel_cache_directory keep_cols = ["weird_ra", "weird_dec"] @@ -39,9 +43,9 @@ def test_to_pixel_shard_equator(tmp_path, basic_data_shard_df): dec_column="weird_dec", ) - path = file_io.append_paths_to_pointer(tmp_path, "Norder=1/Dir=0/Npix=21/Norder=1/Dir=0/Npix=0.parquet") + path = os.path.join(tmp_path, "order_1", "dir_0", "pixel_21", "Norder=1", "Dir=0", "Npix=0.parquet") - assert file_io.does_file_or_directory_exist(path) + assert os.path.exists(path) validate_result_dataframe(path, 2) @@ -56,25 +60,23 @@ def test_to_pixel_shard_polar(tmp_path, polar_data_shard_df): dec_column="weird_dec", ) - path = file_io.append_paths_to_pointer(tmp_path, "Norder=2/Dir=0/Npix=15/Norder=2/Dir=0/Npix=0.parquet") + path = os.path.join(tmp_path, "order_2", "dir_0", "pixel_15", "Norder=2", "Dir=0", "Npix=0.parquet") - assert file_io.does_file_or_directory_exist(path) + assert os.path.exists(path) validate_result_dataframe(path, 360) @pytest.mark.dask def test_reduce_margin_shards(tmp_path, basic_data_shard_df): - partition_dir = margin_cache_map_reduce._get_partition_directory(tmp_path, 1, 21) + partition_dir = get_pixel_cache_directory(tmp_path, HealpixPixel(1, 21)) shard_dir = paths.pixel_directory(partition_dir, 1, 21) - file_io.make_directory(shard_dir, exist_ok=True) + os.makedirs(shard_dir) first_shard_path = paths.pixel_catalog_file(partition_dir, 1, 0) second_shard_path = paths.pixel_catalog_file(partition_dir, 1, 1) - print(first_shard_path) - shard_df = basic_data_shard_df.drop(columns=["partition_order", "partition_pixel", "margin_pixel"]) shard_df.to_parquet(first_shard_path) @@ -85,3 +87,4 @@ def test_reduce_margin_shards(tmp_path, basic_data_shard_df): result_path = paths.pixel_catalog_file(tmp_path, 1, 21) validate_result_dataframe(result_path, 720) + assert not os.path.exists(shard_dir)