Skip to content

Commit

Permalink
Remove from filesystem when job is cancelable
Browse files Browse the repository at this point in the history
  • Loading branch information
sverhoeven committed Mar 8, 2024
1 parent ae3a089 commit 3f8257e
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 47 deletions.
3 changes: 1 addition & 2 deletions src/bartender/filesystems/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,13 @@ async def download(self, src: JobDescription, target: JobDescription) -> None:
target: Local directory to copy to.
"""


async def delete(self, description: JobDescription) -> None:
"""Delete job directory of description.
Args:
description: Remote directory to delete.
"""
# after download or cancellation you might want to
# after download or cancellation you might want to
# delete the remote job directory

async def close(self) -> None:
Expand Down
58 changes: 50 additions & 8 deletions src/bartender/web/api/job/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from bartender.db.models.job_model import MAX_LENGTH_NAME, CompletedStates, Job
from bartender.destinations import Destination
from bartender.filesystems.queue import CurrentFileOutStagingQueue
from bartender.schedulers.abstract import JobDescription
from bartender.walk_dir import DirectoryItem, walk_dir
from bartender.web.api.job.interactive_apps import InteractiveAppResult, run
from bartender.web.api.job.schema import JobModelDTO
Expand Down Expand Up @@ -221,16 +222,31 @@ def retrieve_job_file(

CurrentJob = Annotated[Job, Depends(retrieve_job)]


def get_destination(
job: CurrentJob,
context: CurrentContext,
):
job: CurrentJob,
context: CurrentContext,
) -> Destination:
"""Get the destination of a job.
Args:
job: The job.
context: Context with destinations.
Raises:
ValueError: When job has no destination.
Returns:
The destination of the job.
"""
if not job.destination or not job.internal_id:
raise ValueError("Job has no destination")
return context.destinations[job.destination]


CurrentDestination = Annotated[Destination, Depends(get_destination)]


async def get_completed_logs(
job_dir: CurrentCompletedJobDir,
job: CurrentJob,
Expand All @@ -241,7 +257,7 @@ async def get_completed_logs(
Args:
job_dir: Directory with job output files.
job: The job.
context: Context with destinations.
destination: The destination of the job.
Raises:
ValueError: When job has no destination.
Expand All @@ -251,6 +267,8 @@ async def get_completed_logs(
The standard output and error.
"""
try:
if not job.internal_id:
raise ValueError("Job has no internal_id")
return await destination.scheduler.logs(job.internal_id, job_dir)
except FileNotFoundError as exc:
raise HTTPException(
Expand Down Expand Up @@ -575,19 +593,43 @@ async def rename_job_name(
detail="Job not found",
) from exc


@router.delete("/{jobid}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_job(
async def delete_job( # noqa: WPS211
jobid: int,
job_dao: CurrentJobDAO,
job: Annotated[Job, Depends(retrieve_job)],
user: CurrentUser,
job_dir: Annotated[Path, Depends(get_job_dir)],
destination: CurrentDestination,
job_root_dir: Annotated[Path, Depends(get_job_root_dir)],
) -> None:
CancelableStates = { "queued", "running" }
if job.state in CancelableStates:
"""Delete a job.
Deletes job from database and filesystem.
When job is queued or running it will be canceled
and removed from the filesystem where the job is located.
Args:
jobid: The job identifier.
job_dao: The job DAO.
job: The job.
user: The current user.
job_dir: The job directory.
destination: The destination of the job.
job_root_dir: The root directory of all jobs.
"""
cancelable_states = {"queued", "running"}
if job.state in cancelable_states and job.internal_id is not None:
await destination.scheduler.cancel(job.internal_id)
# TODO cleanup remote job directory
description = JobDescription(job_dir=job_dir, command="echo")
localized_description = destination.filesystem.localize_description(
description,
job_root_dir,
)
await destination.filesystem.delete(localized_description)

if job_dir.is_symlink():
# We want to remove the symlink not its target
Expand Down
42 changes: 5 additions & 37 deletions tests/web/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,9 @@ async def download(self, src: JobDescription, target: JobDescription) -> None:
async def upload(self, src: JobDescription, target: JobDescription) -> None:
raise NotImplementedError()

async def delete(self, target: JobDescription) -> None:
raise NotImplementedError()

async def close(self) -> None:
raise NotImplementedError()

Expand Down Expand Up @@ -917,14 +920,15 @@ async def test_rename_job_name_wrong_user(
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json() == {"detail": "Job not found"}


@pytest.mark.anyio
async def test_delete_completed_job(
fastapi_app: FastAPI,
client: AsyncClient,
auth_headers: Dict[str, str],
mock_ok_job: int,
job_root_dir: Path,
):
) -> None:
job_id = str(mock_ok_job)
url = fastapi_app.url_path_for("delete_job", jobid=job_id)
response = await client.delete(url, headers=auth_headers)
Expand All @@ -936,39 +940,3 @@ async def test_delete_completed_job(

response2 = await client.delete(url, headers=auth_headers)
assert response2.status_code == status.HTTP_404_NOT_FOUND

@pytest.mark.anyio
async def test_delete_queued_job(
dbsession: AsyncSession,
current_user: User,
demo_context: Context,
) -> None:
cancel_mock = Mock()

class FakeScheduler(AbstractScheduler):
async def state(self, job_id: str) -> State:
return "queued"

async def states(self, job_ids: list[str]) -> list[State]:
return ["queued"]

async def submit(self, description: JobDescription) -> str:
raise NotImplementedError()

async def cancel(self, job_id: str) -> None:
cancel_mock(job_id)

async def close(self) -> None:
raise NotImplementedError()

demo_context.destinations["dest1"].scheduler = FakeScheduler()


dao = JobDAO(dbsession)
job_id, download_mock = await prepare_job(
db_state="queued",
scheduler_state="queued",
dao=dao,
current_user=current_user,
demo_context=demo_context,
)

0 comments on commit 3f8257e

Please sign in to comment.