Skip to content

Commit

Permalink
Use scipy.sparse for histogram storage. (#294)
Browse files Browse the repository at this point in the history
* Use scipy.sparse for histogram storage.

* Review comments. Additional testing.
  • Loading branch information
delucchi-cmu committed May 9, 2024
1 parent 7a5c17c commit e6a1ade
Show file tree
Hide file tree
Showing 7 changed files with 228 additions and 98 deletions.
11 changes: 7 additions & 4 deletions src/hipscat_import/catalog/map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from hipscat_import.catalog.file_readers import InputReader
from hipscat_import.catalog.resume_plan import ResumePlan
from hipscat_import.catalog.sparse_histogram import SparseHistogram
from hipscat_import.pipeline_resume_plan import get_pixel_cache_directory

# pylint: disable=too-many-locals,too-many-arguments
Expand Down Expand Up @@ -93,7 +94,7 @@ def map_to_pixels(
ValueError: if the `ra_column` or `dec_column` cannot be found in the input file.
FileNotFoundError: if the file does not exist, or is a directory
"""
histo = pixel_math.empty_histogram(highest_order)
histo = SparseHistogram.make_empty(highest_order)

if use_hipscat_index:
read_columns = [HIPSCAT_ID_COLUMN]
Expand All @@ -104,9 +105,11 @@ def map_to_pixels(
input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index, read_columns
):
mapped_pixel, count_at_pixel = np.unique(mapped_pixels, return_counts=True)
mapped_pixel = mapped_pixel.astype(np.int64)
histo[mapped_pixel] += count_at_pixel.astype(np.int64)
ResumePlan.write_partial_histogram(tmp_path=resume_path, mapping_key=mapping_key, histogram=histo)

partial = SparseHistogram.make_from_counts(mapped_pixel, count_at_pixel, healpix_order=highest_order)
histo.add(partial)

histo.to_file(ResumePlan.partial_histogram_file(tmp_path=resume_path, mapping_key=mapping_key))


def split_pixels(
Expand Down
82 changes: 27 additions & 55 deletions src/hipscat_import/catalog/resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@
from typing import List, Optional, Tuple

import healpy as hp
import numpy as np
from hipscat import pixel_math
from hipscat.io import FilePointer, file_io
from hipscat.pixel_math.healpix_pixel import HealpixPixel
from numpy import frombuffer
from tqdm.auto import tqdm

from hipscat_import.catalog.sparse_histogram import SparseHistogram
from hipscat_import.pipeline_resume_plan import PipelineResumePlan


Expand All @@ -33,7 +31,7 @@ class ResumePlan(PipelineResumePlan):
SPLITTING_STAGE = "splitting"
REDUCING_STAGE = "reducing"

HISTOGRAM_BINARY_FILE = "mapping_histogram.binary"
HISTOGRAM_BINARY_FILE = "mapping_histogram.npz"
HISTOGRAMS_DIR = "histograms"

def __post_init__(self):
Expand Down Expand Up @@ -92,10 +90,10 @@ def get_remaining_map_keys(self):
"""Gather remaining keys, dropping successful mapping tasks from histogram names.
Returns:
list of mapping keys *not* found in files like /resume/path/mapping_key.binary
list of mapping keys *not* found in files like /resume/path/mapping_key.npz
"""
prefix = file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAMS_DIR)
mapped_keys = self.get_keys_from_file_names(prefix, ".binary")
mapped_keys = self.get_keys_from_file_names(prefix, ".npz")
return [
(f"map_{i}", file_path)
for i, file_path in enumerate(self.input_paths)
Expand All @@ -110,13 +108,24 @@ def read_histogram(self, healpix_order):
- Otherwise, return an empty histogram
"""
file_name = file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAM_BINARY_FILE)
if file_io.does_file_or_directory_exist(file_name):
# Look for the single combined histogram file
with open(file_name, "rb") as file_handle:
full_histogram = frombuffer(file_handle.read(), dtype=np.int64)
else:
# Read the histogram from partial binaries
full_histogram = self.read_histogram_from_partials(healpix_order)
if not file_io.does_file_or_directory_exist(file_name):
# Read the histogram from partial histograms and combine.
remaining_map_files = self.get_remaining_map_keys()
if len(remaining_map_files) > 0:
raise RuntimeError(f"{len(remaining_map_files)} map stages did not complete successfully.")
histogram_files = file_io.find_files_matching_path(self.tmp_path, self.HISTOGRAMS_DIR, "*.npz")
aggregate_histogram = SparseHistogram.make_empty(healpix_order)
for partial_file_name in histogram_files:
aggregate_histogram.add(SparseHistogram.from_file(partial_file_name))

aggregate_histogram.to_file(file_name)
file_io.remove_directory(
file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAMS_DIR),
ignore_errors=True,
)

