From 2cad92abfe0c2ad2b9ab78d8537d3854004148c3 Mon Sep 17 00:00:00 2001 From: Melissa DeLucchi Date: Wed, 4 Sep 2024 12:07:48 -0400 Subject: [PATCH] Fix up tests. --- benchmarks/benchmarks.py | 12 ++++-------- src/hipscat_import/catalog/resume_plan.py | 16 +++++++++------- src/hipscat_import/catalog/sparse_histogram.py | 5 +++++ tests/hipscat_import/catalog/test_run_import.py | 2 +- 4 files changed, 19 insertions(+), 16 deletions(-) diff --git a/benchmarks/benchmarks.py b/benchmarks/benchmarks.py index f7e8239..061dc65 100644 --- a/benchmarks/benchmarks.py +++ b/benchmarks/benchmarks.py @@ -1,6 +1,8 @@ import os from pathlib import Path + import numpy as np + from hipscat_import.catalog.resume_plan import ResumePlan from hipscat_import.catalog.sparse_histogram import SparseHistogram @@ -26,14 +28,8 @@ def setup_cache(self): histo.to_file(binning_dir / f"map_{m}") return (tmp_dir, num_paths) - # def time_read_histogram(self, cache): - # input_paths = [f"foo{i}" for i in range(0, cache[1])] - # plan = ResumePlan(tmp_path=cache[0], progress_bar=False, input_paths=input_paths) - - # plan.read_histogram(8) - - def peakmem_read_histogram(self, cache): + def time_read_histogram(self, cache): input_paths = [f"foo{i}" for i in range(0, cache[1])] plan = ResumePlan(tmp_path=cache[0], progress_bar=False, input_paths=input_paths) - plan.read_histogram(8) \ No newline at end of file + plan.read_histogram(8) diff --git a/src/hipscat_import/catalog/resume_plan.py b/src/hipscat_import/catalog/resume_plan.py index c0739bb..83de436 100644 --- a/src/hipscat_import/catalog/resume_plan.py +++ b/src/hipscat_import/catalog/resume_plan.py @@ -6,13 +6,14 @@ from dataclasses import dataclass, field from typing import List, Optional, Tuple +import hipscat.pixel_math as hist import hipscat.pixel_math.healpix_shim as hp import numpy as np from hipscat import pixel_math from hipscat.io import FilePointer, file_io from hipscat.pixel_math.healpix_pixel import HealpixPixel -import hipscat.pixel_math as hist from numpy import frombuffer + from hipscat_import.catalog.sparse_histogram import SparseHistogram from hipscat_import.pipeline_resume_plan import PipelineResumePlan @@ -177,12 +178,15 @@ def read_histogram(self, healpix_order): raise RuntimeError(f"{len(remaining_map_files)} map stages did not complete successfully.") histogram_files = file_io.find_files_matching_path(self.tmp_path, self.HISTOGRAMS_DIR, "*.npz") aggregate_histogram = hist.empty_histogram(healpix_order) - # aggregate_histogram = SparseHistogram.make_empty(healpix_order) for partial_file_name in histogram_files: partial = SparseHistogram.from_file(partial_file_name) - aggregate_histogram = np.add(aggregate_histogram, partial.to_array()) - - # aggregate_histogram.to_file(file_name) + partial_as_array = partial.to_array() + if aggregate_histogram.shape != partial_as_array.shape: + raise ValueError( + "The histogram partials have incompatible sizes due to different healpix orders. " + + "To start the pipeline from scratch with the current order set `resume` to False." + ) + aggregate_histogram = np.add(aggregate_histogram, partial_as_array) file_name = file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAM_BINARY_FILE) with open(file_name, "wb+") as file_handle: @@ -193,8 +197,6 @@ def read_histogram(self, healpix_order): ignore_errors=True, ) - # full_histogram = SparseHistogram.from_file(file_name).to_array() - with open(file_name, "rb") as file_handle: full_histogram = frombuffer(file_handle.read(), dtype=np.int64) diff --git a/src/hipscat_import/catalog/sparse_histogram.py b/src/hipscat_import/catalog/sparse_histogram.py index ff2a3eb..ac1549a 100644 --- a/src/hipscat_import/catalog/sparse_histogram.py +++ b/src/hipscat_import/catalog/sparse_histogram.py @@ -46,6 +46,11 @@ def to_file(self, file_name): """ save_npz(file_name, self.sparse_array) + def to_dense_file(self, file_name): + """Persist the DENSE array to disk as a numpy array.""" + with open(file_name, "wb+") as file_handle: + file_handle.write(self.to_array().data) + @classmethod def make_empty(cls, healpix_order=10): """Create an empty sparse array for a given healpix order. diff --git a/tests/hipscat_import/catalog/test_run_import.py b/tests/hipscat_import/catalog/test_run_import.py index adb5ab8..9661261 100644 --- a/tests/hipscat_import/catalog/test_run_import.py +++ b/tests/hipscat_import/catalog/test_run_import.py @@ -139,7 +139,7 @@ def test_resume_dask_runner_diff_pixel_order( ## Now set up our resume files to match previous work. resume_tmp = tmp_path / "tmp" / "resume_catalog" ResumePlan(tmp_path=resume_tmp, progress_bar=False) - SparseHistogram.make_from_counts([11], [131], 0).to_file(resume_tmp / "mapping_histogram.npz") + SparseHistogram.make_from_counts([11], [131], 0).to_dense_file(resume_tmp / "mapping_histogram.npz") for file_index in range(0, 5): ResumePlan.touch_key_done_file(resume_tmp, ResumePlan.SPLITTING_STAGE, f"split_{file_index}")