Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove get_pixels calls #148

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 50 additions & 18 deletions src/hipscat_import/catalog/resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
from __future__ import annotations

from dataclasses import dataclass, field
from typing import List, Tuple
from typing import List, Optional, Tuple

import numpy as np
from hipscat import pixel_math
from hipscat.io import FilePointer, file_io
from hipscat.pixel_math.healpix_pixel import HealpixPixel
from numpy import frombuffer
from tqdm import tqdm

Expand All @@ -24,6 +25,8 @@ class ResumePlan(PipelineResumePlan):
"""list of files (and job keys) that have yet to be mapped"""
split_keys: List[Tuple[str, str]] = field(default_factory=list)
"""set of files (and job keys) that have yet to be split"""
destination_pixel_map: Optional[List[Tuple[HealpixPixel, List[HealpixPixel], str]]] = None
"""Fully resolved map of destination pixels to constituent smaller pixels"""

MAPPING_STAGE = "mapping"
SPLITTING_STAGE = "splitting"
Expand Down Expand Up @@ -73,19 +76,9 @@ def gather_plan(self):

## Gather keys for execution.
if not mapping_done:
mapped_keys = set(self.get_mapping_keys_from_histograms())
self.map_files = [
(f"map_{i}", file_path)
for i, file_path in enumerate(self.input_paths)
if f"map_{i}" not in mapped_keys
]
self.map_files = self.get_remaining_map_keys()
if not splitting_done:
split_keys = set(self.read_done_keys(self.SPLITTING_STAGE))
self.split_keys = [
(f"split_{i}", file_path)
for i, file_path in enumerate(self.input_paths)
if f"split_{i}" not in split_keys
]
self.split_keys = self.get_remaining_split_keys()
## We don't pre-gather the plan for the reducing keys.
## It requires the full destination pixel map.
step_progress.update(1)
Expand Down Expand Up @@ -123,14 +116,19 @@ def save_original_paths(self):
for path in self.input_paths:
file_handle.write(f"{path}\n")

def get_mapping_keys_from_histograms(self):
"""Gather keys for successful mapping tasks from histogram names.
def get_remaining_map_keys(self):
"""Gather remaining keys, dropping successful mapping tasks from histogram names.

Returns:
list of mapping keys taken from files like /resume/path/mapping_key.binary
list of mapping keys *not* found in files like /resume/path/mapping_key.binary
"""
prefix = file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAMS_DIR)
return self.get_keys_from_file_names(prefix, ".binary")
mapped_keys = self.get_keys_from_file_names(prefix, ".binary")
return [
(f"map_{i}", file_path)
for i, file_path in enumerate(self.input_paths)
if f"map_{i}" not in mapped_keys
]

def read_histogram(self, healpix_order):
"""Return histogram with healpix_order'd shape
Expand All @@ -153,6 +151,9 @@ def read_histogram(self, healpix_order):
# - combine into a single histogram
# - write out as a single histogram for future reads
# - remove all partial histograms
remaining_map_files = self.get_remaining_map_keys()
if len(remaining_map_files) > 0:
raise RuntimeError(f"{len(remaining_map_files)} map stages did not complete successfully.")
histogram_files = file_io.find_files_matching_path(self.tmp_path, self.HISTOGRAMS_DIR, "**.binary")
for file_name in histogram_files:
with open(file_name, "rb") as file_handle:
Expand Down Expand Up @@ -186,6 +187,19 @@ def write_partial_histogram(cls, tmp_path, mapping_key: str, histogram):
with open(file_name, "wb+") as file_handle:
file_handle.write(histogram.data)

def get_remaining_split_keys(self):
"""Gather remaining keys, dropping successful split tasks from done file names.

Returns:
list of splitting keys *not* found in files like /resume/path/split_key.done
"""
split_keys = set(self.read_done_keys(self.SPLITTING_STAGE))
return [
(f"split_{i}", file_path)
for i, file_path in enumerate(self.input_paths)
if f"split_{i}" not in split_keys
]

