From 8be7886d00c81d20d3f64feeddc0eec021081885 Mon Sep 17 00:00:00 2001 From: delucchi-cmu Date: Wed, 1 Nov 2023 11:12:28 -0400 Subject: [PATCH 1/4] Use existing hipscat_index for mapping pixel --- src/hipscat_import/catalog/arguments.py | 3 + src/hipscat_import/catalog/map_reduce.py | 47 ++++++++++----- src/hipscat_import/catalog/run_import.py | 3 + src/hipscat_import/index/map_reduce.py | 3 +- .../catalog/test_run_round_trip.py | 56 ++++++++++++++++++ .../data/test_formats/hipscat_index.parquet | Bin 0 -> 4034 bytes 6 files changed, 95 insertions(+), 17 deletions(-) create mode 100644 tests/hipscat_import/data/test_formats/hipscat_index.parquet diff --git a/src/hipscat_import/catalog/arguments.py b/src/hipscat_import/catalog/arguments.py index 6dda8ddd..31651255 100644 --- a/src/hipscat_import/catalog/arguments.py +++ b/src/hipscat_import/catalog/arguments.py @@ -40,6 +40,8 @@ class ImportArguments(RuntimeArguments): """column for right ascension""" dec_column: str = "dec" """column for declination""" + use_hipscat_index: bool = False + """use an existing hipscat spatial index as the position, instead of ra/dec""" id_column: str = "id" """column for survey identifier, or other sortable column""" add_hipscat_index: bool = True @@ -140,6 +142,7 @@ def additional_runtime_provenance_info(self) -> dict: "input_file_list": self.input_file_list, "ra_column": self.ra_column, "dec_column": self.dec_column, + "use_hipscat_index": self.use_hipscat_index, "id_column": self.id_column, "constant_healpix_order": self.constant_healpix_order, "highest_healpix_order": self.highest_healpix_order, diff --git a/src/hipscat_import/catalog/map_reduce.py b/src/hipscat_import/catalog/map_reduce.py index 8ba4d8a3..38d99b1b 100644 --- a/src/hipscat_import/catalog/map_reduce.py +++ b/src/hipscat_import/catalog/map_reduce.py @@ -6,6 +6,7 @@ import pyarrow.parquet as pq from hipscat import pixel_math from hipscat.io import FilePointer, file_io, paths +from hipscat.pixel_math.hipscat_id import HIPSCAT_ID_COLUMN, hipscat_id_to_healpix from hipscat_import.catalog.file_readers import InputReader from hipscat_import.catalog.resume_plan import ResumePlan @@ -53,6 +54,7 @@ def _iterate_input_file( highest_order, ra_column, dec_column, + use_hipscat_index = False, ): """Helper function to handle input file reading and healpix pixel calculation""" if not file_reader: @@ -61,18 +63,28 @@ def _iterate_input_file( required_columns = [ra_column, dec_column] for chunk_number, data in enumerate(file_reader.read(input_file)): - if not all(x in data.columns for x in required_columns): - raise ValueError( - f"Invalid column names in input file: {ra_column}, {dec_column} not in {input_file}" + if use_hipscat_index: + if data.index.name == HIPSCAT_ID_COLUMN: + mapped_pixels = hipscat_id_to_healpix(data.index, target_order=highest_order) + elif HIPSCAT_ID_COLUMN in data.columns: + mapped_pixels = hipscat_id_to_healpix(data[HIPSCAT_ID_COLUMN], target_order=highest_order) + else: + raise ValueError( + f"Invalid column names in input file: {HIPSCAT_ID_COLUMN} not in {input_file}" + ) + else: + if not all(x in data.columns for x in required_columns): + raise ValueError( + f"Invalid column names in input file: {', '.join(required_columns)} not in {input_file}" + ) + # Set up the pixel data + mapped_pixels = hp.ang2pix( + 2**highest_order, + data[ra_column].values, + data[dec_column].values, + lonlat=True, + nest=True, ) - # Set up the pixel data - mapped_pixels = hp.ang2pix( - 2**highest_order, - data[ra_column].values, - data[dec_column].values, - lonlat=True, - nest=True, - ) yield chunk_number, data, mapped_pixels @@ -84,6 +96,7 @@ def map_to_pixels( highest_order, ra_column, dec_column, + use_hipscat_index = False ): """Map a file of input objects to their healpix pixels. @@ -107,7 +120,7 @@ def map_to_pixels( """ histo = pixel_math.empty_histogram(highest_order) for _, _, mapped_pixels in _iterate_input_file( - input_file, file_reader, highest_order, ra_column, dec_column + input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index ): mapped_pixel, count_at_pixel = np.unique(mapped_pixels, return_counts=True) histo[mapped_pixel] += count_at_pixel.astype(np.int64) @@ -124,6 +137,7 @@ def split_pixels( cache_shard_path: FilePointer, resume_path: FilePointer, alignment=None, + use_hipscat_index = False, ): """Map a file of input objects to their healpix pixels and split into shards. @@ -144,7 +158,7 @@ def split_pixels( FileNotFoundError: if the file does not exist, or is a directory """ for chunk_number, data, mapped_pixels in _iterate_input_file( - input_file, file_reader, highest_order, ra_column, dec_column + input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index ): aligned_pixels = alignment[mapped_pixels] unique_pixels, unique_inverse = np.unique(aligned_pixels, return_inverse=True) @@ -180,6 +194,7 @@ def reduce_pixel_shards( ra_column, dec_column, id_column, + use_hipscat_index = False, add_hipscat_index=True, delete_input_files=True, use_schema_file="", @@ -259,8 +274,8 @@ def reduce_pixel_shards( dataframe = merged_table.to_pandas() if id_column: dataframe = dataframe.sort_values(id_column) - if add_hipscat_index: - dataframe["_hipscat_index"] = pixel_math.compute_hipscat_id( + if add_hipscat_index and not use_hipscat_index: + dataframe[HIPSCAT_ID_COLUMN] = pixel_math.compute_hipscat_id( dataframe[ra_column].values, dataframe[dec_column].values, ) @@ -277,7 +292,7 @@ def reduce_pixel_shards( ## If we had a meaningful index before, preserve it as a column. if _has_named_index(dataframe): dataframe = dataframe.reset_index() - dataframe = dataframe.set_index("_hipscat_index").sort_index() + dataframe = dataframe.set_index(HIPSCAT_ID_COLUMN).sort_index() dataframe.to_parquet(destination_file) del dataframe, merged_table, tables diff --git a/src/hipscat_import/catalog/run_import.py b/src/hipscat_import/catalog/run_import.py index 218bd539..36a7476c 100644 --- a/src/hipscat_import/catalog/run_import.py +++ b/src/hipscat_import/catalog/run_import.py @@ -35,6 +35,7 @@ def _map_pixels(args, client): highest_order=args.mapping_healpix_order, ra_column=args.ra_column, dec_column=args.dec_column, + use_hipscat_index=args.use_hipscat_index, ) ) args.resume_plan.wait_for_mapping(futures) @@ -62,6 +63,7 @@ def _split_pixels(args, alignment_future, client): cache_shard_path=args.tmp_path, resume_path=args.resume_plan.tmp_path, alignment=alignment_future, + use_hipscat_index=args.use_hipscat_index, ) ) @@ -96,6 +98,7 @@ def _reduce_pixels(args, destination_pixel_map, client): id_column=args.id_column, add_hipscat_index=args.add_hipscat_index, use_schema_file=args.use_schema_file, + use_hipscat_index=args.use_hipscat_index, ) ) diff --git a/src/hipscat_import/index/map_reduce.py b/src/hipscat_import/index/map_reduce.py index a7cb5399..e446a2a9 100644 --- a/src/hipscat_import/index/map_reduce.py +++ b/src/hipscat_import/index/map_reduce.py @@ -4,6 +4,7 @@ import numpy as np from dask.distributed import progress, wait from hipscat.io import file_io +from hipscat.pixel_math.hipscat_id import HIPSCAT_ID_COLUMN def create_index(args): @@ -31,7 +32,7 @@ def create_index(args): data["Npix"] = data["Npix"].astype(np.int32) data = data.reset_index() if not args.include_hipscat_index: - data = data.drop(columns=["_hipscat_index"]) + data = data.drop(columns=[HIPSCAT_ID_COLUMN]) data = data.repartition(partition_size=args.compute_partition_size) data = data.set_index(args.indexing_column) result = data.to_parquet( diff --git a/tests/hipscat_import/catalog/test_run_round_trip.py b/tests/hipscat_import/catalog/test_run_round_trip.py index 28ac954b..479b8bb4 100644 --- a/tests/hipscat_import/catalog/test_run_round_trip.py +++ b/tests/hipscat_import/catalog/test_run_round_trip.py @@ -353,3 +353,59 @@ def read(self, input_file): expected_ids = [*range(700, 831)] assert_parquet_file_ids(output_file, "id", expected_ids) + + +@pytest.mark.dask +def test_import_hipscat_index( + dask_client, + formats_dir, + assert_parquet_file_ids, + tmp_path, +): + """Test basic execution, using a previously-computed _hipscat_index column for spatial partitioning.""" + ## First, let's just check the assumptions we have about our input file: + ## - should have _hipscat_index as the indexed column + ## - should NOT have any columns like "ra" or "dec" + input_file = os.path.join(formats_dir, "hipscat_index.parquet") + + expected_ids = [*range(700, 831)] + assert_parquet_file_ids(input_file, "id", expected_ids) + + data_frame = pd.read_parquet(input_file, engine="pyarrow") + assert data_frame.index.name == "_hipscat_index" + npt.assert_array_equal(data_frame.columns, ["id"]) + + args = ImportArguments( + output_catalog_name="using_hipscat_index", + input_file_list=[input_file], + input_format="parquet", + output_path=tmp_path, + dask_tmp=tmp_path, + use_hipscat_index=True, + add_hipscat_index=False, + highest_healpix_order=2, + pixel_threshold=3_000, + progress_bar=False, + id_column="_hipscat_index", + ) + + runner.run(args, dask_client) + + # Check that the catalog metadata file exists + catalog = Catalog.read_from_hipscat(args.catalog_path) + assert catalog.on_disk + assert catalog.catalog_path == args.catalog_path + assert catalog.catalog_info.total_rows == 131 + assert len(catalog.get_healpix_pixels()) == 1 + + # Check that the catalog parquet file exists and contains correct object IDs + output_file = os.path.join(args.catalog_path, "Norder=0", "Dir=0", "Npix=11.parquet") + + expected_ids = [*range(700, 831)] + assert_parquet_file_ids(output_file, "id", expected_ids) + data_frame = pd.read_parquet(output_file, engine="pyarrow") + assert data_frame.index.name == "_hipscat_index" + npt.assert_array_equal( + data_frame.columns, + ["id", "Norder", "Dir", "Npix"], + ) diff --git a/tests/hipscat_import/data/test_formats/hipscat_index.parquet b/tests/hipscat_import/data/test_formats/hipscat_index.parquet new file mode 100644 index 0000000000000000000000000000000000000000..44bdf66349bd1e7195e55f1c0ac1ea0be0dc2c47 GIT binary patch literal 4034 zcmd5Tn<`oJSas^ZrMg5t_`;<%d;sgtWdW8m4qxd}PTfUGQ z$B(3L071Z6;3D*3=sfU0!NZ~b2wZ|MhQAl~0N@-j2p9vs75Wiy5|{%213VJ3zXMX> zF>ng{T_BvFK!t+uBWED=OxWjP4~0DuwgiX^rc(2fhG) z0S*9qa2I4fq#)F2=XR9t`w`KLJPsMguMA#Rc#X*}DgSe|g9VgeHCNEYO(EY(q##5P5-;RU3<@L(dAmB&VyWcksL1*w zr)??8UdZL}hEW`DP;jr1P+nN?KH>a`zLEW+qGJTH{e^J@2F4E>JS1VLXxQ)(iAl*X zj2xAcI(m#)B27!r$Q+xMJuYYbgo(L%v`n5qNuexIsb4HCDlVB^S~g|sv~o>FrFQy^ znZK)=rDOC4V|9(mTsxb!SZ($>bq=Si-reA7ocq$e-_Q5Hyue?0;-c_)L2z6K6%rXI z}dffM^<28$-yf3}<_~vGC zpWVejfqjFS#)q3Zd|nnsT?oeu0C-3xEG~ zij%`j;ZH5`K+Rj_aIJ(Yxu;vl;h8e`wqArvHBOvxmyE6dyeNAMvO-%v=*26f7f-2% z=L`C%o+YY@=XPW4oj048g7&Tl5`h>Gjg%9A4Y>#iN&;B4enfUz`S0*1Oxz zfW2dHJi0{YShM)lBjQJutnIxOzG}EE2JBlAyJZI9_d_nvB>Eb8j*)QFDPIlPdt$Np z4;&%8BwyJ?(kx*oH-V}7Czfm{X~(YS?!+#W+K+!rvafvj5Jk03fAiMkVcU>){ocrQ za2{uS#HV1dVEGV_};AAw(4m%9y|x2S2^$6#;Guh({hsm!Jg2MBYT z1|B8)^5v=5Il}gIPJ^q3(~Q8*Wv_>Uz1LVhk5w# z65Vm~`ev~Az$F=HJM=rbxj}?|4}-$N)NPwKi|_^c$#LMkLv)6QuCqyzJ&1%Sn8|s*taU z-#WPUB;h?V)*E1-U2*Xy;gPGF`|ZH^t96Z0U~gVlQX=u&b5nB&+xM@YK-esrC@4PG_xA)YZ#rmp6IgmI|RF+ zw}+a9gzxuxdpOu9UD-H-_*t6XNyKkGU>pO^o3>(*l=!EEXQvbX)8$`sz~0Pbu3X}K zKU*#*oHhK*w>e^Ql|=O}anFCa^}SuXNZxx}H{t9|Uzp#_;faG@S>!`7Prc~%m_5(A zLNjq)c7}zpIxmO+)9dnwgeTW!M3u>Gcj$G_DznvKYzSPPN1|g_M<-qxlz5OI{~z}D zyn8Zkd9ap+A(i%t-Xi*p2*T*m>U$3i3G3pYmRMd8Kau8xb3OZjat@x)j{F+SGE$$}bq^`K!O~%8=cHYb`9xD9frDi^8d$f;?r0 zgi*T5JRP+ZR~rggw?8IVOBf4_`O8?9*dQms9DBRT(4E4UQ&}>U0yz^(pe%`XIGbZtG$urYtkI%uDaxEyH>6u zby;+pa)(xt?W)vBSdxbx&GxTnm6;eznMp6n78@&MoiUdozp|sAXX{0e#KpCxX3`5r zQ!cL5l$i|5NnJgvp5i8TKUwQ}efRf5Yu9QTW>-i%drX@vq$az*ptu(M?|2tA{&n<9 z1>UKm@#(%9H0*z^vwOUIUm0t;gDH?0$Q~-w)GXGk*PGGb0`yPGn)Mci$4K5S-l*Gkzahsolgldp3@h99Onz?p_H7n&O-sGlq{m8X+51! zvM^cTP{A%$`A=2>(YuZ+Ofp`{&c?6*ykR_2{O1i4nX&7cKnWA Date: Thu, 2 Nov 2023 21:53:22 -0400 Subject: [PATCH 2/4] Update required version. --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 5067110c..3cb6be0d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,7 +19,7 @@ dependencies = [ "dask[distributed]", "deprecated", "healpy", - "hipscat >= 0.1.2", + "hipscat >= 0.1.5", "ipykernel", # Support for Jupyter notebooks "pandas < 2.1.0", "pyarrow", From b4cfe4eb6c6cd3c8bab824dc081c85e5dbab914a Mon Sep 17 00:00:00 2001 From: Melissa DeLucchi <113376043+delucchi-cmu@users.noreply.github.com> Date: Mon, 6 Nov 2023 08:08:05 -0500 Subject: [PATCH 3/4] Merge recent changes (#163) * Set lower default import order. (#153) * Macauff pipeline boilerplate (#152) * checkpoint * checkpoint * testing for MacauffArguments * create boilerplate for macauff runner + tests * remove commented out test * add __future__.annotations * linter problems * add more tests for missing coverage * refactor MacauffArguments required parameter tests * address more comments from pr #152 * add dask pytest mark + black formatting * Add a partition_info on SOAP results. (#157) * Initial commit of metadata file conversion (#156) * Initial commit of metadata file conversion * Move shared logic for locating input files. * Explicitly depend on pyyaml --------- Co-authored-by: Max West <110124344+maxwest-uw@users.noreply.github.com> --- pyproject.toml | 2 + src/hipscat_import/catalog/arguments.py | 15 +- src/hipscat_import/cross_match/__init__.py | 0 .../cross_match/macauff_arguments.py | 121 ++++++++++ .../cross_match/macauff_metadata.py | 102 ++++++++ .../cross_match/run_macauff_import.py | 13 ++ src/hipscat_import/pipeline.py | 4 + src/hipscat_import/runtime_arguments.py | 24 ++ src/hipscat_import/soap/map_reduce.py | 7 + tests/hipscat_import/conftest.py | 10 + .../cross_match/test_macauff_arguments.py | 219 ++++++++++++++++++ .../cross_match/test_macauff_metadata.py | 101 ++++++++ .../cross_match/test_macauff_runner.py | 49 ++++ .../macauff/macauff_gaia_catwise_match.xml | 68 ++++++ ...uff_gaia_catwise_match_and_nonmatches.yaml | 81 +++++++ .../data/test_formats/macauff_metadata.yaml | 3 + 16 files changed, 808 insertions(+), 11 deletions(-) create mode 100644 src/hipscat_import/cross_match/__init__.py create mode 100644 src/hipscat_import/cross_match/macauff_arguments.py create mode 100644 src/hipscat_import/cross_match/macauff_metadata.py create mode 100644 src/hipscat_import/cross_match/run_macauff_import.py create mode 100644 tests/hipscat_import/cross_match/test_macauff_arguments.py create mode 100644 tests/hipscat_import/cross_match/test_macauff_metadata.py create mode 100644 tests/hipscat_import/cross_match/test_macauff_runner.py create mode 100644 tests/hipscat_import/data/macauff/macauff_gaia_catwise_match.xml create mode 100644 tests/hipscat_import/data/macauff/macauff_gaia_catwise_match_and_nonmatches.yaml create mode 100644 tests/hipscat_import/data/test_formats/macauff_metadata.yaml diff --git a/pyproject.toml b/pyproject.toml index 3cb6be0d..058e151f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,7 @@ dependencies = [ "ipykernel", # Support for Jupyter notebooks "pandas < 2.1.0", "pyarrow", + "pyyaml", "tqdm", "numpy < 1.25", "fsspec <= 2023.9.2", # Remove when pyarrow updates to reflect api changes @@ -50,6 +51,7 @@ dev = [ "ipython", # Also used in building notebooks into Sphinx "matplotlib", # Used in sample notebook intro_notebook.ipynb "ray", # Used for dask-on-ray testing. + "types-PyYAML", # type stubs for pyyaml ] [build-system] diff --git a/src/hipscat_import/catalog/arguments.py b/src/hipscat_import/catalog/arguments.py index 31651255..81703bb3 100644 --- a/src/hipscat_import/catalog/arguments.py +++ b/src/hipscat_import/catalog/arguments.py @@ -6,12 +6,12 @@ from typing import List from hipscat.catalog.catalog import CatalogInfo -from hipscat.io import FilePointer, file_io +from hipscat.io import FilePointer from hipscat.pixel_math import hipscat_id from hipscat_import.catalog.file_readers import InputReader, get_file_reader from hipscat_import.catalog.resume_plan import ResumePlan -from hipscat_import.runtime_arguments import RuntimeArguments +from hipscat_import.runtime_arguments import RuntimeArguments, find_input_paths # pylint: disable=too-many-locals,too-many-arguments,too-many-instance-attributes,too-many-branches,too-few-public-methods @@ -56,7 +56,7 @@ class ImportArguments(RuntimeArguments): """healpix order to use when mapping. if this is a positive number, this will be the order of all final pixels and we will not combine pixels according to the threshold""" - highest_healpix_order: int = 10 + highest_healpix_order: int = 7 """healpix order to use when mapping. this will not necessarily be the order used in the final catalog, as we may combine pixels that don't meed the threshold""" @@ -104,14 +104,7 @@ def _check_arguments(self): self.file_reader = get_file_reader(self.input_format) # Basic checks complete - make more checks and create directories where necessary - if self.input_path: - if not file_io.does_file_or_directory_exist(self.input_path): - raise FileNotFoundError("input_path not found on local storage") - self.input_paths = file_io.find_files_matching_path(self.input_path, f"*{self.input_format}") - elif self.input_file_list: - self.input_paths = self.input_file_list - if len(self.input_paths) == 0: - raise FileNotFoundError("No input files found") + self.input_paths = find_input_paths(self.input_path, f"*{self.input_format}", self.input_file_list) self.resume_plan = ResumePlan( resume=self.resume, progress_bar=self.progress_bar, diff --git a/src/hipscat_import/cross_match/__init__.py b/src/hipscat_import/cross_match/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/hipscat_import/cross_match/macauff_arguments.py b/src/hipscat_import/cross_match/macauff_arguments.py new file mode 100644 index 00000000..74283f18 --- /dev/null +++ b/src/hipscat_import/cross_match/macauff_arguments.py @@ -0,0 +1,121 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from os import path +from typing import List + +from hipscat.io import FilePointer +from hipscat.io.validation import is_valid_catalog + +from hipscat_import.runtime_arguments import RuntimeArguments, find_input_paths + +# pylint: disable=too-many-instance-attributes +# pylint: disable=unsupported-binary-operation + + +@dataclass +class MacauffArguments(RuntimeArguments): + """Data class for holding cross-match association arguments""" + + ## Input - Cross-match data + input_path: FilePointer | None = None + """path to search for the input data""" + input_format: str = "" + """specifier of the input data format. this will be used to find an appropriate + InputReader type, and may be used to find input files, via a match like + ``/*`` """ + input_file_list: List[FilePointer] = field(default_factory=list) + """can be used instead of `input_format` to import only specified files""" + input_paths: List[FilePointer] = field(default_factory=list) + """resolved list of all files that will be used in the importer""" + add_hipscat_index: bool = True + """add the hipscat spatial index field alongside the data""" + + ## Input - Left catalog + left_catalog_dir: str = "" + left_id_column: str = "" + left_ra_column: str = "" + left_dec_column: str = "" + + ## Input - Right catalog + right_catalog_dir: str = "" + right_id_column: str = "" + right_ra_column: str = "" + right_dec_column: str = "" + + ## `macauff` specific attributes + metadata_file_path: str = "" + match_probability_columns: List[str] = field(default_factory=list) + column_names: List[str] = field(default_factory=list) + + def __post_init__(self): + self._check_arguments() + + def _check_arguments(self): + super()._check_arguments() + + if not self.input_path and not self.input_file_list: + raise ValueError("input_path nor input_file_list not provided") + if not self.input_format: + raise ValueError("input_format is required") + + if not self.left_catalog_dir: + raise ValueError("left_catalog_dir is required") + if not self.left_id_column: + raise ValueError("left_id_column is required") + if not self.left_ra_column: + raise ValueError("left_ra_column is required") + if not self.left_dec_column: + raise ValueError("left_dec_column is required") + if not is_valid_catalog(self.left_catalog_dir): + raise ValueError("left_catalog_dir not a valid catalog") + + if not self.right_catalog_dir: + raise ValueError("right_catalog_dir is required") + if not self.right_id_column: + raise ValueError("right_id_column is required") + if not self.right_ra_column: + raise ValueError("right_ra_column is required") + if not self.right_dec_column: + raise ValueError("right_dec_column is required") + if not is_valid_catalog(self.right_catalog_dir): + raise ValueError("right_catalog_dir not a valid catalog") + + if not self.metadata_file_path: + raise ValueError("metadata_file_path required for macauff crossmatch") + if not path.isfile(self.metadata_file_path): + raise ValueError("Macauff column metadata file must point to valid file path.") + + # Basic checks complete - make more checks and create directories where necessary + self.input_paths = find_input_paths(self.input_path, f"*{self.input_format}", self.input_file_list) + + self.column_names = self.get_column_names() + + def get_column_names(self): + """Grab the macauff column names.""" + # TODO: Actually read in the metadata file once we get the example file from Tom. + + return [ + "Gaia_designation", + "Gaia_RA", + "Gaia_Dec", + "BP", + "G", + "RP", + "CatWISE_Name", + "CatWISE_RA", + "CatWISE_Dec", + "W1", + "W2", + "match_p", + "Separation", + "eta", + "xi", + "Gaia_avg_cont", + "CatWISE_avg_cont", + "Gaia_cont_f1", + "Gaia_cont_f10", + "CatWISE_cont_f1", + "CatWISE_cont_f10", + "CatWISE_fit_sig", + ] diff --git a/src/hipscat_import/cross_match/macauff_metadata.py b/src/hipscat_import/cross_match/macauff_metadata.py new file mode 100644 index 00000000..0e20caf0 --- /dev/null +++ b/src/hipscat_import/cross_match/macauff_metadata.py @@ -0,0 +1,102 @@ +"""Utilities for generating parquet metdata from macauff-generated metadata files.""" + +import xml.etree.ElementTree as ET + +import pyarrow as pa +import pyarrow.parquet as pq +import yaml +from hipscat.io import file_io + + +def _get_inner_xml_value(parent_el, node_type, default_value): + child_el = parent_el.findall(node_type) + if len(child_el) == 0: + return default_value + if len(child_el) > 1: + raise ValueError(f"found too many {node_type} XML elements") + return child_el[0].text.strip() + + +def _construct_field(name, units, metadata_dict): + """Helper method to construct a pyarrow field from macauff metadata strings.""" + if units == "string": + pa_type = pa.string() + elif units in ("float", "double"): + pa_type = pa.float64() + elif units in ("integer", "long"): + pa_type = pa.int64() + else: + raise ValueError(f"unhandled units {units}") + return pa.field(name, pa_type, metadata=metadata_dict) + + +def from_xml(input_file, output_file): + """Read XML file with column metadata for a cross-match file from macauff. + + Expects XML with the format:: + + + + $COLUMN_NAME + $COLUMN_DESCRIPTION + $COLUMN_UNIT_DESCRIPTOR + + + + Args: + input file (str): file to read for match metadata + output_file (str): desired location for output parquet metadata file + + Raises + ValueError: if the XML is mal-formed + """ + fields = [] + root_el = ET.parse(input_file).getroot() + columns = root_el.findall("column") + + for column in columns: + name = _get_inner_xml_value(column, "name", "foo") + description = _get_inner_xml_value(column, "description", "") + units = _get_inner_xml_value(column, "units", "string") + + fields.append(_construct_field(name, units, metadata_dict={"macauff_description": description})) + + schema = pa.schema(fields) + pq.write_table(schema.empty_table(), where=output_file) + + +def from_yaml(input_file, output_directory): + """Read YAML file with column metadata for the various cross-match files from macauff. + + Expects YAML with the format:: + + name: macauff_GaiaDR3xCatWISE2020 + description: Match and non-match table for macauff cross-matches of Gaia DR3 and CatWISE 2020. + tables: + - name: macauff_GaiaDR3xCatWISE2020_matches + "@id": "#macauff_GaiaDR3xCatWISE2020_matches" + description: Counterpart associations between Gaia and WISE + columns: + - name: gaia_source_id + datatype: long + description: The Gaia DR3 object ID. + + Args: + input file (str): file to read for match metadata + output_dir (str): desired location for output parquet metadata files + We will write one file per table in the "tables" element. + """ + with open(input_file, "r", encoding="utf-8") as file_handle: + metadata = yaml.safe_load(file_handle) + tables = metadata.get("tables", []) + for index, table in enumerate(tables): + fields = [] + table_name = table.get("name", f"metadata_table_{index}") + for col_index, column in enumerate(table.get("columns", [])): + name = column.get("name", f"column_{col_index}") + units = column.get("units", "string") + fields.append(_construct_field(name, units, metadata_dict=column)) + + schema = pa.schema(fields) + output_file = file_io.append_paths_to_pointer(output_directory, f"{table_name}.parquet") + pq.write_table(schema.empty_table(), where=str(output_file)) diff --git a/src/hipscat_import/cross_match/run_macauff_import.py b/src/hipscat_import/cross_match/run_macauff_import.py new file mode 100644 index 00000000..aad49b51 --- /dev/null +++ b/src/hipscat_import/cross_match/run_macauff_import.py @@ -0,0 +1,13 @@ +from hipscat_import.cross_match.macauff_arguments import MacauffArguments + +# pylint: disable=unused-argument + + +def run(args, client): + """run macauff cross-match import pipeline""" + if not args: + raise TypeError("args is required and should be type MacauffArguments") + if not isinstance(args, MacauffArguments): + raise TypeError("args must be type MacauffArguments") + + raise NotImplementedError("macauff pipeline not implemented yet.") diff --git a/src/hipscat_import/pipeline.py b/src/hipscat_import/pipeline.py index d0482e44..ec253a9d 100644 --- a/src/hipscat_import/pipeline.py +++ b/src/hipscat_import/pipeline.py @@ -5,11 +5,13 @@ from dask.distributed import Client import hipscat_import.catalog.run_import as catalog_runner +import hipscat_import.cross_match.run_macauff_import as macauff_runner import hipscat_import.index.run_index as index_runner import hipscat_import.margin_cache.margin_cache as margin_runner import hipscat_import.soap.run_soap as soap_runner import hipscat_import.verification.run_verification as verification_runner from hipscat_import.catalog.arguments import ImportArguments +from hipscat_import.cross_match.macauff_arguments import MacauffArguments from hipscat_import.index.arguments import IndexArguments from hipscat_import.margin_cache.margin_cache_arguments import MarginCacheArguments from hipscat_import.runtime_arguments import RuntimeArguments @@ -49,6 +51,8 @@ def pipeline_with_client(args: RuntimeArguments, client: Client): soap_runner.run(args, client) elif isinstance(args, VerificationArguments): verification_runner.run(args) + elif isinstance(args, MacauffArguments): + macauff_runner.run(args, client) else: raise ValueError("unknown args type") except Exception as exception: # pylint: disable=broad-exception-caught diff --git a/src/hipscat_import/runtime_arguments.py b/src/hipscat_import/runtime_arguments.py index f4e227e1..cd924888 100644 --- a/src/hipscat_import/runtime_arguments.py +++ b/src/hipscat_import/runtime_arguments.py @@ -124,3 +124,27 @@ def provenance_info(self) -> dict: def additional_runtime_provenance_info(self): """Any additional runtime args to be included in provenance info from subclasses""" return {} + + +def find_input_paths(input_path="", file_matcher="", input_file_list=None): + """Helper method to find input paths, given either a prefix and format, or an + explicit list of paths. + + Args: + input_path (str): prefix to search for + file_matcher (str): matcher to use when searching for files + input_file_list (List[str]): list of input paths + Returns: + matching files, if input_path is provided, otherwise, input_file_list + Raises: + FileNotFoundError if no files are found at the input_path and the provided list is empty. + """ + if input_path: + if not file_io.does_file_or_directory_exist(input_path): + raise FileNotFoundError("input_path not found on local storage") + input_paths = file_io.find_files_matching_path(input_path, file_matcher) + elif input_file_list: + input_paths = input_file_list + if len(input_paths) == 0: + raise FileNotFoundError("No input files found") + return input_paths diff --git a/src/hipscat_import/soap/map_reduce.py b/src/hipscat_import/soap/map_reduce.py index c63c667b..53db9305 100644 --- a/src/hipscat_import/soap/map_reduce.py +++ b/src/hipscat_import/soap/map_reduce.py @@ -125,3 +125,10 @@ def combine_partial_results(input_path, output_path): file_pointer=file_io.append_paths_to_pointer(output_path, "unmatched_sources.csv"), index=False, ) + + primary_only = matched.groupby(["Norder", "Dir", "Npix"])["num_rows"].sum().reset_index() + file_io.write_dataframe_to_csv( + dataframe=primary_only, + file_pointer=file_io.append_paths_to_pointer(output_path, "partition_info.csv"), + index=False, + ) diff --git a/tests/hipscat_import/conftest.py b/tests/hipscat_import/conftest.py index dea9ea78..3bc386f8 100644 --- a/tests/hipscat_import/conftest.py +++ b/tests/hipscat_import/conftest.py @@ -94,6 +94,11 @@ def empty_data_dir(test_data_dir): return os.path.join(test_data_dir, "empty") +@pytest.fixture +def macauff_data_dir(test_data_dir): + return os.path.join(test_data_dir, "macauff") + + @pytest.fixture def formats_dir(test_data_dir): return os.path.join(test_data_dir, "test_formats") @@ -124,6 +129,11 @@ def formats_multiindex(test_data_dir): return os.path.join(test_data_dir, "test_formats", "multiindex.parquet") +@pytest.fixture +def formats_yaml(test_data_dir): + return os.path.join(test_data_dir, "test_formats", "macauff_metadata.yaml") + + @pytest.fixture def small_sky_parts_dir(test_data_dir): return os.path.join(test_data_dir, "small_sky_parts") diff --git a/tests/hipscat_import/cross_match/test_macauff_arguments.py b/tests/hipscat_import/cross_match/test_macauff_arguments.py new file mode 100644 index 00000000..ab981ae0 --- /dev/null +++ b/tests/hipscat_import/cross_match/test_macauff_arguments.py @@ -0,0 +1,219 @@ +"""Tests of macauff arguments""" + + +from os import path + +import pytest + +from hipscat_import.cross_match.macauff_arguments import MacauffArguments + +# pylint: disable=duplicate-code + + +def test_macauff_arguments( + small_sky_object_catalog, small_sky_source_catalog, small_sky_dir, formats_yaml, tmp_path +): + """Test that we can create a MacauffArguments instance with two valid catalogs.""" + args = MacauffArguments( + output_path=tmp_path, + output_catalog_name="object_to_source", + tmp_dir=tmp_path, + left_catalog_dir=small_sky_object_catalog, + left_ra_column="ra", + left_dec_column="dec", + left_id_column="id", + right_catalog_dir=small_sky_source_catalog, + right_ra_column="source_ra", + right_dec_column="source_dec", + right_id_column="source_id", + input_path=small_sky_dir, + input_format="csv", + metadata_file_path=formats_yaml, + ) + + assert len(args.input_paths) > 0 + + +def test_empty_required( + small_sky_object_catalog, small_sky_source_catalog, small_sky_dir, formats_yaml, tmp_path +): + """All non-runtime arguments are required.""" + + ## List of required args: + ## - match expression that should be found when missing + ## - default value + required_args = [ + ["output_path", tmp_path], + ["output_catalog_name", "object_to_source"], + ["left_catalog_dir", small_sky_object_catalog], + ["left_ra_column", "ra"], + ["left_dec_column", "dec"], + ["left_id_column", "id"], + ["right_catalog_dir", small_sky_source_catalog], + ["right_ra_column", "source_ra"], + ["right_dec_column", "source_dec"], + ["right_id_column", "source_id"], + ["input_path", small_sky_dir], + ["input_format", "csv"], + ["metadata_file_path", formats_yaml], + ] + + ## For each required argument, check that a ValueError is raised that matches the + ## expected name of the missing param. + for index, args in enumerate(required_args): + test_args = [ + list_args[1] if list_index != index else None + for list_index, list_args in enumerate(required_args) + ] + + print(f"testing required arg #{index}") + + with pytest.raises(ValueError, match=args[0]): + MacauffArguments( + output_path=test_args[0], + output_catalog_name=test_args[1], + tmp_dir=tmp_path, + left_catalog_dir=test_args[2], + left_ra_column=test_args[3], + left_dec_column=test_args[4], + left_id_column=test_args[5], + right_catalog_dir=test_args[6], + right_ra_column=test_args[7], + right_dec_column=test_args[8], + right_id_column=test_args[9], + input_path=test_args[10], + input_format=test_args[11], + metadata_file_path=test_args[12], + overwrite=True, + ) + + +def test_macauff_arguments_file_list( + small_sky_object_catalog, small_sky_source_catalog, small_sky_dir, formats_yaml, tmp_path +): + """Test that we can create a MacauffArguments instance with two valid catalogs.""" + files = [path.join(small_sky_dir, "catalog.csv")] + args = MacauffArguments( + output_path=tmp_path, + output_catalog_name="object_to_source", + tmp_dir=tmp_path, + left_catalog_dir=small_sky_object_catalog, + left_ra_column="ra", + left_dec_column="dec", + left_id_column="id", + right_catalog_dir=small_sky_source_catalog, + right_ra_column="source_ra", + right_dec_column="source_dec", + right_id_column="source_id", + input_file_list=files, + input_format="csv", + metadata_file_path=formats_yaml, + ) + + assert len(args.input_paths) > 0 + + +def test_macauff_args_invalid_catalog(small_sky_source_catalog, small_sky_dir, formats_yaml, tmp_path): + with pytest.raises(ValueError, match="left_catalog_dir"): + MacauffArguments( + output_path=tmp_path, + output_catalog_name="object_to_source", + tmp_dir=tmp_path, + left_catalog_dir=small_sky_dir, # valid path, but not a catalog + left_ra_column="ra", + left_dec_column="dec", + left_id_column="id", + right_catalog_dir=small_sky_source_catalog, + right_ra_column="source_ra", + right_dec_column="source_dec", + right_id_column="source_id", + input_path=small_sky_dir, + input_format="csv", + metadata_file_path=formats_yaml, + ) + + +def test_macauff_args_right_invalid_catalog(small_sky_object_catalog, small_sky_dir, formats_yaml, tmp_path): + with pytest.raises(ValueError, match="right_catalog_dir"): + MacauffArguments( + output_path=tmp_path, + output_catalog_name="object_to_source", + tmp_dir=tmp_path, + left_catalog_dir=small_sky_object_catalog, + left_ra_column="ra", + left_dec_column="dec", + left_id_column="id", + right_catalog_dir=small_sky_dir, # valid directory with files, not a catalog + right_ra_column="source_ra", + right_dec_column="source_dec", + right_id_column="source_id", + input_path=small_sky_dir, + input_format="csv", + metadata_file_path=formats_yaml, + ) + + +def test_macauff_args_invalid_metadata_file( + small_sky_object_catalog, small_sky_source_catalog, small_sky_dir, tmp_path +): + with pytest.raises(ValueError, match="column metadata file must"): + MacauffArguments( + output_path=tmp_path, + output_catalog_name="object_to_source", + tmp_dir=tmp_path, + left_catalog_dir=small_sky_object_catalog, + left_ra_column="ra", + left_dec_column="dec", + left_id_column="id", + right_catalog_dir=small_sky_source_catalog, + right_ra_column="source_ra", + right_dec_column="source_dec", + right_id_column="source_id", + input_path=small_sky_dir, + input_format="csv", + metadata_file_path="ceci_n_est_pas_un_fichier.xml", + ) + + +def test_macauff_args_invalid_input_directory( + small_sky_object_catalog, small_sky_source_catalog, formats_yaml, tmp_path +): + with pytest.raises(FileNotFoundError, match="input_path not found"): + MacauffArguments( + output_path=tmp_path, + output_catalog_name="object_to_source", + tmp_dir=tmp_path, + left_catalog_dir=small_sky_object_catalog, + left_ra_column="ra", + left_dec_column="dec", + left_id_column="id", + right_catalog_dir=small_sky_source_catalog, + right_ra_column="source_ra", + right_dec_column="source_dec", + right_id_column="source_id", + input_path="ceci_n_est_pas_un_directoire/", + input_format="csv", + metadata_file_path=formats_yaml, + ) + + +def test_macauff_args_no_files( + small_sky_object_catalog, small_sky_source_catalog, small_sky_dir, formats_yaml, tmp_path +): + with pytest.raises(FileNotFoundError, match="No input files found"): + MacauffArguments( + output_path=tmp_path, + output_catalog_name="object_to_source", + tmp_dir=tmp_path, + left_catalog_dir=small_sky_object_catalog, + left_ra_column="ra", + left_dec_column="dec", + left_id_column="id", + right_catalog_dir=small_sky_source_catalog, + right_ra_column="source_ra", + right_dec_column="source_dec", + right_id_column="source_id", + input_path=small_sky_dir, + input_format="parquet", # no files of this format will be found + metadata_file_path=formats_yaml, + ) diff --git a/tests/hipscat_import/cross_match/test_macauff_metadata.py b/tests/hipscat_import/cross_match/test_macauff_metadata.py new file mode 100644 index 00000000..5db39084 --- /dev/null +++ b/tests/hipscat_import/cross_match/test_macauff_metadata.py @@ -0,0 +1,101 @@ +import os +from xml.etree.ElementTree import ParseError + +import pytest +from hipscat.io import file_io + +from hipscat_import.cross_match.macauff_metadata import from_xml, from_yaml + + +def test_from_xml(macauff_data_dir, tmp_path): + """Test XML file reading and parquet metadata generation.""" + xml_input_file = os.path.join(macauff_data_dir, "macauff_gaia_catwise_match.xml") + output_file = os.path.join(tmp_path, "output.parquet") + + from_xml(xml_input_file, output_file) + + single_metadata = file_io.read_parquet_metadata(output_file) + schema = single_metadata.schema.to_arrow_schema() + + assert len(schema) == 6 + + +def test_from_xml_malformed(tmp_path): + """Test some invalid XML file inputs.""" + input_file = os.path.join(tmp_path, "input.parquet") + output_file = os.path.join(tmp_path, "output.parquet") + + ## No "columns" found at all + with open(input_file, "w", encoding="utf-8") as file_handle: + file_handle.write("") + + with pytest.raises(ParseError, match="no element found"): + from_xml(input_file, output_file) + + ## Some columns, too many required fields + with open(input_file, "w", encoding="utf-8") as file_handle: + file_handle.write( + """ + + Gaia_designation + The Gaia DR3 object ID. + long + + """ + ) + + with pytest.raises(ValueError, match="too many name XML elements"): + from_xml(input_file, output_file) + + ## Unhandled types + with open(input_file, "w", encoding="utf-8") as file_handle: + file_handle.write( + """ + + Gaia_designation + The Gaia DR3 object ID. + blob + + """ + ) + + with pytest.raises(ValueError, match="unhandled units blob"): + from_xml(input_file, output_file) + + ## Some empty fields are ok! + with open(input_file, "w", encoding="utf-8") as file_handle: + file_handle.write( + """ + + + long + + """ + ) + + from_xml(input_file, output_file) + + +def test_from_yaml(macauff_data_dir, tmp_path): + """Test YAML file reading and parquet metadata generation.""" + yaml_input_file = os.path.join(macauff_data_dir, "macauff_gaia_catwise_match_and_nonmatches.yaml") + + from_yaml(yaml_input_file, tmp_path) + + output_file = os.path.join(tmp_path, "macauff_GaiaDR3xCatWISE2020_matches.parquet") + single_metadata = file_io.read_parquet_metadata(output_file) + schema = single_metadata.schema.to_arrow_schema() + + assert len(schema) == 7 + + output_file = os.path.join(tmp_path, "macauff_GaiaDR3xCatWISE2020_gaia_nonmatches.parquet") + single_metadata = file_io.read_parquet_metadata(output_file) + schema = single_metadata.schema.to_arrow_schema() + + assert len(schema) == 4 + + output_file = os.path.join(tmp_path, "macauff_GaiaDR3xCatWISE2020_catwise_nonmatches.parquet") + single_metadata = file_io.read_parquet_metadata(output_file) + schema = single_metadata.schema.to_arrow_schema() + + assert len(schema) == 4 diff --git a/tests/hipscat_import/cross_match/test_macauff_runner.py b/tests/hipscat_import/cross_match/test_macauff_runner.py new file mode 100644 index 00000000..1431b60d --- /dev/null +++ b/tests/hipscat_import/cross_match/test_macauff_runner.py @@ -0,0 +1,49 @@ +import pytest + +import hipscat_import.cross_match.run_macauff_import as runner +from hipscat_import.cross_match.macauff_arguments import MacauffArguments + +# pylint: disable=too-many-instance-attributes +# pylint: disable=duplicate-code + + +@pytest.mark.dask +def test_bad_args(dask_client): + """Runner should fail with empty or mis-typed arguments""" + with pytest.raises(TypeError, match="MacauffArguments"): + runner.run(None, dask_client) + + args = {"output_catalog_name": "bad_arg_type"} + with pytest.raises(TypeError, match="MacauffArguments"): + runner.run(args, dask_client) + + +@pytest.mark.dask +def test_no_implementation( + small_sky_object_catalog, + small_sky_source_catalog, + small_sky_dir, + formats_yaml, + tmp_path, + dask_client, +): + """Test that we can create a MacauffArguments instance with two valid catalogs.""" + args = MacauffArguments( + output_path=tmp_path, + output_catalog_name="object_to_source", + tmp_dir=tmp_path, + left_catalog_dir=small_sky_object_catalog, + left_ra_column="ra", + left_dec_column="dec", + left_id_column="id", + right_catalog_dir=small_sky_source_catalog, + right_ra_column="source_ra", + right_dec_column="source_dec", + right_id_column="source_id", + input_path=small_sky_dir, + input_format="csv", + metadata_file_path=formats_yaml, + ) + + with pytest.raises(NotImplementedError, match="not implemented yet."): + runner.run(args, dask_client) diff --git a/tests/hipscat_import/data/macauff/macauff_gaia_catwise_match.xml b/tests/hipscat_import/data/macauff/macauff_gaia_catwise_match.xml new file mode 100644 index 00000000..985d8973 --- /dev/null +++ b/tests/hipscat_import/data/macauff/macauff_gaia_catwise_match.xml @@ -0,0 +1,68 @@ + + + + Gaia_designation + + + The Gaia DR3 object ID. + + + long + + + + + Gaia_RA + + + Right Ascension of the Gaia DR3 source. + + + float + + + + + Gaia_Dec + + + The Gaia DR3 declination. + + + float + + + + + CatWISE_Name + + + The object identifier from the CatWISE 2020 catalogue. + + + string + + + + + CatWISE_RA + + + Right Ascension of the object as quoted by the CatWISE 2020 catalogue. + + + float + + + + + CatWISE_Dec + + + CatWISE 2020 Declination. + + + float + + + \ No newline at end of file diff --git a/tests/hipscat_import/data/macauff/macauff_gaia_catwise_match_and_nonmatches.yaml b/tests/hipscat_import/data/macauff/macauff_gaia_catwise_match_and_nonmatches.yaml new file mode 100644 index 00000000..5cba93f6 --- /dev/null +++ b/tests/hipscat_import/data/macauff/macauff_gaia_catwise_match_and_nonmatches.yaml @@ -0,0 +1,81 @@ +--- +name: macauff_GaiaDR3xCatWISE2020 +description: Match and non-match table for macauff cross-matches of Gaia DR3 and CatWISE 2020. +tables: +- name: macauff_GaiaDR3xCatWISE2020_matches + "@id": "#macauff_GaiaDR3xCatWISE2020_matches" + description: Counterpart associations between Gaia and WISE, as well as derived values from the cross-match process such as match probability and contamination flux. + columns: + - name: gaia_source_id + "@id": "#macauff_GaiaDR3xCatWISE2020_matches.gaia_source_id" + datatype: long + description: The Gaia DR3 object ID. + - name: gaia_ra + "@id": "#macauff_GaiaDR3xCatWISE2020_matches.gaia_ra" + datatype: double + description: Right Ascension of the Gaia DR3 source. + - name: gaia_dec + "@id": "#macauff_GaiaDR3xCatWISE2020_matches.gaia_dec" + datatype: double + description: The Gaia DR3 declination. + - name: catwise_name + "@id": "#macauff_GaiaDR3xCatWISE2020_matches.catwise_name" + datatype: string + description: The object identifier from the CatWISE 2020 catalogue. + - name: catwise_ra + "@id": "#macauff_GaiaDR3xCatWISE2020_matches.catwise_ra" + datatype: double + description: Right Ascension of the object as quoted by the CatWISE 2020 catalogue. + - name: catwise_dec + "@id": "#macauff_GaiaDR3xCatWISE2020_matches.catwise_dec" + datatype: double + description: CatWISE 2020 Declination. + - name: match_p + "@id": "#macauff_GaiaDR3xCatWISE2020_matches.match_p" + datatype: double + description: Overall probability that the Gaia DR3 and CatWISE sources are detections of the same object, as given by equation 26 of Wilson & Naylor (2018a). + primaryKey: "#macauff_GaiaDR3xCatWISE2020_matches.gaia_source_id" + +- name: macauff_GaiaDR3xCatWISE2020_gaia_nonmatches + "@id": "#macauff_GaiaDR3xCatWISE2020_gaia_nonmatches" + description: Objects in Gaia DR3 with no counterpart in the CatWISE2020 catalogue, with derived columns such as match probability and simulated flux contamination. + columns: + - name: gaia_source_id + "@id": "#macauff_GaiaDR3xCatWISE2020_gaia_nonmatches.gaia_source_id" + datatype: long + description: The Gaia DR3 object ID. + - name: gaia_ra + "@id": "#macauff_GaiaDR3xCatWISE2020_gaia_nonmatches.gaia_ra" + datatype: double + description: Right Ascension of the Gaia DR3 source. + - name: gaia_dec + "@id": "#macauff_GaiaDR3xCatWISE2020_gaia_nonmatches.gaia_dec" + datatype: double + description: The Gaia DR3 declination. + - name: match_p + "@id": "#macauff_GaiaDR3xCatWISE2020_gaia_nonmatches.match_p" + datatype: double + description: Overall probability that the Gaia DR3 source does not have a corresponding CatWISE detection, as given by equation 26 of Wilson & Naylor (2018a). + primaryKey: "#macauff_GaiaDR3xCatWISE2020_gaia_nonmatches.gaia_source_id" + +- name: macauff_GaiaDR3xCatWISE2020_catwise_nonmatches + "@id": "#macauff_GaiaDR3xCatWISE2020_catwise_nonmatches" + description: Objects in CatWISE2020 with no counterpart in the Gaia DR3 catalogue, with derived columns such as match probability and simulated flux contamination. + columns: + - name: catwise_name + "@id": "#macauff_GaiaDR3xCatWISE2020_catwise_nonmatches.catwise_name" + datatype: string + description: The object identifier from the CatWISE 2020 catalogue. + - name: catwise_ra + "@id": "#macauff_GaiaDR3xCatWISE2020_catwise_nonmatches.catwise_ra" + datatype: double + description: Right Ascension of the object as quoted by the CatWISE 2020 catalogue. + - name: catwise_dec + "@id": "#macauff_GaiaDR3xCatWISE2020_catwise_nonmatches.catwise_dec" + datatype: double + description: CatWISE 2020 Declination. + - name: match_p + "@id": "#macauff_GaiaDR3xCatWISE2020_catwise_nonmatches.match_p" + datatype: double + description: Overall probability that the CatWISE source does not have a corresponding Gaia DR3 detection, as given by equation 26 of Wilson & Naylor (2018a). + primaryKey: "#macauff_GaiaDR3xCatWISE2020_catwise_nonmatches.catwise_name" \ No newline at end of file diff --git a/tests/hipscat_import/data/test_formats/macauff_metadata.yaml b/tests/hipscat_import/data/test_formats/macauff_metadata.yaml new file mode 100644 index 00000000..e41a8357 --- /dev/null +++ b/tests/hipscat_import/data/test_formats/macauff_metadata.yaml @@ -0,0 +1,3 @@ +#placeholder file while we wait for the full metadata example file. +left_catalog_name: small_sky_object_catalog +right_catalog_name: small_sky_source_catalog \ No newline at end of file From 170f512baf900b0ec1dae5ad1e4a1bb903346be0 Mon Sep 17 00:00:00 2001 From: delucchi-cmu Date: Mon, 6 Nov 2023 08:40:58 -0500 Subject: [PATCH 4/4] Exercise more hipscat_index behavior. --- .../hipscat_import/catalog/test_map_reduce.py | 33 +++++ .../data/test_formats/hipscat_index.csv | 132 ++++++++++++++++++ 2 files changed, 165 insertions(+) create mode 100644 tests/hipscat_import/data/test_formats/hipscat_index.csv diff --git a/tests/hipscat_import/catalog/test_map_reduce.py b/tests/hipscat_import/catalog/test_map_reduce.py index eb91e967..98c9b9f6 100644 --- a/tests/hipscat_import/catalog/test_map_reduce.py +++ b/tests/hipscat_import/catalog/test_map_reduce.py @@ -134,6 +134,39 @@ def test_map_headers(tmp_path, formats_headers_csv): assert (result == expected).all() +def test_map_with_hipscat_index(tmp_path, formats_dir, small_sky_single_file): + os.makedirs(os.path.join(tmp_path, "histograms")) + input_file = os.path.join(formats_dir, "hipscat_index.csv") + mr.map_to_pixels( + input_file=input_file, + file_reader=get_file_reader("csv"), + highest_order=0, + ra_column="NOPE", + dec_column="NOPE", + use_hipscat_index=True, # radec don't matter. just use existing index + resume_path=tmp_path, + mapping_key="map_0", + ) + + expected = hist.empty_histogram(0) + expected[11] = 131 + + result = read_partial_histogram(tmp_path, "map_0") + npt.assert_array_equal(result, expected) + + with pytest.raises(ValueError, match="_hipscat_index not in"): + mr.map_to_pixels( + input_file=small_sky_single_file, + file_reader=get_file_reader("csv"), + highest_order=0, + ra_column="NOPE", + dec_column="NOPE", + use_hipscat_index=True, # no pre-existing index! expect failure. + resume_path=tmp_path, + mapping_key="map_0", + ) + + def test_map_small_sky_order0(tmp_path, small_sky_single_file): """Test loading the small sky catalog and partitioning each object into the same large bucket""" os.makedirs(os.path.join(tmp_path, "histograms")) diff --git a/tests/hipscat_import/data/test_formats/hipscat_index.csv b/tests/hipscat_import/data/test_formats/hipscat_index.csv new file mode 100644 index 00000000..153091e5 --- /dev/null +++ b/tests/hipscat_import/data/test_formats/hipscat_index.csv @@ -0,0 +1,132 @@ +_hipscat_index,id,ra,dec,ra_error,dec_error,Norder,Dir,Npix +12749688880727326720,707,308.5,-69.5,0,0,0,0,11 +12751184493818150912,792,320.5,-69.5,0,0,0,0,11 +12753202806647685120,811,315.5,-68.5,0,0,0,0,11 +12753202806647685121,723,315.5,-68.5,0,0,0,0,11 +12770681119980912640,826,335.5,-69.5,0,0,0,0,11 +12771980657148559360,750,338.5,-67.5,0,0,0,0,11 +12776409575968473088,771,348.5,-67.5,0,0,0,0,11 +12782714789977653248,734,348.5,-66.5,0,0,0,0,11 +12786706826733289472,738,345.5,-64.5,0,0,0,0,11 +12786894563780329472,772,348.5,-64.5,0,0,0,0,11 +12788339839317573632,776,344.5,-63.5,0,0,0,0,11 +12797951905556856832,733,329.5,-65.5,0,0,0,0,11 +12801026705158307840,804,322.5,-66.5,0,0,0,0,11 +12818067795442925568,747,327.5,-61.5,0,0,0,0,11 +12823504327528153088,739,332.5,-57.5,0,0,0,0,11 +12842381331509805056,816,288.5,-69.5,0,0,0,0,11 +12842473731565551616,703,286.5,-69.5,0,0,0,0,11 +12855054043935932416,794,300.5,-66.5,0,0,0,0,11 +12856781556059996160,735,299.5,-65.5,0,0,0,0,11 +12859878138972209152,797,308.5,-62.5,0,0,0,0,11 +12866984851890241536,815,283.5,-68.5,0,0,0,0,11 +12882093266048122880,748,296.5,-63.5,0,0,0,0,11 +12886291525662670848,716,305.5,-60.5,0,0,0,0,11 +12886577464536465408,807,303.5,-60.5,0,0,0,0,11 +12887770713741590528,768,297.5,-60.5,0,0,0,0,11 +12888117478487490560,729,299.5,-59.5,0,0,0,0,11 +12888375204127965184,810,301.5,-59.5,0,0,0,0,11 +12890425758039670784,718,292.5,-60.5,0,0,0,0,11 +12897705201133158400,818,300.5,-55.5,0,0,0,0,11 +12901304742075957248,766,310.5,-63.5,0,0,0,0,11 +12904011555938500608,730,322.5,-61.5,0,0,0,0,11 +12924400840801779712,758,325.5,-53.5,0,0,0,0,11 +12924737222707511296,780,326.5,-52.5,0,0,0,0,11 +12926803467124604928,775,321.5,-54.5,0,0,0,0,11 +12927513300782022656,760,320.5,-53.5,0,0,0,0,11 +12935235931912273920,795,306.5,-58.5,0,0,0,0,11 +12946238438616596480,822,301.5,-54.5,0,0,0,0,11 +12947523513744359424,736,303.5,-52.5,0,0,0,0,11 +12949977409238597632,801,309.5,-50.5,0,0,0,0,11 +12951015418364952576,830,306.5,-50.5,0,0,0,0,11 +12957936896993918976,817,318.5,-48.5,0,0,0,0,11 +12958541318065225728,787,320.5,-47.5,0,0,0,0,11 +12980498864409673728,812,346.5,-60.5,0,0,0,0,11 +12985050869937471488,722,350.5,-58.5,0,0,0,0,11 +13025270726448381952,731,343.5,-52.5,0,0,0,0,11 +13031060802264629248,720,344.5,-47.5,0,0,0,0,11 +13040468461170458624,823,338.5,-45.5,0,0,0,0,11 +13055884976753475584,742,348.5,-45.5,0,0,0,0,11 +13093160001097170944,719,344.5,-39.5,0,0,0,0,11 +13094378277252890624,710,341.5,-39.5,0,0,0,0,11 +13095317624672223232,726,341.5,-37.5,0,0,0,0,11 +13097779065304121344,744,349.5,-39.5,0,0,0,0,11 +13100157308065808384,813,349.5,-37.5,0,0,0,0,11 +13109184215138697216,757,346.5,-34.5,0,0,0,0,11 +13114993892334239744,821,330.5,-52.5,0,0,0,0,11 +13117165557772189696,762,327.5,-51.5,0,0,0,0,11 +13122077940282032128,728,328.5,-47.5,0,0,0,0,11 +13123208770404483072,781,330.5,-46.5,0,0,0,0,11 +13130546552927944704,704,326.5,-45.5,0,0,0,0,11 +13135578070553460736,751,330.5,-44.5,0,0,0,0,11 +13158407025211736064,724,323.5,-41.5,0,0,0,0,11 +13164283224702058496,808,320.5,-40.5,0,0,0,0,11 +13186894729939255296,784,338.5,-40.5,0,0,0,0,11 +13187453677775880192,732,337.5,-39.5,0,0,0,0,11 +13189921792761790464,745,337.5,-38.5,0,0,0,0,11 +13202401744484564992,786,336.5,-33.5,0,0,0,0,11 +13203103043639312384,705,335.5,-32.5,0,0,0,0,11 +13211086588563423232,779,347.5,-29.5,0,0,0,0,11 +13235029212974284800,761,329.5,-29.5,0,0,0,0,11 +13239388759557931008,828,330.5,-26.5,0,0,0,0,11 +13250788433850269696,803,336.5,-25.5,0,0,0,0,11 +13263647230914461696,788,283.5,-61.5,0,0,0,0,11 +13272631885323829248,700,282.5,-58.5,0,0,0,0,11 +13277499429092327424,793,289.5,-58.5,0,0,0,0,11 +13283409463257071616,749,293.5,-55.5,0,0,0,0,11 +13284984179453329408,805,297.5,-52.5,0,0,0,0,11 +13293316792777703424,773,293.5,-50.5,0,0,0,0,11 +13300970211545972736,774,281.5,-54.5,0,0,0,0,11 +13316869903572008960,712,288.5,-49.5,0,0,0,0,11 +13319655515505033216,759,290.5,-48.5,0,0,0,0,11 +13325709382806142976,820,286.5,-46.5,0,0,0,0,11 +13326118614579806208,789,287.5,-45.5,0,0,0,0,11 +13335640766354030592,711,305.5,-49.5,0,0,0,0,11 +13335856080517857280,802,304.5,-49.5,0,0,0,0,11 +13341394068685455360,701,299.5,-48.5,0,0,0,0,11 +13347311673342427136,727,301.5,-44.5,0,0,0,0,11 +13348003826582421504,717,303.5,-43.5,0,0,0,0,11 +13351146793404989440,753,307.5,-45.5,0,0,0,0,11 +13358998609274601472,769,307.5,-42.5,0,0,0,0,11 +13359333484913491968,725,308.5,-41.5,0,0,0,0,11 +13362536511002640384,827,310.5,-40.5,0,0,0,0,11 +13364612928339181568,777,307.5,-39.5,0,0,0,0,11 +13368388511275679744,764,297.5,-45.5,0,0,0,0,11 +13369482380335644672,785,296.5,-44.5,0,0,0,0,11 +13369514156621824000,709,294.5,-45.5,0,0,0,0,11 +13374210622061805568,713,298.5,-41.5,0,0,0,0,11 +13382429402164363264,800,299.5,-37.5,0,0,0,0,11 +13384601479449411584,706,297.5,-36.5,0,0,0,0,11 +13387360701694083072,755,303.5,-38.5,0,0,0,0,11 +13387360701694083073,741,303.5,-38.5,0,0,0,0,11 +13388334615593222144,714,303.5,-37.5,0,0,0,0,11 +13389212170495983616,763,306.5,-38.5,0,0,0,0,11 +13389509163101454336,708,307.5,-37.5,0,0,0,0,11 +13392589952663945216,765,306.5,-35.5,0,0,0,0,11 +13393588426222075904,740,306.5,-33.5,0,0,0,0,11 +13425161974698737664,783,286.5,-42.5,0,0,0,0,11 +13462800185222496256,790,286.5,-35.5,0,0,0,0,11 +13465233373970563072,809,283.5,-34.5,0,0,0,0,11 +13467391906581315584,715,280.5,-35.5,0,0,0,0,11 +13477206946360590336,782,290.5,-39.5,0,0,0,0,11 +13488986123334057984,752,291.5,-34.5,0,0,0,0,11 +13520476867982786560,746,283.5,-31.5,0,0,0,0,11 +13521835979425447936,770,285.5,-29.5,0,0,0,0,11 +13552942781667737600,756,319.5,-35.5,0,0,0,0,11 +13553697461939208192,798,316.5,-36.5,0,0,0,0,11 +13557123557418336256,778,313.5,-36.5,0,0,0,0,11 +13557377060258709504,829,314.5,-35.5,0,0,0,0,11 +13557816572940124160,819,313.5,-35.5,0,0,0,0,11 +13560168899495854080,814,312.5,-33.5,0,0,0,0,11 +13560933976658411520,721,314.5,-34.5,0,0,0,0,11 +13561582046530240512,737,316.5,-33.5,0,0,0,0,11 +13563711661973438464,799,313.5,-31.5,0,0,0,0,11 +13564690156971098112,825,315.5,-30.5,0,0,0,0,11 +13565852277582856192,796,320.5,-33.5,0,0,0,0,11 +13588709332114997248,754,313.5,-30.5,0,0,0,0,11 +13590818251897569280,806,312.5,-29.5,0,0,0,0,11 +13591216801265483776,791,312.5,-28.5,0,0,0,0,11 +13596001812279721984,824,305.5,-28.5,0,0,0,0,11 +13598131468743213056,702,310.5,-27.5,0,0,0,0,11 +13601023174257934336,767,314.5,-29.5,0,0,0,0,11 +13696722494273093632,743,307.5,-25.5,0,0,0,0,11