full_histogram = SparseHistogram.from_file(file_name).to_array()

if len(full_histogram) != hp.order2npix(healpix_order):
raise ValueError(
"The histogram from the previous execution is incompatible with "
Expand All @@ -125,58 +134,21 @@ def read_histogram(self, healpix_order):
)
return full_histogram

def _get_partial_filenames(self):
remaining_map_files = self.get_remaining_map_keys()
if len(remaining_map_files) > 0:
raise RuntimeError(f"{len(remaining_map_files)} map stages did not complete successfully.")
histogram_files = file_io.find_files_matching_path(self.tmp_path, self.HISTOGRAMS_DIR, "*.binary")
return histogram_files

def read_histogram_from_partials(self, healpix_order):
"""Combines the histogram partials to get the full histogram."""
histogram_files = self._get_partial_filenames()
full_histogram = pixel_math.empty_histogram(healpix_order)
# Read the partial histograms and make sure they are all the same size
for index, file_name in enumerate(histogram_files):
with open(file_name, "rb") as file_handle:
partial = frombuffer(file_handle.read(), dtype=np.int64)
if index == 0:
full_histogram = partial
elif len(partial) != len(full_histogram):
raise ValueError("The histogram partials have inconsistent sizes.")
else:
full_histogram = np.add(full_histogram, partial)
self._write_combined_histogram(full_histogram)
return full_histogram

@classmethod
def write_partial_histogram(cls, tmp_path, mapping_key: str, histogram):
"""Write partial histogram to a special intermediate directory
def partial_histogram_file(cls, tmp_path, mapping_key: str):
"""File name for writing a histogram file to a special intermediate directory.
As a side effect, this method may create the special intermediate directory.
Args:
tmp_path (str): where to write intermediate resume files.
mapping_key (str): unique string for each mapping task (e.g. "map_57")
histogram (np.array): one-dimensional numpy array of long integers where
the value at each index corresponds to the number of objects found at
the healpix pixel.
"""
file_io.make_directory(
file_io.append_paths_to_pointer(tmp_path, cls.HISTOGRAMS_DIR),
exist_ok=True,
)
file_name = file_io.append_paths_to_pointer(tmp_path, cls.HISTOGRAMS_DIR, f"{mapping_key}.binary")
with open(file_name, "wb+") as file_handle:
file_handle.write(histogram.data)

def _write_combined_histogram(self, histogram):
"""Writes the full histogram to disk, removing the pre-existing partials."""
file_name = file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAM_BINARY_FILE)
with open(file_name, "wb+") as file_handle:
file_handle.write(histogram.data)
file_io.remove_directory(
file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAMS_DIR),
ignore_errors=True,
)
return file_io.append_paths_to_pointer(tmp_path, cls.HISTOGRAMS_DIR, f"{mapping_key}.npz")

def get_remaining_split_keys(self):
"""Gather remaining keys, dropping successful split tasks from done file names.
Expand Down
96 changes: 96 additions & 0 deletions src/hipscat_import/catalog/sparse_histogram.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
"""Sparse 1-D histogram of healpix pixel counts."""

import healpy as hp
import numpy as np
from scipy.sparse import csc_array, load_npz, save_npz, sparray


class SparseHistogram:
"""Wrapper around scipy's sparse array."""

def __init__(self, sparse_array):
if not isinstance(sparse_array, sparray):
raise ValueError("The sparse array must be a scipy sparse array.")
if sparse_array.format != "csc":
raise ValueError("The sparse array must be a Compressed Sparse Column array.")
self.sparse_array = sparse_array

def add(self, other):
"""Add in another sparse histogram, updating this wrapper's array.
Args:
other (SparseHistogram): the wrapper containing the addend
"""
if not isinstance(other, SparseHistogram):
raise ValueError("Both addends should be SparseHistogram.")
if self.sparse_array.shape != other.sparse_array.shape:
raise ValueError(
"The histogram partials have incompatible sizes due to different healpix orders. "
+ "To start the pipeline from scratch with the current order set `resume` to False."
)
self.sparse_array += other.sparse_array

def to_array(self):
"""Convert the sparse array to a dense numpy array.
Returns:
dense 1-d numpy array.
"""
return self.sparse_array.toarray()[0]

def to_file(self, file_name):
"""Persist the sparse array to disk.
NB: this saves as a sparse array, and so will likely have lower space requirements
than saving the corresponding dense 1-d numpy array.
"""
save_npz(file_name, self.sparse_array)

