diff --git a/pyproject.toml b/pyproject.toml index faf2018e..6688f4bb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,7 @@ dependencies = [ "dask[complete]>=2024.3.0", # Includes dask expressions. "deprecated", "healpy", + "healsparse", "hipscat >=0.3.0", "ipykernel", # Support for Jupyter notebooks "pandas", diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 00000000..6ea058d8 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +git+https://github.com/delucchi-cmu/healsparse.git@main \ No newline at end of file diff --git a/src/hipscat_import/catalog/map_reduce.py b/src/hipscat_import/catalog/map_reduce.py index 01188540..e0ad550e 100644 --- a/src/hipscat_import/catalog/map_reduce.py +++ b/src/hipscat_import/catalog/map_reduce.py @@ -3,6 +3,7 @@ from typing import Any, Dict, Union import healpy as hp +import healsparse import numpy as np import pyarrow as pa import pyarrow.parquet as pq @@ -93,8 +94,9 @@ def map_to_pixels( ValueError: if the `ra_column` or `dec_column` cannot be found in the input file. FileNotFoundError: if the file does not exist, or is a directory """ - histo = pixel_math.empty_histogram(highest_order) - + sparse_map = healsparse.HealSparseMap.make_empty( + hp.order2nside(highest_order), hp.order2nside(highest_order), np.int64, sentinel=0 + ) if use_hipscat_index: read_columns = [HIPSCAT_ID_COLUMN] else: @@ -105,8 +107,12 @@ def map_to_pixels( ): mapped_pixel, count_at_pixel = np.unique(mapped_pixels, return_counts=True) mapped_pixel = mapped_pixel.astype(np.int64) - histo[mapped_pixel] += count_at_pixel.astype(np.int64) - ResumePlan.write_partial_histogram(tmp_path=resume_path, mapping_key=mapping_key, histogram=histo) + sparse_map[mapped_pixel] += count_at_pixel.astype(np.int64) + ResumePlan.write_partial_healsparse_map( + tmp_path=resume_path, + mapping_key=mapping_key, + hp_map=sparse_map, + ) def split_pixels( diff --git a/src/hipscat_import/catalog/resume_plan.py b/src/hipscat_import/catalog/resume_plan.py index 916e6ac3..371112d6 100644 --- a/src/hipscat_import/catalog/resume_plan.py +++ b/src/hipscat_import/catalog/resume_plan.py @@ -6,11 +6,10 @@ from typing import List, Optional, Tuple import healpy as hp +import healsparse import numpy as np -from hipscat import pixel_math from hipscat.io import FilePointer, file_io from hipscat.pixel_math.healpix_pixel import HealpixPixel -from numpy import frombuffer from tqdm.auto import tqdm from hipscat_import.pipeline_resume_plan import PipelineResumePlan @@ -33,7 +32,7 @@ class ResumePlan(PipelineResumePlan): SPLITTING_STAGE = "splitting" REDUCING_STAGE = "reducing" - HISTOGRAM_BINARY_FILE = "mapping_histogram.binary" + HISTOGRAM_BINARY_FILE = "mapping_histogram.hs" HISTOGRAMS_DIR = "histograms" def __post_init__(self): @@ -92,10 +91,10 @@ def get_remaining_map_keys(self): """Gather remaining keys, dropping successful mapping tasks from histogram names. Returns: - list of mapping keys *not* found in files like /resume/path/mapping_key.binary + list of mapping keys *not* found in files like /resume/path/mapping_key.hs """ prefix = file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAMS_DIR) - mapped_keys = self.get_keys_from_file_names(prefix, ".binary") + mapped_keys = self.get_keys_from_file_names(prefix, ".hs") return [ (f"map_{i}", file_path) for i, file_path in enumerate(self.input_paths) @@ -109,48 +108,41 @@ def read_histogram(self, healpix_order): - Otherwise, combine histograms from partials - Otherwise, return an empty histogram """ + hp_nside = hp.order2nside(healpix_order) file_name = file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAM_BINARY_FILE) - if file_io.does_file_or_directory_exist(file_name): - # Look for the single combined histogram file - with open(file_name, "rb") as file_handle: - full_histogram = frombuffer(file_handle.read(), dtype=np.int64) - else: - # Read the histogram from partial binaries - full_histogram = self.read_histogram_from_partials(healpix_order) - if len(full_histogram) != hp.order2npix(healpix_order): + if not file_io.does_file_or_directory_exist(file_name): + # Combining coverage maps from partials + full_map = healsparse.HealSparseMap.make_empty(hp_nside, hp_nside, np.int64, sentinel=0) + histogram_files = self._get_partial_filenames() + + for file in histogram_files: + sub_map = healsparse.HealSparseMap.read(file) + if sub_map.nside_sparse != hp_nside: + raise ValueError("The histogram partials have inconsistent sizes.") + full_map[sub_map.valid_pixels] += sub_map[sub_map.valid_pixels] + + full_map.write(file_name, clobber=False) + + hp_map = healsparse.HealSparseMap.read(file_name) + + if hp_map.nside_sparse != hp_nside: raise ValueError( "The histogram from the previous execution is incompatible with " + "the highest healpix order. To start the importing pipeline " + "from scratch with the current order set `resume` to False." ) - return full_histogram + + return hp_map.generate_healpix_map(dtype=np.int64, sentinel=0) def _get_partial_filenames(self): remaining_map_files = self.get_remaining_map_keys() if len(remaining_map_files) > 0: raise RuntimeError(f"{len(remaining_map_files)} map stages did not complete successfully.") - histogram_files = file_io.find_files_matching_path(self.tmp_path, self.HISTOGRAMS_DIR, "*.binary") + histogram_files = file_io.find_files_matching_path(self.tmp_path, self.HISTOGRAMS_DIR, "*.hs") return histogram_files - def read_histogram_from_partials(self, healpix_order): - """Combines the histogram partials to get the full histogram.""" - histogram_files = self._get_partial_filenames() - full_histogram = pixel_math.empty_histogram(healpix_order) - # Read the partial histograms and make sure they are all the same size - for index, file_name in enumerate(histogram_files): - with open(file_name, "rb") as file_handle: - partial = frombuffer(file_handle.read(), dtype=np.int64) - if index == 0: - full_histogram = partial - elif len(partial) != len(full_histogram): - raise ValueError("The histogram partials have inconsistent sizes.") - else: - full_histogram = np.add(full_histogram, partial) - self._write_combined_histogram(full_histogram) - return full_histogram - @classmethod - def write_partial_histogram(cls, tmp_path, mapping_key: str, histogram): + def write_partial_healsparse_map(cls, tmp_path, mapping_key: str, hp_map): """Write partial histogram to a special intermediate directory Args: @@ -164,19 +156,8 @@ def write_partial_histogram(cls, tmp_path, mapping_key: str, histogram): file_io.append_paths_to_pointer(tmp_path, cls.HISTOGRAMS_DIR), exist_ok=True, ) - file_name = file_io.append_paths_to_pointer(tmp_path, cls.HISTOGRAMS_DIR, f"{mapping_key}.binary") - with open(file_name, "wb+") as file_handle: - file_handle.write(histogram.data) - - def _write_combined_histogram(self, histogram): - """Writes the full histogram to disk, removing the pre-existing partials.""" - file_name = file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAM_BINARY_FILE) - with open(file_name, "wb+") as file_handle: - file_handle.write(histogram.data) - file_io.remove_directory( - file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAMS_DIR), - ignore_errors=True, - ) + file_name = file_io.append_paths_to_pointer(tmp_path, cls.HISTOGRAMS_DIR, f"{mapping_key}.hs") + hp_map.write(file_name, clobber=False) def get_remaining_split_keys(self): """Gather remaining keys, dropping successful split tasks from done file names. diff --git a/tests/hipscat_import/catalog/test_map_reduce.py b/tests/hipscat_import/catalog/test_map_reduce.py index 3a17e674..9f114306 100644 --- a/tests/hipscat_import/catalog/test_map_reduce.py +++ b/tests/hipscat_import/catalog/test_map_reduce.py @@ -4,13 +4,13 @@ from io import StringIO import healpy as hp +import healsparse import hipscat.pixel_math as hist import numpy as np import numpy.testing as npt import pandas as pd import pyarrow as pa import pytest -from numpy import frombuffer import hipscat_import.catalog.map_reduce as mr from hipscat_import.catalog.file_readers import get_file_reader @@ -74,9 +74,8 @@ def test_read_bad_fileformat(blank_data_file): def read_partial_histogram(tmp_path, mapping_key): """Helper to read in the former result of a map operation.""" - histogram_file = os.path.join(tmp_path, "histograms", f"{mapping_key}.binary") - with open(histogram_file, "rb") as file_handle: - return frombuffer(file_handle.read(), dtype=np.int64) + partial_map = healsparse.HealSparseMap.read(os.path.join(tmp_path, "histograms", f"{mapping_key}.hs")) + return partial_map.generate_healpix_map(dtype=np.int64, sentinel=0) def test_read_single_fits(tmp_path, formats_fits): diff --git a/tests/hipscat_import/catalog/test_resume_plan.py b/tests/hipscat_import/catalog/test_resume_plan.py index eabaabab..2dc954f3 100644 --- a/tests/hipscat_import/catalog/test_resume_plan.py +++ b/tests/hipscat_import/catalog/test_resume_plan.py @@ -97,7 +97,7 @@ def test_same_input_paths(tmp_path, small_sky_single_file, formats_headers_csv): assert len(map_files) == 2 -def test_read_write_histogram(tmp_path): +def test_read_write_histogram(tmp_path, small_sky_healsparse_map): """Test that we can read what we write into a histogram file.""" plan = ResumePlan(tmp_path=tmp_path, progress_bar=False, input_paths=["foo1"]) @@ -111,7 +111,9 @@ def test_read_write_histogram(tmp_path): remaining_keys = plan.get_remaining_map_keys() assert remaining_keys == [("map_0", "foo1")] - ResumePlan.write_partial_histogram(tmp_path=tmp_path, mapping_key="map_0", histogram=expected) + ResumePlan.write_partial_healsparse_map( + tmp_path=tmp_path, mapping_key="map_0", hp_map=small_sky_healsparse_map + ) remaining_keys = plan.get_remaining_map_keys() assert len(remaining_keys) == 0 @@ -125,7 +127,7 @@ def never_fails(): @pytest.mark.dask -def test_some_map_task_failures(tmp_path, dask_client): +def test_some_map_task_failures(tmp_path, dask_client, small_sky_healsparse_map): """Test that we only consider map stage successful if all partial files are written""" plan = ResumePlan(tmp_path=tmp_path, progress_bar=False, input_paths=["foo1"]) @@ -135,10 +137,9 @@ def test_some_map_task_failures(tmp_path, dask_client): with pytest.raises(RuntimeError, match="map stages"): plan.wait_for_mapping(futures) - expected = hist.empty_histogram(0) - expected[11] = 131 - - ResumePlan.write_partial_histogram(tmp_path=tmp_path, mapping_key="map_0", histogram=expected) + ResumePlan.write_partial_healsparse_map( + tmp_path=tmp_path, mapping_key="map_0", hp_map=small_sky_healsparse_map + ) ## Method succeeds, *and* partial histogram is present. futures = [dask_client.submit(never_fails)] diff --git a/tests/hipscat_import/catalog/test_run_import.py b/tests/hipscat_import/catalog/test_run_import.py index 0a44d19a..d953508b 100644 --- a/tests/hipscat_import/catalog/test_run_import.py +++ b/tests/hipscat_import/catalog/test_run_import.py @@ -3,7 +3,6 @@ import os import shutil -import hipscat.pixel_math as hist import numpy as np import pandas as pd import pyarrow as pa @@ -36,6 +35,8 @@ def test_resume_dask_runner( small_sky_parts_dir, resume_dir, tmp_path, + small_sky_healsparse_map, + empty_healsparse_order0, assert_parquet_file_ids, ): """Test execution in the presence of some resume files.""" @@ -49,15 +50,12 @@ def test_resume_dask_runner( ## Now set up our resume files to match previous work. resume_tmp = os.path.join(tmp_path, "tmp", "resume_catalog") plan = ResumePlan(tmp_path=resume_tmp, progress_bar=False) - histogram = hist.empty_histogram(0) - histogram[11] = 131 - empty = hist.empty_histogram(0) for file_index in range(0, 5): ResumePlan.touch_key_done_file(resume_tmp, ResumePlan.SPLITTING_STAGE, f"split_{file_index}") - ResumePlan.write_partial_histogram( + ResumePlan.write_partial_healsparse_map( tmp_path=resume_tmp, mapping_key=f"map_{file_index}", - histogram=histogram if file_index == 0 else empty, + hp_map=small_sky_healsparse_map if file_index == 0 else empty_healsparse_order0, ) ResumePlan.touch_key_done_file(resume_tmp, ResumePlan.REDUCING_STAGE, "0_11") @@ -139,6 +137,8 @@ def test_resume_dask_runner_diff_pixel_order( small_sky_parts_dir, resume_dir, tmp_path, + small_sky_healsparse_map, + empty_healsparse_order0, assert_parquet_file_ids, ): """Test execution in the presence of histogram files that are not compatible @@ -154,15 +154,12 @@ def test_resume_dask_runner_diff_pixel_order( ## Now set up our resume files to match previous work. resume_tmp = os.path.join(tmp_path, "tmp", "resume_catalog") ResumePlan(tmp_path=resume_tmp, progress_bar=False) - histogram = hist.empty_histogram(0) - histogram[11] = 131 - empty = hist.empty_histogram(0) for file_index in range(0, 5): ResumePlan.touch_key_done_file(resume_tmp, ResumePlan.SPLITTING_STAGE, f"split_{file_index}") - ResumePlan.write_partial_histogram( + ResumePlan.write_partial_healsparse_map( tmp_path=resume_tmp, mapping_key=f"map_{file_index}", - histogram=histogram if file_index == 0 else empty, + hp_map=small_sky_healsparse_map if file_index == 0 else empty_healsparse_order0, ) ResumePlan.touch_key_done_file(resume_tmp, ResumePlan.REDUCING_STAGE, "0_11") @@ -173,7 +170,7 @@ def test_resume_dask_runner_diff_pixel_order( ) with pytest.warns(UserWarning, match="resuming prior progress"): - with pytest.raises(ValueError, match="incompatible with the highest healpix order"): + with pytest.raises(ValueError, match="partials have inconsistent sizes."): args = ImportArguments( output_artifact_name="resume_catalog", input_path=small_sky_parts_dir, @@ -225,22 +222,20 @@ def test_resume_dask_runner_diff_pixel_order( def test_resume_dask_runner_histograms_diff_size( dask_client, small_sky_parts_dir, + empty_healsparse_order0, + empty_healsparse_order1, tmp_path, ): """Tests that the pipeline errors if the partial histograms have different sizes.""" resume_tmp = os.path.join(tmp_path, "tmp", "resume_catalog") ResumePlan(tmp_path=resume_tmp, progress_bar=False) - # We'll create mock partial histograms of size 0 and 2 - histogram = hist.empty_histogram(0) - wrong_histogram = hist.empty_histogram(2) - for file_index in range(0, 5): ResumePlan.touch_key_done_file(resume_tmp, ResumePlan.SPLITTING_STAGE, f"split_{file_index}") - ResumePlan.write_partial_histogram( + ResumePlan.write_partial_healsparse_map( tmp_path=resume_tmp, mapping_key=f"map_{file_index}", - histogram=histogram if file_index % 2 == 0 else wrong_histogram, + hp_map=empty_healsparse_order0 if file_index % 2 == 0 else empty_healsparse_order1, ) with pytest.warns(UserWarning, match="resuming prior progress"): diff --git a/tests/hipscat_import/conftest.py b/tests/hipscat_import/conftest.py index 303144bc..dc382cdc 100644 --- a/tests/hipscat_import/conftest.py +++ b/tests/hipscat_import/conftest.py @@ -4,6 +4,7 @@ import re import healpy as hp +import healsparse import numpy as np import numpy.testing as npt import pandas as pd @@ -167,6 +168,23 @@ def resume_dir(test_data_dir): return os.path.join(test_data_dir, "resume") +@pytest.fixture +def small_sky_healsparse_map(): + sub_map = healsparse.HealSparseMap.make_empty(hp.order2nside(0), hp.order2nside(0), np.int64, sentinel=0) + sub_map[11] = 131 + return sub_map + + +@pytest.fixture +def empty_healsparse_order0(): + return healsparse.HealSparseMap.make_empty(hp.order2nside(0), hp.order2nside(0), np.int64, sentinel=0) + + +@pytest.fixture +def empty_healsparse_order1(): + return healsparse.HealSparseMap.make_empty(hp.order2nside(1), hp.order2nside(1), np.int64, sentinel=0) + + @pytest.fixture def basic_data_shard_df(): ras = np.arange(0.0, 360.0)