Skip to content

Commit

Permalink
Fix doc build error with dask client (#173)
Browse files Browse the repository at this point in the history
* Give more failure info.

* Maybe the done files aren't written?

* Use same pip as local?

* Maybe tasks aren't scheduled because they're done.

* Undo red herring

* Restructure exception handling
  • Loading branch information
delucchi-cmu authored Nov 29, 2023
1 parent 9f44b40 commit b2fe8ac
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 14 deletions.
18 changes: 9 additions & 9 deletions docs/notebooks/unequal_schema.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@
" highest_healpix_order=1,\n",
")\n",
"\n",
"client = Client(n_workers=1, threads_per_worker=1)\n",
"\n",
"try:\n",
" pipeline_with_client(args, client)\n",
"except:\n",
" pass # we know it's going to fail!!"
"with Client(n_workers=1, threads_per_worker=1) as client:\n",
" try:\n",
" pipeline_with_client(args, client)\n",
" except:\n",
" pass # we know it's going to fail!!\n",
"tmp_path.cleanup()"
]
},
{
Expand Down Expand Up @@ -137,7 +137,8 @@
" use_schema_file=mixed_schema_csv_parquet,\n",
" overwrite=True,\n",
")\n",
"pipeline_with_client(args, client)"
"with Client(n_workers=1, threads_per_worker=1) as client:\n",
" pipeline_with_client(args, client)"
]
},
{
Expand Down Expand Up @@ -283,7 +284,6 @@
"metadata": {},
"outputs": [],
"source": [
"client.shutdown()\n",
"tmp_path.cleanup()"
]
}
Expand All @@ -304,7 +304,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.4"
"version": "3.10.13"
}
},
"nbformat": 4,
Expand Down
10 changes: 5 additions & 5 deletions src/hipscat_import/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ def pipeline_with_client(args: RuntimeArguments, client: Client):
macauff_runner.run(args, client)
else:
raise ValueError("unknown args type")
except Exception as exception: # pylint: disable=broad-exception-caught
_send_failure_email(args, exception)
else:

_send_success_email(args)
except Exception as exception: # pylint: disable=broad-exception-caught
if args.completion_email_address:
_send_failure_email(args, exception)
raise exception


def _send_failure_email(args: RuntimeArguments, exception: Exception):
if not args.completion_email_address:
raise exception
message = EmailMessage()
message["Subject"] = "hipscat-import failure."
message["To"] = args.completion_email_address
Expand Down
2 changes: 2 additions & 0 deletions src/hipscat_import/pipeline_resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ def wait_for_futures(self, futures, stage_name):
):
if future.status == "error":
some_error = True
print(f"{stage_name} task {future.key} failed with message:")
print(future.exception())
if some_error:
raise RuntimeError(f"Some {stage_name} stages failed. See logs for details.")

Expand Down

0 comments on commit b2fe8ac

Please sign in to comment.