From 793212f31cba2f0df0b989257108dfa45d8a8a4a Mon Sep 17 00:00:00 2001 From: Melissa DeLucchi <113376043+delucchi-cmu@users.noreply.github.com> Date: Wed, 22 May 2024 18:28:53 -0400 Subject: [PATCH] Fail fast on reduce stage failures. (#317) --- src/hipscat_import/catalog/resume_plan.py | 2 +- src/hipscat_import/pipeline_resume_plan.py | 7 ++++- .../test_pipeline_resume_plan.py | 27 ++++++++++++++++--- 3 files changed, 31 insertions(+), 5 deletions(-) diff --git a/src/hipscat_import/catalog/resume_plan.py b/src/hipscat_import/catalog/resume_plan.py index 2585d5cf..fe301ec4 100644 --- a/src/hipscat_import/catalog/resume_plan.py +++ b/src/hipscat_import/catalog/resume_plan.py @@ -242,7 +242,7 @@ def is_reducing_done(self) -> bool: def wait_for_reducing(self, futures): """Wait for reducing futures to complete.""" - self.wait_for_futures(futures, self.REDUCING_STAGE) + self.wait_for_futures(futures, self.REDUCING_STAGE, fail_fast=True) remaining_reduce_items = self.get_reduce_items() if len(remaining_reduce_items) > 0: raise RuntimeError(f"{len(remaining_reduce_items)} reduce stages did not complete successfully.") diff --git a/src/hipscat_import/pipeline_resume_plan.py b/src/hipscat_import/pipeline_resume_plan.py index 3511d1da..350a9905 100644 --- a/src/hipscat_import/pipeline_resume_plan.py +++ b/src/hipscat_import/pipeline_resume_plan.py @@ -110,7 +110,7 @@ def clean_resume_files(self): """Remove all intermediate files created in execution.""" file_io.remove_directory(self.tmp_path, ignore_errors=True) - def wait_for_futures(self, futures, stage_name): + def wait_for_futures(self, futures, stage_name, fail_fast=False): """Wait for collected futures to complete. As each future completes, check the returned status. @@ -118,6 +118,9 @@ def wait_for_futures(self, futures, stage_name): Args: futures(List[future]): collected futures stage_name(str): name of the stage (e.g. mapping, reducing) + fail_fast (bool): if True, we will re-raise the first exception + encountered and NOT continue. this may lead to unexpected + behavior for in-progress tasks. Raises: RuntimeError: if any future returns an error status. """ @@ -145,6 +148,8 @@ def wait_for_futures(self, futures, stage_name): trace_strs.append(f' File "{filename}", line {line_number}, in {method_name}') stack_trace = stack_trace.tb_next print("\n".join(trace_strs)) + if fail_fast: + raise exception if some_error: raise RuntimeError(f"Some {stage_name} stages failed. See logs for details.") diff --git a/tests/hipscat_import/test_pipeline_resume_plan.py b/tests/hipscat_import/test_pipeline_resume_plan.py index 0f806660..c029f09b 100644 --- a/tests/hipscat_import/test_pipeline_resume_plan.py +++ b/tests/hipscat_import/test_pipeline_resume_plan.py @@ -118,9 +118,30 @@ def error_on_even(argument): with pytest.raises(RuntimeError, match="Some test stages failed"): plan.wait_for_futures(futures, "test") - captured = capsys.readouterr() - assert "we are at odds with evens" in captured - assert "error_on_even" in captured + captured = capsys.readouterr() + assert "RuntimeError: we are at odds with evens" in captured.out + assert "error_on_even" in captured.out + + +@pytest.mark.dask +def test_wait_for_futures_fail_fast(tmp_path, dask_client, capsys): + """Test that we can wait around for futures to complete. + + Additionally test that relevant parts of the traceback are printed to stdout.""" + plan = PipelineResumePlan(tmp_path=tmp_path, progress_bar=False, resume=False) + + def error_on_even(argument): + """Silly little method used to test futures that fail under predictable conditions""" + if argument % 2 == 0: + raise RuntimeError("we are at odds with evens") + + futures = [dask_client.submit(error_on_even, 3), dask_client.submit(error_on_even, 4)] + with pytest.raises(RuntimeError, match="we are at odds with evens"): + plan.wait_for_futures(futures, "test", fail_fast=True) + + captured = capsys.readouterr() + assert "we are at odds with evens" in captured.out + assert "error_on_even" in captured.out def test_formatted_stage_name():