@classmethod
def make_empty(cls, healpix_order=10):
"""Create an empty sparse array for a given healpix order.
Args:
healpix_order (int): healpix order
Returns:
new sparse histogram
"""
histo = csc_array((1, hp.order2npix(healpix_order)), dtype=np.int64)
return cls(histo)

@classmethod
def make_from_counts(cls, indexes, counts_at_indexes, healpix_order=10):
"""Create an sparse array for a given healpix order, prefilled with counts at
the provided indexes.
e.g. for a dense 1-d numpy histogram of order 0, you might see::
[0, 4, 0, 0, 0, 0, 0, 0, 9, 0, 0]
There are only elements at [1, 8], and they have respective values [4, 9]. You
would create the sparse histogram like::
make_from_counts([1, 8], [4, 9], 0)
Args:
indexes (int[]): index locations of non-zero values
counts_at_indexes (int[]): values at the ``indexes``
healpix_order (int): healpix order
Returns:
new sparse histogram
"""
row = np.array(np.zeros(len(indexes), dtype=np.int64))
histo = csc_array((counts_at_indexes, (row, indexes)), shape=(1, hp.order2npix(healpix_order)))
return cls(histo)

@classmethod
def from_file(cls, file_name):
"""Read sparse histogram from a file.
Returns:
new sparse histogram
"""
histo = load_npz(file_name)
return cls(histo)
8 changes: 4 additions & 4 deletions tests/hipscat_import/catalog/test_map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
import pandas as pd
import pyarrow as pa
import pytest
from numpy import frombuffer

import hipscat_import.catalog.map_reduce as mr
from hipscat_import.catalog.file_readers import get_file_reader
from hipscat_import.catalog.sparse_histogram import SparseHistogram


def test_read_empty_filename():
Expand Down Expand Up @@ -74,9 +74,9 @@ def test_read_bad_fileformat(blank_data_file):

def read_partial_histogram(tmp_path, mapping_key):
"""Helper to read in the former result of a map operation."""
histogram_file = os.path.join(tmp_path, "histograms", f"{mapping_key}.binary")
with open(histogram_file, "rb") as file_handle:
return frombuffer(file_handle.read(), dtype=np.int64)
histogram_file = os.path.join(tmp_path, "histograms", f"{mapping_key}.npz")
hist = SparseHistogram.from_file(histogram_file)
return hist.to_array()


def test_read_single_fits(tmp_path, formats_fits):
Expand Down
16 changes: 6 additions & 10 deletions tests/hipscat_import/catalog/test_resume_plan.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""Test catalog resume logic"""

import hipscat.pixel_math as hist
import numpy.testing as npt
import pytest
from hipscat.pixel_math.healpix_pixel import HealpixPixel

from hipscat_import.catalog.resume_plan import ResumePlan
from hipscat_import.catalog.sparse_histogram import SparseHistogram


def test_mapping_done(tmp_path):
Expand Down Expand Up @@ -105,18 +105,16 @@ def test_read_write_histogram(tmp_path):
with pytest.raises(RuntimeError, match="map stages"):
result = plan.read_histogram(0)

expected = hist.empty_histogram(0)
expected[11] = 131

remaining_keys = plan.get_remaining_map_keys()
assert remaining_keys == [("map_0", "foo1")]

ResumePlan.write_partial_histogram(tmp_path=tmp_path, mapping_key="map_0", histogram=expected)
histogram = SparseHistogram.make_from_counts([11], [131], 0)
histogram.to_file(ResumePlan.partial_histogram_file(tmp_path=tmp_path, mapping_key="map_0"))

remaining_keys = plan.get_remaining_map_keys()
assert len(remaining_keys) == 0
result = plan.read_histogram(0)
npt.assert_array_equal(result, expected)
npt.assert_array_equal(result, histogram.to_array())


def never_fails():
Expand All @@ -135,10 +133,8 @@ def test_some_map_task_failures(tmp_path, dask_client):
with pytest.raises(RuntimeError, match="map stages"):
plan.wait_for_mapping(futures)

expected = hist.empty_histogram(0)
expected[11] = 131

ResumePlan.write_partial_histogram(tmp_path=tmp_path, mapping_key="map_0", histogram=expected)
histogram = SparseHistogram.make_from_counts([11], [131], 0)
histogram.to_file(ResumePlan.partial_histogram_file(tmp_path=tmp_path, mapping_key="map_0"))

## Method succeeds, *and* partial histogram is present.
futures = [dask_client.submit(never_fails)]
Expand Down
Loading

0 comments on commit e6a1ade

Please sign in to comment.