Skip to content

Commit

Permalink
Merge pull request #99 from lsst-sqre:tickets/DM-46137
Browse files Browse the repository at this point in the history
DM-46137: Support timeouts in notebook executions
  • Loading branch information
jonathansick authored Sep 12, 2024
2 parents 7858d60 + 55bee1b commit 058689c
Show file tree
Hide file tree
Showing 11 changed files with 1,272 additions and 1,053 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ repos:
- id: trailing-whitespace

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.5.6
rev: v0.6.4
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix]
Expand Down
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@

<!-- scriv-insert-here -->

<a id='changelog-0.13.0'></a>

## 0.13.0 (2024-09-12)

### New features

- Notebook execution jobs can now set _timeouts_. In requests, set a timeout in the `timeout` request field. This can be a number of seconds, or a [human-readable duration string](https://safir.lsst.io/user-guide/datetime.html#parsing-time-intervals) (e.g. "1h30m"). The specified timeout is also repeated in the response body. This timeout applies to the notebook execution, not any time in the queue.

- Errors that prevented a notebook from being executed are now reported in the notebook job response body in the `error` field. The field is an object with a `code` field and a `message` field. The `code` field is a string that can be used to identify the error. Currently the codes are `timeout`, `jupyter_error`, and `unknown`. Note that exceptions raised in the Jupyter notebook aren't considered errors, but are instead reported in the `ipynb_error` field.

<a id='changelog-0.12.1'></a>

## 0.12.1 (2024-08-02)
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# - Runs a non-root user.
# - Sets up the entrypoint and port.

FROM python:3.12.3-slim-bullseye as base-image
FROM python:3.12.6-slim-bookworm as base-image

Check warning on line 17 in Dockerfile

View workflow job for this annotation

GitHub Actions / build

The 'as' keyword should match the case of the 'from' keyword

FromAsCasing: 'as' and 'FROM' keywords' casing do not match More info: https://docs.docker.com/go/dockerfile/rule/from-as-casing/

# Update system packages
COPY scripts/install-base-packages.sh .
Expand Down
1,194 changes: 623 additions & 571 deletions requirements/dev.txt

Large diffs are not rendered by default.

934 changes: 491 additions & 443 deletions requirements/main.txt

Large diffs are not rendered by default.

56 changes: 28 additions & 28 deletions requirements/tox.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# This file was autogenerated by uv via the following command:
# uv pip compile --generate-hashes --output-file requirements/tox.txt requirements/tox.in
cachetools==5.4.0 \
--hash=sha256:3ae3b49a3d5e28a77a0be2b37dbcb89005058959cb2323858c2657c4a8cab474 \
--hash=sha256:b8adc2e7c07f105ced7bc56dbb6dfbe7c4a00acce20e2227b3f355be89bc6827
cachetools==5.5.0 \
--hash=sha256:02134e8439cdc2ffb62023ce1debca2944c3f289d66bb17ead3ab3dede74b292 \
--hash=sha256:2cc24fb4cbe39633fb7badd9db9ca6295d766d9c2995f245725a46715d050f2a
# via tox
chardet==5.2.0 \
--hash=sha256:1b3b6ff479a8c414bc3fa2c0852995695c4a026dcd6d0633b2dd092ca39c1cf7 \
Expand Down Expand Up @@ -51,35 +51,35 @@ pyproject-api==1.7.1 \
--hash=sha256:2dc1654062c2b27733d8fd4cdda672b22fe8741ef1dde8e3a998a9547b071eeb \
--hash=sha256:7ebc6cd10710f89f4cf2a2731710a98abce37ebff19427116ff2174c9236a827
# via tox
tox==4.16.0 \
--hash=sha256:43499656f9949edb681c0f907f86fbfee98677af9919d8b11ae5ad77cb800748 \
--hash=sha256:61e101061b977b46cf00093d4319438055290ad0009f84497a07bf2d2d7a06d0
tox==4.18.0 \
--hash=sha256:0a457400cf70615dc0627eb70d293e80cd95d8ce174bb40ac011011f0c03a249 \
--hash=sha256:5dfa1cab9f146becd6e351333a82f9e0ade374451630ba65ee54584624c27b58
# via
# -r requirements/tox.in
# tox-uv
tox-uv==1.11.1 \
--hash=sha256:9e310020eaad3c7389770007d49cb2146e92383a47380086c065156c8aff0c56 \
--hash=sha256:e4962ad07541bd9da08e6d654edc5ce636665899e9c181a4ea4a3f4385da9158
tox-uv==1.11.2 \
--hash=sha256:7f8f1737b3277e1cddcb5b89fcc5931d04923562c940ae60f29e140908566df2 \
--hash=sha256:a7aded5c3fb69f055b523357988c1055bb573e91bfd7ecfb9b5233ebcab5d10b
# via -r requirements/tox.in
uv==0.2.33 \
--hash=sha256:02ed3b62049ea1f40404d33a02a69d3808f3b0e001e5565938804ca76beafbc4 \
--hash=sha256:181ccdb22058465c6690dca22e506fec234dcae5bcbe6389fd5330971910250e \
--hash=sha256:2fe685e73f198b2630e08e89ece0d858d58646a038a6d9cb2b06126dcca856d1 \
--hash=sha256:37924a3b502117fd74b1ddf08e9288b397da7895dd8cad46005422eefffe6e88 \
--hash=sha256:42b65bbf78b5186a40ea4423fab030fb01c9354432a7c0a3b5db67a3f4e246c5 \
--hash=sha256:48cfdb8efd237eb00086b8f0d0dc7281e517fd8afb55f698538087379bf45a8d \
--hash=sha256:676231a93001db051ecf98cb380f2d48d3f6b95add66ff4546073e30911a737a \
--hash=sha256:714351e10f27e41052897e26cd4acfe66e35250903fdc20f762d29461cf3ec4a \
--hash=sha256:73031edf35195289f02f6f1a603c512b57c8f921cb62fd442dbb63fd2a77c801 \
--hash=sha256:744eb9743e4b850af5de9f3c727d84a60a763ae0f4f5183dcdfa8a065879694d \
--hash=sha256:86f6237102deedbb17201804eb821833c5bad3f551f16f2695ae2b85e9f066de \
--hash=sha256:8eba96cbff1bc492c270e143235b39cfbe6dddebd842228ea14124d6b7d944e8 \
--hash=sha256:90b74796ce75594e63345c8e090fbac832a8f6db876691ae2b57b0b8d6011559 \
--hash=sha256:93c45d07ab440c03f2796540d646c34e58b4707feebfb9f70ded1306830408b0 \
--hash=sha256:ace6cb8383203fdfeaf8dbbc1ecb3bb945e040ca10558e233b63c84af82f6636 \
--hash=sha256:dbe497a1a16be9569d42cf4a7562e14bb3c3d9b33cc65e59095f1c3f8ab983df \
--hash=sha256:ede51de6795f9571b182c104d6078690c3a10b3fbe6fcf414b2e38c8d394e575 \
--hash=sha256:fb6f282ac92fbc05e82fa3a93e6515ad5b044e8c845ba16d815b5889799eebd1
uv==0.4.6 \
--hash=sha256:081793fc66286ad0f9b939b5ab3239d20d486eff7b8f7c574d3fdcd44536595e \
--hash=sha256:16ae9f0a9b3bb6db755d057c4ff55b0e82fbb09243a1dce02720268718c0a11c \
--hash=sha256:2bd9879eb8ee9de0fa532136ddfe76c1b425520fa6de52f5d023567e226a08b5 \
--hash=sha256:346db85ce897c6c782af33504922a84c583180906244ea224a306e2393f396f4 \
--hash=sha256:397e02640cbddc4230da8d614ef9a7138a024b9ce396f2a27013cd254e97480d \
--hash=sha256:4db5c2b770bb0e95a4d32dc19943109310c86103fe8b8ecf0d18dfe1a4f2a212 \
--hash=sha256:5c142b0082844c9eca4a303e13cf1d286622f49af70e8f922b833cb3e667b2e3 \
--hash=sha256:5d507dee4767d054171dcdbc1ac1fa7d15a7579e20ff988162c749f09a95f943 \
--hash=sha256:831be24db75938ebb01f9e6b7bcf24683b9661f91aab4108636207ba9dc6a2b9 \
--hash=sha256:86ecd19796363f0161600120d81ce7e3d17d54cce6d25017190daa608a8ab535 \
--hash=sha256:86f78fb80989b524ce7da705fb5af613339a8eadbcf9e230bb16e8a7d383c2ce \
--hash=sha256:93165aa25f5278a0844c91a367140e725d384633946705651ff8e70757c2e92f \
--hash=sha256:954f91be1b8e531e9fd3d6d549ec1638c20ce4e77dc6e272c5c5d21b8c952f11 \
--hash=sha256:9bf486c6331197a301bff253945b233d852ed2fdc322e93c78d382092813161d \
--hash=sha256:a66442018e09467222824b685b2d401053dfdbc3b5f6fb1f863ca2eb2560076a \
--hash=sha256:b8cdadb99b842494a0a39e0184cacfb4af722fdbea3d4cb5ee3b25ea0b824292 \
--hash=sha256:e0360bb3b094d106d1fa00b3a15c69ccd0d1593682c33affb1b94367a248a378 \
--hash=sha256:e6e925f661aeaf392cc102d3c2f6f05b6994519bd7a36578f836a4bbf42ee813
# via tox-uv
virtualenv==20.26.3 \
--hash=sha256:4c43a2a236279d9ea36a0d76f98d84bd6ca94ac4e0f4a3b9d46d05e10fea542a \
Expand Down
9 changes: 9 additions & 0 deletions src/noteburst/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
__all__ = [
"TaskError",
"NbexecTaskError",
"NbexecTaskTimeoutError",
"NoteburstClientRequestError",
"NoteburstError",
]
Expand Down Expand Up @@ -38,6 +39,14 @@ class NbexecTaskError(TaskError):
task_name = "nbexec"


class NbexecTaskTimeoutError(NbexecTaskError):
"""Error raised when a notebook execution task times out."""

@classmethod
def from_exception(cls, exc: Exception) -> Self:
return cls(f"{cls.task_name} timeout error\n\n{exc!s}")


class NoteburstClientRequestError(ClientRequestError):
"""Error related to the API client."""

Expand Down
1 change: 1 addition & 0 deletions src/noteburst/handlers/v1/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ async def post_nbexec(
ipynb=request_data.get_ipynb_as_str(),
kernel_name=request_data.kernel_name,
enable_retry=request_data.enable_retry,
timeout=request_data.timeout,
)
logger.info("Finished enqueing an nbexec task", job_id=job_metadata.id)
response_data = await NotebookResponse.from_job_metadata(
Expand Down
101 changes: 96 additions & 5 deletions src/noteburst/handlers/v1/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@
from __future__ import annotations

import json
from datetime import datetime
from datetime import datetime, timedelta
from enum import Enum
from typing import Annotated, Any

from arq.jobs import JobStatus
from fastapi import Request
from pydantic import AnyHttpUrl, BaseModel, Field
from safir.arq import JobMetadata, JobResult
from safir.pydantic import HumanTimedelta

from noteburst.exceptions import NbexecTaskError, NbexecTaskTimeoutError
from noteburst.jupyterclient.jupyterlab import (
NotebookExecutionErrorModel,
NotebookExecutionResult,
Expand Down Expand Up @@ -47,6 +50,34 @@ def from_nbexec_error(
)


class NoteburstErrorCodes(Enum):
"""Error codes for Noteburst errors."""

timeout = "timeout"
"""The notebook execution timed out."""

jupyter_error = "jupyter_error"
"""An error occurred contacting the Jupyter server."""

unknown = "unknown"
"""An unknown error occurred."""


class NoteburstExecutionError(BaseModel):
"""Information about an exception that occurred during noteburst's
execution of a notebook (other than an exception raised in the notebook
itself).
"""

code: NoteburstErrorCodes = Field(
description="The reference code of the error."
)

message: str | None = Field(
None, description="Additional information about the exception."
)


class NotebookResponse(BaseModel):
"""Information about a notebook execution job, possibly including the
result and source notebooks.
Expand Down Expand Up @@ -101,6 +132,17 @@ class NotebookResponse(BaseModel):
),
] = None

error: Annotated[
NoteburstExecutionError | None,
Field(
description=(
"An error occurred during notebook execution, other than an "
"exception in the notebook itself. This field is null if an "
"error did not occur."
)
),
] = None

ipynb: Annotated[
str | None,
Field(
Expand All @@ -119,6 +161,15 @@ class NotebookResponse(BaseModel):
),
] = None

timeout: Annotated[
float | None,
Field(
None,
title="The job's timeout in seconds",
description="This field is null if a timeout was not set.",
),
] = None

@classmethod
async def from_job_metadata(
cls,
Expand All @@ -129,18 +180,45 @@ async def from_job_metadata(
job_result: JobResult | None = None,
) -> NotebookResponse:
"""Create a NotebookResponse from a job."""
# When a job is a "success" it means that the arq worker didn't raise
# an exception, so we can expect an ipynb result. However the ipynb
# might have still raised an exception which is part of
# nbexec_result.error and we want to pass that back to the user.
if job_result is not None and job_result.success:
nbexec_result = NotebookExecutionResult.model_validate_json(
job_result.result
)
ipynb = nbexec_result.notebook
if nbexec_result.error:
error = NotebookError.from_nbexec_error(nbexec_result.error)
ipynb_error = NotebookError.from_nbexec_error(
nbexec_result.error
)
else:
error = None
ipynb_error = None
else:
ipynb = None
error = None
ipynb_error = None

# In this case the job is complete but failed (an exception was raised)
# so we want to pass the exception back to the user.
noteburst_error = None
if job_result and not job_result.success:
if e := job_result.result:
if isinstance(job_result.result, NbexecTaskTimeoutError):
noteburst_error = NoteburstExecutionError(
code=NoteburstErrorCodes.timeout,
message=str(e).strip(),
)
elif isinstance(job_result.result, NbexecTaskError):
noteburst_error = NoteburstExecutionError(
code=NoteburstErrorCodes.jupyter_error,
message=str(e).strip(),
)
elif isinstance(job_result.result, Exception):
noteburst_error = NoteburstExecutionError(
code=NoteburstErrorCodes.unknown,
message=str(e).strip(),
)

return cls(
job_id=job.id,
Expand All @@ -152,8 +230,10 @@ async def from_job_metadata(
start_time=job_result.start_time if job_result else None,
finish_time=job_result.finish_time if job_result else None,
success=job_result.success if job_result else None,
error=noteburst_error,
ipynb=ipynb,
ipynb_error=error,
ipynb_error=ipynb_error,
timeout=job.kwargs["timeout"].total_seconds(),
)


Expand All @@ -172,6 +252,17 @@ class PostNotebookRequest(BaseModel):

kernel_name: Annotated[str, kernel_name_field]

timeout: HumanTimedelta = Field(
default_factory=lambda: timedelta(seconds=300),
title="Timeout for notebook execution.",
description=(
"The timeout can either be written as a number in seconds or as a "
"human-readable duration string. For example, '5m' is 5 minutes, "
"'1h' is 1 hour, '1d' is 1 day. If the notebook execution does "
"not complete within this time, the job is marked as failed."
),
)

enable_retry: Annotated[
bool,
Field(
Expand Down
14 changes: 11 additions & 3 deletions src/noteburst/worker/functions/nbexec.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@

from __future__ import annotations

import asyncio
import json
import sys
from datetime import timedelta
from typing import Any, cast

from arq import Retry
from safir.slack.blockkit import SlackCodeBlock, SlackTextField

from noteburst.exceptions import NbexecTaskError
from noteburst.exceptions import NbexecTaskError, NbexecTaskTimeoutError
from noteburst.jupyterclient.jupyterlab import JupyterClient, JupyterError


Expand All @@ -21,6 +23,7 @@ async def nbexec(
ipynb: str,
kernel_name: str = "LSST",
enable_retry: bool = True,
timeout: timedelta | None = None, # noqa: ASYNC109
) -> str:
"""Execute a notebook, as an asynchronous arq worker task.
Expand Down Expand Up @@ -54,10 +57,15 @@ async def nbexec(
parsed_notebook = json.loads(ipynb)
logger.debug("Got ipynb", ipynb=parsed_notebook)
try:
execution_result = await jupyter_client.execute_notebook(
parsed_notebook, kernel_name=kernel_name
execution_result = await asyncio.wait_for(
jupyter_client.execute_notebook(
parsed_notebook, kernel_name=kernel_name
),
timeout=timeout.total_seconds() if timeout else None,
)
logger.info("nbexec finished", error=execution_result.error)
except TimeoutError as e:
raise NbexecTaskTimeoutError.from_exception(e) from e
except JupyterError as e:
logger.exception("nbexec error", jupyter_status=e.status)
if "slack" in ctx and "slack_message_factory" in ctx:
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async def client(app: FastAPI) -> AsyncIterator[AsyncClient]:
"""Return an ``httpx.AsyncClient`` configured to talk to the test app."""
headers = {"X-Auth-Request-User": "user"}
async with AsyncClient(
transport=ASGITransport(app=app), # type: ignore[arg-type]
transport=ASGITransport(app=app),
base_url="https://example.com/",
headers=headers,
) as client:
Expand Down

0 comments on commit 058689c

Please sign in to comment.