Skip to content

Commit

Permalink
Fail fast on reduce stage failures. (#317)
Browse files Browse the repository at this point in the history
  • Loading branch information
delucchi-cmu authored May 22, 2024
1 parent 37ce248 commit 793212f
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 5 deletions.
2 changes: 1 addition & 1 deletion src/hipscat_import/catalog/resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ def is_reducing_done(self) -> bool:

def wait_for_reducing(self, futures):
"""Wait for reducing futures to complete."""
self.wait_for_futures(futures, self.REDUCING_STAGE)
self.wait_for_futures(futures, self.REDUCING_STAGE, fail_fast=True)
remaining_reduce_items = self.get_reduce_items()
if len(remaining_reduce_items) > 0:
raise RuntimeError(f"{len(remaining_reduce_items)} reduce stages did not complete successfully.")
Expand Down
7 changes: 6 additions & 1 deletion src/hipscat_import/pipeline_resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,17 @@ def clean_resume_files(self):
"""Remove all intermediate files created in execution."""
file_io.remove_directory(self.tmp_path, ignore_errors=True)

def wait_for_futures(self, futures, stage_name):
def wait_for_futures(self, futures, stage_name, fail_fast=False):
"""Wait for collected futures to complete.
As each future completes, check the returned status.
Args:
futures(List[future]): collected futures
stage_name(str): name of the stage (e.g. mapping, reducing)
fail_fast (bool): if True, we will re-raise the first exception
encountered and NOT continue. this may lead to unexpected
behavior for in-progress tasks.
Raises:
RuntimeError: if any future returns an error status.
"""
Expand Down Expand Up @@ -145,6 +148,8 @@ def wait_for_futures(self, futures, stage_name):
trace_strs.append(f' File "{filename}", line {line_number}, in {method_name}')
stack_trace = stack_trace.tb_next
print("\n".join(trace_strs))
if fail_fast:
raise exception

if some_error:
raise RuntimeError(f"Some {stage_name} stages failed. See logs for details.")
Expand Down
27 changes: 24 additions & 3 deletions tests/hipscat_import/test_pipeline_resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,30 @@ def error_on_even(argument):
with pytest.raises(RuntimeError, match="Some test stages failed"):
plan.wait_for_futures(futures, "test")

captured = capsys.readouterr()
assert "we are at odds with evens" in captured
assert "error_on_even" in captured
captured = capsys.readouterr()
assert "RuntimeError: we are at odds with evens" in captured.out
assert "error_on_even" in captured.out


@pytest.mark.dask
def test_wait_for_futures_fail_fast(tmp_path, dask_client, capsys):
"""Test that we can wait around for futures to complete.
Additionally test that relevant parts of the traceback are printed to stdout."""
plan = PipelineResumePlan(tmp_path=tmp_path, progress_bar=False, resume=False)

def error_on_even(argument):
"""Silly little method used to test futures that fail under predictable conditions"""
if argument % 2 == 0:
raise RuntimeError("we are at odds with evens")

futures = [dask_client.submit(error_on_even, 3), dask_client.submit(error_on_even, 4)]
with pytest.raises(RuntimeError, match="we are at odds with evens"):
plan.wait_for_futures(futures, "test", fail_fast=True)

captured = capsys.readouterr()
assert "we are at odds with evens" in captured.out
assert "error_on_even" in captured.out


def test_formatted_stage_name():
Expand Down

0 comments on commit 793212f

Please sign in to comment.