From ec55c60652dd9e745648470900a84235327342ab Mon Sep 17 00:00:00 2001 From: Melissa DeLucchi <113376043+delucchi-cmu@users.noreply.github.com> Date: Wed, 29 Nov 2023 08:29:28 -0500 Subject: [PATCH] Fix doc build error with dask client (#173) (#175) * Give more failure info. * Maybe the done files aren't written? * Use same pip as local? * Maybe tasks aren't scheduled because they're done. * Undo red herring * Restructure exception handling --- docs/notebooks/unequal_schema.ipynb | 18 +++++++++--------- src/hipscat_import/pipeline.py | 10 +++++----- src/hipscat_import/pipeline_resume_plan.py | 2 ++ 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/docs/notebooks/unequal_schema.ipynb b/docs/notebooks/unequal_schema.ipynb index d2761cb3..509fba86 100644 --- a/docs/notebooks/unequal_schema.ipynb +++ b/docs/notebooks/unequal_schema.ipynb @@ -82,12 +82,12 @@ " highest_healpix_order=1,\n", ")\n", "\n", - "client = Client(n_workers=1, threads_per_worker=1)\n", - "\n", - "try:\n", - " pipeline_with_client(args, client)\n", - "except:\n", - " pass # we know it's going to fail!!" + "with Client(n_workers=1, threads_per_worker=1) as client:\n", + " try:\n", + " pipeline_with_client(args, client)\n", + " except:\n", + " pass # we know it's going to fail!!\n", + "tmp_path.cleanup()" ] }, { @@ -137,7 +137,8 @@ " use_schema_file=mixed_schema_csv_parquet,\n", " overwrite=True,\n", ")\n", - "pipeline_with_client(args, client)" + "with Client(n_workers=1, threads_per_worker=1) as client:\n", + " pipeline_with_client(args, client)" ] }, { @@ -283,7 +284,6 @@ "metadata": {}, "outputs": [], "source": [ - "client.shutdown()\n", "tmp_path.cleanup()" ] } @@ -304,7 +304,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.4" + "version": "3.10.13" } }, "nbformat": 4, diff --git a/src/hipscat_import/pipeline.py b/src/hipscat_import/pipeline.py index b6857ed9..14df5c4b 100644 --- a/src/hipscat_import/pipeline.py +++ b/src/hipscat_import/pipeline.py @@ -55,15 +55,15 @@ def pipeline_with_client(args: RuntimeArguments, client: Client): macauff_runner.run(args, client) else: raise ValueError("unknown args type") - except Exception as exception: # pylint: disable=broad-exception-caught - _send_failure_email(args, exception) - else: + _send_success_email(args) + except Exception as exception: # pylint: disable=broad-exception-caught + if args.completion_email_address: + _send_failure_email(args, exception) + raise exception def _send_failure_email(args: RuntimeArguments, exception: Exception): - if not args.completion_email_address: - raise exception message = EmailMessage() message["Subject"] = "hipscat-import failure." message["To"] = args.completion_email_address diff --git a/src/hipscat_import/pipeline_resume_plan.py b/src/hipscat_import/pipeline_resume_plan.py index 4250bec4..38d727f7 100644 --- a/src/hipscat_import/pipeline_resume_plan.py +++ b/src/hipscat_import/pipeline_resume_plan.py @@ -131,6 +131,8 @@ def wait_for_futures(self, futures, stage_name): ): if future.status == "error": some_error = True + print(f"{stage_name} task {future.key} failed with message:") + print(future.exception()) if some_error: raise RuntimeError(f"Some {stage_name} stages failed. See logs for details.")