diff --git a/src/hipscat_import/catalog/map_reduce.py b/src/hipscat_import/catalog/map_reduce.py index 01188540..39b81c32 100644 --- a/src/hipscat_import/catalog/map_reduce.py +++ b/src/hipscat_import/catalog/map_reduce.py @@ -13,6 +13,7 @@ from hipscat_import.catalog.file_readers import InputReader from hipscat_import.catalog.resume_plan import ResumePlan +from hipscat_import.catalog.sparse_histogram import SparseHistogram from hipscat_import.pipeline_resume_plan import get_pixel_cache_directory # pylint: disable=too-many-locals,too-many-arguments @@ -93,7 +94,7 @@ def map_to_pixels( ValueError: if the `ra_column` or `dec_column` cannot be found in the input file. FileNotFoundError: if the file does not exist, or is a directory """ - histo = pixel_math.empty_histogram(highest_order) + histo = SparseHistogram.make_empty(highest_order) if use_hipscat_index: read_columns = [HIPSCAT_ID_COLUMN] @@ -104,9 +105,11 @@ def map_to_pixels( input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index, read_columns ): mapped_pixel, count_at_pixel = np.unique(mapped_pixels, return_counts=True) - mapped_pixel = mapped_pixel.astype(np.int64) - histo[mapped_pixel] += count_at_pixel.astype(np.int64) - ResumePlan.write_partial_histogram(tmp_path=resume_path, mapping_key=mapping_key, histogram=histo) + + partial = SparseHistogram.make_from_counts(mapped_pixel, count_at_pixel, healpix_order=highest_order) + histo.add(partial) + + histo.to_file(ResumePlan.partial_histogram_file(tmp_path=resume_path, mapping_key=mapping_key)) def split_pixels( diff --git a/src/hipscat_import/catalog/resume_plan.py b/src/hipscat_import/catalog/resume_plan.py index 916e6ac3..1d7280de 100644 --- a/src/hipscat_import/catalog/resume_plan.py +++ b/src/hipscat_import/catalog/resume_plan.py @@ -6,13 +6,11 @@ from typing import List, Optional, Tuple import healpy as hp -import numpy as np -from hipscat import pixel_math from hipscat.io import FilePointer, file_io from hipscat.pixel_math.healpix_pixel import HealpixPixel -from numpy import frombuffer from tqdm.auto import tqdm +from hipscat_import.catalog.sparse_histogram import SparseHistogram from hipscat_import.pipeline_resume_plan import PipelineResumePlan @@ -33,7 +31,7 @@ class ResumePlan(PipelineResumePlan): SPLITTING_STAGE = "splitting" REDUCING_STAGE = "reducing" - HISTOGRAM_BINARY_FILE = "mapping_histogram.binary" + HISTOGRAM_BINARY_FILE = "mapping_histogram.npz" HISTOGRAMS_DIR = "histograms" def __post_init__(self): @@ -92,10 +90,10 @@ def get_remaining_map_keys(self): """Gather remaining keys, dropping successful mapping tasks from histogram names. Returns: - list of mapping keys *not* found in files like /resume/path/mapping_key.binary + list of mapping keys *not* found in files like /resume/path/mapping_key.npz """ prefix = file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAMS_DIR) - mapped_keys = self.get_keys_from_file_names(prefix, ".binary") + mapped_keys = self.get_keys_from_file_names(prefix, ".npz") return [ (f"map_{i}", file_path) for i, file_path in enumerate(self.input_paths) @@ -110,13 +108,24 @@ def read_histogram(self, healpix_order): - Otherwise, return an empty histogram """ file_name = file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAM_BINARY_FILE) - if file_io.does_file_or_directory_exist(file_name): - # Look for the single combined histogram file - with open(file_name, "rb") as file_handle: - full_histogram = frombuffer(file_handle.read(), dtype=np.int64) - else: - # Read the histogram from partial binaries - full_histogram = self.read_histogram_from_partials(healpix_order) + if not file_io.does_file_or_directory_exist(file_name): + # Read the histogram from partial histograms and combine. + remaining_map_files = self.get_remaining_map_keys() + if len(remaining_map_files) > 0: + raise RuntimeError(f"{len(remaining_map_files)} map stages did not complete successfully.") + histogram_files = file_io.find_files_matching_path(self.tmp_path, self.HISTOGRAMS_DIR, "*.npz") + aggregate_histogram = SparseHistogram.make_empty(healpix_order) + for partial_file_name in histogram_files: + aggregate_histogram.add(SparseHistogram.from_file(partial_file_name)) + + aggregate_histogram.to_file(file_name) + file_io.remove_directory( + file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAMS_DIR), + ignore_errors=True, + ) + + full_histogram = SparseHistogram.from_file(file_name).to_array() + if len(full_histogram) != hp.order2npix(healpix_order): raise ValueError( "The histogram from the previous execution is incompatible with " @@ -125,58 +134,21 @@ def read_histogram(self, healpix_order): ) return full_histogram - def _get_partial_filenames(self): - remaining_map_files = self.get_remaining_map_keys() - if len(remaining_map_files) > 0: - raise RuntimeError(f"{len(remaining_map_files)} map stages did not complete successfully.") - histogram_files = file_io.find_files_matching_path(self.tmp_path, self.HISTOGRAMS_DIR, "*.binary") - return histogram_files - - def read_histogram_from_partials(self, healpix_order): - """Combines the histogram partials to get the full histogram.""" - histogram_files = self._get_partial_filenames() - full_histogram = pixel_math.empty_histogram(healpix_order) - # Read the partial histograms and make sure they are all the same size - for index, file_name in enumerate(histogram_files): - with open(file_name, "rb") as file_handle: - partial = frombuffer(file_handle.read(), dtype=np.int64) - if index == 0: - full_histogram = partial - elif len(partial) != len(full_histogram): - raise ValueError("The histogram partials have inconsistent sizes.") - else: - full_histogram = np.add(full_histogram, partial) - self._write_combined_histogram(full_histogram) - return full_histogram - @classmethod - def write_partial_histogram(cls, tmp_path, mapping_key: str, histogram): - """Write partial histogram to a special intermediate directory + def partial_histogram_file(cls, tmp_path, mapping_key: str): + """File name for writing a histogram file to a special intermediate directory. + + As a side effect, this method may create the special intermediate directory. Args: tmp_path (str): where to write intermediate resume files. mapping_key (str): unique string for each mapping task (e.g. "map_57") - histogram (np.array): one-dimensional numpy array of long integers where - the value at each index corresponds to the number of objects found at - the healpix pixel. """ file_io.make_directory( file_io.append_paths_to_pointer(tmp_path, cls.HISTOGRAMS_DIR), exist_ok=True, ) - file_name = file_io.append_paths_to_pointer(tmp_path, cls.HISTOGRAMS_DIR, f"{mapping_key}.binary") - with open(file_name, "wb+") as file_handle: - file_handle.write(histogram.data) - - def _write_combined_histogram(self, histogram): - """Writes the full histogram to disk, removing the pre-existing partials.""" - file_name = file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAM_BINARY_FILE) - with open(file_name, "wb+") as file_handle: - file_handle.write(histogram.data) - file_io.remove_directory( - file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAMS_DIR), - ignore_errors=True, - ) + return file_io.append_paths_to_pointer(tmp_path, cls.HISTOGRAMS_DIR, f"{mapping_key}.npz") def get_remaining_split_keys(self): """Gather remaining keys, dropping successful split tasks from done file names. diff --git a/src/hipscat_import/catalog/sparse_histogram.py b/src/hipscat_import/catalog/sparse_histogram.py new file mode 100644 index 00000000..14367942 --- /dev/null +++ b/src/hipscat_import/catalog/sparse_histogram.py @@ -0,0 +1,96 @@ +"""Sparse 1-D histogram of healpix pixel counts.""" + +import healpy as hp +import numpy as np +from scipy.sparse import csc_array, load_npz, save_npz, sparray + + +class SparseHistogram: + """Wrapper around scipy's sparse array.""" + + def __init__(self, sparse_array): + if not isinstance(sparse_array, sparray): + raise ValueError("The sparse array must be a scipy sparse array.") + if sparse_array.format != "csc": + raise ValueError("The sparse array must be a Compressed Sparse Column array.") + self.sparse_array = sparse_array + + def add(self, other): + """Add in another sparse histogram, updating this wrapper's array. + + Args: + other (SparseHistogram): the wrapper containing the addend + """ + if not isinstance(other, SparseHistogram): + raise ValueError("Both addends should be SparseHistogram.") + if self.sparse_array.shape != other.sparse_array.shape: + raise ValueError( + "The histogram partials have incompatible sizes due to different healpix orders. " + + "To start the pipeline from scratch with the current order set `resume` to False." + ) + self.sparse_array += other.sparse_array + + def to_array(self): + """Convert the sparse array to a dense numpy array. + + Returns: + dense 1-d numpy array. + """ + return self.sparse_array.toarray()[0] + + def to_file(self, file_name): + """Persist the sparse array to disk. + + NB: this saves as a sparse array, and so will likely have lower space requirements + than saving the corresponding dense 1-d numpy array. + """ + save_npz(file_name, self.sparse_array) + + @classmethod + def make_empty(cls, healpix_order=10): + """Create an empty sparse array for a given healpix order. + + Args: + healpix_order (int): healpix order + + Returns: + new sparse histogram + """ + histo = csc_array((1, hp.order2npix(healpix_order)), dtype=np.int64) + return cls(histo) + + @classmethod + def make_from_counts(cls, indexes, counts_at_indexes, healpix_order=10): + """Create an sparse array for a given healpix order, prefilled with counts at + the provided indexes. + + e.g. for a dense 1-d numpy histogram of order 0, you might see:: + + [0, 4, 0, 0, 0, 0, 0, 0, 9, 0, 0] + + There are only elements at [1, 8], and they have respective values [4, 9]. You + would create the sparse histogram like:: + + make_from_counts([1, 8], [4, 9], 0) + + Args: + indexes (int[]): index locations of non-zero values + counts_at_indexes (int[]): values at the ``indexes`` + healpix_order (int): healpix order + + Returns: + new sparse histogram + """ + row = np.array(np.zeros(len(indexes), dtype=np.int64)) + histo = csc_array((counts_at_indexes, (row, indexes)), shape=(1, hp.order2npix(healpix_order))) + return cls(histo) + + @classmethod + def from_file(cls, file_name): + """Read sparse histogram from a file. + + Returns: + new sparse histogram + """ + histo = load_npz(file_name) + return cls(histo) diff --git a/tests/hipscat_import/catalog/test_map_reduce.py b/tests/hipscat_import/catalog/test_map_reduce.py index 3a17e674..dad2eb49 100644 --- a/tests/hipscat_import/catalog/test_map_reduce.py +++ b/tests/hipscat_import/catalog/test_map_reduce.py @@ -10,10 +10,10 @@ import pandas as pd import pyarrow as pa import pytest -from numpy import frombuffer import hipscat_import.catalog.map_reduce as mr from hipscat_import.catalog.file_readers import get_file_reader +from hipscat_import.catalog.sparse_histogram import SparseHistogram def test_read_empty_filename(): @@ -74,9 +74,9 @@ def test_read_bad_fileformat(blank_data_file): def read_partial_histogram(tmp_path, mapping_key): """Helper to read in the former result of a map operation.""" - histogram_file = os.path.join(tmp_path, "histograms", f"{mapping_key}.binary") - with open(histogram_file, "rb") as file_handle: - return frombuffer(file_handle.read(), dtype=np.int64) + histogram_file = os.path.join(tmp_path, "histograms", f"{mapping_key}.npz") + hist = SparseHistogram.from_file(histogram_file) + return hist.to_array() def test_read_single_fits(tmp_path, formats_fits): diff --git a/tests/hipscat_import/catalog/test_resume_plan.py b/tests/hipscat_import/catalog/test_resume_plan.py index eabaabab..32425dab 100644 --- a/tests/hipscat_import/catalog/test_resume_plan.py +++ b/tests/hipscat_import/catalog/test_resume_plan.py @@ -1,11 +1,11 @@ """Test catalog resume logic""" -import hipscat.pixel_math as hist import numpy.testing as npt import pytest from hipscat.pixel_math.healpix_pixel import HealpixPixel from hipscat_import.catalog.resume_plan import ResumePlan +from hipscat_import.catalog.sparse_histogram import SparseHistogram def test_mapping_done(tmp_path): @@ -105,18 +105,16 @@ def test_read_write_histogram(tmp_path): with pytest.raises(RuntimeError, match="map stages"): result = plan.read_histogram(0) - expected = hist.empty_histogram(0) - expected[11] = 131 - remaining_keys = plan.get_remaining_map_keys() assert remaining_keys == [("map_0", "foo1")] - ResumePlan.write_partial_histogram(tmp_path=tmp_path, mapping_key="map_0", histogram=expected) + histogram = SparseHistogram.make_from_counts([11], [131], 0) + histogram.to_file(ResumePlan.partial_histogram_file(tmp_path=tmp_path, mapping_key="map_0")) remaining_keys = plan.get_remaining_map_keys() assert len(remaining_keys) == 0 result = plan.read_histogram(0) - npt.assert_array_equal(result, expected) + npt.assert_array_equal(result, histogram.to_array()) def never_fails(): @@ -135,10 +133,8 @@ def test_some_map_task_failures(tmp_path, dask_client): with pytest.raises(RuntimeError, match="map stages"): plan.wait_for_mapping(futures) - expected = hist.empty_histogram(0) - expected[11] = 131 - - ResumePlan.write_partial_histogram(tmp_path=tmp_path, mapping_key="map_0", histogram=expected) + histogram = SparseHistogram.make_from_counts([11], [131], 0) + histogram.to_file(ResumePlan.partial_histogram_file(tmp_path=tmp_path, mapping_key="map_0")) ## Method succeeds, *and* partial histogram is present. futures = [dask_client.submit(never_fails)] diff --git a/tests/hipscat_import/catalog/test_run_import.py b/tests/hipscat_import/catalog/test_run_import.py index 0a44d19a..64efe57a 100644 --- a/tests/hipscat_import/catalog/test_run_import.py +++ b/tests/hipscat_import/catalog/test_run_import.py @@ -3,7 +3,6 @@ import os import shutil -import hipscat.pixel_math as hist import numpy as np import pandas as pd import pyarrow as pa @@ -15,6 +14,7 @@ from hipscat_import.catalog.arguments import ImportArguments from hipscat_import.catalog.file_readers import CsvReader from hipscat_import.catalog.resume_plan import ResumePlan +from hipscat_import.catalog.sparse_histogram import SparseHistogram def test_empty_args(): @@ -49,16 +49,17 @@ def test_resume_dask_runner( ## Now set up our resume files to match previous work. resume_tmp = os.path.join(tmp_path, "tmp", "resume_catalog") plan = ResumePlan(tmp_path=resume_tmp, progress_bar=False) - histogram = hist.empty_histogram(0) - histogram[11] = 131 - empty = hist.empty_histogram(0) + histogram = SparseHistogram.make_from_counts([11], [131], 0) + empty = SparseHistogram.make_empty(0) for file_index in range(0, 5): ResumePlan.touch_key_done_file(resume_tmp, ResumePlan.SPLITTING_STAGE, f"split_{file_index}") - ResumePlan.write_partial_histogram( - tmp_path=resume_tmp, - mapping_key=f"map_{file_index}", - histogram=histogram if file_index == 0 else empty, + histogram_file = ResumePlan.partial_histogram_file( + tmp_path=resume_tmp, mapping_key=f"map_{file_index}" ) + if file_index == 0: + histogram.to_file(histogram_file) + else: + empty.to_file(histogram_file) ResumePlan.touch_key_done_file(resume_tmp, ResumePlan.REDUCING_STAGE, "0_11") @@ -154,18 +155,11 @@ def test_resume_dask_runner_diff_pixel_order( ## Now set up our resume files to match previous work. resume_tmp = os.path.join(tmp_path, "tmp", "resume_catalog") ResumePlan(tmp_path=resume_tmp, progress_bar=False) - histogram = hist.empty_histogram(0) - histogram[11] = 131 - empty = hist.empty_histogram(0) + SparseHistogram.make_from_counts([11], [131], 0).to_file( + os.path.join(resume_tmp, "mapping_histogram.npz") + ) for file_index in range(0, 5): ResumePlan.touch_key_done_file(resume_tmp, ResumePlan.SPLITTING_STAGE, f"split_{file_index}") - ResumePlan.write_partial_histogram( - tmp_path=resume_tmp, - mapping_key=f"map_{file_index}", - histogram=histogram if file_index == 0 else empty, - ) - - ResumePlan.touch_key_done_file(resume_tmp, ResumePlan.REDUCING_STAGE, "0_11") shutil.copytree( os.path.join(resume_dir, "Norder=0"), @@ -232,19 +226,22 @@ def test_resume_dask_runner_histograms_diff_size( ResumePlan(tmp_path=resume_tmp, progress_bar=False) # We'll create mock partial histograms of size 0 and 2 - histogram = hist.empty_histogram(0) - wrong_histogram = hist.empty_histogram(2) + histogram = SparseHistogram.make_empty(0) + wrong_histogram = SparseHistogram.make_empty(2) for file_index in range(0, 5): ResumePlan.touch_key_done_file(resume_tmp, ResumePlan.SPLITTING_STAGE, f"split_{file_index}") - ResumePlan.write_partial_histogram( - tmp_path=resume_tmp, - mapping_key=f"map_{file_index}", - histogram=histogram if file_index % 2 == 0 else wrong_histogram, + + histogram_file = ResumePlan.partial_histogram_file( + tmp_path=resume_tmp, mapping_key=f"map_{file_index}" ) + if file_index == 2: + histogram.to_file(histogram_file) + else: + wrong_histogram.to_file(histogram_file) with pytest.warns(UserWarning, match="resuming prior progress"): - with pytest.raises(ValueError, match="histogram partials have inconsistent sizes"): + with pytest.raises(ValueError, match="histogram partials have incompatible sizes"): args = ImportArguments( output_artifact_name="resume_catalog", input_path=small_sky_parts_dir, diff --git a/tests/hipscat_import/catalog/test_sparse_histogram.py b/tests/hipscat_import/catalog/test_sparse_histogram.py new file mode 100644 index 00000000..52e22164 --- /dev/null +++ b/tests/hipscat_import/catalog/test_sparse_histogram.py @@ -0,0 +1,66 @@ +"""Test sparse histogram behavior.""" + +import os + +import numpy as np +import numpy.testing as npt +import pytest +from scipy.sparse import csr_array + +from hipscat_import.catalog.sparse_histogram import SparseHistogram + + +def test_read_write_round_trip(tmp_path): + """Test that we can read what we write into a histogram file.""" + file_name = os.path.join(tmp_path, "round_trip.npz") + histogram = SparseHistogram.make_from_counts([11], [131], 0) + histogram.to_file(file_name) + + read_histogram = SparseHistogram.from_file(file_name) + + npt.assert_array_equal(read_histogram.to_array(), histogram.to_array()) + + +def test_add_same_order(): + """Test that we can add two histograms created from the same order, and get + the expected results.""" + partial_histogram_left = SparseHistogram.make_from_counts([11], [131], 0) + + partial_histogram_right = SparseHistogram.make_from_counts([10, 11], [4, 15], 0) + + partial_histogram_left.add(partial_histogram_right) + + expected = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, 146] + npt.assert_array_equal(partial_histogram_left.to_array(), expected) + + +def test_add_different_order(): + """Test that we can NOT add histograms of different healpix orders.""" + partial_histogram_left = SparseHistogram.make_from_counts([11], [131], 0) + + partial_histogram_right = SparseHistogram.make_from_counts([10, 11], [4, 15], 1) + + with pytest.raises(ValueError, match="partials have incompatible sizes"): + partial_histogram_left.add(partial_histogram_right) + + +def test_add_different_type(): + """Test that we can NOT add histograms of different healpix orders.""" + partial_histogram_left = SparseHistogram.make_from_counts([11], [131], 0) + + with pytest.raises(ValueError, match="addends should be SparseHistogram"): + partial_histogram_left.add(5) + + with pytest.raises(ValueError, match="addends should be SparseHistogram"): + partial_histogram_left.add([1, 2, 3, 4, 5]) + + +def test_init_bad_inputs(): + """Test that the SparseHistogram type requires a compressed sparse column + as its sole `sparse_array` argument.""" + with pytest.raises(ValueError, match="must be a scipy sparse array"): + SparseHistogram(5) + + with pytest.raises(ValueError, match="must be a Compressed Sparse Column"): + row_sparse_array = csr_array((1, 12), dtype=np.int64) + SparseHistogram(row_sparse_array)