@classmethod
def splitting_key_done(cls, tmp_path, splitting_key: str):
"""Mark a single splitting task as done
Expand All @@ -209,6 +223,10 @@ def reducing_key_done(cls, tmp_path, reducing_key: str):
def wait_for_mapping(self, futures):
"""Wait for mapping futures to complete."""
self.wait_for_futures(futures, self.MAPPING_STAGE)
remaining_map_items = self.get_remaining_map_keys()
if len(remaining_map_items) > 0:
raise RuntimeError("some map stages did not complete successfully.")
self.touch_stage_done_file(self.MAPPING_STAGE)

def is_mapping_done(self) -> bool:
"""Are there files left to map?"""
Expand All @@ -217,12 +235,16 @@ def is_mapping_done(self) -> bool:
def wait_for_splitting(self, futures):
"""Wait for splitting futures to complete."""
self.wait_for_futures(futures, self.SPLITTING_STAGE)
remaining_split_items = self.get_remaining_split_keys()
if len(remaining_split_items) > 0:
raise RuntimeError(f"{len(remaining_split_items)} split stages did not complete successfully.")
self.touch_stage_done_file(self.SPLITTING_STAGE)

def is_splitting_done(self) -> bool:
"""Are there files left to split?"""
return self.done_file_exists(self.SPLITTING_STAGE)

def get_reduce_items(self, destination_pixel_map):
def get_reduce_items(self, destination_pixel_map=None):
"""Fetch a triple for each partition to reduce.

Triple contains:
Expand All @@ -233,6 +255,12 @@ def get_reduce_items(self, destination_pixel_map):

"""
reduced_keys = set(self.read_done_keys(self.REDUCING_STAGE))
if destination_pixel_map is None:
destination_pixel_map = self.destination_pixel_map
elif self.destination_pixel_map is None:
self.destination_pixel_map = destination_pixel_map
if self.destination_pixel_map is None:
raise RuntimeError("destination pixel map not provided for progress tracking.")
reduce_items = [
(hp_pixel, source_pixels, f"{hp_pixel.order}_{hp_pixel.pixel}")
for hp_pixel, source_pixels in destination_pixel_map.items()
Expand All @@ -247,3 +275,7 @@ def is_reducing_done(self) -> bool:
def wait_for_reducing(self, futures):
"""Wait for reducing futures to complete."""
self.wait_for_futures(futures, self.REDUCING_STAGE)
remaining_reduce_items = self.get_reduce_items()
if len(remaining_reduce_items) > 0:
raise RuntimeError(f"{len(remaining_reduce_items)} reduce stages did not complete successfully.")
self.touch_stage_done_file(self.REDUCING_STAGE)
4 changes: 1 addition & 3 deletions src/hipscat_import/margin_cache/margin_cache_arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from dataclasses import dataclass

import healpy as hp
import numpy as np
from hipscat.catalog import Catalog
from hipscat.catalog.margin_cache.margin_cache_catalog_info import MarginCacheCatalogInfo
from hipscat.io.validation import is_valid_catalog
Expand Down Expand Up @@ -40,8 +39,7 @@ def _check_arguments(self):

self.catalog = Catalog.read_from_hipscat(self.input_catalog_path)

partition_stats = self.catalog.get_pixels()
highest_order = np.max(partition_stats["Norder"].values)
highest_order = self.catalog.partition_info.get_highest_order()
margin_pixel_k = highest_order + 1
if self.margin_order > -1:
if self.margin_order < margin_pixel_k:
Expand Down
10 changes: 5 additions & 5 deletions src/hipscat_import/pipeline_resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,13 @@ def clean_resume_files(self):
def wait_for_futures(self, futures, stage_name):
"""Wait for collected futures to complete.

As each future completes, read the task key and write to the log file.
If all tasks complete successfully, touch the done file. Otherwise, raise an error.
As each future completes, check the returned status.

