Skip to content

Commit

Permalink
Use healsparse (with some modifications)
Browse files Browse the repository at this point in the history
  • Loading branch information
delucchi-cmu committed May 6, 2024
1 parent 7a5c17c commit 074f23b
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 79 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dependencies = [
"dask[complete]>=2024.3.0", # Includes dask expressions.
"deprecated",
"healpy",
"healsparse",
"hipscat >=0.3.0",
"ipykernel", # Support for Jupyter notebooks
"pandas",
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
git+https://github.com/delucchi-cmu/healsparse.git@main
14 changes: 10 additions & 4 deletions src/hipscat_import/catalog/map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Any, Dict, Union

import healpy as hp
import healsparse
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
Expand Down Expand Up @@ -93,8 +94,9 @@ def map_to_pixels(
ValueError: if the `ra_column` or `dec_column` cannot be found in the input file.
FileNotFoundError: if the file does not exist, or is a directory
"""
histo = pixel_math.empty_histogram(highest_order)

sparse_map = healsparse.HealSparseMap.make_empty(
hp.order2nside(highest_order), hp.order2nside(highest_order), np.int64, sentinel=0
)
if use_hipscat_index:
read_columns = [HIPSCAT_ID_COLUMN]
else:
Expand All @@ -105,8 +107,12 @@ def map_to_pixels(
):
mapped_pixel, count_at_pixel = np.unique(mapped_pixels, return_counts=True)
mapped_pixel = mapped_pixel.astype(np.int64)
histo[mapped_pixel] += count_at_pixel.astype(np.int64)
ResumePlan.write_partial_histogram(tmp_path=resume_path, mapping_key=mapping_key, histogram=histo)
sparse_map[mapped_pixel] += count_at_pixel.astype(np.int64)
ResumePlan.write_partial_healsparse_map(
tmp_path=resume_path,
mapping_key=mapping_key,
hp_map=sparse_map,
)


def split_pixels(
Expand Down
73 changes: 27 additions & 46 deletions src/hipscat_import/catalog/resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
from typing import List, Optional, Tuple

import healpy as hp
import healsparse
import numpy as np
from hipscat import pixel_math
from hipscat.io import FilePointer, file_io
from hipscat.pixel_math.healpix_pixel import HealpixPixel
from numpy import frombuffer
from tqdm.auto import tqdm

from hipscat_import.pipeline_resume_plan import PipelineResumePlan
Expand All @@ -33,7 +32,7 @@ class ResumePlan(PipelineResumePlan):
SPLITTING_STAGE = "splitting"
REDUCING_STAGE = "reducing"

HISTOGRAM_BINARY_FILE = "mapping_histogram.binary"
HISTOGRAM_BINARY_FILE = "mapping_histogram.hs"
HISTOGRAMS_DIR = "histograms"

def __post_init__(self):
Expand Down Expand Up @@ -92,10 +91,10 @@ def get_remaining_map_keys(self):
"""Gather remaining keys, dropping successful mapping tasks from histogram names.
Returns:
list of mapping keys *not* found in files like /resume/path/mapping_key.binary
list of mapping keys *not* found in files like /resume/path/mapping_key.hs
"""
prefix = file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAMS_DIR)
mapped_keys = self.get_keys_from_file_names(prefix, ".binary")
mapped_keys = self.get_keys_from_file_names(prefix, ".hs")
return [
(f"map_{i}", file_path)
for i, file_path in enumerate(self.input_paths)
Expand All @@ -109,48 +108,41 @@ def read_histogram(self, healpix_order):
- Otherwise, combine histograms from partials
- Otherwise, return an empty histogram
"""
hp_nside = hp.order2nside(healpix_order)
file_name = file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAM_BINARY_FILE)
if file_io.does_file_or_directory_exist(file_name):
# Look for the single combined histogram file
with open(file_name, "rb") as file_handle:
full_histogram = frombuffer(file_handle.read(), dtype=np.int64)
else:
# Read the histogram from partial binaries
full_histogram = self.read_histogram_from_partials(healpix_order)
if len(full_histogram) != hp.order2npix(healpix_order):
if not file_io.does_file_or_directory_exist(file_name):
# Combining coverage maps from partials
full_map = healsparse.HealSparseMap.make_empty(hp_nside, hp_nside, np.int64, sentinel=0)
histogram_files = self._get_partial_filenames()

for file in histogram_files:
sub_map = healsparse.HealSparseMap.read(file)
if sub_map.nside_sparse != hp_nside:
raise ValueError("The histogram partials have inconsistent sizes.")
full_map[sub_map.valid_pixels] += sub_map[sub_map.valid_pixels]

full_map.write(file_name, clobber=False)

hp_map = healsparse.HealSparseMap.read(file_name)

if hp_map.nside_sparse != hp_nside:
raise ValueError(
"The histogram from the previous execution is incompatible with "
+ "the highest healpix order. To start the importing pipeline "
+ "from scratch with the current order set `resume` to False."
)
return full_histogram

return hp_map.generate_healpix_map(dtype=np.int64, sentinel=0)

def _get_partial_filenames(self):
remaining_map_files = self.get_remaining_map_keys()
if len(remaining_map_files) > 0:
raise RuntimeError(f"{len(remaining_map_files)} map stages did not complete successfully.")
histogram_files = file_io.find_files_matching_path(self.tmp_path, self.HISTOGRAMS_DIR, "*.binary")
histogram_files = file_io.find_files_matching_path(self.tmp_path, self.HISTOGRAMS_DIR, "*.hs")
return histogram_files

def read_histogram_from_partials(self, healpix_order):
"""Combines the histogram partials to get the full histogram."""
histogram_files = self._get_partial_filenames()
full_histogram = pixel_math.empty_histogram(healpix_order)
# Read the partial histograms and make sure they are all the same size
for index, file_name in enumerate(histogram_files):
with open(file_name, "rb") as file_handle:
partial = frombuffer(file_handle.read(), dtype=np.int64)
if index == 0:
full_histogram = partial
elif len(partial) != len(full_histogram):
raise ValueError("The histogram partials have inconsistent sizes.")
else:
full_histogram = np.add(full_histogram, partial)
self._write_combined_histogram(full_histogram)
return full_histogram

@classmethod
def write_partial_histogram(cls, tmp_path, mapping_key: str, histogram):
def write_partial_healsparse_map(cls, tmp_path, mapping_key: str, hp_map):
"""Write partial histogram to a special intermediate directory
Args:
Expand All @@ -164,19 +156,8 @@ def write_partial_histogram(cls, tmp_path, mapping_key: str, histogram):
file_io.append_paths_to_pointer(tmp_path, cls.HISTOGRAMS_DIR),
exist_ok=True,
)
file_name = file_io.append_paths_to_pointer(tmp_path, cls.HISTOGRAMS_DIR, f"{mapping_key}.binary")
with open(file_name, "wb+") as file_handle:
file_handle.write(histogram.data)

def _write_combined_histogram(self, histogram):
"""Writes the full histogram to disk, removing the pre-existing partials."""
file_name = file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAM_BINARY_FILE)
with open(file_name, "wb+") as file_handle:
file_handle.write(histogram.data)
file_io.remove_directory(
file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAMS_DIR),
ignore_errors=True,
)
file_name = file_io.append_paths_to_pointer(tmp_path, cls.HISTOGRAMS_DIR, f"{mapping_key}.hs")
hp_map.write(file_name, clobber=False)

def get_remaining_split_keys(self):
"""Gather remaining keys, dropping successful split tasks from done file names.
Expand Down
7 changes: 3 additions & 4 deletions tests/hipscat_import/catalog/test_map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
from io import StringIO

import healpy as hp
import healsparse
import hipscat.pixel_math as hist
import numpy as np
import numpy.testing as npt
import pandas as pd
import pyarrow as pa
import pytest
from numpy import frombuffer

import hipscat_import.catalog.map_reduce as mr
from hipscat_import.catalog.file_readers import get_file_reader
Expand Down Expand Up @@ -74,9 +74,8 @@ def test_read_bad_fileformat(blank_data_file):

def read_partial_histogram(tmp_path, mapping_key):
"""Helper to read in the former result of a map operation."""
histogram_file = os.path.join(tmp_path, "histograms", f"{mapping_key}.binary")
with open(histogram_file, "rb") as file_handle:
return frombuffer(file_handle.read(), dtype=np.int64)
partial_map = healsparse.HealSparseMap.read(os.path.join(tmp_path, "histograms", f"{mapping_key}.hs"))
return partial_map.generate_healpix_map(dtype=np.int64, sentinel=0)


def test_read_single_fits(tmp_path, formats_fits):
Expand Down
15 changes: 8 additions & 7 deletions tests/hipscat_import/catalog/test_resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def test_same_input_paths(tmp_path, small_sky_single_file, formats_headers_csv):
assert len(map_files) == 2


def test_read_write_histogram(tmp_path):
def test_read_write_histogram(tmp_path, small_sky_healsparse_map):
"""Test that we can read what we write into a histogram file."""
plan = ResumePlan(tmp_path=tmp_path, progress_bar=False, input_paths=["foo1"])

Expand All @@ -111,7 +111,9 @@ def test_read_write_histogram(tmp_path):
remaining_keys = plan.get_remaining_map_keys()
assert remaining_keys == [("map_0", "foo1")]

ResumePlan.write_partial_histogram(tmp_path=tmp_path, mapping_key="map_0", histogram=expected)
ResumePlan.write_partial_healsparse_map(
tmp_path=tmp_path, mapping_key="map_0", hp_map=small_sky_healsparse_map
)

remaining_keys = plan.get_remaining_map_keys()
assert len(remaining_keys) == 0
Expand All @@ -125,7 +127,7 @@ def never_fails():


@pytest.mark.dask
def test_some_map_task_failures(tmp_path, dask_client):
def test_some_map_task_failures(tmp_path, dask_client, small_sky_healsparse_map):
"""Test that we only consider map stage successful if all partial files are written"""
plan = ResumePlan(tmp_path=tmp_path, progress_bar=False, input_paths=["foo1"])

Expand All @@ -135,10 +137,9 @@ def test_some_map_task_failures(tmp_path, dask_client):
with pytest.raises(RuntimeError, match="map stages"):
plan.wait_for_mapping(futures)

expected = hist.empty_histogram(0)
expected[11] = 131

ResumePlan.write_partial_histogram(tmp_path=tmp_path, mapping_key="map_0", histogram=expected)
ResumePlan.write_partial_healsparse_map(
tmp_path=tmp_path, mapping_key="map_0", hp_map=small_sky_healsparse_map
)

## Method succeeds, *and* partial histogram is present.
futures = [dask_client.submit(never_fails)]
Expand Down
31 changes: 13 additions & 18 deletions tests/hipscat_import/catalog/test_run_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import os
import shutil

import hipscat.pixel_math as hist
import numpy as np
import pandas as pd
import pyarrow as pa
Expand Down Expand Up @@ -36,6 +35,8 @@ def test_resume_dask_runner(
small_sky_parts_dir,
resume_dir,
tmp_path,
small_sky_healsparse_map,
empty_healsparse_order0,
assert_parquet_file_ids,
):
"""Test execution in the presence of some resume files."""
Expand All @@ -49,15 +50,12 @@ def test_resume_dask_runner(
## Now set up our resume files to match previous work.
resume_tmp = os.path.join(tmp_path, "tmp", "resume_catalog")
plan = ResumePlan(tmp_path=resume_tmp, progress_bar=False)
histogram = hist.empty_histogram(0)
histogram[11] = 131
empty = hist.empty_histogram(0)
for file_index in range(0, 5):
ResumePlan.touch_key_done_file(resume_tmp, ResumePlan.SPLITTING_STAGE, f"split_{file_index}")
ResumePlan.write_partial_histogram(
ResumePlan.write_partial_healsparse_map(
tmp_path=resume_tmp,
mapping_key=f"map_{file_index}",
histogram=histogram if file_index == 0 else empty,
hp_map=small_sky_healsparse_map if file_index == 0 else empty_healsparse_order0,
)

ResumePlan.touch_key_done_file(resume_tmp, ResumePlan.REDUCING_STAGE, "0_11")
Expand Down Expand Up @@ -139,6 +137,8 @@ def test_resume_dask_runner_diff_pixel_order(
small_sky_parts_dir,
resume_dir,
tmp_path,
small_sky_healsparse_map,
empty_healsparse_order0,
assert_parquet_file_ids,
):
"""Test execution in the presence of histogram files that are not compatible
Expand All @@ -154,15 +154,12 @@ def test_resume_dask_runner_diff_pixel_order(
## Now set up our resume files to match previous work.
resume_tmp = os.path.join(tmp_path, "tmp", "resume_catalog")
ResumePlan(tmp_path=resume_tmp, progress_bar=False)
histogram = hist.empty_histogram(0)
histogram[11] = 131
empty = hist.empty_histogram(0)
for file_index in range(0, 5):
ResumePlan.touch_key_done_file(resume_tmp, ResumePlan.SPLITTING_STAGE, f"split_{file_index}")
ResumePlan.write_partial_histogram(
ResumePlan.write_partial_healsparse_map(
tmp_path=resume_tmp,
mapping_key=f"map_{file_index}",
histogram=histogram if file_index == 0 else empty,
hp_map=small_sky_healsparse_map if file_index == 0 else empty_healsparse_order0,
)

ResumePlan.touch_key_done_file(resume_tmp, ResumePlan.REDUCING_STAGE, "0_11")
Expand All @@ -173,7 +170,7 @@ def test_resume_dask_runner_diff_pixel_order(
)

with pytest.warns(UserWarning, match="resuming prior progress"):
with pytest.raises(ValueError, match="incompatible with the highest healpix order"):
with pytest.raises(ValueError, match="partials have inconsistent sizes."):
args = ImportArguments(
output_artifact_name="resume_catalog",
input_path=small_sky_parts_dir,
Expand Down Expand Up @@ -225,22 +222,20 @@ def test_resume_dask_runner_diff_pixel_order(
def test_resume_dask_runner_histograms_diff_size(
dask_client,
small_sky_parts_dir,
empty_healsparse_order0,
empty_healsparse_order1,
tmp_path,
):
"""Tests that the pipeline errors if the partial histograms have different sizes."""
resume_tmp = os.path.join(tmp_path, "tmp", "resume_catalog")
ResumePlan(tmp_path=resume_tmp, progress_bar=False)

# We'll create mock partial histograms of size 0 and 2
histogram = hist.empty_histogram(0)
wrong_histogram = hist.empty_histogram(2)

for file_index in range(0, 5):
ResumePlan.touch_key_done_file(resume_tmp, ResumePlan.SPLITTING_STAGE, f"split_{file_index}")
ResumePlan.write_partial_histogram(
ResumePlan.write_partial_healsparse_map(
tmp_path=resume_tmp,
mapping_key=f"map_{file_index}",
histogram=histogram if file_index % 2 == 0 else wrong_histogram,
hp_map=empty_healsparse_order0 if file_index % 2 == 0 else empty_healsparse_order1,
)

with pytest.warns(UserWarning, match="resuming prior progress"):
Expand Down
18 changes: 18 additions & 0 deletions tests/hipscat_import/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import re

import healpy as hp
import healsparse
import numpy as np
import numpy.testing as npt
import pandas as pd
Expand Down Expand Up @@ -167,6 +168,23 @@ def resume_dir(test_data_dir):
return os.path.join(test_data_dir, "resume")


@pytest.fixture
def small_sky_healsparse_map():
sub_map = healsparse.HealSparseMap.make_empty(hp.order2nside(0), hp.order2nside(0), np.int64, sentinel=0)
sub_map[11] = 131
return sub_map


@pytest.fixture
def empty_healsparse_order0():
return healsparse.HealSparseMap.make_empty(hp.order2nside(0), hp.order2nside(0), np.int64, sentinel=0)


@pytest.fixture
def empty_healsparse_order1():
return healsparse.HealSparseMap.make_empty(hp.order2nside(1), hp.order2nside(1), np.int64, sentinel=0)


@pytest.fixture
def basic_data_shard_df():
ras = np.arange(0.0, 360.0)
Expand Down

0 comments on commit 074f23b

Please sign in to comment.