Skip to content

Commit

Permalink
fix: Keep polling when GPFS job is running (#2961)
Browse files Browse the repository at this point in the history
Backported-from: main (24.12)
Backported-to: 24.03
Backport-of: 2961
  • Loading branch information
fregataa committed Oct 24, 2024
1 parent 602cc9c commit 20b63ec
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 12 deletions.
1 change: 1 addition & 0 deletions changes/2961.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Let GPFS client keep polling when GPFS job is running
34 changes: 22 additions & 12 deletions src/ai/backend/storage/gpfs/gpfs_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@

import aiohttp
from aiohttp import BasicAuth, web
from tenacity import AsyncRetrying, retry_if_exception_type, stop_after_attempt, wait_fixed
from tenacity import (
AsyncRetrying,
TryAgain,
retry_if_exception_type,
stop_after_attempt,
wait_fixed,
)

from ai.backend.common.logging import BraceStyleAdapter
from ai.backend.common.types import BinarySize
Expand Down Expand Up @@ -128,23 +134,27 @@ async def _build_request(
except web.HTTPUnauthorized:
raise GPFSUnauthorizedError

async def _wait_for_job_done(self, jobs: List[GPFSJob]) -> None:
async def _wait_for_job_done(self, jobs: list[GPFSJob]) -> None:
for job_to_wait in jobs:
async for attempt in AsyncRetrying(
wait=wait_fixed(0.5),
stop=stop_after_attempt(100),
retry=retry_if_exception_type(web.HTTPNotFound),
stop=stop_after_attempt(120),
retry=retry_if_exception_type(TryAgain) | retry_if_exception_type(web.HTTPNotFound),
):
with attempt:
job = await self.get_job(job_to_wait.jobId)
if job.status == GPFSJobStatus.COMPLETED:
return
elif job.status == GPFSJobStatus.FAILED:
raise GPFSJobFailedError(
job.result.to_json() if job.result is not None else ""
)
elif job.status == GPFSJobStatus.CANCELLED:
raise GPFSJobCancelledError
match job.status:
case GPFSJobStatus.RUNNING | GPFSJobStatus.CANCELLING:
raise TryAgain
case GPFSJobStatus.COMPLETED:
return
case GPFSJobStatus.FAILED:
log.error(f"Failed to run GPFS job. (e:{str(jobs)})")
raise GPFSJobFailedError(
job.result.to_json() if job.result is not None else ""
)
case GPFSJobStatus.CANCELLED:
raise GPFSJobCancelledError

@contextlib.asynccontextmanager
async def _build_session(self) -> AsyncIterator[aiohttp.ClientSession]:
Expand Down

0 comments on commit 20b63ec

Please sign in to comment.