From f63ac5bf13d2dc5ab6b9680c0637df35fb7c606c Mon Sep 17 00:00:00 2001 From: delucchi-cmu Date: Wed, 6 Dec 2023 13:28:23 -0500 Subject: [PATCH 1/6] All but dask --- .../cross_match/macauff_arguments.py | 69 ++++----- .../cross_match/macauff_map_reduce.py | 63 +++++++++ .../cross_match/macauff_metadata.py | 2 +- .../cross_match/run_macauff_import.py | 88 +++++++++++- .../cross_match/test_macauff_runner.py | 55 ++++++-- .../data/macauff/gaia_small_sky_matches.csv | 131 ++++++++++++++++++ 6 files changed, 357 insertions(+), 51 deletions(-) create mode 100644 src/hipscat_import/cross_match/macauff_map_reduce.py create mode 100644 tests/hipscat_import/data/macauff/gaia_small_sky_matches.csv diff --git a/src/hipscat_import/cross_match/macauff_arguments.py b/src/hipscat_import/cross_match/macauff_arguments.py index 74283f18..24faebc6 100644 --- a/src/hipscat_import/cross_match/macauff_arguments.py +++ b/src/hipscat_import/cross_match/macauff_arguments.py @@ -4,9 +4,12 @@ from os import path from typing import List +from hipscat.catalog.association_catalog.association_catalog import AssociationCatalogInfo +from hipscat.catalog.catalog_type import CatalogType from hipscat.io import FilePointer from hipscat.io.validation import is_valid_catalog +from hipscat_import.catalog.file_readers import InputReader, get_file_reader from hipscat_import.runtime_arguments import RuntimeArguments, find_input_paths # pylint: disable=too-many-instance-attributes @@ -28,8 +31,6 @@ class MacauffArguments(RuntimeArguments): """can be used instead of `input_format` to import only specified files""" input_paths: List[FilePointer] = field(default_factory=list) """resolved list of all files that will be used in the importer""" - add_hipscat_index: bool = True - """add the hipscat spatial index field alongside the data""" ## Input - Left catalog left_catalog_dir: str = "" @@ -45,8 +46,8 @@ class MacauffArguments(RuntimeArguments): ## `macauff` specific attributes metadata_file_path: str = "" - match_probability_columns: List[str] = field(default_factory=list) - column_names: List[str] = field(default_factory=list) + + file_reader: InputReader | None = None def __post_init__(self): self._check_arguments() @@ -89,33 +90,33 @@ def _check_arguments(self): # Basic checks complete - make more checks and create directories where necessary self.input_paths = find_input_paths(self.input_path, f"*{self.input_format}", self.input_file_list) - self.column_names = self.get_column_names() - - def get_column_names(self): - """Grab the macauff column names.""" - # TODO: Actually read in the metadata file once we get the example file from Tom. - - return [ - "Gaia_designation", - "Gaia_RA", - "Gaia_Dec", - "BP", - "G", - "RP", - "CatWISE_Name", - "CatWISE_RA", - "CatWISE_Dec", - "W1", - "W2", - "match_p", - "Separation", - "eta", - "xi", - "Gaia_avg_cont", - "CatWISE_avg_cont", - "Gaia_cont_f1", - "Gaia_cont_f10", - "CatWISE_cont_f1", - "CatWISE_cont_f10", - "CatWISE_fit_sig", - ] + if not self.file_reader: + self.file_reader = get_file_reader(file_format=self.input_format) + + def to_catalog_info(self, total_rows) -> AssociationCatalogInfo: + """Catalog-type-specific dataset info.""" + info = { + "catalog_name": self.output_artifact_name, + "catalog_type": CatalogType.ASSOCIATION, + "total_rows": total_rows, + "primary_column": self.left_id_column, + "primary_catalog": str(self.left_catalog_dir), + "join_column": self.right_id_column, + "join_catalog": str(self.right_catalog_dir), + } + return AssociationCatalogInfo(**info) + + def additional_runtime_provenance_info(self) -> dict: + return { + "input_path": self.input_path, + "input_format": self.input_format, + "left_catalog_dir": self.left_catalog_dir, + "left_id_column": self.left_id_column, + "left_ra_column": self.left_ra_column, + "left_dec_column": self.left_dec_column, + "right_catalog_dir": self.right_catalog_dir, + "right_id_column": self.right_id_column, + "right_ra_column": self.right_ra_column, + "right_dec_column": self.right_dec_column, + "metadata_file_path": self.metadata_file_path, + } diff --git a/src/hipscat_import/cross_match/macauff_map_reduce.py b/src/hipscat_import/cross_match/macauff_map_reduce.py new file mode 100644 index 00000000..d1b0b4e6 --- /dev/null +++ b/src/hipscat_import/cross_match/macauff_map_reduce.py @@ -0,0 +1,63 @@ +import healpy as hp +import numpy as np +import pyarrow as pa +import pyarrow.parquet as pq +from hipscat.io import FilePointer, file_io, paths + + +def _get_pixel_directory(cache_path: FilePointer, order: np.int64, pixel: np.int64): + """Create a path for intermediate pixel data. + + This will take the form: + + /dir_/pixel_ + + where the directory separator is calculated using integer division: + + (pixel/10000)*10000 + + and exists to mitigate problems on file systems that don't support + more than 10_000 children nodes. + """ + dir_number = int(pixel / 10_000) * 10_000 + return file_io.append_paths_to_pointer( + cache_path, f"order_{order}", f"dir_{dir_number}", f"pixel_{pixel}" + ) + + +def reduce_associations(args, left_pixel, highest_right_order, regenerated_right_alignment): + """For all points determined to be in the target left_pixel, map them to the appropriate right_pixel + and aggregate into a single parquet file.""" + inputs = _get_pixel_directory(args.tmp_path, left_pixel.order, left_pixel.pixel) + + destination_dir = paths.pixel_directory(args.catalog_path, left_pixel.order, left_pixel.pixel) + file_io.make_directory(destination_dir, exist_ok=True) + + destination_file = paths.pixel_catalog_file(args.catalog_path, left_pixel.order, left_pixel.pixel) + + tables = [] + tables.append(pq.read_table(inputs)) + + merged_table = pa.concat_tables(tables) + dataframe = merged_table.to_pandas() + rows_written = len(dataframe) + + dataframe["Norder"] = np.full(rows_written, fill_value=left_pixel.order, dtype=np.uint8) + dataframe["Dir"] = np.full(rows_written, fill_value=left_pixel.dir, dtype=np.uint32) + dataframe["Npix"] = np.full(rows_written, fill_value=left_pixel.pixel, dtype=np.uint32) + + mapped_pixels = hp.ang2pix( + 2**highest_right_order, + dataframe[args.right_ra_column].values, + dataframe[args.right_dec_column].values, + lonlat=True, + nest=True, + ) + aligned_pixels = regenerated_right_alignment[mapped_pixels] + + dataframe["join_Norder"] = [pixel[0] for pixel in aligned_pixels] + dataframe["join_Dir"] = [int(pixel[1] / 10_000) * 10_000 for pixel in aligned_pixels] + dataframe["join_Npix"] = [pixel[1] for pixel in aligned_pixels] + + ## TODO - row groups per join_Norder/join_Npix + dataframe.to_parquet(destination_file) diff --git a/src/hipscat_import/cross_match/macauff_metadata.py b/src/hipscat_import/cross_match/macauff_metadata.py index 0e20caf0..65c8bf99 100644 --- a/src/hipscat_import/cross_match/macauff_metadata.py +++ b/src/hipscat_import/cross_match/macauff_metadata.py @@ -94,7 +94,7 @@ def from_yaml(input_file, output_directory): table_name = table.get("name", f"metadata_table_{index}") for col_index, column in enumerate(table.get("columns", [])): name = column.get("name", f"column_{col_index}") - units = column.get("units", "string") + units = column.get("datatype", "string") fields.append(_construct_field(name, units, metadata_dict=column)) schema = pa.schema(fields) diff --git a/src/hipscat_import/cross_match/run_macauff_import.py b/src/hipscat_import/cross_match/run_macauff_import.py index aad49b51..b7296d19 100644 --- a/src/hipscat_import/cross_match/run_macauff_import.py +++ b/src/hipscat_import/cross_match/run_macauff_import.py @@ -1,8 +1,67 @@ +import healpy as hp +import numpy as np +from hipscat.catalog import Catalog +from hipscat.io import file_io, parquet_metadata, paths, write_metadata +from tqdm import tqdm + +import hipscat_import.catalog.map_reduce as catalog_mr from hipscat_import.cross_match.macauff_arguments import MacauffArguments +from hipscat_import.cross_match.macauff_map_reduce import reduce_associations # pylint: disable=unused-argument +def split_associations(args, left_catalog): + """Split association rows by their aligned left pixel.""" + left_pixels = left_catalog.partition_info.get_healpix_pixels() + highest_order = left_catalog.partition_info.get_highest_order() + + regenerated_alignment = np.full(hp.order2npix(highest_order), None) + for pixel in left_pixels: + explosion_factor = 4 ** (highest_order - pixel.order) + exploded_pixels = np.arange( + pixel.pixel * explosion_factor, + (pixel.pixel + 1) * explosion_factor, + ) + for explody in exploded_pixels: + regenerated_alignment[explody] = (pixel.order, pixel.pixel, 0) + + for i, file in enumerate(args.input_paths): + catalog_mr.split_pixels( + input_file=file, + file_reader=args.file_reader, + splitting_key=i, + highest_order=highest_order, + ra_column=args.left_ra_column, + dec_column=args.left_dec_column, + cache_shard_path=args.tmp_path, + resume_path=args.tmp_path, + alignment=regenerated_alignment, + use_hipscat_index=False, + ) + + +def reduce(args, left_catalog, right_catalog): + """Reduce left pixel files into a single parquet file per.""" + highest_right_order = right_catalog.partition_info.get_highest_order() + + left_pixels = left_catalog.partition_info.get_healpix_pixels() + right_pixels = right_catalog.partition_info.get_healpix_pixels() + + regenerated_right_alignment = np.full(hp.order2npix(highest_right_order), None) + for pixel in right_pixels: + explosion_factor = 4 ** (highest_right_order - pixel.order) + exploded_pixels = np.arange( + pixel.pixel * explosion_factor, + (pixel.pixel + 1) * explosion_factor, + ) + for explody in exploded_pixels: + regenerated_right_alignment[explody] = (pixel.order, pixel.pixel, 0) + + for left_pixel in left_pixels: + reduce_associations(args, left_pixel, highest_right_order, regenerated_right_alignment) + + def run(args, client): """run macauff cross-match import pipeline""" if not args: @@ -10,4 +69,31 @@ def run(args, client): if not isinstance(args, MacauffArguments): raise TypeError("args must be type MacauffArguments") - raise NotImplementedError("macauff pipeline not implemented yet.") + left_catalog = Catalog.read_from_hipscat(args.left_catalog_dir) + right_catalog = Catalog.read_from_hipscat(args.right_catalog_dir) + + split_associations(args, left_catalog) + reduce(args, left_catalog, right_catalog) + + # All done - write out the metadata + with tqdm(total=4, desc="Finishing", disable=not args.progress_bar) as step_progress: + parquet_metadata.write_parquet_metadata(args.catalog_path) + total_rows = 0 + metadata_path = paths.get_parquet_metadata_pointer(args.catalog_path) + for row_group in parquet_metadata.read_row_group_fragments(metadata_path): + total_rows += row_group.num_rows + # pylint: disable=duplicate-code + # Very similar to /index/run_index.py + step_progress.update(1) + total_rows = int(total_rows) + catalog_info = args.to_catalog_info(total_rows) + write_metadata.write_provenance_info( + catalog_base_dir=args.catalog_path, + dataset_info=catalog_info, + tool_args=args.provenance_info(), + ) + step_progress.update(1) + write_metadata.write_catalog_info(dataset_info=catalog_info, catalog_base_dir=args.catalog_path) + step_progress.update(1) + file_io.remove_directory(args.tmp_path, ignore_errors=True) + step_progress.update(1) diff --git a/tests/hipscat_import/cross_match/test_macauff_runner.py b/tests/hipscat_import/cross_match/test_macauff_runner.py index 144e7954..7238f63e 100644 --- a/tests/hipscat_import/cross_match/test_macauff_runner.py +++ b/tests/hipscat_import/cross_match/test_macauff_runner.py @@ -1,7 +1,13 @@ +import os + import pytest +from hipscat.catalog.association_catalog.association_catalog import AssociationCatalog +from hipscat.io import file_io import hipscat_import.cross_match.run_macauff_import as runner +from hipscat_import.catalog.file_readers import CsvReader from hipscat_import.cross_match.macauff_arguments import MacauffArguments +from hipscat_import.cross_match.macauff_metadata import from_yaml # pylint: disable=too-many-instance-attributes # pylint: disable=duplicate-code @@ -21,29 +27,48 @@ def test_bad_args(dask_client): @pytest.mark.dask def test_no_implementation( small_sky_object_catalog, - small_sky_source_catalog, - small_sky_dir, - formats_yaml, tmp_path, + macauff_data_dir, dask_client, ): """Test that we can create a MacauffArguments instance with two valid catalogs.""" + + # os.makedirs(os.path.join(tmp_path, "object_to_object")) + + yaml_input_file = os.path.join(macauff_data_dir, "macauff_gaia_catwise_match_and_nonmatches.yaml") + from_yaml(yaml_input_file, tmp_path) + matches_schema_file = os.path.join(tmp_path, "macauff_GaiaDR3xCatWISE2020_matches.parquet") + single_metadata = file_io.read_parquet_metadata(matches_schema_file) + schema = single_metadata.schema.to_arrow_schema() + + assert len(schema) == 7 + args = MacauffArguments( output_path=tmp_path, - output_artifact_name="object_to_source", + output_artifact_name="object_to_object", tmp_dir=tmp_path, left_catalog_dir=small_sky_object_catalog, - left_ra_column="ra", - left_dec_column="dec", - left_id_column="id", - right_catalog_dir=small_sky_source_catalog, - right_ra_column="source_ra", - right_dec_column="source_dec", - right_id_column="source_id", - input_path=small_sky_dir, + left_ra_column="gaia_ra", + left_dec_column="gaia_dec", + left_id_column="gaia_source_id", + right_catalog_dir=small_sky_object_catalog, + right_ra_column="catwise_ra", + right_dec_column="catwise_dec", + right_id_column="catwise_name", + input_file_list=[os.path.join(macauff_data_dir, "gaia_small_sky_matches.csv")], input_format="csv", - metadata_file_path=formats_yaml, + overwrite=True, + file_reader=CsvReader(schema_file=matches_schema_file, header=None), + metadata_file_path=matches_schema_file, + progress_bar=False, ) + os.makedirs(os.path.join(args.tmp_path, "splitting")) - with pytest.raises(NotImplementedError, match="not implemented yet."): - runner.run(args, dask_client) + runner.run(args, dask_client) + + ## Check that the association data can be parsed as a valid association catalog. + catalog = AssociationCatalog.read_from_hipscat(args.catalog_path) + assert catalog.on_disk + assert catalog.catalog_path == args.catalog_path + assert len(catalog.get_join_pixels()) == 1 + assert catalog.catalog_info.total_rows == 131 diff --git a/tests/hipscat_import/data/macauff/gaia_small_sky_matches.csv b/tests/hipscat_import/data/macauff/gaia_small_sky_matches.csv new file mode 100644 index 00000000..fddaf044 --- /dev/null +++ b/tests/hipscat_import/data/macauff/gaia_small_sky_matches.csv @@ -0,0 +1,131 @@ +700,282.5,-58.5,cat_700,282.5,-58.5,0.5354709584 +701,299.5,-48.5,cat_701,299.5,-48.5,0.4724403517 +702,310.5,-27.5,cat_702,310.5,-27.5,0.4614836636 +703,286.5,-69.5,cat_703,286.5,-69.5,0.6277813704 +704,326.5,-45.5,cat_704,326.5,-45.5,0.1882040269 +705,335.5,-32.5,cat_705,335.5,-32.5,0.9985247394 +706,297.5,-36.5,cat_706,297.5,-36.5,0.02301896624 +707,308.5,-69.5,cat_707,308.5,-69.5,0.5904110262 +708,307.5,-37.5,cat_708,307.5,-37.5,0.5878325892 +709,294.5,-45.5,cat_709,294.5,-45.5,0.1980368365 +710,341.5,-39.5,cat_710,341.5,-39.5,0.04196035272 +711,305.5,-49.5,cat_711,305.5,-49.5,0.3062621527 +712,288.5,-49.5,cat_712,288.5,-49.5,0.6435039744 +713,298.5,-41.5,cat_713,298.5,-41.5,0.7619788882 +714,303.5,-37.5,cat_714,303.5,-37.5,0.7915978441 +715,280.5,-35.5,cat_715,280.5,-35.5,0.1868950003 +716,305.5,-60.5,cat_716,305.5,-60.5,0.8443717349 +717,303.5,-43.5,cat_717,303.5,-43.5,0.3929019182 +718,292.5,-60.5,cat_718,292.5,-60.5,0.2896284887 +719,344.5,-39.5,cat_719,344.5,-39.5,0.00222404657 +720,344.5,-47.5,cat_720,344.5,-47.5,0.6291603128 +721,314.5,-34.5,cat_721,314.5,-34.5,0.6322839802 +722,350.5,-58.5,cat_722,350.5,-58.5,0.6985155513 +723,315.5,-68.5,cat_723,315.5,-68.5,0.7755598471 +724,323.5,-41.5,cat_724,323.5,-41.5,0.1102038902 +725,308.5,-41.5,cat_725,308.5,-41.5,0.3612986851 +726,341.5,-37.5,cat_726,341.5,-37.5,0.3112333674 +727,301.5,-44.5,cat_727,301.5,-44.5,0.7305355582 +728,328.5,-47.5,cat_728,328.5,-47.5,0.07008040529 +729,299.5,-59.5,cat_729,299.5,-59.5,0.7683396723 +730,322.5,-61.5,cat_730,322.5,-61.5,0.4413972816 +731,343.5,-52.5,cat_731,343.5,-52.5,0.4255687054 +732,337.5,-39.5,cat_732,337.5,-39.5,0.7397828751 +733,329.5,-65.5,cat_733,329.5,-65.5,0.1127992088 +734,348.5,-66.5,cat_734,348.5,-66.5,0.3307251864 +735,299.5,-65.5,cat_735,299.5,-65.5,0.07576674192 +736,303.5,-52.5,cat_736,303.5,-52.5,0.1144396065 +737,316.5,-33.5,cat_737,316.5,-33.5,0.05698199793 +738,345.5,-64.5,cat_738,345.5,-64.5,0.1369706484 +739,332.5,-57.5,cat_739,332.5,-57.5,0.3326924736 +740,306.5,-33.5,cat_740,306.5,-33.5,0.4659156208 +741,303.5,-38.5,cat_741,303.5,-38.5,0.6866683066 +742,348.5,-45.5,cat_742,348.5,-45.5,0.1719145118 +743,307.5,-25.5,cat_743,307.5,-25.5,0.469163747 +744,349.5,-39.5,cat_744,349.5,-39.5,0.2711982345 +745,337.5,-38.5,cat_745,337.5,-38.5,0.5957227712 +746,283.5,-31.5,cat_746,283.5,-31.5,0.8216960109 +747,327.5,-61.5,cat_747,327.5,-61.5,0.9992690083 +748,296.5,-63.5,cat_748,296.5,-63.5,0.9912890703 +749,293.5,-55.5,cat_749,293.5,-55.5,0.5712789508 +750,338.5,-67.5,cat_750,338.5,-67.5,0.0007103625003 +751,330.5,-44.5,cat_751,330.5,-44.5,0.2530383004 +752,291.5,-34.5,cat_752,291.5,-34.5,0.007396371453 +753,307.5,-45.5,cat_753,307.5,-45.5,0.4834114007 +754,313.5,-30.5,cat_754,313.5,-30.5,0.565495019 +755,303.5,-38.5,cat_755,303.5,-38.5,0.3207177632 +756,319.5,-35.5,cat_756,319.5,-35.5,0.5495406617 +757,346.5,-34.5,cat_757,346.5,-34.5,0.9568408496 +758,325.5,-53.5,cat_758,325.5,-53.5,0.8464719616 +759,290.5,-48.5,cat_759,290.5,-48.5,0.08335239057 +760,320.5,-53.5,cat_760,320.5,-53.5,0.6372577995 +761,329.5,-29.5,cat_761,329.5,-29.5,0.2821003251 +762,327.5,-51.5,cat_762,327.5,-51.5,0.3431085653 +763,306.5,-38.5,cat_763,306.5,-38.5,0.7543905106 +764,297.5,-45.5,cat_764,297.5,-45.5,0.8929035356 +765,306.5,-35.5,cat_765,306.5,-35.5,0.8097229976 +766,310.5,-63.5,cat_766,310.5,-63.5,0.6720056414 +767,314.5,-29.5,cat_767,314.5,-29.5,0.4712806906 +768,297.5,-60.5,cat_768,297.5,-60.5,0.5851340965 +769,307.5,-42.5,cat_769,307.5,-42.5,0.7633953196 +770,285.5,-29.5,cat_770,285.5,-29.5,0.05761527425 +771,348.5,-67.5,cat_771,348.5,-67.5,0.9965400568 +772,348.5,-64.5,cat_772,348.5,-64.5,0.4134575263 +773,293.5,-50.5,cat_773,293.5,-50.5,0.006651984057 +774,281.5,-54.5,cat_774,281.5,-54.5,0.01173790109 +775,321.5,-54.5,cat_775,321.5,-54.5,0.5145306498 +776,344.5,-63.5,cat_776,344.5,-63.5,0.3543206217 +777,307.5,-39.5,cat_777,307.5,-39.5,0.8025718501 +778,313.5,-36.5,cat_778,313.5,-36.5,0.3606419908 +779,347.5,-29.5,cat_779,347.5,-29.5,0.8431876746 +780,326.5,-52.5,cat_780,326.5,-52.5,0.3550175616 +781,330.5,-46.5,cat_781,330.5,-46.5,0.01706686238 +782,290.5,-39.5,cat_782,290.5,-39.5,0.4475960667 +783,286.5,-42.5,cat_783,286.5,-42.5,0.1192026354 +784,338.5,-40.5,cat_784,338.5,-40.5,0.7837749925 +785,296.5,-44.5,cat_785,296.5,-44.5,0.5337388926 +786,336.5,-33.5,cat_786,336.5,-33.5,0.1637974224 +787,320.5,-47.5,cat_787,320.5,-47.5,0.1606707258 +788,283.5,-61.5,cat_788,283.5,-61.5,0.3226904481 +789,287.5,-45.5,cat_789,287.5,-45.5,0.871912064 +790,286.5,-35.5,cat_790,286.5,-35.5,0.4421276183 +791,312.5,-28.5,cat_791,312.5,-28.5,0.2629705535 +792,320.5,-69.5,cat_792,320.5,-69.5,0.4247698928 +793,289.5,-58.5,cat_793,289.5,-58.5,0.1293312261 +794,300.5,-66.5,cat_794,300.5,-66.5,0.9858600303 +795,306.5,-58.5,cat_795,306.5,-58.5,0.141651889 +796,320.5,-33.5,cat_796,320.5,-33.5,0.5471312733 +797,308.5,-62.5,cat_797,308.5,-62.5,0.4071495713 +798,316.5,-36.5,cat_798,316.5,-36.5,0.298389701 +799,313.5,-31.5,cat_799,313.5,-31.5,0.1003646965 +800,299.5,-37.5,cat_800,299.5,-37.5,0.1195110611 +801,309.5,-50.5,cat_801,309.5,-50.5,0.8172118166 +802,304.5,-49.5,cat_802,304.5,-49.5,0.53025181 +803,336.5,-25.5,cat_803,336.5,-25.5,0.2723708625 +804,322.5,-66.5,cat_804,322.5,-66.5,0.6105308707 +805,297.5,-52.5,cat_805,297.5,-52.5,0.02027159887 +806,312.5,-29.5,cat_806,312.5,-29.5,0.2661396854 +807,303.5,-60.5,cat_807,303.5,-60.5,0.9609377314 +808,320.5,-40.5,cat_808,320.5,-40.5,0.113751978 +809,283.5,-34.5,cat_809,283.5,-34.5,0.998665039 +810,301.5,-59.5,cat_810,301.5,-59.5,0.1373287203 +811,315.5,-68.5,cat_811,315.5,-68.5,0.6869200694 +812,346.5,-60.5,cat_812,346.5,-60.5,0.5913654966 +813,349.5,-37.5,cat_813,349.5,-37.5,0.4280948593 +814,312.5,-33.5,cat_814,312.5,-33.5,0.06385846256 +815,283.5,-68.5,cat_815,283.5,-68.5,0.5410954638 +816,288.5,-69.5,cat_816,288.5,-69.5,0.2854363325 +817,318.5,-48.5,cat_817,318.5,-48.5,0.5805624378 +818,300.5,-55.5,cat_818,300.5,-55.5,0.006019407311 +819,313.5,-35.5,cat_819,313.5,-35.5,0.8343255244 +820,286.5,-46.5,cat_820,286.5,-46.5,0.8349398583 +821,330.5,-52.5,cat_821,330.5,-52.5,0.164118026 +822,301.5,-54.5,cat_822,301.5,-54.5,0.3361374088 +823,338.5,-45.5,cat_823,338.5,-45.5,0.8403731808 +824,305.5,-28.5,cat_824,305.5,-28.5,0.3213994958 +825,315.5,-30.5,cat_825,315.5,-30.5,0.8849820777 +826,335.5,-69.5,cat_826,335.5,-69.5,0.3293432462 +827,310.5,-40.5,cat_827,310.5,-40.5,0.6342142199 +828,330.5,-26.5,cat_828,330.5,-26.5,0.8067550106 +829,314.5,-35.5,cat_829,314.5,-35.5,0.8037233427 +830,306.5,-50.5,cat_830,306.5,-50.5,0.8159603245 \ No newline at end of file From ea75765069173a9b4310e6bdba07a33a5d6965ce Mon Sep 17 00:00:00 2001 From: delucchi-cmu Date: Wed, 6 Dec 2023 14:58:16 -0500 Subject: [PATCH 2/6] row group per right pixel --- .../cross_match/macauff_map_reduce.py | 134 ++++++++++++------ .../cross_match/run_macauff_import.py | 61 ++++---- 2 files changed, 116 insertions(+), 79 deletions(-) diff --git a/src/hipscat_import/cross_match/macauff_map_reduce.py b/src/hipscat_import/cross_match/macauff_map_reduce.py index d1b0b4e6..2daed416 100644 --- a/src/hipscat_import/cross_match/macauff_map_reduce.py +++ b/src/hipscat_import/cross_match/macauff_map_reduce.py @@ -2,30 +2,83 @@ import numpy as np import pyarrow as pa import pyarrow.parquet as pq -from hipscat.io import FilePointer, file_io, paths - - -def _get_pixel_directory(cache_path: FilePointer, order: np.int64, pixel: np.int64): - """Create a path for intermediate pixel data. - - This will take the form: - - /dir_/pixel_ - - where the directory separator is calculated using integer division: - - (pixel/10000)*10000 - - and exists to mitigate problems on file systems that don't support - more than 10_000 children nodes. +from hipscat.io import file_io, paths +from hipscat.pixel_math.healpix_pixel import HealpixPixel +from hipscat.pixel_math.healpix_pixel_function import get_pixel_argsort + +from hipscat_import.catalog.map_reduce import _get_pixel_directory, _iterate_input_file +from hipscat_import.catalog.resume_plan import ResumePlan + + +def split_associations( + input_file, + file_reader, + splitting_key, + args, + highest_left_order, + highest_right_order, + left_alignment, + right_alignment, +): + """Map a file of links to their healpix pixels and split into shards. + + Args: + input_file (FilePointer): file to read for catalog data. + file_reader (hipscat_import.catalog.file_readers.InputReader): instance + of input reader that specifies arguments necessary for reading from the input file. + splitting_key (str): unique counter for this input file, used + when creating intermediate files + highest_order (int): healpix order to use when mapping + ra_column (str): where to find right ascension data in the dataframe + dec_column (str): where to find declation in the dataframe + cache_shard_path (FilePointer): where to write intermediate parquet files. + resume_path (FilePointer): where to write resume files. + + Raises: + ValueError: if the `ra_column` or `dec_column` cannot be found in the input file. + FileNotFoundError: if the file does not exist, or is a directory """ - dir_number = int(pixel / 10_000) * 10_000 - return file_io.append_paths_to_pointer( - cache_path, f"order_{order}", f"dir_{dir_number}", f"pixel_{pixel}" - ) - - -def reduce_associations(args, left_pixel, highest_right_order, regenerated_right_alignment): + for chunk_number, data, mapped_left_pixels in _iterate_input_file( + input_file, file_reader, highest_left_order, args.left_ra_column, args.left_dec_column, False + ): + aligned_left_pixels = left_alignment[mapped_left_pixels] + unique_pixels, unique_inverse = np.unique(aligned_left_pixels, return_inverse=True) + + mapped_right_pixels = hp.ang2pix( + 2**highest_right_order, + data[args.right_ra_column].values, + data[args.right_dec_column].values, + lonlat=True, + nest=True, + ) + aligned_right_pixels = right_alignment[mapped_right_pixels] + + data["Norder"] = [pix.order for pix in aligned_left_pixels] + data["Dir"] = [pix.dir for pix in aligned_left_pixels] + data["Npix"] = [pix.pixel for pix in aligned_left_pixels] + + data["join_Norder"] = [pix.order for pix in aligned_right_pixels] + data["join_Dir"] = [pix.dir for pix in aligned_right_pixels] + data["join_Npix"] = [pix.pixel for pix in aligned_right_pixels] + + for unique_index, pixel in enumerate(unique_pixels): + mapped_indexes = np.where(unique_inverse == unique_index) + data_indexes = data.index[mapped_indexes[0].tolist()] + + filtered_data = data.filter(items=data_indexes, axis=0) + + pixel_dir = _get_pixel_directory(args.tmp_path, pixel.order, pixel.pixel) + file_io.make_directory(pixel_dir, exist_ok=True) + output_file = file_io.append_paths_to_pointer( + pixel_dir, f"shard_{splitting_key}_{chunk_number}.parquet" + ) + filtered_data.to_parquet(output_file, index=False) + del filtered_data, data_indexes + + ResumePlan.splitting_key_done(tmp_path=args.tmp_path, splitting_key=splitting_key) + + +def reduce_associations(args, left_pixel): """For all points determined to be in the target left_pixel, map them to the appropriate right_pixel and aggregate into a single parquet file.""" inputs = _get_pixel_directory(args.tmp_path, left_pixel.order, left_pixel.pixel) @@ -35,29 +88,16 @@ def reduce_associations(args, left_pixel, highest_right_order, regenerated_right destination_file = paths.pixel_catalog_file(args.catalog_path, left_pixel.order, left_pixel.pixel) - tables = [] - tables.append(pq.read_table(inputs)) - - merged_table = pa.concat_tables(tables) - dataframe = merged_table.to_pandas() - rows_written = len(dataframe) - - dataframe["Norder"] = np.full(rows_written, fill_value=left_pixel.order, dtype=np.uint8) - dataframe["Dir"] = np.full(rows_written, fill_value=left_pixel.dir, dtype=np.uint32) - dataframe["Npix"] = np.full(rows_written, fill_value=left_pixel.pixel, dtype=np.uint32) - - mapped_pixels = hp.ang2pix( - 2**highest_right_order, - dataframe[args.right_ra_column].values, - dataframe[args.right_dec_column].values, - lonlat=True, - nest=True, - ) - aligned_pixels = regenerated_right_alignment[mapped_pixels] + merged_table = pq.read_table(inputs) + dataframe = merged_table.to_pandas().reset_index() - dataframe["join_Norder"] = [pixel[0] for pixel in aligned_pixels] - dataframe["join_Dir"] = [int(pixel[1] / 10_000) * 10_000 for pixel in aligned_pixels] - dataframe["join_Npix"] = [pixel[1] for pixel in aligned_pixels] + ## One row group per join_Norder/join_Npix - ## TODO - row groups per join_Norder/join_Npix - dataframe.to_parquet(destination_file) + join_pixel_frames = dataframe.groupby(["join_Norder", "join_Npix"], group_keys=True) + join_pixels = [HealpixPixel(pixel[0], pixel[1]) for pixel, _ in join_pixel_frames] + pixel_argsort = get_pixel_argsort(join_pixels) + with pq.ParquetWriter(destination_file, merged_table.schema) as writer: + for pixel_index in pixel_argsort: + join_pixel = join_pixels[pixel_index] + join_pixel_frame = join_pixel_frames.get_group((join_pixel.order, join_pixel.pixel)).reset_index() + writer.write_table(pa.Table.from_pandas(join_pixel_frame, schema=merged_table.schema)) diff --git a/src/hipscat_import/cross_match/run_macauff_import.py b/src/hipscat_import/cross_match/run_macauff_import.py index b7296d19..19c02e9b 100644 --- a/src/hipscat_import/cross_match/run_macauff_import.py +++ b/src/hipscat_import/cross_match/run_macauff_import.py @@ -4,62 +4,59 @@ from hipscat.io import file_io, parquet_metadata, paths, write_metadata from tqdm import tqdm -import hipscat_import.catalog.map_reduce as catalog_mr from hipscat_import.cross_match.macauff_arguments import MacauffArguments -from hipscat_import.cross_match.macauff_map_reduce import reduce_associations +from hipscat_import.cross_match.macauff_map_reduce import reduce_associations, split_associations # pylint: disable=unused-argument -def split_associations(args, left_catalog): +def split(args, left_catalog, right_catalog): """Split association rows by their aligned left pixel.""" left_pixels = left_catalog.partition_info.get_healpix_pixels() - highest_order = left_catalog.partition_info.get_highest_order() + highest_left_order = left_catalog.partition_info.get_highest_order() + highest_right_order = right_catalog.partition_info.get_highest_order() - regenerated_alignment = np.full(hp.order2npix(highest_order), None) + left_pixels = left_catalog.partition_info.get_healpix_pixels() + right_pixels = right_catalog.partition_info.get_healpix_pixels() + regenerated_left_alignment = np.full(hp.order2npix(highest_left_order), None) for pixel in left_pixels: - explosion_factor = 4 ** (highest_order - pixel.order) + explosion_factor = 4 ** (highest_left_order - pixel.order) + exploded_pixels = np.arange( + pixel.pixel * explosion_factor, + (pixel.pixel + 1) * explosion_factor, + ) + for explody in exploded_pixels: + regenerated_left_alignment[explody] = pixel + + regenerated_right_alignment = np.full(hp.order2npix(highest_right_order), None) + for pixel in right_pixels: + explosion_factor = 4 ** (highest_right_order - pixel.order) exploded_pixels = np.arange( pixel.pixel * explosion_factor, (pixel.pixel + 1) * explosion_factor, ) for explody in exploded_pixels: - regenerated_alignment[explody] = (pixel.order, pixel.pixel, 0) + regenerated_right_alignment[explody] = pixel for i, file in enumerate(args.input_paths): - catalog_mr.split_pixels( + split_associations( input_file=file, file_reader=args.file_reader, splitting_key=i, - highest_order=highest_order, - ra_column=args.left_ra_column, - dec_column=args.left_dec_column, - cache_shard_path=args.tmp_path, - resume_path=args.tmp_path, - alignment=regenerated_alignment, - use_hipscat_index=False, + args=args, + highest_left_order=highest_left_order, + highest_right_order=highest_right_order, + left_alignment=regenerated_left_alignment, + right_alignment=regenerated_right_alignment, ) -def reduce(args, left_catalog, right_catalog): +def reduce(args, left_catalog): """Reduce left pixel files into a single parquet file per.""" - highest_right_order = right_catalog.partition_info.get_highest_order() - left_pixels = left_catalog.partition_info.get_healpix_pixels() - right_pixels = right_catalog.partition_info.get_healpix_pixels() - - regenerated_right_alignment = np.full(hp.order2npix(highest_right_order), None) - for pixel in right_pixels: - explosion_factor = 4 ** (highest_right_order - pixel.order) - exploded_pixels = np.arange( - pixel.pixel * explosion_factor, - (pixel.pixel + 1) * explosion_factor, - ) - for explody in exploded_pixels: - regenerated_right_alignment[explody] = (pixel.order, pixel.pixel, 0) for left_pixel in left_pixels: - reduce_associations(args, left_pixel, highest_right_order, regenerated_right_alignment) + reduce_associations(args, left_pixel) def run(args, client): @@ -72,8 +69,8 @@ def run(args, client): left_catalog = Catalog.read_from_hipscat(args.left_catalog_dir) right_catalog = Catalog.read_from_hipscat(args.right_catalog_dir) - split_associations(args, left_catalog) - reduce(args, left_catalog, right_catalog) + split(args, left_catalog, right_catalog) + reduce(args, left_catalog) # All done - write out the metadata with tqdm(total=4, desc="Finishing", disable=not args.progress_bar) as step_progress: From d95048883337af071a1e273e22074b011f3b54ac Mon Sep 17 00:00:00 2001 From: delucchi-cmu Date: Wed, 6 Dec 2023 15:35:45 -0500 Subject: [PATCH 3/6] Add another test --- .../cross_match/macauff_map_reduce.py | 14 ++--- .../cross_match/run_macauff_import.py | 6 +-- .../cross_match/test_macauff_runner.py | 54 +++++++++++++++++-- .../macauff/small_sky_and_source_matches.csv | 34 ++++++++++++ 4 files changed, 90 insertions(+), 18 deletions(-) create mode 100644 tests/hipscat_import/data/macauff/small_sky_and_source_matches.csv diff --git a/src/hipscat_import/cross_match/macauff_map_reduce.py b/src/hipscat_import/cross_match/macauff_map_reduce.py index 2daed416..375a2661 100644 --- a/src/hipscat_import/cross_match/macauff_map_reduce.py +++ b/src/hipscat_import/cross_match/macauff_map_reduce.py @@ -22,17 +22,6 @@ def split_associations( ): """Map a file of links to their healpix pixels and split into shards. - Args: - input_file (FilePointer): file to read for catalog data. - file_reader (hipscat_import.catalog.file_readers.InputReader): instance - of input reader that specifies arguments necessary for reading from the input file. - splitting_key (str): unique counter for this input file, used - when creating intermediate files - highest_order (int): healpix order to use when mapping - ra_column (str): where to find right ascension data in the dataframe - dec_column (str): where to find declation in the dataframe - cache_shard_path (FilePointer): where to write intermediate parquet files. - resume_path (FilePointer): where to write resume files. Raises: ValueError: if the `ra_column` or `dec_column` cannot be found in the input file. @@ -83,6 +72,9 @@ def reduce_associations(args, left_pixel): and aggregate into a single parquet file.""" inputs = _get_pixel_directory(args.tmp_path, left_pixel.order, left_pixel.pixel) + if not file_io.directory_has_contents(inputs): + print(f"Warning: no input data for pixel {left_pixel}") + return destination_dir = paths.pixel_directory(args.catalog_path, left_pixel.order, left_pixel.pixel) file_io.make_directory(destination_dir, exist_ok=True) diff --git a/src/hipscat_import/cross_match/run_macauff_import.py b/src/hipscat_import/cross_match/run_macauff_import.py index 19c02e9b..5f9f97a2 100644 --- a/src/hipscat_import/cross_match/run_macauff_import.py +++ b/src/hipscat_import/cross_match/run_macauff_import.py @@ -25,8 +25,7 @@ def split(args, left_catalog, right_catalog): pixel.pixel * explosion_factor, (pixel.pixel + 1) * explosion_factor, ) - for explody in exploded_pixels: - regenerated_left_alignment[explody] = pixel + regenerated_left_alignment[exploded_pixels] = pixel regenerated_right_alignment = np.full(hp.order2npix(highest_right_order), None) for pixel in right_pixels: @@ -35,8 +34,7 @@ def split(args, left_catalog, right_catalog): pixel.pixel * explosion_factor, (pixel.pixel + 1) * explosion_factor, ) - for explody in exploded_pixels: - regenerated_right_alignment[explody] = pixel + regenerated_right_alignment[exploded_pixels] = pixel for i, file in enumerate(args.input_paths): split_associations( diff --git a/tests/hipscat_import/cross_match/test_macauff_runner.py b/tests/hipscat_import/cross_match/test_macauff_runner.py index 7238f63e..ee1d47d1 100644 --- a/tests/hipscat_import/cross_match/test_macauff_runner.py +++ b/tests/hipscat_import/cross_match/test_macauff_runner.py @@ -25,7 +25,7 @@ def test_bad_args(dask_client): @pytest.mark.dask -def test_no_implementation( +def test_object_to_object( small_sky_object_catalog, tmp_path, macauff_data_dir, @@ -33,8 +33,6 @@ def test_no_implementation( ): """Test that we can create a MacauffArguments instance with two valid catalogs.""" - # os.makedirs(os.path.join(tmp_path, "object_to_object")) - yaml_input_file = os.path.join(macauff_data_dir, "macauff_gaia_catwise_match_and_nonmatches.yaml") from_yaml(yaml_input_file, tmp_path) matches_schema_file = os.path.join(tmp_path, "macauff_GaiaDR3xCatWISE2020_matches.parquet") @@ -72,3 +70,53 @@ def test_no_implementation( assert catalog.catalog_path == args.catalog_path assert len(catalog.get_join_pixels()) == 1 assert catalog.catalog_info.total_rows == 131 + + + +@pytest.mark.dask +def test_source_to_object( + small_sky_object_catalog, + small_sky_source_catalog, + tmp_path, + macauff_data_dir, + dask_client, +): + """Test that we can create a MacauffArguments instance with two valid catalogs.""" + + yaml_input_file = os.path.join(macauff_data_dir, "macauff_gaia_catwise_match_and_nonmatches.yaml") + from_yaml(yaml_input_file, tmp_path) + matches_schema_file = os.path.join(tmp_path, "macauff_GaiaDR3xCatWISE2020_matches.parquet") + single_metadata = file_io.read_parquet_metadata(matches_schema_file) + schema = single_metadata.schema.to_arrow_schema() + + assert len(schema) == 7 + + args = MacauffArguments( + output_path=tmp_path, + output_artifact_name="object_to_object", + tmp_dir=tmp_path, + left_catalog_dir=small_sky_source_catalog, + left_ra_column="gaia_ra", + left_dec_column="gaia_dec", + left_id_column="gaia_source_id", + right_catalog_dir=small_sky_object_catalog, + right_ra_column="catwise_ra", + right_dec_column="catwise_dec", + right_id_column="catwise_name", + input_file_list=[os.path.join(macauff_data_dir, "small_sky_and_source_matches.csv")], + input_format="csv", + overwrite=True, + file_reader=CsvReader(schema_file=matches_schema_file, header=None), + metadata_file_path=matches_schema_file, + progress_bar=False, + ) + os.makedirs(os.path.join(args.tmp_path, "splitting")) + + runner.run(args, dask_client) + + ## Check that the association data can be parsed as a valid association catalog. + catalog = AssociationCatalog.read_from_hipscat(args.catalog_path) + assert catalog.on_disk + assert catalog.catalog_path == args.catalog_path + assert len(catalog.get_join_pixels()) == 8 + assert catalog.catalog_info.total_rows == 34 \ No newline at end of file diff --git a/tests/hipscat_import/data/macauff/small_sky_and_source_matches.csv b/tests/hipscat_import/data/macauff/small_sky_and_source_matches.csv new file mode 100644 index 00000000..146877e2 --- /dev/null +++ b/tests/hipscat_import/data/macauff/small_sky_and_source_matches.csv @@ -0,0 +1,34 @@ +72008,320.8364113,-69.45376863,792,320.5,-69.5,0.996 +73091,320.9404216,-69.46498164,792,320.5,-69.5,0.998 +83813,335.5861031,-69.37807662,826,335.5,-69.5,0.994 +78312,335.5182331,-69.38325891,826,335.5,-69.5,0.991 +76201,288.9361436,-69.31626483,816,288.5,-69.5,0.993 +72926,288.9503144,-69.3115179,816,288.5,-69.5,0.99 +72813,310.5307814,-63.34133051,766,310.5,-63.5,0.994 +70048,310.5876212,-63.33485542,766,310.5,-63.5,0.999 +83424,283.7763878,-61.30283808,788,283.5,-61.5,1 +73626,283.864721,-61.29113052,788,283.5,-61.5,1 +84534,347.9612914,-29.13951069,779,347.5,-29.5,0.998 +87130,347.9655757,-29.1246194,779,347.5,-29.5,0.993 +79615,347.9345496,-29.10876863,779,347.5,-29.5,0.99 +73071,347.9463072,-29.08860161,779,347.5,-29.5,0.992 +78803,347.997414,-29.07112828,779,347.5,-29.5,0.999 +76988,348.0338029,-29.04750582,779,347.5,-29.5,0.999 +83444,348.0537862,-29.02085159,779,347.5,-29.5,0.996 +72480,320.0880522,-35.28432758,756,319.5,-35.5,0.997 +76134,320.0697349,-35.21411381,756,319.5,-35.5,0.99 +75313,319.7793679,-35.45350619,756,319.5,-35.5,0.999 +79351,319.7409873,-35.4177272,756,319.5,-35.5,0.993 +78766,319.8029046,-35.42603476,756,319.5,-35.5,0.992 +74689,319.7981819,-35.41676507,756,319.5,-35.5,0.997 +73928,319.7099797,-35.43311803,756,319.5,-35.5,0.99 +77882,319.689082,-35.43731031,756,319.5,-35.5,0.998 +85015,319.6872701,-35.43434368,756,319.5,-35.5,0.99 +75167,319.7008698,-35.43045134,756,319.5,-35.5,0.996 +75394,319.736227,-35.40559895,756,319.5,-35.5,0.999 +80736,319.7140687,-35.37583874,756,319.5,-35.5,0.99 +86351,290.5372378,-39.34034881,782,290.5,-39.5,0.996 +84773,290.5185662,-39.3174862,782,290.5,-39.5,0.998 +75092,290.5865147,-39.30033282,782,290.5,-39.5,0.992 +78548,290.5404456,-39.31843165,782,290.5,-39.5,0.997 +79186,290.7615303,-39.38550864,782,290.5,-39.5,0.994 \ No newline at end of file From 1c8216e14b23fbe2f96d8718dba0775a26736de9 Mon Sep 17 00:00:00 2001 From: delucchi-cmu Date: Fri, 8 Dec 2023 10:49:02 -0500 Subject: [PATCH 4/6] Add dask, resume, progress, scattering. --- .../cross_match/macauff_arguments.py | 3 + .../cross_match/macauff_map_reduce.py | 27 ++- .../cross_match/macauff_resume_plan.py | 175 ++++++++++++++++++ .../cross_match/run_macauff_import.py | 143 +++++++++----- .../cross_match/test_macauff_runner.py | 5 +- 5 files changed, 297 insertions(+), 56 deletions(-) create mode 100644 src/hipscat_import/cross_match/macauff_resume_plan.py diff --git a/src/hipscat_import/cross_match/macauff_arguments.py b/src/hipscat_import/cross_match/macauff_arguments.py index 24faebc6..26079c01 100644 --- a/src/hipscat_import/cross_match/macauff_arguments.py +++ b/src/hipscat_import/cross_match/macauff_arguments.py @@ -46,6 +46,9 @@ class MacauffArguments(RuntimeArguments): ## `macauff` specific attributes metadata_file_path: str = "" + resume: bool = True + """if there are existing intermediate resume files, should we + read those and continue to create a new catalog where we left off""" file_reader: InputReader | None = None diff --git a/src/hipscat_import/cross_match/macauff_map_reduce.py b/src/hipscat_import/cross_match/macauff_map_reduce.py index 375a2661..35d54d45 100644 --- a/src/hipscat_import/cross_match/macauff_map_reduce.py +++ b/src/hipscat_import/cross_match/macauff_map_reduce.py @@ -14,11 +14,15 @@ def split_associations( input_file, file_reader, splitting_key, - args, highest_left_order, highest_right_order, left_alignment, right_alignment, + left_ra_column, + left_dec_column, + right_ra_column, + right_dec_column, + tmp_path, ): """Map a file of links to their healpix pixels and split into shards. @@ -28,15 +32,15 @@ def split_associations( FileNotFoundError: if the file does not exist, or is a directory """ for chunk_number, data, mapped_left_pixels in _iterate_input_file( - input_file, file_reader, highest_left_order, args.left_ra_column, args.left_dec_column, False + input_file, file_reader, highest_left_order, left_ra_column, left_dec_column, False ): aligned_left_pixels = left_alignment[mapped_left_pixels] unique_pixels, unique_inverse = np.unique(aligned_left_pixels, return_inverse=True) mapped_right_pixels = hp.ang2pix( 2**highest_right_order, - data[args.right_ra_column].values, - data[args.right_dec_column].values, + data[right_ra_column].values, + data[right_dec_column].values, lonlat=True, nest=True, ) @@ -56,7 +60,7 @@ def split_associations( filtered_data = data.filter(items=data_indexes, axis=0) - pixel_dir = _get_pixel_directory(args.tmp_path, pixel.order, pixel.pixel) + pixel_dir = _get_pixel_directory(tmp_path, pixel.order, pixel.pixel) file_io.make_directory(pixel_dir, exist_ok=True) output_file = file_io.append_paths_to_pointer( pixel_dir, f"shard_{splitting_key}_{chunk_number}.parquet" @@ -64,21 +68,22 @@ def split_associations( filtered_data.to_parquet(output_file, index=False) del filtered_data, data_indexes - ResumePlan.splitting_key_done(tmp_path=args.tmp_path, splitting_key=splitting_key) + ResumePlan.splitting_key_done(tmp_path=tmp_path, splitting_key=splitting_key) -def reduce_associations(args, left_pixel): +def reduce_associations(left_pixel, tmp_path, catalog_path): """For all points determined to be in the target left_pixel, map them to the appropriate right_pixel and aggregate into a single parquet file.""" - inputs = _get_pixel_directory(args.tmp_path, left_pixel.order, left_pixel.pixel) + inputs = _get_pixel_directory(tmp_path, left_pixel.order, left_pixel.pixel) if not file_io.directory_has_contents(inputs): + ResumePlan.reducing_key_done(tmp_path=tmp_path, reducing_key=f"{left_pixel.order}_{left_pixel.pixel}") print(f"Warning: no input data for pixel {left_pixel}") return - destination_dir = paths.pixel_directory(args.catalog_path, left_pixel.order, left_pixel.pixel) + destination_dir = paths.pixel_directory(catalog_path, left_pixel.order, left_pixel.pixel) file_io.make_directory(destination_dir, exist_ok=True) - destination_file = paths.pixel_catalog_file(args.catalog_path, left_pixel.order, left_pixel.pixel) + destination_file = paths.pixel_catalog_file(catalog_path, left_pixel.order, left_pixel.pixel) merged_table = pq.read_table(inputs) dataframe = merged_table.to_pandas().reset_index() @@ -93,3 +98,5 @@ def reduce_associations(args, left_pixel): join_pixel = join_pixels[pixel_index] join_pixel_frame = join_pixel_frames.get_group((join_pixel.order, join_pixel.pixel)).reset_index() writer.write_table(pa.Table.from_pandas(join_pixel_frame, schema=merged_table.schema)) + + ResumePlan.reducing_key_done(tmp_path=tmp_path, reducing_key=f"{left_pixel.order}_{left_pixel.pixel}") diff --git a/src/hipscat_import/cross_match/macauff_resume_plan.py b/src/hipscat_import/cross_match/macauff_resume_plan.py new file mode 100644 index 00000000..ce020c8d --- /dev/null +++ b/src/hipscat_import/cross_match/macauff_resume_plan.py @@ -0,0 +1,175 @@ +"""Utility to hold the file-level pipeline execution plan.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import List, Optional, Tuple + +import healpy as hp +import numpy as np +from hipscat import pixel_math +from hipscat.catalog import Catalog +from hipscat.io import FilePointer, file_io +from hipscat.pixel_math.healpix_pixel import HealpixPixel +from hipscat.pixel_tree import PixelAlignment, align_trees +from numpy import frombuffer +from tqdm import tqdm + +from hipscat_import.cross_match.macauff_arguments import MacauffArguments +from hipscat_import.pipeline_resume_plan import PipelineResumePlan + + +@dataclass +class MacauffResumePlan(PipelineResumePlan): + """Container class for holding the state of each file in the pipeline plan.""" + + input_paths: List[FilePointer] = field(default_factory=list) + """resolved list of all files that will be used in the importer""" + split_keys: List[Tuple[str, str]] = field(default_factory=list) + """set of files (and job keys) that have yet to be split""" + reduce_keys: List[Tuple[HealpixPixel, str]] = field(default_factory=list) + """set of left side catalog pixels (and job keys) that have yet to be reduced/combined""" + + SPLITTING_STAGE = "splitting" + REDUCING_STAGE = "reducing" + + ORIGINAL_INPUT_PATHS = "input_paths.txt" + + def __init__(self, args: MacauffArguments, left_pixels): + if not args.tmp_path: # pragma: no cover (not reachable, but required for mypy) + raise ValueError("tmp_path is required") + super().__init__(resume=args.resume, progress_bar=args.progress_bar, tmp_path=args.tmp_path) + self.input_paths = args.input_paths + self.gather_plan(args, left_pixels) + + def gather_plan(self, args, left_pixels): + """Initialize the plan.""" + ## Make sure it's safe to use existing resume state. + super().safe_to_resume() + + ## Validate existing resume state. + ## - if a later stage is complete, the earlier stages should be complete too. + splitting_done = self.is_splitting_done() + reducing_done = self.is_reducing_done() + + if reducing_done and not splitting_done: + raise ValueError("splitting must be complete before reducing") + + ## Validate that we're operating on the same file set as the previous instance. + unique_file_paths = set(self.input_paths) + self.input_paths = list(unique_file_paths) + self.input_paths.sort() + original_input_paths = self.get_original_paths() + if not original_input_paths: + self.save_original_paths() + else: + if original_input_paths != unique_file_paths: + raise ValueError("Different file set from resumed pipeline execution.") + + ## Gather keys for execution. + if not splitting_done: + self.split_keys = self.get_remaining_split_keys() + if not reducing_done: + self.reduce_keys = self.get_reduce_keys(left_pixels) + ## Go ahead and create our directories for storing resume files. + file_io.make_directory( + file_io.append_paths_to_pointer(self.tmp_path, self.SPLITTING_STAGE), + exist_ok=True, + ) + file_io.make_directory( + file_io.append_paths_to_pointer(self.tmp_path, self.REDUCING_STAGE), + exist_ok=True, + ) + + def get_original_paths(self): + """Get all input file paths from the first pipeline attempt.""" + file_path = file_io.append_paths_to_pointer(self.tmp_path, self.ORIGINAL_INPUT_PATHS) + try: + with open(file_path, "r", encoding="utf-8") as file_handle: + contents = file_handle.readlines() + contents = [path.strip() for path in contents] + original_input_paths = set(contents) + return original_input_paths + except FileNotFoundError: + return [] + + def save_original_paths(self): + """Save input file paths from the first pipeline attempt.""" + file_path = file_io.append_paths_to_pointer(self.tmp_path, self.ORIGINAL_INPUT_PATHS) + with open(file_path, "w", encoding="utf-8") as file_handle: + for path in self.input_paths: + file_handle.write(f"{path}\n") + + def get_remaining_split_keys(self): + """Gather remaining keys, dropping successful split tasks from done file names. + + Returns: + list of splitting keys *not* found in files like /resume/path/split_key.done + """ + split_keys = set(self.read_done_keys(self.SPLITTING_STAGE)) + return [ + (f"split_{i}", file_path) + for i, file_path in enumerate(self.input_paths) + if f"split_{i}" not in split_keys + ] + + @classmethod + def splitting_key_done(cls, tmp_path, splitting_key: str): + """Mark a single splitting task as done + + Args: + tmp_path (str): where to write intermediate resume files. + splitting_key (str): unique string for each splitting task (e.g. "split_57") + """ + cls.touch_key_done_file(tmp_path, cls.SPLITTING_STAGE, splitting_key) + + @classmethod + def reducing_key_done(cls, tmp_path, reducing_key: str): + """Mark a single reducing task as done + + Args: + tmp_path (str): where to write intermediate resume files. + reducing_key (str): unique string for each reducing task (e.g. "3_57") + """ + cls.touch_key_done_file(tmp_path, cls.REDUCING_STAGE, reducing_key) + + def wait_for_splitting(self, futures): + """Wait for splitting futures to complete.""" + self.wait_for_futures(futures, self.SPLITTING_STAGE) + remaining_split_items = self.get_remaining_split_keys() + if len(remaining_split_items) > 0: + raise RuntimeError(f"{len(remaining_split_items)} split stages did not complete successfully.") + self.touch_stage_done_file(self.SPLITTING_STAGE) + + def is_splitting_done(self) -> bool: + """Are there files left to split?""" + return self.done_file_exists(self.SPLITTING_STAGE) + + def get_reduce_keys(self, left_pixels): + """Fetch a Tuple for each partition to reduce. + + Tuple contains: + + - left pixel (healpix pixel with both order and pixel) + - reduce key (string of left order+pixel) + + """ + reduced_keys = set(self.read_done_keys(self.REDUCING_STAGE)) + reduce_items = [ + (hp_pixel, f"{hp_pixel.order}_{hp_pixel.pixel}") + for hp_pixel in left_pixels + if f"{hp_pixel.order}_{hp_pixel.pixel}" not in reduced_keys + ] + return reduce_items + + def is_reducing_done(self) -> bool: + """Are there partitions left to reduce?""" + return self.done_file_exists(self.REDUCING_STAGE) + + def wait_for_reducing(self, futures, left_pixels): + """Wait for reducing futures to complete.""" + self.wait_for_futures(futures, self.REDUCING_STAGE) + remaining_reduce_items = self.get_reduce_keys(left_pixels) + if len(remaining_reduce_items) > 0: + raise RuntimeError(f"{len(remaining_reduce_items)} reduce stages did not complete successfully.") + self.touch_stage_done_file(self.REDUCING_STAGE) diff --git a/src/hipscat_import/cross_match/run_macauff_import.py b/src/hipscat_import/cross_match/run_macauff_import.py index 5f9f97a2..ecc503a7 100644 --- a/src/hipscat_import/cross_match/run_macauff_import.py +++ b/src/hipscat_import/cross_match/run_macauff_import.py @@ -6,55 +6,71 @@ from hipscat_import.cross_match.macauff_arguments import MacauffArguments from hipscat_import.cross_match.macauff_map_reduce import reduce_associations, split_associations +from hipscat_import.cross_match.macauff_resume_plan import MacauffResumePlan # pylint: disable=unused-argument -def split(args, left_catalog, right_catalog): +def split( + args, + highest_left_order, + highest_right_order, + left_alignment, + right_alignment, + resume_plan, + client, +): """Split association rows by their aligned left pixel.""" - left_pixels = left_catalog.partition_info.get_healpix_pixels() - highest_left_order = left_catalog.partition_info.get_highest_order() - highest_right_order = right_catalog.partition_info.get_highest_order() - - left_pixels = left_catalog.partition_info.get_healpix_pixels() - right_pixels = right_catalog.partition_info.get_healpix_pixels() - regenerated_left_alignment = np.full(hp.order2npix(highest_left_order), None) - for pixel in left_pixels: - explosion_factor = 4 ** (highest_left_order - pixel.order) - exploded_pixels = np.arange( - pixel.pixel * explosion_factor, - (pixel.pixel + 1) * explosion_factor, - ) - regenerated_left_alignment[exploded_pixels] = pixel - - regenerated_right_alignment = np.full(hp.order2npix(highest_right_order), None) - for pixel in right_pixels: - explosion_factor = 4 ** (highest_right_order - pixel.order) - exploded_pixels = np.arange( - pixel.pixel * explosion_factor, - (pixel.pixel + 1) * explosion_factor, - ) - regenerated_right_alignment[exploded_pixels] = pixel - - for i, file in enumerate(args.input_paths): - split_associations( - input_file=file, - file_reader=args.file_reader, - splitting_key=i, - args=args, - highest_left_order=highest_left_order, - highest_right_order=highest_right_order, - left_alignment=regenerated_left_alignment, - right_alignment=regenerated_right_alignment, + + if resume_plan.is_splitting_done(): + return + + reader_future = client.scatter(args.file_reader) + left_alignment_future = client.scatter(left_alignment) + right_alignment_future = client.scatter(right_alignment) + futures = [] + for key, file_path in resume_plan.split_keys: + futures.append( + client.submit( + split_associations, + input_file=file_path, + file_reader=reader_future, + splitting_key=key, + highest_left_order=highest_left_order, + highest_right_order=highest_right_order, + left_alignment=left_alignment_future, + right_alignment=right_alignment_future, + left_ra_column=args.left_ra_column, + left_dec_column=args.left_dec_column, + right_ra_column=args.right_ra_column, + right_dec_column=args.right_dec_column, + tmp_path=args.tmp_path, + ) ) + resume_plan.wait_for_splitting(futures) -def reduce(args, left_catalog): +def reduce(args, left_pixels, resume_plan, client): """Reduce left pixel files into a single parquet file per.""" - left_pixels = left_catalog.partition_info.get_healpix_pixels() - for left_pixel in left_pixels: - reduce_associations(args, left_pixel) + if resume_plan.is_reducing_done(): + return + + futures = [] + for ( + left_pixel, + pixel_key, + ) in resume_plan.reduce_keys: + futures.append( + client.submit( + reduce_associations, + left_pixel=left_pixel, + tmp_path=args.tmp_path, + catalog_path=args.catalog_path, + ) + ) + + resume_plan.wait_for_reducing(futures, left_pixels) def run(args, client): @@ -64,11 +80,52 @@ def run(args, client): if not isinstance(args, MacauffArguments): raise TypeError("args must be type MacauffArguments") - left_catalog = Catalog.read_from_hipscat(args.left_catalog_dir) - right_catalog = Catalog.read_from_hipscat(args.right_catalog_dir) + ## Lump all of the catalog reading stuff together under a single block, + ## since it can take a while to load large catalogs and some feedback is nice. + with tqdm(total=5, desc="Planning", disable=not args.progress_bar) as step_progress: + left_catalog = Catalog.read_from_hipscat(args.left_catalog_dir) + step_progress.update(1) + right_catalog = Catalog.read_from_hipscat(args.right_catalog_dir) + step_progress.update(1) + highest_left_order = left_catalog.partition_info.get_highest_order() + highest_right_order = right_catalog.partition_info.get_highest_order() + + left_pixels = left_catalog.partition_info.get_healpix_pixels() + right_pixels = right_catalog.partition_info.get_healpix_pixels() + + regenerated_left_alignment = np.full(hp.order2npix(highest_left_order), None) + for pixel in left_pixels: + explosion_factor = 4 ** (highest_left_order - pixel.order) + exploded_pixels = np.arange( + pixel.pixel * explosion_factor, + (pixel.pixel + 1) * explosion_factor, + ) + regenerated_left_alignment[exploded_pixels] = pixel + step_progress.update(1) + + regenerated_right_alignment = np.full(hp.order2npix(highest_right_order), None) + for pixel in right_pixels: + explosion_factor = 4 ** (highest_right_order - pixel.order) + exploded_pixels = np.arange( + pixel.pixel * explosion_factor, + (pixel.pixel + 1) * explosion_factor, + ) + regenerated_right_alignment[exploded_pixels] = pixel + step_progress.update(1) + + resume_plan = MacauffResumePlan(args, left_pixels) + step_progress.update(1) - split(args, left_catalog, right_catalog) - reduce(args, left_catalog) + split( + args, + highest_left_order=highest_left_order, + highest_right_order=highest_right_order, + left_alignment=regenerated_left_alignment, + right_alignment=regenerated_right_alignment, + resume_plan=resume_plan, + client=client, + ) + reduce(args, left_pixels, resume_plan, client) # All done - write out the metadata with tqdm(total=4, desc="Finishing", disable=not args.progress_bar) as step_progress: diff --git a/tests/hipscat_import/cross_match/test_macauff_runner.py b/tests/hipscat_import/cross_match/test_macauff_runner.py index ee1d47d1..f6839175 100644 --- a/tests/hipscat_import/cross_match/test_macauff_runner.py +++ b/tests/hipscat_import/cross_match/test_macauff_runner.py @@ -58,7 +58,7 @@ def test_object_to_object( overwrite=True, file_reader=CsvReader(schema_file=matches_schema_file, header=None), metadata_file_path=matches_schema_file, - progress_bar=False, + # progress_bar=False, ) os.makedirs(os.path.join(args.tmp_path, "splitting")) @@ -72,7 +72,6 @@ def test_object_to_object( assert catalog.catalog_info.total_rows == 131 - @pytest.mark.dask def test_source_to_object( small_sky_object_catalog, @@ -119,4 +118,4 @@ def test_source_to_object( assert catalog.on_disk assert catalog.catalog_path == args.catalog_path assert len(catalog.get_join_pixels()) == 8 - assert catalog.catalog_info.total_rows == 34 \ No newline at end of file + assert catalog.catalog_info.total_rows == 34 From 7eadbc5fc5a15b5f2f78b06a4662604fc4101e61 Mon Sep 17 00:00:00 2001 From: delucchi-cmu Date: Thu, 14 Dec 2023 10:00:06 -0500 Subject: [PATCH 5/6] Address pylint findings --- .../cross_match/macauff_map_reduce.py | 7 ++++--- .../cross_match/macauff_resume_plan.py | 14 ++++---------- .../cross_match/run_macauff_import.py | 1 + 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/src/hipscat_import/cross_match/macauff_map_reduce.py b/src/hipscat_import/cross_match/macauff_map_reduce.py index 35d54d45..a83b7c07 100644 --- a/src/hipscat_import/cross_match/macauff_map_reduce.py +++ b/src/hipscat_import/cross_match/macauff_map_reduce.py @@ -9,6 +9,7 @@ from hipscat_import.catalog.map_reduce import _get_pixel_directory, _iterate_input_file from hipscat_import.catalog.resume_plan import ResumePlan +# pylint: disable=too-many-arguments,too-many-locals def split_associations( input_file, @@ -66,12 +67,12 @@ def split_associations( pixel_dir, f"shard_{splitting_key}_{chunk_number}.parquet" ) filtered_data.to_parquet(output_file, index=False) - del filtered_data, data_indexes + del data, filtered_data, data_indexes ResumePlan.splitting_key_done(tmp_path=tmp_path, splitting_key=splitting_key) -def reduce_associations(left_pixel, tmp_path, catalog_path): +def reduce_associations(left_pixel, tmp_path, catalog_path, reduce_key): """For all points determined to be in the target left_pixel, map them to the appropriate right_pixel and aggregate into a single parquet file.""" inputs = _get_pixel_directory(tmp_path, left_pixel.order, left_pixel.pixel) @@ -99,4 +100,4 @@ def reduce_associations(left_pixel, tmp_path, catalog_path): join_pixel_frame = join_pixel_frames.get_group((join_pixel.order, join_pixel.pixel)).reset_index() writer.write_table(pa.Table.from_pandas(join_pixel_frame, schema=merged_table.schema)) - ResumePlan.reducing_key_done(tmp_path=tmp_path, reducing_key=f"{left_pixel.order}_{left_pixel.pixel}") + ResumePlan.reducing_key_done(tmp_path=tmp_path, reducing_key=reduce_key) diff --git a/src/hipscat_import/cross_match/macauff_resume_plan.py b/src/hipscat_import/cross_match/macauff_resume_plan.py index ce020c8d..cf481cbd 100644 --- a/src/hipscat_import/cross_match/macauff_resume_plan.py +++ b/src/hipscat_import/cross_match/macauff_resume_plan.py @@ -3,21 +3,15 @@ from __future__ import annotations from dataclasses import dataclass, field -from typing import List, Optional, Tuple +from typing import List, Tuple -import healpy as hp -import numpy as np -from hipscat import pixel_math -from hipscat.catalog import Catalog from hipscat.io import FilePointer, file_io from hipscat.pixel_math.healpix_pixel import HealpixPixel -from hipscat.pixel_tree import PixelAlignment, align_trees -from numpy import frombuffer -from tqdm import tqdm from hipscat_import.cross_match.macauff_arguments import MacauffArguments from hipscat_import.pipeline_resume_plan import PipelineResumePlan +# pylint: disable=duplicate-code @dataclass class MacauffResumePlan(PipelineResumePlan): @@ -40,9 +34,9 @@ def __init__(self, args: MacauffArguments, left_pixels): raise ValueError("tmp_path is required") super().__init__(resume=args.resume, progress_bar=args.progress_bar, tmp_path=args.tmp_path) self.input_paths = args.input_paths - self.gather_plan(args, left_pixels) + self.gather_plan(left_pixels) - def gather_plan(self, args, left_pixels): + def gather_plan(self, left_pixels): """Initialize the plan.""" ## Make sure it's safe to use existing resume state. super().safe_to_resume() diff --git a/src/hipscat_import/cross_match/run_macauff_import.py b/src/hipscat_import/cross_match/run_macauff_import.py index ecc503a7..35424058 100644 --- a/src/hipscat_import/cross_match/run_macauff_import.py +++ b/src/hipscat_import/cross_match/run_macauff_import.py @@ -67,6 +67,7 @@ def reduce(args, left_pixels, resume_plan, client): left_pixel=left_pixel, tmp_path=args.tmp_path, catalog_path=args.catalog_path, + reduce_key=pixel_key, ) ) From ff39e53364cdfcc396dbd27b1c2649772c541a62 Mon Sep 17 00:00:00 2001 From: delucchi-cmu Date: Thu, 14 Dec 2023 14:53:58 -0500 Subject: [PATCH 6/6] Improve code coverage --- src/hipscat_import/catalog/resume_plan.py | 31 +-------------- .../cross_match/macauff_map_reduce.py | 11 ++++-- .../cross_match/macauff_resume_plan.py | 32 +--------------- src/hipscat_import/pipeline_resume_plan.py | 38 +++++++++++++++++++ 4 files changed, 48 insertions(+), 64 deletions(-) diff --git a/src/hipscat_import/catalog/resume_plan.py b/src/hipscat_import/catalog/resume_plan.py index e73d6a4f..c554f32b 100644 --- a/src/hipscat_import/catalog/resume_plan.py +++ b/src/hipscat_import/catalog/resume_plan.py @@ -32,8 +32,6 @@ class ResumePlan(PipelineResumePlan): SPLITTING_STAGE = "splitting" REDUCING_STAGE = "reducing" - ORIGINAL_INPUT_PATHS = "input_paths.txt" - HISTOGRAM_BINARY_FILE = "mapping_histogram.binary" HISTOGRAMS_DIR = "histograms" @@ -63,15 +61,7 @@ def gather_plan(self): step_progress.update(1) ## Validate that we're operating on the same file set as the previous instance. - unique_file_paths = set(self.input_paths) - self.input_paths = list(unique_file_paths) - self.input_paths.sort() - original_input_paths = self.get_original_paths() - if not original_input_paths: - self.save_original_paths() - else: - if original_input_paths != unique_file_paths: - raise ValueError("Different file set from resumed pipeline execution.") + self.input_paths = self.check_original_input_paths(self.input_paths) step_progress.update(1) ## Gather keys for execution. @@ -97,25 +87,6 @@ def gather_plan(self): ) step_progress.update(1) - def get_original_paths(self): - """Get all input file paths from the first pipeline attempt.""" - file_path = file_io.append_paths_to_pointer(self.tmp_path, self.ORIGINAL_INPUT_PATHS) - try: - with open(file_path, "r", encoding="utf-8") as file_handle: - contents = file_handle.readlines() - contents = [path.strip() for path in contents] - original_input_paths = set(contents) - return original_input_paths - except FileNotFoundError: - return [] - - def save_original_paths(self): - """Save input file paths from the first pipeline attempt.""" - file_path = file_io.append_paths_to_pointer(self.tmp_path, self.ORIGINAL_INPUT_PATHS) - with open(file_path, "w", encoding="utf-8") as file_handle: - for path in self.input_paths: - file_handle.write(f"{path}\n") - def get_remaining_map_keys(self): """Gather remaining keys, dropping successful mapping tasks from histogram names. diff --git a/src/hipscat_import/cross_match/macauff_map_reduce.py b/src/hipscat_import/cross_match/macauff_map_reduce.py index a83b7c07..27a4b157 100644 --- a/src/hipscat_import/cross_match/macauff_map_reduce.py +++ b/src/hipscat_import/cross_match/macauff_map_reduce.py @@ -7,10 +7,11 @@ from hipscat.pixel_math.healpix_pixel_function import get_pixel_argsort from hipscat_import.catalog.map_reduce import _get_pixel_directory, _iterate_input_file -from hipscat_import.catalog.resume_plan import ResumePlan +from hipscat_import.cross_match.macauff_resume_plan import MacauffResumePlan # pylint: disable=too-many-arguments,too-many-locals + def split_associations( input_file, file_reader, @@ -69,7 +70,7 @@ def split_associations( filtered_data.to_parquet(output_file, index=False) del data, filtered_data, data_indexes - ResumePlan.splitting_key_done(tmp_path=tmp_path, splitting_key=splitting_key) + MacauffResumePlan.splitting_key_done(tmp_path=tmp_path, splitting_key=splitting_key) def reduce_associations(left_pixel, tmp_path, catalog_path, reduce_key): @@ -78,7 +79,9 @@ def reduce_associations(left_pixel, tmp_path, catalog_path, reduce_key): inputs = _get_pixel_directory(tmp_path, left_pixel.order, left_pixel.pixel) if not file_io.directory_has_contents(inputs): - ResumePlan.reducing_key_done(tmp_path=tmp_path, reducing_key=f"{left_pixel.order}_{left_pixel.pixel}") + MacauffResumePlan.reducing_key_done( + tmp_path=tmp_path, reducing_key=f"{left_pixel.order}_{left_pixel.pixel}" + ) print(f"Warning: no input data for pixel {left_pixel}") return destination_dir = paths.pixel_directory(catalog_path, left_pixel.order, left_pixel.pixel) @@ -100,4 +103,4 @@ def reduce_associations(left_pixel, tmp_path, catalog_path, reduce_key): join_pixel_frame = join_pixel_frames.get_group((join_pixel.order, join_pixel.pixel)).reset_index() writer.write_table(pa.Table.from_pandas(join_pixel_frame, schema=merged_table.schema)) - ResumePlan.reducing_key_done(tmp_path=tmp_path, reducing_key=reduce_key) + MacauffResumePlan.reducing_key_done(tmp_path=tmp_path, reducing_key=reduce_key) diff --git a/src/hipscat_import/cross_match/macauff_resume_plan.py b/src/hipscat_import/cross_match/macauff_resume_plan.py index cf481cbd..dfc3d871 100644 --- a/src/hipscat_import/cross_match/macauff_resume_plan.py +++ b/src/hipscat_import/cross_match/macauff_resume_plan.py @@ -13,6 +13,7 @@ # pylint: disable=duplicate-code + @dataclass class MacauffResumePlan(PipelineResumePlan): """Container class for holding the state of each file in the pipeline plan.""" @@ -27,8 +28,6 @@ class MacauffResumePlan(PipelineResumePlan): SPLITTING_STAGE = "splitting" REDUCING_STAGE = "reducing" - ORIGINAL_INPUT_PATHS = "input_paths.txt" - def __init__(self, args: MacauffArguments, left_pixels): if not args.tmp_path: # pragma: no cover (not reachable, but required for mypy) raise ValueError("tmp_path is required") @@ -50,15 +49,7 @@ def gather_plan(self, left_pixels): raise ValueError("splitting must be complete before reducing") ## Validate that we're operating on the same file set as the previous instance. - unique_file_paths = set(self.input_paths) - self.input_paths = list(unique_file_paths) - self.input_paths.sort() - original_input_paths = self.get_original_paths() - if not original_input_paths: - self.save_original_paths() - else: - if original_input_paths != unique_file_paths: - raise ValueError("Different file set from resumed pipeline execution.") + self.input_paths = self.check_original_input_paths(self.input_paths) ## Gather keys for execution. if not splitting_done: @@ -75,25 +66,6 @@ def gather_plan(self, left_pixels): exist_ok=True, ) - def get_original_paths(self): - """Get all input file paths from the first pipeline attempt.""" - file_path = file_io.append_paths_to_pointer(self.tmp_path, self.ORIGINAL_INPUT_PATHS) - try: - with open(file_path, "r", encoding="utf-8") as file_handle: - contents = file_handle.readlines() - contents = [path.strip() for path in contents] - original_input_paths = set(contents) - return original_input_paths - except FileNotFoundError: - return [] - - def save_original_paths(self): - """Save input file paths from the first pipeline attempt.""" - file_path = file_io.append_paths_to_pointer(self.tmp_path, self.ORIGINAL_INPUT_PATHS) - with open(file_path, "w", encoding="utf-8") as file_handle: - for path in self.input_paths: - file_handle.write(f"{path}\n") - def get_remaining_split_keys(self): """Gather remaining keys, dropping successful split tasks from done file names. diff --git a/src/hipscat_import/pipeline_resume_plan.py b/src/hipscat_import/pipeline_resume_plan.py index 38d727f7..91759f7c 100644 --- a/src/hipscat_import/pipeline_resume_plan.py +++ b/src/hipscat_import/pipeline_resume_plan.py @@ -24,6 +24,8 @@ class PipelineResumePlan: """if true, a tqdm progress bar will be displayed for user feedback of planning progress""" + ORIGINAL_INPUT_PATHS = "input_paths.txt" + def safe_to_resume(self): """Check that we are ok to resume an in-progress pipeline, if one exists. @@ -148,3 +150,39 @@ def get_formatted_stage_name(stage_name) -> str: stage_name = "progress" return f"{stage_name.capitalize(): <10}" + + def check_original_input_paths(self, input_paths): + """Validate that we're operating on the same file set as the original pipeline, + or save the inputs as the originals if not found. + + Args: + input_paths (list[str]): input paths that will be processed by a pipeline. + + Raises: + ValueError if the retrieved file set differs from `input_paths`. + """ + unique_file_paths = set(input_paths) + + original_input_paths = [] + + file_path = file_io.append_paths_to_pointer(self.tmp_path, self.ORIGINAL_INPUT_PATHS) + try: + with open(file_path, "r", encoding="utf-8") as file_handle: + contents = file_handle.readlines() + contents = [path.strip() for path in contents] + original_input_paths = set(contents) + except FileNotFoundError: + pass + + if len(original_input_paths) == 0: + file_path = file_io.append_paths_to_pointer(self.tmp_path, self.ORIGINAL_INPUT_PATHS) + with open(file_path, "w", encoding="utf-8") as file_handle: + for path in input_paths: + file_handle.write(f"{path}\n") + else: + if original_input_paths != unique_file_paths: + raise ValueError("Different file set from resumed pipeline execution.") + + input_paths = list(unique_file_paths) + input_paths.sort() + return input_paths