From e5b02faaed8e9999ef58932fc332d339d324c0dc Mon Sep 17 00:00:00 2001 From: Melissa DeLucchi <113376043+delucchi-cmu@users.noreply.github.com> Date: Tue, 26 Nov 2024 16:09:14 -0500 Subject: [PATCH] Remove margin fine filtering, and healpy dependency. (#442) * Remove margin fine filtering. (#435) * Use different methods for healpix manipulation. (#436) * Remove margin fine filtering. * Use different methods for healpix manipulation. * Optional dependency for older healpy fits files. * Allow import of map types. (#437) * Accomodate changes in cds-healpix. * Use radec2pix for healpix pixel calculation. (#438) * Remove default healpy dependency. (#440) * Clean up pre-commit --- docs/index.rst | 3 ++- pyproject.toml | 1 + src/hats_import/catalog/map_reduce.py | 16 +++++------- .../hipscat_conversion/run_conversion.py | 21 +++++++-------- .../margin_cache/margin_cache_arguments.py | 9 ++++--- .../margin_cache/margin_cache_map_reduce.py | 26 ++++++------------- src/hats_import/soap/resume_plan.py | 5 ++-- tests/hats_import/catalog/test_map_reduce.py | 6 ++--- .../hipscat_conversion/test_run_conversion.py | 11 ++++++++ .../margin_cache/test_margin_cache.py | 2 +- .../test_margin_cache_map_reduce.py | 9 ++++--- .../margin_cache/test_margin_round_trip.py | 4 +-- 12 files changed, 54 insertions(+), 59 deletions(-) diff --git a/docs/index.rst b/docs/index.rst index 2ec3e4fc..be9cf68a 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -28,7 +28,8 @@ need to install or upgrade versions of dependencies to work with hats-import. .. tip:: Installing on Mac - ``healpy`` is a very necessary dependency for hats libraries at this time, but + ``healpy`` is an optional dependency for hats-import (included in the ``full`` extra) + to support converting from older HiPSCat catalogs, but native prebuilt binaries for healpy on Apple Silicon Macs `do not yet exist `_, so it's recommended to install via conda before proceeding to hats-import. diff --git a/pyproject.toml b/pyproject.toml index c1a6fa01..9b168892 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,6 +42,7 @@ full = [ "fsspec[full]", # complete file system specs. "ipykernel", # Support for Jupyter notebooks "ipywidgets", # useful for tqdm in notebooks. + "healpy", # used only in hipscat conversion ] [build-system] diff --git a/src/hats_import/catalog/map_reduce.py b/src/hats_import/catalog/map_reduce.py index d8d15cfe..63d4743a 100644 --- a/src/hats_import/catalog/map_reduce.py +++ b/src/hats_import/catalog/map_reduce.py @@ -60,20 +60,16 @@ def _iterate_input_file( else: # Set up the pixel data if isinstance(data, pd.DataFrame): - mapped_pixels = hp.ang2pix( - 2**highest_order, + mapped_pixels = hp.radec2pix( + highest_order, data[ra_column].to_numpy(copy=False, dtype=float), data[dec_column].to_numpy(copy=False, dtype=float), - lonlat=True, - nest=True, ) else: - mapped_pixels = hp.ang2pix( - 2**highest_order, + mapped_pixels = hp.radec2pix( + highest_order, data[ra_column].to_numpy(), data[dec_column].to_numpy(), - lonlat=True, - nest=True, ) yield chunk_number, data, mapped_pixels @@ -299,8 +295,8 @@ def reduce_pixel_shards( SPATIAL_INDEX_COLUMN, [ pixel_math.compute_spatial_index( - merged_table[ra_column].to_numpy(), - merged_table[dec_column].to_numpy(), + merged_table[ra_column].to_numpy().astype(np.float64), + merged_table[dec_column].to_numpy().astype(np.float64), ) ], ).sort_by(SPATIAL_INDEX_COLUMN) diff --git a/src/hats_import/hipscat_conversion/run_conversion.py b/src/hats_import/hipscat_conversion/run_conversion.py index 0342f604..f863cf57 100644 --- a/src/hats_import/hipscat_conversion/run_conversion.py +++ b/src/hats_import/hipscat_conversion/run_conversion.py @@ -4,7 +4,7 @@ import tempfile from typing import no_type_check -import hats.pixel_math.healpix_shim as hp +import hats.pixel_math.healpix_shim as healpix import numpy as np import pyarrow.parquet as pq from dask.distributed import as_completed, get_worker @@ -128,12 +128,10 @@ def _convert_partition_file(pixel, args, schema, ra_column, dec_column): 0, "_healpix_29", [ - hp.ang2pix( - 2**29, + healpix.radec2pix( + 29, table[ra_column].to_numpy(), table[dec_column].to_numpy(), - nest=True, - lonlat=True, ) ], ) @@ -155,7 +153,11 @@ def _convert_partition_file(pixel, args, schema, ra_column, dec_column): raise exception +# pylint: disable=import-outside-toplevel def _write_nested_fits_map(input_dir, output_dir): + # Healpy is an optional dependency, used only for reads of legacy fits files. + import healpy as hp + input_file = input_dir / "point_map.fits" if not input_file.exists(): return @@ -169,11 +171,6 @@ def _write_nested_fits_map(input_dir, output_dir): map_fits_image = hp.read_map(_tmp_file.name) else: map_fits_image = map_fits_image[0] + map_fits_image = map_fits_image.astype(np.int32) - output_file = output_dir / "point_map.fits" - with tempfile.NamedTemporaryFile() as _tmp_file: - with output_file.open("wb") as _map_file: - hp.write_map( - _tmp_file.name, map_fits_image, overwrite=True, dtype=np.int32, nest=True, coord="CEL" - ) - _map_file.write(_tmp_file.read()) + file_io.write_fits_image(map_fits_image, output_dir / "point_map.fits") diff --git a/src/hats_import/margin_cache/margin_cache_arguments.py b/src/hats_import/margin_cache/margin_cache_arguments.py index a8d9ad56..b56a2a4e 100644 --- a/src/hats_import/margin_cache/margin_cache_arguments.py +++ b/src/hats_import/margin_cache/margin_cache_arguments.py @@ -28,7 +28,7 @@ class MarginCacheArguments(RuntimeArguments): order of healpix partitioning in the source catalog. if `margin_order` is left default or set to -1, then the `margin_order` will be set dynamically to the highest partition order plus 1.""" - fine_filtering: bool = True + fine_filtering: bool = False """should we perform the precise boundary checking? if false, some results may be greater than `margin_threshold` away from the border (but within `margin_order`).""" @@ -54,6 +54,9 @@ def _check_arguments(self): if len(self.catalog.get_healpix_pixels()) == 0: raise ValueError("debug_filter_pixel_list has created empty catalog") + if self.fine_filtering: + raise NotImplementedError("Fine filtering temporarily removed.") + highest_order = int(self.catalog.partition_info.get_highest_order()) if self.margin_order < 0: @@ -64,9 +67,7 @@ def _check_arguments(self): "margin_order must be of a higher order than the highest order catalog partition pixel." ) - margin_pixel_nside = hp.order2nside(self.margin_order) - margin_pixel_avgsize = hp.nside2resol(margin_pixel_nside, arcmin=True) - margin_pixel_mindist = hp.avgsize2mindist(margin_pixel_avgsize) + margin_pixel_mindist = hp.order2mindist(self.margin_order) if margin_pixel_mindist * 60.0 < self.margin_threshold: raise ValueError("margin pixels must be larger than margin_threshold") diff --git a/src/hats_import/margin_cache/margin_cache_map_reduce.py b/src/hats_import/margin_cache/margin_cache_map_reduce.py index 625046c1..52600fef 100644 --- a/src/hats_import/margin_cache/margin_cache_map_reduce.py +++ b/src/hats_import/margin_cache/margin_cache_map_reduce.py @@ -3,7 +3,6 @@ import pandas as pd import pyarrow as pa import pyarrow.dataset as ds -from hats import pixel_math from hats.io import file_io, paths from hats.pixel_math.healpix_pixel import HealpixPixel @@ -11,7 +10,7 @@ from hats_import.pipeline_resume_plan import get_pixel_cache_directory, print_task_failure -# pylint: disable=too-many-arguments +# pylint: disable=too-many-arguments, unused-argument def map_pixel_shards( partition_file, mapping_key, @@ -26,6 +25,9 @@ def map_pixel_shards( ): """Creates margin cache shards from a source partition file.""" try: + if fine_filtering: + raise NotImplementedError("Fine filtering temporarily removed.") + schema = file_io.read_parquet_metadata(original_catalog_metadata).schema.to_arrow_schema() data = file_io.read_parquet_file_to_pandas(partition_file, schema=schema) source_pixel = HealpixPixel(data["Norder"].iloc[0], data["Npix"].iloc[0]) @@ -41,12 +43,10 @@ def map_pixel_shards( f"margin_pixel >= {margin_pixel_range_start} and margin_pixel < {margin_pixel_range_end}" ) - margin_pixel_list = hp.ang2pix( - 2**margin_order, + margin_pixel_list = hp.radec2pix( + margin_order, data[ra_column].values, data[dec_column].values, - lonlat=True, - nest=True, ) margin_pixel_filter = pd.DataFrame( {"margin_pixel": margin_pixel_list, "filter_value": np.arange(0, len(margin_pixel_list))} @@ -78,6 +78,7 @@ def map_pixel_shards( raise exception +# pylint: disable=too-many-arguments, unused-argument def _to_pixel_shard( filtered_data, pixel, @@ -89,18 +90,7 @@ def _to_pixel_shard( fine_filtering, ): """Do boundary checking for the cached partition and then output remaining data.""" - if fine_filtering: - margin_check = pixel_math.check_margin_bounds( - filtered_data[ra_column].values, - filtered_data[dec_column].values, - pixel.order, - pixel.pixel, - margin_threshold, - ) - - margin_data = filtered_data.iloc[margin_check] - else: - margin_data = filtered_data + margin_data = filtered_data num_rows = len(margin_data) if num_rows: diff --git a/src/hats_import/soap/resume_plan.py b/src/hats_import/soap/resume_plan.py index e4447fc2..bef61335 100644 --- a/src/hats_import/soap/resume_plan.py +++ b/src/hats_import/soap/resume_plan.py @@ -7,7 +7,7 @@ import hats.pixel_math.healpix_shim as hp import numpy as np -from hats import read_hats +from hats import pixel_math, read_hats from hats.catalog import Catalog from hats.io import file_io from hats.pixel_math.healpix_pixel import HealpixPixel @@ -193,8 +193,7 @@ def source_to_object_map(object_catalog, source_catalog): for source, objects in source_to_object.items(): # get all neighboring pixels - nside = hp.order2nside(source.order) - neighbors = hp.get_all_neighbours(nside, source.pixel, nest=True) + neighbors = pixel_math.get_margin(source.order, source.pixel, 0) ## get rid of -1s and normalize to max order explosion_factor = 4 ** (max_order - source.order) diff --git a/tests/hats_import/catalog/test_map_reduce.py b/tests/hats_import/catalog/test_map_reduce.py index 2aeb2e8c..2c736079 100644 --- a/tests/hats_import/catalog/test_map_reduce.py +++ b/tests/hats_import/catalog/test_map_reduce.py @@ -436,12 +436,10 @@ def test_reduce_with_sorting_complex(assert_parquet_file_ids, tmp_path): file2_data.to_parquet(shard_dir / "file_2_shard_1.parquet") combined_data = pd.concat([file1_data, file2_data]) - combined_data["norder19_healpix"] = hp.ang2pix( - 2**19, + combined_data["norder19_healpix"] = hp.radec2pix( + 19, combined_data["ra"].values, combined_data["dec"].values, - lonlat=True, - nest=True, ) ## Use this to prune generated columns like Norder, Npix, and _healpix_29 comparison_columns = ["source_id", "object_id", "time", "ra", "dec"] diff --git a/tests/hats_import/hipscat_conversion/test_run_conversion.py b/tests/hats_import/hipscat_conversion/test_run_conversion.py index 1479a9e9..e0314e1d 100644 --- a/tests/hats_import/hipscat_conversion/test_run_conversion.py +++ b/tests/hats_import/hipscat_conversion/test_run_conversion.py @@ -24,6 +24,16 @@ def test_bad_args(): runner.run(args, None) +# pylint: disable=unused-import +try: + import healpy as hp + + HAVE_HEALPY = True +except ImportError: + HAVE_HEALPY = False + + +@pytest.mark.skipif(not HAVE_HEALPY, reason="healpy is not installed") @pytest.mark.dask def test_run_conversion_object( test_data_dir, @@ -88,6 +98,7 @@ def test_run_conversion_object( assert data.index.name is None +@pytest.mark.skipif(not HAVE_HEALPY, reason="healpy is not installed") @pytest.mark.dask def test_run_conversion_source( test_data_dir, diff --git a/tests/hats_import/margin_cache/test_margin_cache.py b/tests/hats_import/margin_cache/test_margin_cache.py index 7b480c36..3dba6f56 100644 --- a/tests/hats_import/margin_cache/test_margin_cache.py +++ b/tests/hats_import/margin_cache/test_margin_cache.py @@ -35,7 +35,7 @@ def test_margin_cache_gen(small_sky_source_catalog, tmp_path, dask_client): data = pd.read_parquet(test_file) - assert len(data) == 13 + assert len(data) == 88 assert all(data[paths.PARTITION_ORDER] == norder) assert all(data[paths.PARTITION_PIXEL] == npix) diff --git a/tests/hats_import/margin_cache/test_margin_cache_map_reduce.py b/tests/hats_import/margin_cache/test_margin_cache_map_reduce.py index dc610501..317e1366 100644 --- a/tests/hats_import/margin_cache/test_margin_cache_map_reduce.py +++ b/tests/hats_import/margin_cache/test_margin_cache_map_reduce.py @@ -53,7 +53,7 @@ def test_to_pixel_shard_equator(tmp_path, basic_data_shard_df): assert os.path.exists(path) - validate_result_dataframe(path, 2) + validate_result_dataframe(path, 360) @pytest.mark.timeout(5) @@ -79,7 +79,7 @@ def test_to_pixel_shard_polar(tmp_path, polar_data_shard_df): def test_map_pixel_shards_error(tmp_path, capsys): """Test error behavior on reduce stage. e.g. by not creating the original catalog parquet files.""" - with pytest.raises(FileNotFoundError): + with pytest.raises(NotImplementedError): margin_cache_map_reduce.map_pixel_shards( paths.pixel_catalog_file(tmp_path, HealpixPixel(1, 0)), mapping_key="1_21", @@ -94,9 +94,10 @@ def test_map_pixel_shards_error(tmp_path, capsys): ) captured = capsys.readouterr() - assert "Parquet file does not exist" in captured.out + assert "Fine filtering temporarily removed" in captured.out +@pytest.mark.skip() @pytest.mark.timeout(30) def test_map_pixel_shards_fine(tmp_path, test_data_dir, small_sky_source_catalog): """Test basic mapping behavior, with fine filtering enabled.""" @@ -210,7 +211,7 @@ def test_reduce_margin_shards(tmp_path): hats_indexes = pixel_math.compute_spatial_index(ras, dec) margin_order = np.full(360, 0) margin_dir = np.full(360, 0) - margin_pixels = hp.ang2pix(2**3, ras, dec, lonlat=True, nest=True) + margin_pixels = hp.radec2pix(3, ras, dec) test_df = pd.DataFrame( data=zip(hats_indexes, ras, dec, norder, ndir, npix, margin_order, margin_dir, margin_pixels), diff --git a/tests/hats_import/margin_cache/test_margin_round_trip.py b/tests/hats_import/margin_cache/test_margin_round_trip.py index 8e3e8fee..865327f5 100644 --- a/tests/hats_import/margin_cache/test_margin_round_trip.py +++ b/tests/hats_import/margin_cache/test_margin_round_trip.py @@ -72,7 +72,7 @@ def test_margin_import_gaia_minimum( data = pd.read_parquet(test_file) - assert len(data) == 1 + assert len(data) == 4 @pytest.mark.dask(timeout=180) @@ -117,7 +117,7 @@ def test_margin_import_mixed_schema_csv( catalog = read_hats(args.catalog_path) assert catalog.on_disk assert catalog.catalog_path == args.catalog_path - assert len(catalog.get_healpix_pixels()) == 5 + assert len(catalog.get_healpix_pixels()) == 19 norder = 2 npix = 187