Skip to content

Commit

Permalink
Tidy up some resume functionality (#324)
Browse files Browse the repository at this point in the history
  • Loading branch information
delucchi-cmu authored May 30, 2024
1 parent d27f650 commit 8145fbd
Show file tree
Hide file tree
Showing 12 changed files with 82 additions and 47 deletions.
4 changes: 0 additions & 4 deletions src/hipscat_import/catalog/resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ class ResumePlan(PipelineResumePlan):
"""set of files (and job keys) that have yet to be split"""
destination_pixel_map: Optional[List[Tuple[HealpixPixel, List[HealpixPixel], str]]] = None
"""Fully resolved map of destination pixels to constituent smaller pixels"""
delete_resume_log_files: bool = True
"""should we delete task-level done files once each stage is complete?
if False, we will keep all sub-histograms from the mapping stage, and all
done marker files at the end of the pipeline."""

MAPPING_STAGE = "mapping"
SPLITTING_STAGE = "splitting"
Expand Down
3 changes: 1 addition & 2 deletions src/hipscat_import/catalog/run_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,5 @@ def run(args, client):
step_progress.update(1)
io.write_fits_map(args.catalog_path, raw_histogram, storage_options=args.output_storage_options)
step_progress.update(1)
if args.resume_plan.delete_resume_log_files:
args.resume_plan.clean_resume_files()
args.resume_plan.clean_resume_files()
step_progress.update(1)
6 changes: 2 additions & 4 deletions src/hipscat_import/margin_cache/margin_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ def generate_margin_cache(args, client):
args (MarginCacheArguments): A valid `MarginCacheArguments` object.
client (dask.distributed.Client): A dask distributed client object.
"""
partition_pixels = args.catalog.partition_info.get_healpix_pixels()
negative_pixels = args.catalog.generate_negative_tree_pixels()
combined_pixels = partition_pixels + negative_pixels
resume_plan = MarginCachePlan(args, combined_pixels=combined_pixels, partition_pixels=partition_pixels)
resume_plan = MarginCachePlan(args)

if not resume_plan.is_mapping_done():
futures = []
Expand Down Expand Up @@ -54,6 +51,7 @@ def generate_margin_cache(args, client):
partition_order=pix.order,
partition_pixel=pix.pixel,
original_catalog_metadata=paths.get_common_metadata_pointer(args.input_catalog_path),
delete_intermediate_parquet_files=args.delete_intermediate_parquet_files,
input_storage_options=args.input_storage_options,
)
)
Expand Down
6 changes: 6 additions & 0 deletions src/hipscat_import/margin_cache/margin_cache_arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ class MarginCacheArguments(RuntimeArguments):
order of healpix partitioning in the source catalog. if `margin_order` is left
default or set to -1, then the `margin_order` will be set dynamically to the
highest partition order plus 1."""
delete_intermediate_parquet_files: bool = True
"""should we delete the smaller intermediate parquet files generated in the
splitting stage, once the relevant reducing stage is complete?"""
delete_resume_log_files: bool = True
"""should we delete task-level done files once each stage is complete?
if False, we will keep all done marker files at the end of the pipeline."""

input_catalog_path: str = ""
"""the path to the hipscat-formatted input catalog."""
Expand Down
11 changes: 8 additions & 3 deletions src/hipscat_import/margin_cache/margin_cache_map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ def reduce_margin_shards(
partition_order,
partition_pixel,
original_catalog_metadata,
delete_intermediate_parquet_files,
input_storage_options,
):
"""Reduce all partition pixel directories into a single file"""
Expand All @@ -128,8 +129,6 @@ def reduce_margin_shards(
if file_io.does_file_or_directory_exist(shard_dir):
data = ds.dataset(shard_dir, format="parquet")
full_df = data.to_table().to_pandas()
margin_cache_dir = paths.pixel_directory(output_path, partition_order, partition_pixel)
file_io.make_directory(margin_cache_dir, exist_ok=True, storage_options=output_storage_options)

if len(full_df):
schema = file_io.read_parquet_metadata(
Expand All @@ -142,14 +141,20 @@ def reduce_margin_shards(
.append(pa.field("margin_Npix", pa.uint64()))
)

margin_cache_dir = paths.pixel_directory(output_path, partition_order, partition_pixel)
file_io.make_directory(
margin_cache_dir, exist_ok=True, storage_options=output_storage_options
)

margin_cache_file_path = paths.pixel_catalog_file(
output_path, partition_order, partition_pixel
)

full_df.to_parquet(
margin_cache_file_path, schema=schema, storage_options=output_storage_options
)
file_io.remove_directory(shard_dir)
if delete_intermediate_parquet_files:
file_io.remove_directory(shard_dir)

MarginCachePlan.reducing_key_done(intermediate_directory, reducing_key)
except Exception as exception: # pylint: disable=broad-exception-caught
Expand Down
8 changes: 5 additions & 3 deletions src/hipscat_import/margin_cache/margin_cache_resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,15 @@ class MarginCachePlan(PipelineResumePlan):
REDUCING_STAGE = "reducing"
MARGIN_PAIR_FILE = "margin_pair.csv"

def __init__(self, args: MarginCacheArguments, combined_pixels, partition_pixels):
def __init__(self, args: MarginCacheArguments):
if not args.tmp_path: # pragma: no cover (not reachable, but required for mypy)
raise ValueError("tmp_path is required")
super().__init__(
resume=args.resume,
progress_bar=args.progress_bar,
tmp_path=args.tmp_path,
delete_resume_log_files=args.delete_resume_log_files,
)
self.combined_pixels = combined_pixels
self.partition_pixels = partition_pixels
self._gather_plan(args)

def _gather_plan(self, args):
Expand All @@ -52,6 +51,9 @@ def _gather_plan(self, args):
raise ValueError("mapping must be complete before reducing")
step_progress.update(1)

self.partition_pixels = args.catalog.partition_info.get_healpix_pixels()
negative_pixels = args.catalog.generate_negative_tree_pixels()
self.combined_pixels = self.partition_pixels + negative_pixels
self.margin_pair_file = file_io.append_paths_to_pointer(self.tmp_path, self.MARGIN_PAIR_FILE)
if not file_io.does_file_or_directory_exist(self.margin_pair_file):
margin_pairs = _find_partition_margin_pixel_pairs(self.combined_pixels, args.margin_order)
Expand Down
7 changes: 6 additions & 1 deletion src/hipscat_import/pipeline_resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ class PipelineResumePlan:
progress_bar: bool = True
"""if true, a tqdm progress bar will be displayed for user
feedback of planning progress"""
delete_resume_log_files: bool = True
"""should we delete task-level done files once each stage is complete?
if False, we will keep all sub-histograms from the mapping stage, and all
done marker files at the end of the pipeline."""

ORIGINAL_INPUT_PATHS = "input_paths.txt"

Expand Down Expand Up @@ -109,7 +113,8 @@ def get_keys_from_file_names(directory, extension):

def clean_resume_files(self):
"""Remove all intermediate files created in execution."""
file_io.remove_directory(self.tmp_path, ignore_errors=True)
if self.delete_resume_log_files:
file_io.remove_directory(self.tmp_path, ignore_errors=True)

def wait_for_futures(self, futures, stage_name, fail_fast=False):
"""Wait for collected futures to complete.
Expand Down
6 changes: 6 additions & 0 deletions src/hipscat_import/soap/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,15 @@ class SoapArguments(RuntimeArguments):
resume: bool = True
"""if there are existing intermediate resume files, should we
read those and continue to run the pipeline where we left off"""
delete_resume_log_files: bool = True
"""should we delete task-level done files once each stage is complete?
if False, we will keep all done marker files at the end of the pipeline."""
write_leaf_files: bool = False
"""Should we also write out leaf parquet files (e.g. Norder/Dir/Npix.parquet)
that represent the full association table"""
delete_intermediate_parquet_files: bool = True
"""should we delete the smaller intermediate parquet files generated in the
mapping stage, once the relevant reducing stage is complete?"""

compute_partition_size: int = 1_000_000_000

Expand Down
7 changes: 6 additions & 1 deletion src/hipscat_import/soap/resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ class SoapPlan(PipelineResumePlan):
def __init__(self, args: SoapArguments):
if not args.tmp_path: # pragma: no cover (not reachable, but required for mypy)
raise ValueError("tmp_path is required")
super().__init__(resume=args.resume, progress_bar=args.progress_bar, tmp_path=args.tmp_path)
super().__init__(
resume=args.resume,
progress_bar=args.progress_bar,
tmp_path=args.tmp_path,
delete_resume_log_files=args.delete_resume_log_files,
)
self.gather_plan(args)

def gather_plan(self, args):
Expand Down
1 change: 1 addition & 0 deletions src/hipscat_import/soap/run_soap.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def run(args, client):
soap_args=args,
object_pixel=object_pixel,
object_key=object_key,
delete_input_files=args.delete_intermediate_parquet_files,
)
)

Expand Down
20 changes: 20 additions & 0 deletions tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,25 @@ def test_reduce_margin_shards(tmp_path):
1,
21,
original_catalog_metadata=schema_path,
delete_intermediate_parquet_files=False,
input_storage_options=None,
)

result_path = paths.pixel_catalog_file(tmp_path, 1, 21)

validate_result_dataframe(result_path, 720)
assert os.path.exists(shard_dir)

# Run again with delete_intermediate_parquet_files. shard_dir doesn't exist at the end.
margin_cache_map_reduce.reduce_margin_shards(
intermediate_dir,
"1_21",
tmp_path,
None,
1,
21,
original_catalog_metadata=schema_path,
delete_intermediate_parquet_files=True,
input_storage_options=None,
)

Expand Down Expand Up @@ -177,6 +196,7 @@ def test_reduce_margin_shards_error(tmp_path, basic_data_shard_df, capsys):
1,
21,
original_catalog_metadata=schema_path,
delete_intermediate_parquet_files=True,
input_storage_options=None,
)

Expand Down
50 changes: 21 additions & 29 deletions tests/hipscat_import/margin_cache/test_margin_cache_resume_plan.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import numpy as np
import numpy.testing as npt
import pytest
from hipscat.pixel_math.healpix_pixel import HealpixPixel
from hipscat.catalog import Catalog

from hipscat_import.margin_cache.margin_cache_arguments import MarginCacheArguments
from hipscat_import.margin_cache.margin_cache_resume_plan import (
Expand All @@ -13,10 +13,10 @@


@pytest.fixture
def small_sky_margin_args(tmp_path, small_sky_source_catalog):
def small_sky_margin_args(tmp_path, small_sky_object_catalog):
return MarginCacheArguments(
margin_threshold=5.0,
input_catalog_path=small_sky_source_catalog,
input_catalog_path=small_sky_object_catalog,
output_path=tmp_path,
output_artifact_name="catalog_cache",
progress_bar=False,
Expand All @@ -28,8 +28,7 @@ def test_done_checks(small_sky_margin_args):
"""Verify that done files imply correct pipeline execution order:
mapping > reducing
"""
pixels = [HealpixPixel(0, 11)]
plan = MarginCachePlan(small_sky_margin_args, pixels, pixels)
plan = MarginCachePlan(small_sky_margin_args)
plan.touch_stage_done_file(MarginCachePlan.REDUCING_STAGE)

with pytest.raises(ValueError, match="before reducing"):
Expand All @@ -42,7 +41,7 @@ def test_done_checks(small_sky_margin_args):

plan.clean_resume_files()

plan = MarginCachePlan(small_sky_margin_args, pixels, pixels)
plan = MarginCachePlan(small_sky_margin_args)
plan.touch_stage_done_file(MarginCachePlan.MAPPING_STAGE)
plan._gather_plan(small_sky_margin_args)

Expand All @@ -58,24 +57,16 @@ def never_fails():
@pytest.mark.dask
def test_some_map_task_failures(small_sky_margin_args, dask_client):
"""Test that we only consider map stage successful if all done files are written"""
pixels = [HealpixPixel(0, 10), HealpixPixel(0, 11)]
plan = MarginCachePlan(small_sky_margin_args, pixels, pixels)
plan = MarginCachePlan(small_sky_margin_args)

## Method doesn't FAIL, but it doesn't write out the done file either.
## Since the intermediate files aren't found, we throw an error.
futures = [dask_client.submit(never_fails)]
with pytest.raises(RuntimeError, match="2 mapping stages"):
with pytest.raises(RuntimeError, match="1 mapping stages"):
plan.wait_for_mapping(futures)

MarginCachePlan.touch_key_done_file(plan.tmp_path, MarginCachePlan.MAPPING_STAGE, "0_11")

## Method succeeds, but only *ONE* done file is present.
futures = [dask_client.submit(never_fails)]
with pytest.raises(RuntimeError, match="1 mapping stage"):
plan.wait_for_mapping(futures)

MarginCachePlan.touch_key_done_file(plan.tmp_path, MarginCachePlan.MAPPING_STAGE, "0_10")

## Method succeeds, *and* done files are present.
futures = [dask_client.submit(never_fails)]
plan.wait_for_mapping(futures)
Expand All @@ -84,48 +75,49 @@ def test_some_map_task_failures(small_sky_margin_args, dask_client):
@pytest.mark.dask
def test_some_reducing_task_failures(small_sky_margin_args, dask_client):
"""Test that we only consider reduce stage successful if all done files are written"""
pixels = [HealpixPixel(0, 10), HealpixPixel(0, 11)]
plan = MarginCachePlan(small_sky_margin_args, pixels, pixels)
plan = MarginCachePlan(small_sky_margin_args)

## Method doesn't FAIL, but it doesn't write out the done file either.
## Since the intermediate files aren't found, we throw an error.
futures = [dask_client.submit(never_fails)]
with pytest.raises(RuntimeError, match="2 reducing stages"):
with pytest.raises(RuntimeError, match="12 reducing stages"):
plan.wait_for_reducing(futures)

MarginCachePlan.touch_key_done_file(plan.tmp_path, MarginCachePlan.REDUCING_STAGE, "0_11")

## Method succeeds, but only *ONE* done file is present.
futures = [dask_client.submit(never_fails)]
with pytest.raises(RuntimeError, match="1 reducing stage"):
with pytest.raises(RuntimeError, match="11 reducing stages"):
plan.wait_for_reducing(futures)

MarginCachePlan.touch_key_done_file(plan.tmp_path, MarginCachePlan.REDUCING_STAGE, "0_10")
for partition in range(0, 12):
MarginCachePlan.touch_key_done_file(plan.tmp_path, MarginCachePlan.REDUCING_STAGE, f"0_{partition}")

## Method succeeds, *and* done files are present.
futures = [dask_client.submit(never_fails)]
plan.wait_for_reducing(futures)


def test_partition_margin_pixel_pairs(small_sky_margin_args):
def test_partition_margin_pixel_pairs(small_sky_source_catalog):
"""Ensure partition_margin_pixel_pairs can generate main partition pixels."""
margin_pairs = _find_partition_margin_pixel_pairs(
small_sky_margin_args.catalog.partition_info.get_healpix_pixels(), small_sky_margin_args.margin_order
)
source_catalog = Catalog.read_from_hipscat(small_sky_source_catalog)
margin_pairs = _find_partition_margin_pixel_pairs(source_catalog.get_healpix_pixels(), 3)

expected = np.array([725, 733, 757, 765, 727, 735, 759, 767, 469, 192])

npt.assert_array_equal(margin_pairs.iloc[:10]["margin_pixel"], expected)
assert len(margin_pairs) == 196


def test_partition_margin_pixel_pairs_negative(small_sky_margin_args):
def test_partition_margin_pixel_pairs_negative(small_sky_source_catalog):
"""Ensure partition_margin_pixel_pairs can generate negative tree pixels."""
partition_stats = small_sky_margin_args.catalog.partition_info.get_healpix_pixels()
negative_pixels = small_sky_margin_args.catalog.generate_negative_tree_pixels()
source_catalog = Catalog.read_from_hipscat(small_sky_source_catalog)

partition_stats = source_catalog.get_healpix_pixels()
negative_pixels = source_catalog.generate_negative_tree_pixels()
combined_pixels = partition_stats + negative_pixels

margin_pairs = _find_partition_margin_pixel_pairs(combined_pixels, small_sky_margin_args.margin_order)
margin_pairs = _find_partition_margin_pixel_pairs(combined_pixels, 3)

expected_order = 0
expected_pixel = 10
Expand Down

0 comments on commit 8145fbd

Please sign in to comment.