Skip to content

Commit

Permalink
Remove margin fine filtering, and healpy dependency. (#442)
Browse files Browse the repository at this point in the history
* Remove margin fine filtering. (#435)

* Use different methods for healpix manipulation. (#436)

* Remove margin fine filtering.

* Use different methods for healpix manipulation.

* Optional dependency for older healpy fits files.

* Allow import of map types. (#437)

* Accomodate changes in cds-healpix.

* Use radec2pix for healpix pixel calculation. (#438)

* Remove default healpy dependency. (#440)

* Clean up pre-commit
  • Loading branch information
delucchi-cmu authored Nov 26, 2024
1 parent d928c82 commit e5b02fa
Show file tree
Hide file tree
Showing 12 changed files with 54 additions and 59 deletions.
3 changes: 2 additions & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ need to install or upgrade versions of dependencies to work with hats-import.
.. tip::
Installing on Mac

``healpy`` is a very necessary dependency for hats libraries at this time, but
``healpy`` is an optional dependency for hats-import (included in the ``full`` extra)
to support converting from older HiPSCat catalogs, but
native prebuilt binaries for healpy on Apple Silicon Macs
`do not yet exist <https://healpy.readthedocs.io/en/latest/install.html#binary-installation-with-pip-recommended-for-most-other-python-users>`_,
so it's recommended to install via conda before proceeding to hats-import.
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ full = [
"fsspec[full]", # complete file system specs.
"ipykernel", # Support for Jupyter notebooks
"ipywidgets", # useful for tqdm in notebooks.
"healpy", # used only in hipscat conversion
]

[build-system]
Expand Down
16 changes: 6 additions & 10 deletions src/hats_import/catalog/map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,16 @@ def _iterate_input_file(
else:
# Set up the pixel data
if isinstance(data, pd.DataFrame):
mapped_pixels = hp.ang2pix(
2**highest_order,
mapped_pixels = hp.radec2pix(
highest_order,
data[ra_column].to_numpy(copy=False, dtype=float),
data[dec_column].to_numpy(copy=False, dtype=float),
lonlat=True,
nest=True,
)
else:
mapped_pixels = hp.ang2pix(
2**highest_order,
mapped_pixels = hp.radec2pix(
highest_order,
data[ra_column].to_numpy(),
data[dec_column].to_numpy(),
lonlat=True,
nest=True,
)
yield chunk_number, data, mapped_pixels

Expand Down Expand Up @@ -299,8 +295,8 @@ def reduce_pixel_shards(
SPATIAL_INDEX_COLUMN,
[
pixel_math.compute_spatial_index(
merged_table[ra_column].to_numpy(),
merged_table[dec_column].to_numpy(),
merged_table[ra_column].to_numpy().astype(np.float64),
merged_table[dec_column].to_numpy().astype(np.float64),
)
],
).sort_by(SPATIAL_INDEX_COLUMN)
Expand Down
21 changes: 9 additions & 12 deletions src/hats_import/hipscat_conversion/run_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import tempfile
from typing import no_type_check

import hats.pixel_math.healpix_shim as hp
import hats.pixel_math.healpix_shim as healpix
import numpy as np
import pyarrow.parquet as pq
from dask.distributed import as_completed, get_worker
Expand Down Expand Up @@ -128,12 +128,10 @@ def _convert_partition_file(pixel, args, schema, ra_column, dec_column):
0,
"_healpix_29",
[
hp.ang2pix(
2**29,
healpix.radec2pix(
29,
table[ra_column].to_numpy(),
table[dec_column].to_numpy(),
nest=True,
lonlat=True,
)
],
)
Expand All @@ -155,7 +153,11 @@ def _convert_partition_file(pixel, args, schema, ra_column, dec_column):
raise exception


# pylint: disable=import-outside-toplevel
def _write_nested_fits_map(input_dir, output_dir):
# Healpy is an optional dependency, used only for reads of legacy fits files.
import healpy as hp

input_file = input_dir / "point_map.fits"
if not input_file.exists():
return
Expand All @@ -169,11 +171,6 @@ def _write_nested_fits_map(input_dir, output_dir):
map_fits_image = hp.read_map(_tmp_file.name)
else:
map_fits_image = map_fits_image[0]
map_fits_image = map_fits_image.astype(np.int32)

output_file = output_dir / "point_map.fits"
with tempfile.NamedTemporaryFile() as _tmp_file:
with output_file.open("wb") as _map_file:
hp.write_map(
_tmp_file.name, map_fits_image, overwrite=True, dtype=np.int32, nest=True, coord="CEL"
)
_map_file.write(_tmp_file.read())
file_io.write_fits_image(map_fits_image, output_dir / "point_map.fits")
9 changes: 5 additions & 4 deletions src/hats_import/margin_cache/margin_cache_arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class MarginCacheArguments(RuntimeArguments):
order of healpix partitioning in the source catalog. if `margin_order` is left
default or set to -1, then the `margin_order` will be set dynamically to the
highest partition order plus 1."""
fine_filtering: bool = True
fine_filtering: bool = False
"""should we perform the precise boundary checking? if false, some results may be
greater than `margin_threshold` away from the border (but within `margin_order`)."""

Expand All @@ -54,6 +54,9 @@ def _check_arguments(self):
if len(self.catalog.get_healpix_pixels()) == 0:
raise ValueError("debug_filter_pixel_list has created empty catalog")

if self.fine_filtering:
raise NotImplementedError("Fine filtering temporarily removed.")

highest_order = int(self.catalog.partition_info.get_highest_order())

if self.margin_order < 0:
Expand All @@ -64,9 +67,7 @@ def _check_arguments(self):
"margin_order must be of a higher order than the highest order catalog partition pixel."
)

margin_pixel_nside = hp.order2nside(self.margin_order)
margin_pixel_avgsize = hp.nside2resol(margin_pixel_nside, arcmin=True)
margin_pixel_mindist = hp.avgsize2mindist(margin_pixel_avgsize)
margin_pixel_mindist = hp.order2mindist(self.margin_order)
if margin_pixel_mindist * 60.0 < self.margin_threshold:
raise ValueError("margin pixels must be larger than margin_threshold")

Expand Down
26 changes: 8 additions & 18 deletions src/hats_import/margin_cache/margin_cache_map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@
import pandas as pd
import pyarrow as pa
import pyarrow.dataset as ds
from hats import pixel_math
from hats.io import file_io, paths
from hats.pixel_math.healpix_pixel import HealpixPixel

from hats_import.margin_cache.margin_cache_resume_plan import MarginCachePlan
from hats_import.pipeline_resume_plan import get_pixel_cache_directory, print_task_failure


# pylint: disable=too-many-arguments
# pylint: disable=too-many-arguments, unused-argument
def map_pixel_shards(
partition_file,
mapping_key,
Expand All @@ -26,6 +25,9 @@ def map_pixel_shards(
):
"""Creates margin cache shards from a source partition file."""
try:
if fine_filtering:
raise NotImplementedError("Fine filtering temporarily removed.")

schema = file_io.read_parquet_metadata(original_catalog_metadata).schema.to_arrow_schema()
data = file_io.read_parquet_file_to_pandas(partition_file, schema=schema)
source_pixel = HealpixPixel(data["Norder"].iloc[0], data["Npix"].iloc[0])
Expand All @@ -41,12 +43,10 @@ def map_pixel_shards(
f"margin_pixel >= {margin_pixel_range_start} and margin_pixel < {margin_pixel_range_end}"
)

margin_pixel_list = hp.ang2pix(
2**margin_order,
margin_pixel_list = hp.radec2pix(
margin_order,
data[ra_column].values,
data[dec_column].values,
lonlat=True,
nest=True,
)
margin_pixel_filter = pd.DataFrame(
{"margin_pixel": margin_pixel_list, "filter_value": np.arange(0, len(margin_pixel_list))}
Expand Down Expand Up @@ -78,6 +78,7 @@ def map_pixel_shards(
raise exception


# pylint: disable=too-many-arguments, unused-argument
def _to_pixel_shard(
filtered_data,
pixel,
Expand All @@ -89,18 +90,7 @@ def _to_pixel_shard(
fine_filtering,
):
"""Do boundary checking for the cached partition and then output remaining data."""
if fine_filtering:
margin_check = pixel_math.check_margin_bounds(
filtered_data[ra_column].values,
filtered_data[dec_column].values,
pixel.order,
pixel.pixel,
margin_threshold,
)

margin_data = filtered_data.iloc[margin_check]
else:
margin_data = filtered_data
margin_data = filtered_data

num_rows = len(margin_data)
if num_rows:
Expand Down
5 changes: 2 additions & 3 deletions src/hats_import/soap/resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import hats.pixel_math.healpix_shim as hp
import numpy as np
from hats import read_hats
from hats import pixel_math, read_hats
from hats.catalog import Catalog
from hats.io import file_io
from hats.pixel_math.healpix_pixel import HealpixPixel
Expand Down Expand Up @@ -193,8 +193,7 @@ def source_to_object_map(object_catalog, source_catalog):

for source, objects in source_to_object.items():
# get all neighboring pixels
nside = hp.order2nside(source.order)
neighbors = hp.get_all_neighbours(nside, source.pixel, nest=True)
neighbors = pixel_math.get_margin(source.order, source.pixel, 0)

## get rid of -1s and normalize to max order
explosion_factor = 4 ** (max_order - source.order)
Expand Down
6 changes: 2 additions & 4 deletions tests/hats_import/catalog/test_map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,12 +436,10 @@ def test_reduce_with_sorting_complex(assert_parquet_file_ids, tmp_path):
file2_data.to_parquet(shard_dir / "file_2_shard_1.parquet")

combined_data = pd.concat([file1_data, file2_data])
combined_data["norder19_healpix"] = hp.ang2pix(
2**19,
combined_data["norder19_healpix"] = hp.radec2pix(
19,
combined_data["ra"].values,
combined_data["dec"].values,
lonlat=True,
nest=True,
)
## Use this to prune generated columns like Norder, Npix, and _healpix_29
comparison_columns = ["source_id", "object_id", "time", "ra", "dec"]
Expand Down
11 changes: 11 additions & 0 deletions tests/hats_import/hipscat_conversion/test_run_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ def test_bad_args():
runner.run(args, None)


# pylint: disable=unused-import
try:
import healpy as hp

HAVE_HEALPY = True
except ImportError:
HAVE_HEALPY = False


@pytest.mark.skipif(not HAVE_HEALPY, reason="healpy is not installed")
@pytest.mark.dask
def test_run_conversion_object(
test_data_dir,
Expand Down Expand Up @@ -88,6 +98,7 @@ def test_run_conversion_object(
assert data.index.name is None


@pytest.mark.skipif(not HAVE_HEALPY, reason="healpy is not installed")
@pytest.mark.dask
def test_run_conversion_source(
test_data_dir,
Expand Down
2 changes: 1 addition & 1 deletion tests/hats_import/margin_cache/test_margin_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def test_margin_cache_gen(small_sky_source_catalog, tmp_path, dask_client):

data = pd.read_parquet(test_file)

assert len(data) == 13
assert len(data) == 88

assert all(data[paths.PARTITION_ORDER] == norder)
assert all(data[paths.PARTITION_PIXEL] == npix)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def test_to_pixel_shard_equator(tmp_path, basic_data_shard_df):

assert os.path.exists(path)

validate_result_dataframe(path, 2)
validate_result_dataframe(path, 360)


@pytest.mark.timeout(5)
Expand All @@ -79,7 +79,7 @@ def test_to_pixel_shard_polar(tmp_path, polar_data_shard_df):
def test_map_pixel_shards_error(tmp_path, capsys):
"""Test error behavior on reduce stage. e.g. by not creating the original
catalog parquet files."""
with pytest.raises(FileNotFoundError):
with pytest.raises(NotImplementedError):
margin_cache_map_reduce.map_pixel_shards(
paths.pixel_catalog_file(tmp_path, HealpixPixel(1, 0)),
mapping_key="1_21",
Expand All @@ -94,9 +94,10 @@ def test_map_pixel_shards_error(tmp_path, capsys):
)

captured = capsys.readouterr()
assert "Parquet file does not exist" in captured.out
assert "Fine filtering temporarily removed" in captured.out


@pytest.mark.skip()
@pytest.mark.timeout(30)
def test_map_pixel_shards_fine(tmp_path, test_data_dir, small_sky_source_catalog):
"""Test basic mapping behavior, with fine filtering enabled."""
Expand Down Expand Up @@ -210,7 +211,7 @@ def test_reduce_margin_shards(tmp_path):
hats_indexes = pixel_math.compute_spatial_index(ras, dec)
margin_order = np.full(360, 0)
margin_dir = np.full(360, 0)
margin_pixels = hp.ang2pix(2**3, ras, dec, lonlat=True, nest=True)
margin_pixels = hp.radec2pix(3, ras, dec)

test_df = pd.DataFrame(
data=zip(hats_indexes, ras, dec, norder, ndir, npix, margin_order, margin_dir, margin_pixels),
Expand Down
4 changes: 2 additions & 2 deletions tests/hats_import/margin_cache/test_margin_round_trip.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def test_margin_import_gaia_minimum(

data = pd.read_parquet(test_file)

assert len(data) == 1
assert len(data) == 4


@pytest.mark.dask(timeout=180)
Expand Down Expand Up @@ -117,7 +117,7 @@ def test_margin_import_mixed_schema_csv(
catalog = read_hats(args.catalog_path)
assert catalog.on_disk
assert catalog.catalog_path == args.catalog_path
assert len(catalog.get_healpix_pixels()) == 5
assert len(catalog.get_healpix_pixels()) == 19

norder = 2
npix = 187
Expand Down

0 comments on commit e5b02fa

Please sign in to comment.