Args:
futures(List[future]): collected futures
stage_name(str): name of the stage (e.g. mapping, reducing)
Raises:
RuntimeError if any future returns an error status.
"""
some_error = False
formatted_stage_name = self.get_formatted_stage_name(stage_name)
Expand All @@ -128,11 +129,10 @@ def wait_for_futures(self, futures, stage_name):
total=len(futures),
disable=(not self.progress_bar),
):
if future.status == "error": # pragma: no cover
if future.status == "error":
some_error = True
if some_error: # pragma: no cover
if some_error:
raise RuntimeError(f"Some {stage_name} stages failed. See logs for details.")
self.touch_stage_done_file(stage_name)

@staticmethod
def get_formatted_stage_name(stage_name) -> str:
Expand Down
24 changes: 19 additions & 5 deletions src/hipscat_import/soap/resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from __future__ import annotations

from dataclasses import dataclass, field
from typing import List, Tuple
from typing import List, Optional, Tuple

import healpy as hp
import numpy as np
Expand All @@ -23,6 +23,8 @@ class SoapPlan(PipelineResumePlan):

count_keys: List[Tuple[HealpixPixel, List[HealpixPixel], str]] = field(default_factory=list)
"""set of pixels (and job keys) that have yet to be counted"""
source_pixel_map: Optional[List[Tuple[HealpixPixel, List[HealpixPixel], str]]] = None
"""Map of object pixels to source pixels, with counting key."""

COUNTING_STAGE = "counting"
SOURCE_MAP_FILE = "source_object_map.npz"
Expand Down Expand Up @@ -56,29 +58,41 @@ def gather_plan(self, args):
source_catalog = Catalog.read_from_hipscat(args.source_catalog_dir)
source_pixel_map = source_to_object_map(object_catalog, source_catalog)
np.savez_compressed(source_map_file, source_pixel_map)
self._set_sources_to_count(source_pixel_map)
self.count_keys = self.get_sources_to_count(source_pixel_map=source_pixel_map)
step_progress.update(1)

def wait_for_counting(self, futures):
"""Wait for counting stage futures to complete."""
self.wait_for_futures(futures, self.COUNTING_STAGE)
remaining_sources_to_count = self.get_sources_to_count()
if len(remaining_sources_to_count) > 0:
raise RuntimeError(
f"{len(remaining_sources_to_count)} counting stages did not complete successfully."
)
self.touch_stage_done_file(self.COUNTING_STAGE)

def is_counting_done(self) -> bool:
"""Are there sources left to count?"""
return self.done_file_exists(self.COUNTING_STAGE)

def _set_sources_to_count(self, source_pixel_map):
def get_sources_to_count(self, source_pixel_map=None):
"""Fetch a triple for each source pixel to join and count.

Triple contains:
- source pixel
- object pixels (healpix pixel with both order and pixel, for aligning and
neighboring object pixels)
- source key (string of source order+pixel)

