Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use healsparse (with some modifications) #293

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build-documentation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ jobs:
run: |
sudo apt-get update
python -m pip install --upgrade pip
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
if [ -f docs/requirements.txt ]; then pip install -r docs/requirements.txt; fi
pip install .
- name: Install notebook requirements
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dependencies = [
"dask[complete]>=2024.3.0", # Includes dask expressions.
"deprecated",
"healpy",
"healsparse",
"hipscat >=0.3.0",
"ipykernel", # Support for Jupyter notebooks
"pandas",
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
git+https://github.com/delucchi-cmu/healsparse.git@delucchi/black
16 changes: 11 additions & 5 deletions src/hipscat_import/catalog/map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Any, Dict, Union

import healpy as hp
import healsparse
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
Expand Down Expand Up @@ -93,8 +94,9 @@ def map_to_pixels(
ValueError: if the `ra_column` or `dec_column` cannot be found in the input file.
FileNotFoundError: if the file does not exist, or is a directory
"""
histo = pixel_math.empty_histogram(highest_order)

sparse_map = healsparse.HealSparseMap.make_empty(
hp.order2nside(highest_order), hp.order2nside(highest_order), np.int32, sentinel=0
)
if use_hipscat_index:
read_columns = [HIPSCAT_ID_COLUMN]
else:
Expand All @@ -104,9 +106,13 @@ def map_to_pixels(
input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index, read_columns
):
mapped_pixel, count_at_pixel = np.unique(mapped_pixels, return_counts=True)
mapped_pixel = mapped_pixel.astype(np.int64)
histo[mapped_pixel] += count_at_pixel.astype(np.int64)
ResumePlan.write_partial_histogram(tmp_path=resume_path, mapping_key=mapping_key, histogram=histo)
mapped_pixel = mapped_pixel.astype(np.int32)
sparse_map[mapped_pixel] += count_at_pixel.astype(np.int32)
ResumePlan.write_partial_healsparse_map(
tmp_path=resume_path,
mapping_key=mapping_key,
hp_map=sparse_map,
)


def split_pixels(
Expand Down
73 changes: 27 additions & 46 deletions src/hipscat_import/catalog/resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
from typing import List, Optional, Tuple

import healpy as hp
import healsparse
import numpy as np
from hipscat import pixel_math
from hipscat.io import FilePointer, file_io
from hipscat.pixel_math.healpix_pixel import HealpixPixel
from numpy import frombuffer
from tqdm.auto import tqdm

from hipscat_import.pipeline_resume_plan import PipelineResumePlan
Expand All @@ -33,7 +32,7 @@ class ResumePlan(PipelineResumePlan):
SPLITTING_STAGE = "splitting"
REDUCING_STAGE = "reducing"

HISTOGRAM_BINARY_FILE = "mapping_histogram.binary"
HISTOGRAM_BINARY_FILE = "mapping_histogram.hs"
HISTOGRAMS_DIR = "histograms"

def __post_init__(self):
Expand Down Expand Up @@ -92,10 +91,10 @@ def get_remaining_map_keys(self):
"""Gather remaining keys, dropping successful mapping tasks from histogram names.

Returns:
list of mapping keys *not* found in files like /resume/path/mapping_key.binary
list of mapping keys *not* found in files like /resume/path/mapping_key.hs
"""
prefix = file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAMS_DIR)
mapped_keys = self.get_keys_from_file_names(prefix, ".binary")
mapped_keys = self.get_keys_from_file_names(prefix, ".hs")
return [
(f"map_{i}", file_path)
for i, file_path in enumerate(self.input_paths)
Expand All @@ -109,48 +108,41 @@ def read_histogram(self, healpix_order):
- Otherwise, combine histograms from partials
- Otherwise, return an empty histogram
"""
hp_nside = hp.order2nside(healpix_order)
file_name = file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAM_BINARY_FILE)
if file_io.does_file_or_directory_exist(file_name):
# Look for the single combined histogram file
with open(file_name, "rb") as file_handle:
full_histogram = frombuffer(file_handle.read(), dtype=np.int64)
else:
# Read the histogram from partial binaries
full_histogram = self.read_histogram_from_partials(healpix_order)
if len(full_histogram) != hp.order2npix(healpix_order):
if not file_io.does_file_or_directory_exist(file_name):
# Combining coverage maps from partials
full_map = healsparse.HealSparseMap.make_empty(hp_nside, hp_nside, np.int32, sentinel=0)
histogram_files = self._get_partial_filenames()

for file in histogram_files:
sub_map = healsparse.HealSparseMap.read(file)
if sub_map.nside_sparse != hp_nside:
raise ValueError("The histogram partials have inconsistent sizes.")
full_map[sub_map.valid_pixels] += sub_map[sub_map.valid_pixels]

full_map.write(file_name, clobber=False)

hp_map = healsparse.HealSparseMap.read(file_name)

if hp_map.nside_sparse != hp_nside:
raise ValueError(
"The histogram from the previous execution is incompatible with "
+ "the highest healpix order. To start the importing pipeline "
+ "from scratch with the current order set `resume` to False."
)
return full_histogram

return hp_map.generate_healpix_map(dtype=np.int32, sentinel=0)

def _get_partial_filenames(self):
remaining_map_files = self.get_remaining_map_keys()
if len(remaining_map_files) > 0:
raise RuntimeError(f"{len(remaining_map_files)} map stages did not complete successfully.")
histogram_files = file_io.find_files_matching_path(self.tmp_path, self.HISTOGRAMS_DIR, "*.binary")
histogram_files = file_io.find_files_matching_path(self.tmp_path, self.HISTOGRAMS_DIR, "*.hs")
return histogram_files

def read_histogram_from_partials(self, healpix_order):
"""Combines the histogram partials to get the full histogram."""
histogram_files = self._get_partial_filenames()
full_histogram = pixel_math.empty_histogram(healpix_order)
# Read the partial histograms and make sure they are all the same size
for index, file_name in enumerate(histogram_files):
with open(file_name, "rb") as file_handle:
partial = frombuffer(file_handle.read(), dtype=np.int64)
if index == 0:
full_histogram = partial
elif len(partial) != len(full_histogram):
raise ValueError("The histogram partials have inconsistent sizes.")
else:
full_histogram = np.add(full_histogram, partial)
self._write_combined_histogram(full_histogram)
return full_histogram

@classmethod
def write_partial_histogram(cls, tmp_path, mapping_key: str, histogram):
def write_partial_healsparse_map(cls, tmp_path, mapping_key: str, hp_map):
"""Write partial histogram to a special intermediate directory

Args:
Expand All @@ -164,19 +156,8 @@ def write_partial_histogram(cls, tmp_path, mapping_key: str, histogram):
file_io.append_paths_to_pointer(tmp_path, cls.HISTOGRAMS_DIR),
exist_ok=True,
)
file_name = file_io.append_paths_to_pointer(tmp_path, cls.HISTOGRAMS_DIR, f"{mapping_key}.binary")
with open(file_name, "wb+") as file_handle:
file_handle.write(histogram.data)

def _write_combined_histogram(self, histogram):
"""Writes the full histogram to disk, removing the pre-existing partials."""
file_name = file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAM_BINARY_FILE)
with open(file_name, "wb+") as file_handle:
file_handle.write(histogram.data)
file_io.remove_directory(
file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAMS_DIR),
ignore_errors=True,
)
file_name = file_io.append_paths_to_pointer(tmp_path, cls.HISTOGRAMS_DIR, f"{mapping_key}.hs")
hp_map.write(file_name, clobber=False)

def get_remaining_split_keys(self):
"""Gather remaining keys, dropping successful split tasks from done file names.
Expand Down
7 changes: 3 additions & 4 deletions tests/hipscat_import/catalog/test_map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
from io import StringIO

import healpy as hp
import healsparse
import hipscat.pixel_math as hist
import numpy as np
import numpy.testing as npt
import pandas as pd
import pyarrow as pa
import pytest
from numpy import frombuffer

import hipscat_import.catalog.map_reduce as mr
from hipscat_import.catalog.file_readers import get_file_reader
Expand Down Expand Up @@ -74,9 +74,8 @@ def test_read_bad_fileformat(blank_data_file):

def read_partial_histogram(tmp_path, mapping_key):
"""Helper to read in the former result of a map operation."""
histogram_file = os.path.join(tmp_path, "histograms", f"{mapping_key}.binary")
with open(histogram_file, "rb") as file_handle:
return frombuffer(file_handle.read(), dtype=np.int64)
partial_map = healsparse.HealSparseMap.read(os.path.join(tmp_path, "histograms", f"{mapping_key}.hs"))
return partial_map.generate_healpix_map(dtype=np.int64, sentinel=0)


def test_read_single_fits(tmp_path, formats_fits):
Expand Down
15 changes: 8 additions & 7 deletions tests/hipscat_import/catalog/test_resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def test_same_input_paths(tmp_path, small_sky_single_file, formats_headers_csv):
assert len(map_files) == 2


def test_read_write_histogram(tmp_path):
def test_read_write_histogram(tmp_path, small_sky_healsparse_map):
"""Test that we can read what we write into a histogram file."""
plan = ResumePlan(tmp_path=tmp_path, progress_bar=False, input_paths=["foo1"])

Expand All @@ -111,7 +111,9 @@ def test_read_write_histogram(tmp_path):
remaining_keys = plan.get_remaining_map_keys()
assert remaining_keys == [("map_0", "foo1")]

ResumePlan.write_partial_histogram(tmp_path=tmp_path, mapping_key="map_0", histogram=expected)
ResumePlan.write_partial_healsparse_map(
tmp_path=tmp_path, mapping_key="map_0", hp_map=small_sky_healsparse_map
)

remaining_keys = plan.get_remaining_map_keys()
assert len(remaining_keys) == 0
Expand All @@ -125,7 +127,7 @@ def never_fails():


@pytest.mark.dask
def test_some_map_task_failures(tmp_path, dask_client):
def test_some_map_task_failures(tmp_path, dask_client, small_sky_healsparse_map):
"""Test that we only consider map stage successful if all partial files are written"""
plan = ResumePlan(tmp_path=tmp_path, progress_bar=False, input_paths=["foo1"])

Expand All @@ -135,10 +137,9 @@ def test_some_map_task_failures(tmp_path, dask_client):
with pytest.raises(RuntimeError, match="map stages"):
plan.wait_for_mapping(futures)

expected = hist.empty_histogram(0)
expected[11] = 131

ResumePlan.write_partial_histogram(tmp_path=tmp_path, mapping_key="map_0", histogram=expected)
ResumePlan.write_partial_healsparse_map(
tmp_path=tmp_path, mapping_key="map_0", hp_map=small_sky_healsparse_map
)

## Method succeeds, *and* partial histogram is present.
futures = [dask_client.submit(never_fails)]
Expand Down
31 changes: 13 additions & 18 deletions tests/hipscat_import/catalog/test_run_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import os
import shutil

import hipscat.pixel_math as hist
import numpy as np
import pandas as pd
import pyarrow as pa
Expand Down Expand Up @@ -36,6 +35,8 @@ def test_resume_dask_runner(
small_sky_parts_dir,
resume_dir,
tmp_path,
small_sky_healsparse_map,
empty_healsparse_order0,
assert_parquet_file_ids,
):
"""Test execution in the presence of some resume files."""
Expand All @@ -49,15 +50,12 @@ def test_resume_dask_runner(
## Now set up our resume files to match previous work.
resume_tmp = os.path.join(tmp_path, "tmp", "resume_catalog")
plan = ResumePlan(tmp_path=resume_tmp, progress_bar=False)
histogram = hist.empty_histogram(0)
histogram[11] = 131
empty = hist.empty_histogram(0)
for file_index in range(0, 5):
ResumePlan.touch_key_done_file(resume_tmp, ResumePlan.SPLITTING_STAGE, f"split_{file_index}")
ResumePlan.write_partial_histogram(
ResumePlan.write_partial_healsparse_map(
tmp_path=resume_tmp,
mapping_key=f"map_{file_index}",
histogram=histogram if file_index == 0 else empty,
hp_map=small_sky_healsparse_map if file_index == 0 else empty_healsparse_order0,
)

ResumePlan.touch_key_done_file(resume_tmp, ResumePlan.REDUCING_STAGE, "0_11")
Expand Down Expand Up @@ -139,6 +137,8 @@ def test_resume_dask_runner_diff_pixel_order(
small_sky_parts_dir,
resume_dir,
tmp_path,
small_sky_healsparse_map,
empty_healsparse_order0,
assert_parquet_file_ids,
):
"""Test execution in the presence of histogram files that are not compatible
Expand All @@ -154,15 +154,12 @@ def test_resume_dask_runner_diff_pixel_order(
## Now set up our resume files to match previous work.
resume_tmp = os.path.join(tmp_path, "tmp", "resume_catalog")
ResumePlan(tmp_path=resume_tmp, progress_bar=False)
histogram = hist.empty_histogram(0)
histogram[11] = 131
empty = hist.empty_histogram(0)
for file_index in range(0, 5):
ResumePlan.touch_key_done_file(resume_tmp, ResumePlan.SPLITTING_STAGE, f"split_{file_index}")
ResumePlan.write_partial_histogram(
ResumePlan.write_partial_healsparse_map(
tmp_path=resume_tmp,
mapping_key=f"map_{file_index}",
histogram=histogram if file_index == 0 else empty,
hp_map=small_sky_healsparse_map if file_index == 0 else empty_healsparse_order0,
)

ResumePlan.touch_key_done_file(resume_tmp, ResumePlan.REDUCING_STAGE, "0_11")
Expand All @@ -173,7 +170,7 @@ def test_resume_dask_runner_diff_pixel_order(
)

with pytest.warns(UserWarning, match="resuming prior progress"):
with pytest.raises(ValueError, match="incompatible with the highest healpix order"):
with pytest.raises(ValueError, match="partials have inconsistent sizes."):
args = ImportArguments(
output_artifact_name="resume_catalog",
input_path=small_sky_parts_dir,
Expand Down Expand Up @@ -225,22 +222,20 @@ def test_resume_dask_runner_diff_pixel_order(
def test_resume_dask_runner_histograms_diff_size(
dask_client,
small_sky_parts_dir,
empty_healsparse_order0,
empty_healsparse_order1,
tmp_path,
):
"""Tests that the pipeline errors if the partial histograms have different sizes."""
resume_tmp = os.path.join(tmp_path, "tmp", "resume_catalog")
ResumePlan(tmp_path=resume_tmp, progress_bar=False)

# We'll create mock partial histograms of size 0 and 2
histogram = hist.empty_histogram(0)
wrong_histogram = hist.empty_histogram(2)

for file_index in range(0, 5):
ResumePlan.touch_key_done_file(resume_tmp, ResumePlan.SPLITTING_STAGE, f"split_{file_index}")
ResumePlan.write_partial_histogram(
ResumePlan.write_partial_healsparse_map(
tmp_path=resume_tmp,
mapping_key=f"map_{file_index}",
histogram=histogram if file_index % 2 == 0 else wrong_histogram,
hp_map=empty_healsparse_order0 if file_index % 2 == 0 else empty_healsparse_order1,
)

with pytest.warns(UserWarning, match="resuming prior progress"):
Expand Down
18 changes: 18 additions & 0 deletions tests/hipscat_import/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import re

import healpy as hp
import healsparse
import numpy as np
import numpy.testing as npt
import pandas as pd
Expand Down Expand Up @@ -167,6 +168,23 @@ def resume_dir(test_data_dir):
return os.path.join(test_data_dir, "resume")


@pytest.fixture
def small_sky_healsparse_map():
sub_map = healsparse.HealSparseMap.make_empty(hp.order2nside(0), hp.order2nside(0), np.int32, sentinel=0)
sub_map[11] = 131
return sub_map


@pytest.fixture
def empty_healsparse_order0():
return healsparse.HealSparseMap.make_empty(hp.order2nside(0), hp.order2nside(0), np.int32, sentinel=0)


@pytest.fixture
def empty_healsparse_order1():
return healsparse.HealSparseMap.make_empty(hp.order2nside(1), hp.order2nside(1), np.int32, sentinel=0)


@pytest.fixture
def basic_data_shard_df():
ras = np.arange(0.0, 360.0)
Expand Down