diff --git a/src/hipscat_import/cross_match/macauff_map_reduce.py b/src/hipscat_import/cross_match/macauff_map_reduce.py index 2daed416..375a2661 100644 --- a/src/hipscat_import/cross_match/macauff_map_reduce.py +++ b/src/hipscat_import/cross_match/macauff_map_reduce.py @@ -22,17 +22,6 @@ def split_associations( ): """Map a file of links to their healpix pixels and split into shards. - Args: - input_file (FilePointer): file to read for catalog data. - file_reader (hipscat_import.catalog.file_readers.InputReader): instance - of input reader that specifies arguments necessary for reading from the input file. - splitting_key (str): unique counter for this input file, used - when creating intermediate files - highest_order (int): healpix order to use when mapping - ra_column (str): where to find right ascension data in the dataframe - dec_column (str): where to find declation in the dataframe - cache_shard_path (FilePointer): where to write intermediate parquet files. - resume_path (FilePointer): where to write resume files. Raises: ValueError: if the `ra_column` or `dec_column` cannot be found in the input file. @@ -83,6 +72,9 @@ def reduce_associations(args, left_pixel): and aggregate into a single parquet file.""" inputs = _get_pixel_directory(args.tmp_path, left_pixel.order, left_pixel.pixel) + if not file_io.directory_has_contents(inputs): + print(f"Warning: no input data for pixel {left_pixel}") + return destination_dir = paths.pixel_directory(args.catalog_path, left_pixel.order, left_pixel.pixel) file_io.make_directory(destination_dir, exist_ok=True) diff --git a/src/hipscat_import/cross_match/run_macauff_import.py b/src/hipscat_import/cross_match/run_macauff_import.py index 19c02e9b..5f9f97a2 100644 --- a/src/hipscat_import/cross_match/run_macauff_import.py +++ b/src/hipscat_import/cross_match/run_macauff_import.py @@ -25,8 +25,7 @@ def split(args, left_catalog, right_catalog): pixel.pixel * explosion_factor, (pixel.pixel + 1) * explosion_factor, ) - for explody in exploded_pixels: - regenerated_left_alignment[explody] = pixel + regenerated_left_alignment[exploded_pixels] = pixel regenerated_right_alignment = np.full(hp.order2npix(highest_right_order), None) for pixel in right_pixels: @@ -35,8 +34,7 @@ def split(args, left_catalog, right_catalog): pixel.pixel * explosion_factor, (pixel.pixel + 1) * explosion_factor, ) - for explody in exploded_pixels: - regenerated_right_alignment[explody] = pixel + regenerated_right_alignment[exploded_pixels] = pixel for i, file in enumerate(args.input_paths): split_associations( diff --git a/tests/hipscat_import/cross_match/test_macauff_runner.py b/tests/hipscat_import/cross_match/test_macauff_runner.py index 7238f63e..ee1d47d1 100644 --- a/tests/hipscat_import/cross_match/test_macauff_runner.py +++ b/tests/hipscat_import/cross_match/test_macauff_runner.py @@ -25,7 +25,7 @@ def test_bad_args(dask_client): @pytest.mark.dask -def test_no_implementation( +def test_object_to_object( small_sky_object_catalog, tmp_path, macauff_data_dir, @@ -33,8 +33,6 @@ def test_no_implementation( ): """Test that we can create a MacauffArguments instance with two valid catalogs.""" - # os.makedirs(os.path.join(tmp_path, "object_to_object")) - yaml_input_file = os.path.join(macauff_data_dir, "macauff_gaia_catwise_match_and_nonmatches.yaml") from_yaml(yaml_input_file, tmp_path) matches_schema_file = os.path.join(tmp_path, "macauff_GaiaDR3xCatWISE2020_matches.parquet") @@ -72,3 +70,53 @@ def test_no_implementation( assert catalog.catalog_path == args.catalog_path assert len(catalog.get_join_pixels()) == 1 assert catalog.catalog_info.total_rows == 131 + + + +@pytest.mark.dask +def test_source_to_object( + small_sky_object_catalog, + small_sky_source_catalog, + tmp_path, + macauff_data_dir, + dask_client, +): + """Test that we can create a MacauffArguments instance with two valid catalogs.""" + + yaml_input_file = os.path.join(macauff_data_dir, "macauff_gaia_catwise_match_and_nonmatches.yaml") + from_yaml(yaml_input_file, tmp_path) + matches_schema_file = os.path.join(tmp_path, "macauff_GaiaDR3xCatWISE2020_matches.parquet") + single_metadata = file_io.read_parquet_metadata(matches_schema_file) + schema = single_metadata.schema.to_arrow_schema() + + assert len(schema) == 7 + + args = MacauffArguments( + output_path=tmp_path, + output_artifact_name="object_to_object", + tmp_dir=tmp_path, + left_catalog_dir=small_sky_source_catalog, + left_ra_column="gaia_ra", + left_dec_column="gaia_dec", + left_id_column="gaia_source_id", + right_catalog_dir=small_sky_object_catalog, + right_ra_column="catwise_ra", + right_dec_column="catwise_dec", + right_id_column="catwise_name", + input_file_list=[os.path.join(macauff_data_dir, "small_sky_and_source_matches.csv")], + input_format="csv", + overwrite=True, + file_reader=CsvReader(schema_file=matches_schema_file, header=None), + metadata_file_path=matches_schema_file, + progress_bar=False, + ) + os.makedirs(os.path.join(args.tmp_path, "splitting")) + + runner.run(args, dask_client) + + ## Check that the association data can be parsed as a valid association catalog. + catalog = AssociationCatalog.read_from_hipscat(args.catalog_path) + assert catalog.on_disk + assert catalog.catalog_path == args.catalog_path + assert len(catalog.get_join_pixels()) == 8 + assert catalog.catalog_info.total_rows == 34 \ No newline at end of file diff --git a/tests/hipscat_import/data/macauff/small_sky_and_source_matches.csv b/tests/hipscat_import/data/macauff/small_sky_and_source_matches.csv new file mode 100644 index 00000000..146877e2 --- /dev/null +++ b/tests/hipscat_import/data/macauff/small_sky_and_source_matches.csv @@ -0,0 +1,34 @@ +72008,320.8364113,-69.45376863,792,320.5,-69.5,0.996 +73091,320.9404216,-69.46498164,792,320.5,-69.5,0.998 +83813,335.5861031,-69.37807662,826,335.5,-69.5,0.994 +78312,335.5182331,-69.38325891,826,335.5,-69.5,0.991 +76201,288.9361436,-69.31626483,816,288.5,-69.5,0.993 +72926,288.9503144,-69.3115179,816,288.5,-69.5,0.99 +72813,310.5307814,-63.34133051,766,310.5,-63.5,0.994 +70048,310.5876212,-63.33485542,766,310.5,-63.5,0.999 +83424,283.7763878,-61.30283808,788,283.5,-61.5,1 +73626,283.864721,-61.29113052,788,283.5,-61.5,1 +84534,347.9612914,-29.13951069,779,347.5,-29.5,0.998 +87130,347.9655757,-29.1246194,779,347.5,-29.5,0.993 +79615,347.9345496,-29.10876863,779,347.5,-29.5,0.99 +73071,347.9463072,-29.08860161,779,347.5,-29.5,0.992 +78803,347.997414,-29.07112828,779,347.5,-29.5,0.999 +76988,348.0338029,-29.04750582,779,347.5,-29.5,0.999 +83444,348.0537862,-29.02085159,779,347.5,-29.5,0.996 +72480,320.0880522,-35.28432758,756,319.5,-35.5,0.997 +76134,320.0697349,-35.21411381,756,319.5,-35.5,0.99 +75313,319.7793679,-35.45350619,756,319.5,-35.5,0.999 +79351,319.7409873,-35.4177272,756,319.5,-35.5,0.993 +78766,319.8029046,-35.42603476,756,319.5,-35.5,0.992 +74689,319.7981819,-35.41676507,756,319.5,-35.5,0.997 +73928,319.7099797,-35.43311803,756,319.5,-35.5,0.99 +77882,319.689082,-35.43731031,756,319.5,-35.5,0.998 +85015,319.6872701,-35.43434368,756,319.5,-35.5,0.99 +75167,319.7008698,-35.43045134,756,319.5,-35.5,0.996 +75394,319.736227,-35.40559895,756,319.5,-35.5,0.999 +80736,319.7140687,-35.37583874,756,319.5,-35.5,0.99 +86351,290.5372378,-39.34034881,782,290.5,-39.5,0.996 +84773,290.5185662,-39.3174862,782,290.5,-39.5,0.998 +75092,290.5865147,-39.30033282,782,290.5,-39.5,0.992 +78548,290.5404456,-39.31843165,782,290.5,-39.5,0.997 +79186,290.7615303,-39.38550864,782,290.5,-39.5,0.994 \ No newline at end of file