diff --git a/src/hipscat_import/margin_cache/margin_cache.py b/src/hipscat_import/margin_cache/margin_cache.py index 8611393f..8d0a512c 100644 --- a/src/hipscat_import/margin_cache/margin_cache.py +++ b/src/hipscat_import/margin_cache/margin_cache.py @@ -35,6 +35,7 @@ def generate_margin_cache(args, client): margin_order=args.margin_order, ra_column=args.catalog.catalog_info.ra_column, dec_column=args.catalog.catalog_info.dec_column, + fine_filtering=args.fine_filtering, ) ) resume_plan.wait_for_mapping(futures) @@ -59,16 +60,11 @@ def generate_margin_cache(args, client): resume_plan.wait_for_reducing(futures) with resume_plan.print_progress(total=4, stage_name="Finishing") as step_progress: - parquet_metadata.write_parquet_metadata( + total_rows = parquet_metadata.write_parquet_metadata( args.catalog_path, storage_options=args.output_storage_options ) step_progress.update(1) - total_rows = 0 metadata_path = paths.get_parquet_metadata_pointer(args.catalog_path) - for row_group in parquet_metadata.read_row_group_fragments( - metadata_path, storage_options=args.output_storage_options - ): - total_rows += row_group.num_rows partition_info = PartitionInfo.read_from_file( metadata_path, storage_options=args.output_storage_options ) diff --git a/src/hipscat_import/margin_cache/margin_cache_arguments.py b/src/hipscat_import/margin_cache/margin_cache_arguments.py index fa689a91..780d1b4c 100644 --- a/src/hipscat_import/margin_cache/margin_cache_arguments.py +++ b/src/hipscat_import/margin_cache/margin_cache_arguments.py @@ -25,6 +25,9 @@ class MarginCacheArguments(RuntimeArguments): order of healpix partitioning in the source catalog. if `margin_order` is left default or set to -1, then the `margin_order` will be set dynamically to the highest partition order plus 1.""" + fine_filtering: bool = True + """should we perform the precise boundary checking? if false, some results may be + greater than `margin_threshold` away from the border (but within `margin_order`).""" delete_intermediate_parquet_files: bool = True """should we delete the smaller intermediate parquet files generated in the splitting stage, once the relevant reducing stage is complete?""" diff --git a/src/hipscat_import/margin_cache/margin_cache_map_reduce.py b/src/hipscat_import/margin_cache/margin_cache_map_reduce.py index 95ae00c3..e30ae9e7 100644 --- a/src/hipscat_import/margin_cache/margin_cache_map_reduce.py +++ b/src/hipscat_import/margin_cache/margin_cache_map_reduce.py @@ -7,12 +7,12 @@ from hipscat.catalog.partition_info import PartitionInfo from hipscat.io import file_io, paths from hipscat.pixel_math.healpix_pixel import HealpixPixel -from hipscat.pixel_math.hipscat_id import HIPSCAT_ID_COLUMN from hipscat_import.margin_cache.margin_cache_resume_plan import MarginCachePlan from hipscat_import.pipeline_resume_plan import get_pixel_cache_directory, print_task_failure +# pylint: disable=too-many-arguments def map_pixel_shards( partition_file, mapping_key, @@ -24,6 +24,7 @@ def map_pixel_shards( margin_order, ra_column, dec_column, + fine_filtering, ): """Creates margin cache shards from a source partition file.""" try: @@ -33,25 +34,47 @@ def map_pixel_shards( data = file_io.read_parquet_file_to_pandas( partition_file, schema=schema, storage_options=input_storage_options ) + source_pixel = HealpixPixel(data["Norder"].iloc[0], data["Npix"].iloc[0]) - data["margin_pixel"] = hp.ang2pix( + # Constrain the possible margin pairs, first by only those `margin_order` pixels + # that **can** be contained in source pixel, then by `margin_order` pixels for rows + # in source data + margin_pairs = pd.read_csv(margin_pair_file) + explosion_factor = 4 ** (margin_order - source_pixel.order) + margin_pixel_range_start = source_pixel.pixel * explosion_factor + margin_pixel_range_end = (source_pixel.pixel + 1) * explosion_factor + margin_pairs = margin_pairs.query( + f"margin_pixel >= {margin_pixel_range_start} and margin_pixel < {margin_pixel_range_end}" + ) + + margin_pixel_list = hp.ang2pix( 2**margin_order, data[ra_column].values, data[dec_column].values, lonlat=True, nest=True, ) - - margin_pairs = pd.read_csv(margin_pair_file) - constrained_data = data.reset_index().merge(margin_pairs, on="margin_pixel") - - if len(constrained_data): - constrained_data.groupby(["partition_order", "partition_pixel"]).apply( - _to_pixel_shard, + margin_pixel_filter = pd.DataFrame( + {"margin_pixel": margin_pixel_list, "filter_value": np.arange(0, len(margin_pixel_list))} + ).merge(margin_pairs, on="margin_pixel") + + # For every possible output pixel, find the full margin_order pixel filter list, + # perform the filter, and pass along to helper method to compute fine filter + # and write out shard file. + for partition_key, data_filter in margin_pixel_filter.groupby(["partition_order", "partition_pixel"]): + data_filter = np.unique(data_filter["filter_value"]).tolist() + pixel = HealpixPixel(partition_key[0], partition_key[1]) + + filtered_data = data.iloc[data_filter] + _to_pixel_shard( + filtered_data=filtered_data, + pixel=pixel, margin_threshold=margin_threshold, output_path=output_path, ra_column=ra_column, dec_column=dec_column, + source_pixel=source_pixel, + fine_filtering=fine_filtering, ) MarginCachePlan.mapping_key_done(output_path, mapping_key) @@ -60,60 +83,61 @@ def map_pixel_shards( raise exception -def _to_pixel_shard(data, margin_threshold, output_path, ra_column, dec_column): +def _to_pixel_shard( + filtered_data, + pixel, + margin_threshold, + output_path, + ra_column, + dec_column, + source_pixel, + fine_filtering, +): """Do boundary checking for the cached partition and then output remaining data.""" - order, pix = data["partition_order"].iloc[0], data["partition_pixel"].iloc[0] - source_order, source_pix = data["Norder"].iloc[0], data["Npix"].iloc[0] - - data["margin_check"] = pixel_math.check_margin_bounds( - data[ra_column].values, data[dec_column].values, order, pix, margin_threshold - ) + if fine_filtering: + margin_check = pixel_math.check_margin_bounds( + filtered_data[ra_column].values, + filtered_data[dec_column].values, + pixel.order, + pixel.pixel, + margin_threshold, + ) - # pylint: disable-next=singleton-comparison - margin_data = data.loc[data["margin_check"] == True] + margin_data = filtered_data.iloc[margin_check] + else: + margin_data = filtered_data if len(margin_data): # generate a file name for our margin shard, that uses both sets of Norder/Npix - partition_dir = get_pixel_cache_directory(output_path, HealpixPixel(order, pix)) - shard_dir = paths.pixel_directory(partition_dir, source_order, source_pix) + partition_dir = get_pixel_cache_directory(output_path, pixel) + shard_dir = paths.pixel_directory(partition_dir, source_pixel.order, source_pixel.pixel) file_io.make_directory(shard_dir, exist_ok=True) - shard_path = paths.pixel_catalog_file(partition_dir, source_order, source_pix) - - final_df = margin_data.drop( - columns=[ - "margin_check", - "margin_pixel", - ] - ) + shard_path = paths.pixel_catalog_file(partition_dir, source_pixel.order, source_pixel.pixel) rename_columns = { PartitionInfo.METADATA_ORDER_COLUMN_NAME: f"margin_{PartitionInfo.METADATA_ORDER_COLUMN_NAME}", PartitionInfo.METADATA_DIR_COLUMN_NAME: f"margin_{PartitionInfo.METADATA_DIR_COLUMN_NAME}", PartitionInfo.METADATA_PIXEL_COLUMN_NAME: f"margin_{PartitionInfo.METADATA_PIXEL_COLUMN_NAME}", - "partition_order": PartitionInfo.METADATA_ORDER_COLUMN_NAME, - "partition_pixel": PartitionInfo.METADATA_PIXEL_COLUMN_NAME, } - final_df.rename(columns=rename_columns, inplace=True) + margin_data = margin_data.rename(columns=rename_columns) - dir_column = np.floor_divide(final_df[PartitionInfo.METADATA_PIXEL_COLUMN_NAME].values, 10000) * 10000 + margin_data[PartitionInfo.METADATA_ORDER_COLUMN_NAME] = pixel.order + margin_data[PartitionInfo.METADATA_DIR_COLUMN_NAME] = pixel.dir + margin_data[PartitionInfo.METADATA_PIXEL_COLUMN_NAME] = pixel.pixel - final_df[PartitionInfo.METADATA_DIR_COLUMN_NAME] = dir_column - - final_df = final_df.astype( + margin_data = margin_data.astype( { PartitionInfo.METADATA_ORDER_COLUMN_NAME: np.uint8, PartitionInfo.METADATA_DIR_COLUMN_NAME: np.uint64, PartitionInfo.METADATA_PIXEL_COLUMN_NAME: np.uint64, } ) - final_df = final_df.set_index(HIPSCAT_ID_COLUMN).sort_index() - - final_df.to_parquet(shard_path) + margin_data = margin_data.sort_index() - del data, margin_data, final_df + margin_data.to_parquet(shard_path) def reduce_margin_shards( diff --git a/src/hipscat_import/margin_cache/margin_cache_resume_plan.py b/src/hipscat_import/margin_cache/margin_cache_resume_plan.py index 9a6274df..aa0fc683 100644 --- a/src/hipscat_import/margin_cache/margin_cache_resume_plan.py +++ b/src/hipscat_import/margin_cache/margin_cache_resume_plan.py @@ -55,7 +55,7 @@ def _gather_plan(self, args): self.margin_pair_file = file_io.append_paths_to_pointer(self.tmp_path, self.MARGIN_PAIR_FILE) if not file_io.does_file_or_directory_exist(self.margin_pair_file): margin_pairs = _find_partition_margin_pixel_pairs(self.combined_pixels, args.margin_order) - margin_pairs.to_csv(self.margin_pair_file) + margin_pairs.to_csv(self.margin_pair_file, index=False) step_progress.update(1) file_io.make_directory( @@ -167,5 +167,5 @@ def _find_partition_margin_pixel_pairs(combined_pixels, margin_order): margin_pairs_df = pd.DataFrame( zip(norders, part_pix, margin_pix), columns=["partition_order", "partition_pixel", "margin_pixel"], - ) + ).sort_values("margin_pixel") return margin_pairs_df diff --git a/tests/hipscat_import/conftest.py b/tests/hipscat_import/conftest.py index f1e714f3..1cd8cbf2 100644 --- a/tests/hipscat_import/conftest.py +++ b/tests/hipscat_import/conftest.py @@ -4,7 +4,6 @@ import re from pathlib import Path -import hipscat.pixel_math.healpix_shim as hp import numpy as np import numpy.testing as npt import pandas as pd @@ -170,33 +169,20 @@ def resume_dir(test_data_dir): def basic_data_shard_df(): ras = np.arange(0.0, 360.0) dec = np.full(360, 0.0) - ppix = np.full(360, 21) - porder = np.full(360, 1) norder = np.full(360, 1) npix = np.full(360, 0) hipscat_indexes = pixel_math.compute_hipscat_id(ras, dec) test_df = pd.DataFrame( - data=zip(hipscat_indexes, ras, dec, ppix, porder, norder, npix), + data=zip(hipscat_indexes, ras, dec, norder, npix), columns=[ "_hipscat_index", "weird_ra", "weird_dec", - "partition_pixel", - "partition_order", "Norder", "Npix", ], ) - - test_df["margin_pixel"] = hp.ang2pix( - 2**3, - test_df["weird_ra"].values, - test_df["weird_dec"].values, - lonlat=True, - nest=True, - ) - return test_df @@ -204,33 +190,21 @@ def basic_data_shard_df(): def polar_data_shard_df(): ras = np.arange(0.0, 360.0) dec = np.full(360, 89.9) - ppix = np.full(360, 15) - porder = np.full(360, 2) norder = np.full(360, 2) npix = np.full(360, 0) hipscat_indexes = pixel_math.compute_hipscat_id(ras, dec) test_df = pd.DataFrame( - data=zip(hipscat_indexes, ras, dec, ppix, porder, norder, npix), + data=zip(hipscat_indexes, ras, dec, norder, npix), columns=[ "_hipscat_index", "weird_ra", "weird_dec", - "partition_pixel", - "partition_order", "Norder", "Npix", ], ) - test_df["margin_pixel"] = hp.ang2pix( - 2**3, - test_df["weird_ra"].values, - test_df["weird_dec"].values, - lonlat=True, - nest=True, - ) - return test_df diff --git a/tests/hipscat_import/data/margin_pairs/negative_pairs.csv b/tests/hipscat_import/data/margin_pairs/negative_pairs.csv new file mode 100644 index 00000000..94590616 --- /dev/null +++ b/tests/hipscat_import/data/margin_pairs/negative_pairs.csv @@ -0,0 +1,537 @@ +partition_order,partition_pixel,margin_pixel +0,5,0 +0,8,0 +0,4,0 +0,5,1 +0,4,2 +0,5,4 +0,5,5 +0,4,8 +0,4,10 +0,5,16 +0,5,17 +0,5,20 +0,5,21 +0,1,21 +0,1,23 +0,1,29 +0,1,31 +0,4,32 +0,4,34 +0,4,40 +0,3,42 +0,4,42 +0,3,43 +0,3,46 +0,3,47 +0,1,53 +0,1,55 +0,3,58 +0,3,59 +0,1,61 +0,3,62 +0,2,63 +0,3,63 +0,1,63 +0,5,64 +0,9,64 +0,6,64 +0,6,65 +0,5,66 +0,6,68 +0,6,69 +0,5,72 +0,5,74 +0,6,80 +0,6,81 +0,6,84 +0,6,85 +0,2,85 +0,2,87 +0,2,93 +0,2,95 +0,5,96 +0,5,98 +0,5,104 +0,0,106 +0,5,106 +0,0,107 +0,0,110 +0,0,111 +0,2,117 +0,2,119 +0,0,122 +0,0,123 +0,2,125 +0,0,126 +0,0,127 +0,3,127 +0,2,127 +0,7,128 +0,10,128 +0,6,128 +0,7,129 +0,6,130 +0,7,132 +0,7,133 +0,6,136 +0,6,138 +0,7,144 +0,7,145 +0,7,148 +0,3,149 +0,7,149 +0,3,151 +0,3,157 +0,3,159 +0,6,160 +0,6,162 +0,6,168 +0,6,170 +0,1,170 +0,1,171 +0,1,174 +0,1,175 +0,3,181 +0,3,183 +0,1,186 +0,1,187 +0,3,189 +0,1,190 +0,3,191 +0,0,191 +0,1,191 +0,7,192 +0,4,192 +1,47,192 +0,4,193 +0,7,194 +0,4,196 +0,4,197 +0,7,200 +0,7,202 +0,4,208 +0,4,209 +0,4,212 +0,0,213 +0,4,213 +0,0,215 +0,0,221 +0,0,223 +0,7,224 +0,7,226 +0,7,232 +0,2,234 +0,7,234 +0,2,235 +0,2,238 +0,2,239 +0,0,245 +0,0,247 +0,2,250 +0,2,251 +0,0,253 +0,2,254 +0,2,255 +0,1,255 +0,0,255 +2,181,256 +0,8,256 +0,8,257 +2,183,258 +2,181,258 +0,8,260 +0,8,261 +2,181,264 +2,183,264 +1,47,266 +2,183,266 +0,8,272 +0,8,273 +0,8,276 +0,0,277 +0,5,277 +0,8,277 +0,0,279 +0,0,285 +0,0,287 +1,47,288 +2,183,288 +1,47,290 +1,47,296 +0,3,298 +0,7,298 +1,47,298 +0,3,299 +0,3,302 +0,3,303 +0,0,309 +0,0,311 +0,3,314 +0,3,315 +0,0,317 +0,3,318 +0,0,319 +0,3,319 +0,9,320 +0,8,320 +0,9,321 +0,8,322 +0,9,324 +0,9,325 +0,8,328 +0,8,330 +0,9,336 +0,9,337 +0,9,340 +0,9,341 +0,1,341 +0,6,341 +0,1,343 +0,1,349 +0,1,351 +0,8,352 +0,8,354 +0,8,360 +0,0,362 +0,4,362 +0,8,362 +0,0,363 +0,0,366 +0,0,367 +0,1,373 +0,1,375 +0,0,378 +0,0,379 +0,1,381 +0,0,382 +0,1,383 +0,0,383 +0,9,384 +0,10,384 +0,10,385 +0,9,386 +0,10,388 +0,10,389 +0,9,392 +0,9,394 +0,10,400 +0,10,401 +0,10,404 +0,10,405 +0,7,405 +0,2,405 +0,2,407 +0,2,413 +0,2,415 +0,9,416 +0,9,418 +0,9,424 +0,5,426 +0,1,426 +0,9,426 +0,1,427 +0,1,430 +0,1,431 +0,2,437 +0,2,439 +0,1,442 +0,1,443 +0,2,445 +0,1,446 +0,2,447 +0,1,447 +0,10,448 +2,186,448 +2,186,449 +2,187,449 +0,10,450 +2,186,452 +2,187,452 +2,187,453 +1,47,453 +0,10,456 +0,10,458 +2,187,464 +1,47,464 +1,47,465 +1,47,468 +1,47,469 +0,4,469 +0,3,469 +0,3,471 +0,3,477 +0,3,479 +0,10,480 +0,10,482 +0,10,488 +0,2,490 +0,10,490 +0,6,490 +0,2,491 +0,2,494 +0,2,495 +0,3,501 +0,3,503 +0,2,506 +0,2,507 +0,3,509 +0,2,510 +0,2,511 +0,3,511 +0,9,512 +0,10,512 +2,176,512 +0,9,513 +2,177,514 +2,176,514 +0,9,516 +0,9,517 +2,177,520 +2,176,520 +2,177,522 +2,180,522 +0,9,528 +0,9,529 +0,9,532 +0,5,533 +0,9,533 +0,5,535 +0,5,541 +0,5,543 +2,180,544 +2,177,544 +2,181,546 +2,180,546 +2,180,552 +2,181,552 +2,181,554 +0,4,554 +0,4,555 +0,4,558 +0,4,559 +0,5,565 +0,5,567 +0,4,570 +0,4,571 +0,5,573 +0,4,574 +0,4,575 +0,5,575 +0,0,575 +2,176,576 +0,8,576 +0,10,576 +0,10,577 +0,8,578 +0,10,580 +0,10,581 +0,8,584 +0,8,586 +0,10,592 +0,10,593 +0,10,596 +0,10,597 +0,6,597 +0,6,599 +0,6,605 +0,6,607 +0,8,608 +0,8,610 +0,8,616 +0,5,618 +0,8,618 +0,5,619 +0,5,622 +0,5,623 +0,6,629 +0,6,631 +0,5,634 +0,5,635 +0,6,637 +0,5,638 +0,5,639 +0,6,639 +0,1,639 +0,8,640 +2,176,640 +0,9,640 +2,178,641 +2,176,641 +0,9,642 +2,178,644 +2,176,644 +2,178,645 +2,184,645 +0,9,648 +0,9,650 +2,178,656 +2,184,656 +2,184,657 +2,186,657 +2,186,660 +2,184,660 +0,7,661 +2,186,661 +0,7,663 +0,7,669 +0,7,671 +0,9,672 +0,9,674 +0,9,680 +0,9,682 +0,6,682 +0,6,683 +0,6,686 +0,6,687 +0,7,693 +0,7,695 +0,6,698 +0,6,699 +0,7,701 +0,6,702 +0,2,703 +0,6,703 +0,7,703 +0,8,704 +0,9,704 +0,10,704 +2,177,705 +0,8,705 +2,178,706 +0,10,706 +2,179,707 +2,177,707 +2,178,707 +0,8,708 +2,176,708 +2,180,709 +0,8,709 +2,178,710 +2,176,710 +2,179,710 +2,182,711 +2,179,711 +2,180,711 +0,10,712 +2,176,712 +2,177,713 +2,179,713 +2,176,713 +0,10,714 +2,184,714 +2,185,715 +2,179,715 +2,184,715 +2,177,716 +2,176,716 +2,178,716 +2,180,717 +2,177,717 +2,182,717 +2,184,718 +2,185,718 +2,178,718 +2,182,719 +1,47,719 +2,185,719 +0,8,720 +2,177,720 +0,8,721 +2,181,721 +2,177,722 +2,182,722 +2,179,722 +2,181,723 +2,183,723 +2,182,723 +2,180,724 +0,8,724 +0,8,725 +0,4,725 +2,182,726 +2,183,726 +2,180,726 +2,183,727 +0,4,727 +2,179,728 +2,180,728 +2,177,728 +2,180,729 +2,181,729 +2,183,729 +1,47,730 +2,179,730 +2,185,730 +1,47,731 +2,183,731 +2,181,732 +2,182,732 +2,180,732 +2,181,733 +0,4,733 +2,182,734 +1,47,734 +0,4,735 +1,47,735 +2,178,736 +0,10,736 +2,178,737 +2,179,737 +2,185,737 +2,186,738 +0,10,738 +2,187,739 +2,186,739 +2,185,739 +2,179,740 +2,184,740 +2,178,740 +2,179,741 +2,182,741 +1,47,741 +2,186,742 +2,187,742 +2,184,742 +2,187,743 +1,47,743 +2,184,744 +0,10,744 +2,184,745 +2,185,745 +2,187,745 +0,7,746 +0,10,746 +0,7,747 +2,187,747 +2,185,748 +2,184,748 +2,186,748 +2,185,749 +1,47,749 +2,186,750 +0,7,750 +0,7,751 +1,47,751 +2,185,752 +2,182,752 +2,179,752 +2,182,753 +2,183,753 +2,185,754 +2,187,754 +2,183,756 +2,182,756 +2,183,757 +0,4,757 +0,4,759 +2,185,760 +2,187,760 +2,187,762 +0,7,762 +0,7,763 +0,4,765 +0,7,766 +0,4,767 +0,7,767 +0,3,767 diff --git a/tests/hipscat_import/data/margin_pairs/small_sky_source_pairs.csv b/tests/hipscat_import/data/margin_pairs/small_sky_source_pairs.csv new file mode 100644 index 00000000..e85ea4f7 --- /dev/null +++ b/tests/hipscat_import/data/margin_pairs/small_sky_source_pairs.csv @@ -0,0 +1,197 @@ +partition_order,partition_pixel,margin_pixel +0,4,0 +0,4,2 +0,4,8 +0,4,10 +0,4,32 +0,4,34 +0,4,40 +0,4,42 +1,47,192 +0,4,192 +0,4,193 +0,4,196 +0,4,197 +0,4,208 +0,4,209 +0,4,212 +0,4,213 +2,181,256 +2,181,258 +2,183,258 +2,181,264 +2,183,264 +1,47,266 +2,183,266 +2,183,288 +1,47,288 +1,47,290 +1,47,296 +1,47,298 +0,4,362 +2,186,448 +2,187,449 +2,186,449 +2,186,452 +2,187,452 +1,47,453 +2,187,453 +1,47,464 +2,187,464 +1,47,465 +1,47,468 +1,47,469 +0,4,469 +2,176,512 +2,176,514 +2,177,514 +2,177,520 +2,176,520 +2,180,522 +2,177,522 +2,180,544 +2,177,544 +2,180,546 +2,181,546 +2,181,552 +2,180,552 +0,4,554 +2,181,554 +0,4,555 +0,4,558 +0,4,559 +0,4,570 +0,4,571 +0,4,574 +0,4,575 +2,176,576 +2,176,640 +2,178,641 +2,176,641 +2,178,644 +2,176,644 +2,184,645 +2,178,645 +2,178,656 +2,184,656 +2,186,657 +2,184,657 +2,184,660 +2,186,660 +2,186,661 +2,177,705 +2,178,706 +2,178,707 +2,179,707 +2,177,707 +2,176,708 +2,180,709 +2,176,710 +2,179,710 +2,178,710 +2,180,711 +2,179,711 +2,182,711 +2,176,712 +2,179,713 +2,176,713 +2,177,713 +2,184,714 +2,179,715 +2,185,715 +2,184,715 +2,178,716 +2,176,716 +2,177,716 +2,180,717 +2,177,717 +2,182,717 +2,178,718 +2,185,718 +2,184,718 +2,182,719 +1,47,719 +2,185,719 +2,177,720 +2,181,721 +2,182,722 +2,179,722 +2,177,722 +2,182,723 +2,183,723 +2,181,723 +2,180,724 +0,4,725 +2,183,726 +2,180,726 +2,182,726 +2,183,727 +0,4,727 +2,179,728 +2,180,728 +2,177,728 +2,183,729 +2,180,729 +2,181,729 +2,185,730 +2,179,730 +1,47,730 +2,183,731 +1,47,731 +2,182,732 +2,180,732 +2,181,732 +2,181,733 +0,4,733 +1,47,734 +2,182,734 +0,4,735 +1,47,735 +2,178,736 +2,178,737 +2,185,737 +2,179,737 +2,186,738 +2,186,739 +2,185,739 +2,187,739 +2,178,740 +2,179,740 +2,184,740 +1,47,741 +2,179,741 +2,182,741 +2,184,742 +2,187,742 +2,186,742 +2,187,743 +1,47,743 +2,184,744 +2,187,745 +2,184,745 +2,185,745 +2,187,747 +2,184,748 +2,185,748 +2,186,748 +2,185,749 +1,47,749 +2,186,750 +1,47,751 +2,185,752 +2,182,752 +2,179,752 +2,182,753 +2,183,753 +2,187,754 +2,185,754 +2,183,756 +2,182,756 +0,4,757 +2,183,757 +0,4,759 +2,185,760 +2,187,760 +2,187,762 +0,4,765 +0,4,767 diff --git a/tests/hipscat_import/margin_cache/test_margin_cache.py b/tests/hipscat_import/margin_cache/test_margin_cache.py index 9ac5745d..d4582250 100644 --- a/tests/hipscat_import/margin_cache/test_margin_cache.py +++ b/tests/hipscat_import/margin_cache/test_margin_cache.py @@ -83,6 +83,7 @@ def test_margin_cache_gen_negative_pixels(small_sky_source_catalog, tmp_path, da output_artifact_name="catalog_cache", margin_order=4, progress_bar=False, + fine_filtering=False, ) assert args.catalog.catalog_info.ra_column == "source_ra" diff --git a/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py b/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py index 67b426d2..ab234f34 100644 --- a/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py +++ b/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py @@ -40,10 +40,13 @@ def validate_result_dataframe(df_path, expected_len): def test_to_pixel_shard_equator(tmp_path, basic_data_shard_df): margin_cache_map_reduce._to_pixel_shard( basic_data_shard_df, + pixel=HealpixPixel(1, 21), margin_threshold=360.0, output_path=tmp_path, ra_column="weird_ra", dec_column="weird_dec", + source_pixel=HealpixPixel(1, 0), + fine_filtering=True, ) path = tmp_path / "order_1" / "dir_0" / "pixel_21" / "Norder=1" / "Dir=0" / "Npix=0.parquet" @@ -57,10 +60,13 @@ def test_to_pixel_shard_equator(tmp_path, basic_data_shard_df): def test_to_pixel_shard_polar(tmp_path, polar_data_shard_df): margin_cache_map_reduce._to_pixel_shard( polar_data_shard_df, + pixel=HealpixPixel(2, 15), margin_threshold=360.0, output_path=tmp_path, ra_column="weird_ra", dec_column="weird_dec", + source_pixel=HealpixPixel(2, 0), + fine_filtering=True, ) path = tmp_path / "order_2" / "dir_0" / "pixel_15" / "Norder=2" / "Dir=0" / "Npix=0.parquet" @@ -85,12 +91,73 @@ def test_map_pixel_shards_error(tmp_path, capsys): margin_order=4, ra_column="ra", dec_column="dec", + fine_filtering=True, ) captured = capsys.readouterr() assert "No such file or directory" in captured.out +@pytest.mark.timeout(15) +def test_map_pixel_shards_fine(tmp_path, test_data_dir, small_sky_source_catalog): + """Test basic mapping behavior, with fine filtering enabled.""" + intermediate_dir = tmp_path / "intermediate" + os.makedirs(intermediate_dir / "mapping") + margin_cache_map_reduce.map_pixel_shards( + small_sky_source_catalog / "Norder=1" / "Dir=0" / "Npix=47.parquet", + mapping_key="1_47", + input_storage_options=None, + original_catalog_metadata=small_sky_source_catalog / "_common_metadata", + margin_pair_file=test_data_dir / "margin_pairs" / "small_sky_source_pairs.csv", + margin_threshold=3600, + output_path=intermediate_dir, + margin_order=3, + ra_column="source_ra", + dec_column="source_dec", + fine_filtering=True, + ) + + path = intermediate_dir / "order_2" / "dir_0" / "pixel_182" / "Norder=1" / "Dir=0" / "Npix=47.parquet" + assert os.path.exists(path) + res_df = pd.read_parquet(path) + assert len(res_df) == 107 + + path = intermediate_dir / "order_2" / "dir_0" / "pixel_185" / "Norder=1" / "Dir=0" / "Npix=47.parquet" + assert os.path.exists(path) + res_df = pd.read_parquet(path) + assert len(res_df) == 37 + + +@pytest.mark.timeout(15) +def test_map_pixel_shards_coarse(tmp_path, test_data_dir, small_sky_source_catalog): + """Test basic mapping behavior, without fine filtering enabled.""" + intermediate_dir = tmp_path / "intermediate" + os.makedirs(intermediate_dir / "mapping") + margin_cache_map_reduce.map_pixel_shards( + small_sky_source_catalog / "Norder=1" / "Dir=0" / "Npix=47.parquet", + mapping_key="1_47", + input_storage_options=None, + original_catalog_metadata=small_sky_source_catalog / "_common_metadata", + margin_pair_file=test_data_dir / "margin_pairs" / "small_sky_source_pairs.csv", + margin_threshold=3600, + output_path=intermediate_dir, + margin_order=3, + ra_column="source_ra", + dec_column="source_dec", + fine_filtering=False, + ) + + path = intermediate_dir / "order_2" / "dir_0" / "pixel_182" / "Norder=1" / "Dir=0" / "Npix=47.parquet" + assert os.path.exists(path) + res_df = pd.read_parquet(path) + assert len(res_df) == 1386 + + path = intermediate_dir / "order_2" / "dir_0" / "pixel_185" / "Norder=1" / "Dir=0" / "Npix=47.parquet" + assert os.path.exists(path) + res_df = pd.read_parquet(path) + assert len(res_df) == 1978 + + def test_reduce_margin_shards(tmp_path): intermediate_dir = tmp_path / "intermediate" partition_dir = get_pixel_cache_directory(intermediate_dir, HealpixPixel(1, 21)) diff --git a/tests/hipscat_import/margin_cache/test_margin_cache_resume_plan.py b/tests/hipscat_import/margin_cache/test_margin_cache_resume_plan.py index baeffba1..e25e781a 100644 --- a/tests/hipscat_import/margin_cache/test_margin_cache_resume_plan.py +++ b/tests/hipscat_import/margin_cache/test_margin_cache_resume_plan.py @@ -103,7 +103,7 @@ def test_partition_margin_pixel_pairs(small_sky_source_catalog): source_catalog = Catalog.read_from_hipscat(small_sky_source_catalog) margin_pairs = _find_partition_margin_pixel_pairs(source_catalog.get_healpix_pixels(), 3) - expected = np.array([725, 733, 757, 765, 727, 735, 759, 767, 469, 192]) + expected = np.array([0, 2, 8, 10, 32, 34, 40, 42, 192, 192]) npt.assert_array_equal(margin_pairs.iloc[:10]["margin_pixel"], expected) assert len(margin_pairs) == 196 @@ -121,9 +121,9 @@ def test_partition_margin_pixel_pairs_negative(small_sky_source_catalog): expected_order = 0 expected_pixel = 10 - expected = np.array([490, 704, 712, 736, 744, 706, 714, 738, 746, 512]) + expected = np.array([760, 760, 762, 762, 763, 765, 766, 767, 767, 767]) - assert margin_pairs.iloc[-1]["partition_order"] == expected_order - assert margin_pairs.iloc[-1]["partition_pixel"] == expected_pixel + assert margin_pairs.iloc[218]["partition_order"] == expected_order + assert margin_pairs.iloc[218]["partition_pixel"] == expected_pixel npt.assert_array_equal(margin_pairs.iloc[-10:]["margin_pixel"], expected) assert len(margin_pairs) == 536