"""
if source_pixel_map is None:
source_pixel_map = self.source_pixel_map
elif self.source_pixel_map is None:
self.source_pixel_map = source_pixel_map
if self.source_pixel_map is None:
raise ValueError("source_pixel_map not provided for progress tracking.")

counted_keys = set(self.get_keys_from_file_names(self.tmp_path, ".csv"))
self.count_keys = [
return [
(hp_pixel, object_pixels, f"{hp_pixel.order}_{hp_pixel.pixel}")
for hp_pixel, object_pixels in source_pixel_map.items()
if f"{hp_pixel.order}_{hp_pixel.pixel}" not in counted_keys
Expand Down
101 changes: 89 additions & 12 deletions tests/hipscat_import/catalog/test_resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import hipscat.pixel_math as hist
import numpy.testing as npt
import pytest
from hipscat.pixel_math.healpix_pixel import HealpixPixel

from hipscat_import.catalog.resume_plan import ResumePlan

Expand Down Expand Up @@ -92,30 +93,50 @@ def test_same_input_paths(tmp_path, small_sky_single_file, formats_headers_csv):

def test_read_write_histogram(tmp_path):
"""Test that we can read what we write into a histogram file."""
plan = ResumePlan(tmp_path=tmp_path, progress_bar=False)
plan = ResumePlan(tmp_path=tmp_path, progress_bar=False, input_paths=["foo1"])

## We're not ready to read the final histogram - missing partial histograms.
with pytest.raises(RuntimeError, match="map stages"):
result = plan.read_histogram(0)

expected = hist.empty_histogram(0)
expected[11] = 131

remaining_keys = plan.get_remaining_map_keys()
assert remaining_keys == [("map_0", "foo1")]

ResumePlan.write_partial_histogram(tmp_path=tmp_path, mapping_key="map_0", histogram=expected)

keys = plan.get_mapping_keys_from_histograms()
assert keys == ["map_0"]
remaining_keys = plan.get_remaining_map_keys()
assert len(remaining_keys) == 0
result = plan.read_histogram(0)
npt.assert_array_equal(result, expected)

keys = plan.get_mapping_keys_from_histograms()
assert len(keys) == 0

plan.clean_resume_files()
def never_fails():
"""Method never fails, but never marks intermediate success file."""
return

plan = ResumePlan(tmp_path=tmp_path, progress_bar=False)
empty = hist.empty_histogram(0)
result = plan.read_histogram(0)
npt.assert_array_equal(result, empty)

keys = plan.get_mapping_keys_from_histograms()
assert len(keys) == 0
@pytest.mark.dask
def test_some_map_task_failures(tmp_path, dask_client):
"""Test that we only consider map stage successful if all partial files are written"""
plan = ResumePlan(tmp_path=tmp_path, progress_bar=False, input_paths=["foo1"])

## Method doesn't FAIL, but it doesn't write out the partial histogram either.
## Since the intermediate files aren't found, we throw an error.
futures = [dask_client.submit(never_fails)]
with pytest.raises(RuntimeError, match="map stages"):
plan.wait_for_mapping(futures)

expected = hist.empty_histogram(0)
expected[11] = 131

ResumePlan.write_partial_histogram(tmp_path=tmp_path, mapping_key="map_0", histogram=expected)

## Method succeeds, *and* partial histogram is present.
futures = [dask_client.submit(never_fails)]
plan.wait_for_mapping(futures)


def test_read_write_splitting_keys(tmp_path, small_sky_single_file, formats_headers_csv):
Expand All @@ -141,3 +162,59 @@ def test_read_write_splitting_keys(tmp_path, small_sky_single_file, formats_head
plan.gather_plan()
split_keys = plan.split_keys
assert len(split_keys) == 2


@pytest.mark.dask
def test_some_split_task_failures(tmp_path, dask_client):
"""Test that we only consider split stage successful if all done files are written"""
plan = ResumePlan(tmp_path=tmp_path, progress_bar=False, input_paths=["foo1"])

## Method doesn't FAIL, but it doesn't write out the done file either.
## Since the intermediate files aren't found, we throw an error.
futures = [dask_client.submit(never_fails)]
with pytest.raises(RuntimeError, match="split stages"):
plan.wait_for_splitting(futures)

ResumePlan.touch_key_done_file(tmp_path, ResumePlan.SPLITTING_STAGE, "split_0")

## Method succeeds, and done file is present.
futures = [dask_client.submit(never_fails)]
plan.wait_for_splitting(futures)


def test_get_reduce_items(tmp_path):
"""Test generation of remaining reduce items"""
destination_pixel_map = {HealpixPixel(0, 11): (131, [44, 45, 46])}
plan = ResumePlan(tmp_path=tmp_path, progress_bar=False)

with pytest.raises(RuntimeError, match="destination pixel map"):
remaining_reduce_items = plan.get_reduce_items()

remaining_reduce_items = plan.get_reduce_items(destination_pixel_map=destination_pixel_map)
assert len(remaining_reduce_items) == 1

ResumePlan.reducing_key_done(tmp_path=tmp_path, reducing_key="0_11")
remaining_reduce_items = plan.get_reduce_items(destination_pixel_map=destination_pixel_map)
assert len(remaining_reduce_items) == 0


@pytest.mark.dask
def test_some_reduce_task_failures(tmp_path, dask_client):
"""Test that we only consider reduce stage successful if all done files are written"""
plan = ResumePlan(tmp_path=tmp_path, progress_bar=False)

destination_pixel_map = {HealpixPixel(0, 11): (131, [44, 45, 46])}
remaining_reduce_items = plan.get_reduce_items(destination_pixel_map=destination_pixel_map)
assert len(remaining_reduce_items) == 1

## Method doesn't FAIL, but it doesn't write out the done file either.
## Since the intermediate files aren't found, we throw an error.
futures = [dask_client.submit(never_fails)]
with pytest.raises(RuntimeError, match="reduce stages"):
plan.wait_for_reducing(futures)

ResumePlan.touch_key_done_file(tmp_path, ResumePlan.REDUCING_STAGE, "0_11")

## Method succeeds, and done file is present.
futures = [dask_client.submit(never_fails)]
plan.wait_for_reducing(futures)
Loading