Skip to content

Commit

Permalink
Fix up tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
delucchi-cmu committed Sep 4, 2024
1 parent 97f542d commit 2cad92a
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 16 deletions.
12 changes: 4 additions & 8 deletions benchmarks/benchmarks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import os
from pathlib import Path

import numpy as np

from hipscat_import.catalog.resume_plan import ResumePlan
from hipscat_import.catalog.sparse_histogram import SparseHistogram

Expand All @@ -26,14 +28,8 @@ def setup_cache(self):
histo.to_file(binning_dir / f"map_{m}")
return (tmp_dir, num_paths)

# def time_read_histogram(self, cache):
# input_paths = [f"foo{i}" for i in range(0, cache[1])]
# plan = ResumePlan(tmp_path=cache[0], progress_bar=False, input_paths=input_paths)

# plan.read_histogram(8)

def peakmem_read_histogram(self, cache):
def time_read_histogram(self, cache):
input_paths = [f"foo{i}" for i in range(0, cache[1])]
plan = ResumePlan(tmp_path=cache[0], progress_bar=False, input_paths=input_paths)

plan.read_histogram(8)
plan.read_histogram(8)
16 changes: 9 additions & 7 deletions src/hipscat_import/catalog/resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
from dataclasses import dataclass, field
from typing import List, Optional, Tuple

import hipscat.pixel_math as hist
import hipscat.pixel_math.healpix_shim as hp
import numpy as np
from hipscat import pixel_math
from hipscat.io import FilePointer, file_io
from hipscat.pixel_math.healpix_pixel import HealpixPixel
import hipscat.pixel_math as hist
from numpy import frombuffer

from hipscat_import.catalog.sparse_histogram import SparseHistogram
from hipscat_import.pipeline_resume_plan import PipelineResumePlan

Expand Down Expand Up @@ -177,12 +178,15 @@ def read_histogram(self, healpix_order):
raise RuntimeError(f"{len(remaining_map_files)} map stages did not complete successfully.")
histogram_files = file_io.find_files_matching_path(self.tmp_path, self.HISTOGRAMS_DIR, "*.npz")
aggregate_histogram = hist.empty_histogram(healpix_order)
# aggregate_histogram = SparseHistogram.make_empty(healpix_order)
for partial_file_name in histogram_files:
partial = SparseHistogram.from_file(partial_file_name)
aggregate_histogram = np.add(aggregate_histogram, partial.to_array())

# aggregate_histogram.to_file(file_name)
partial_as_array = partial.to_array()
if aggregate_histogram.shape != partial_as_array.shape:
raise ValueError(
"The histogram partials have incompatible sizes due to different healpix orders. "
+ "To start the pipeline from scratch with the current order set `resume` to False."
)
aggregate_histogram = np.add(aggregate_histogram, partial_as_array)

file_name = file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAM_BINARY_FILE)
with open(file_name, "wb+") as file_handle:
Expand All @@ -193,8 +197,6 @@ def read_histogram(self, healpix_order):
ignore_errors=True,
)

# full_histogram = SparseHistogram.from_file(file_name).to_array()

with open(file_name, "rb") as file_handle:
full_histogram = frombuffer(file_handle.read(), dtype=np.int64)

Expand Down
5 changes: 5 additions & 0 deletions src/hipscat_import/catalog/sparse_histogram.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ def to_file(self, file_name):
"""
save_npz(file_name, self.sparse_array)

def to_dense_file(self, file_name):
"""Persist the DENSE array to disk as a numpy array."""
with open(file_name, "wb+") as file_handle:
file_handle.write(self.to_array().data)

@classmethod
def make_empty(cls, healpix_order=10):
"""Create an empty sparse array for a given healpix order.
Expand Down
2 changes: 1 addition & 1 deletion tests/hipscat_import/catalog/test_run_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def test_resume_dask_runner_diff_pixel_order(
## Now set up our resume files to match previous work.
resume_tmp = tmp_path / "tmp" / "resume_catalog"
ResumePlan(tmp_path=resume_tmp, progress_bar=False)
SparseHistogram.make_from_counts([11], [131], 0).to_file(resume_tmp / "mapping_histogram.npz")
SparseHistogram.make_from_counts([11], [131], 0).to_dense_file(resume_tmp / "mapping_histogram.npz")
for file_index in range(0, 5):
ResumePlan.touch_key_done_file(resume_tmp, ResumePlan.SPLITTING_STAGE, f"split_{file_index}")

Expand Down

0 comments on commit 2cad92a

Please sign in to comment.