Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into delete-job
Browse files Browse the repository at this point in the history
  • Loading branch information
sverhoeven committed Mar 8, 2024
2 parents 8f5ec07 + 5584a13 commit 565e23b
Show file tree
Hide file tree
Showing 16 changed files with 428 additions and 47 deletions.
7 changes: 3 additions & 4 deletions .readthedocs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ build:
tools:
python: '3.10'
jobs:
# https://github.com/readthedocs/readthedocs.org/issues/4912#issuecomment-1143587902
post_install:
post_create_environment:
- pip install poetry
- poetry config virtualenvs.create false
- poetry install --only docs
post_install:
- VIRTUAL_ENV=$READTHEDOCS_VIRTUALENV_PATH poetry install --only docs
sphinx:
configuration: docs/conf.py
26 changes: 24 additions & 2 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ schedulers
* **arq**, Scheduler which uses a Redis server as a job queue and
1 or more workers (`bartender perform` command) to run the jobs.
* **dirac**, Scheduler which submits job to grid using [DIRAC](http://diracgrid.org/).
* **eager**, Scheduler which runs the job immediately on submission.

Supported file systems

Expand Down Expand Up @@ -393,15 +394,36 @@ destinations:
log_level: DEBUG
```

### Example of running jobs direct on submission

For applications that can be run within request/response cycle time window.
For example to alter the uploaded zip contents to mimic another applications output.

```yaml
destinations:
atonce:
scheduler:
type: eager
filesystem:
type: local
applications:
runimport:
command_template: mkdir -p output && mv * output || true
# `|| true` is there to swallow eror
# that output dir itself can not be moved
```

## Destination picker

If you have multiple applications and job destinations you need some way to
specify to which destination a job should be submitted. A Python function can be
used to pick a destination. By default jobs are submitted to the first
destination.

To use a custom picker function set `destination_picker`. The value should be
formatted as `<module>:<function>`. The picker function should have type
To use a custom picker function set `destination_picker`.
The value should be formatted as `<module>:<function>` or
`<path to python file>:<function>`.
The picker function should have type
[bartender.picker.DestinationPicker](
https://github.com/i-VRESSE/bartender/blob/bdbef5176e05c498b37f4ada2bf7c09ad0e7b853/src/bartender/picker.py#L8
). For example to rotate over each
Expand Down
22 changes: 22 additions & 0 deletions src/bartender/check_load.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from os import getloadavg, sched_getaffinity

from fastapi import HTTPException
from starlette import status


def check_load(max_load: float = 1.0) -> None:
"""Check if machine load is too high.
Args:
max_load: Maximum load allowed.
Raises:
HTTPException: When machine load is too high.
"""
nr_cpus = len(sched_getaffinity(0))
load_avg_last_minute = getloadavg()[0] / nr_cpus
if load_avg_last_minute > max_load:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Machine load is too high, please try again later.",
)
3 changes: 2 additions & 1 deletion src/bartender/destinations.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ class DestinationConfig(BaseModel):

# TODO validate that some combinations of scheduler and file system
# are not possible like
# * MemoryScheduler + SftpFileSystem
# * MemoryScheduler + remote fs
# * EagerScheduler + remote fs
# In future possible combos
# * AWSBatchScheduler + S3FileSystem
# * DiracScheduler + SrmFileSystem
Expand Down
67 changes: 64 additions & 3 deletions src/bartender/picker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from importlib import import_module
from importlib.util import module_from_spec, spec_from_file_location
from pathlib import Path
from typing import TYPE_CHECKING, Callable

Expand Down Expand Up @@ -31,6 +32,55 @@ def pick_first(
return destination_names[0]


def pick_byname(
job_dir: Path,
application_name: str,
submitter: User,
context: "Context",
) -> str:
"""Picks destination with same name as application name.
Args:
job_dir: Location where job input files are located.
application_name: Application name that should be run.
submitter: User that submitted the job.
context: Context with applications and destinations.
Returns:
Destination name.
Raises:
KeyError: If application has no destination.
"""
if application_name in context.destinations:
return application_name
raise KeyError(f"Application {application_name} has no destination.")


def pick_byindex(
job_dir: Path,
application_name: str,
submitter: User,
context: "Context",
) -> str:
"""Picks destination by index.
For example the 2nd application will be submitted to the 2nd destination.
Args:
job_dir: Location where job input files are located.
application_name: Application name that should be run.
submitter: User that submitted the job.
context: Context with applications and destinations.
Returns:
Destination name.
"""
application_index = list(context.applications.keys()).index(application_name)
destination_names = list(context.destinations.keys())
return destination_names[application_index]


class PickRound:
"""Builder for round robin destination picker."""

Expand Down Expand Up @@ -76,16 +126,27 @@ def __call__(


def import_picker(destination_picker_name: str) -> DestinationPicker:
"""Import a picker function based on a `<module>:<function>` string.
"""Import a picker function.
Args:
destination_picker_name: function import as string.
Format `<module>:<function>` or `<path to python file>:<function>`
Returns:
Function that can be used to pick to which destination a job should be
submitted.
Raises:
ValueError: If the function could not be imported.
"""
# TODO allow somedir/somefile.py:pick_round_robin
(module_name, function_name) = destination_picker_name.split(":")
module = import_module(module_name)
if module_name.endswith(".py"):
file_path = Path(module_name)
spec = spec_from_file_location(file_path.name, file_path)
if spec is None or spec.loader is None:
raise ValueError(f"Could not load {file_path}")
module = module_from_spec(spec)
spec.loader.exec_module(module)
else:
module = import_module(module_name)
return getattr(module, function_name)
7 changes: 7 additions & 0 deletions src/bartender/schedulers/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ class JobDescription(BaseModel):
command: str


class JobSubmissionError(Exception):
"""Error during job submission."""


class AbstractScheduler(ABC):
"""Abstract scheduler."""

Expand All @@ -34,6 +38,9 @@ async def submit(self, description: JobDescription) -> str:
Returns:
Identifier that can be used later to interact with job.
Raises:
JobSubmissionError: If job submission failed.
"""

@abstractmethod
Expand Down
17 changes: 11 additions & 6 deletions src/bartender/schedulers/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from bartender.schedulers.abstract import AbstractScheduler
from bartender.schedulers.arq import ArqScheduler, ArqSchedulerConfig
from bartender.schedulers.dirac_config import DiracSchedulerConfig
from bartender.schedulers.eager import EagerScheduler, EagerSchedulerConfig
from bartender.schedulers.memory import MemoryScheduler, MemorySchedulerConfig
from bartender.schedulers.slurm import SlurmScheduler, SlurmSchedulerConfig
from bartender.shared.dirac_config import DIRAC_INSTALLED
Expand All @@ -12,6 +13,7 @@
SlurmSchedulerConfig,
ArqSchedulerConfig,
DiracSchedulerConfig,
EagerSchedulerConfig,
]


Expand All @@ -28,12 +30,15 @@ def build(config: SchedulerConfig) -> AbstractScheduler:
A scheduler instance.
"""
if isinstance(config, MemorySchedulerConfig):
return MemoryScheduler(config)
if isinstance(config, SlurmSchedulerConfig):
return SlurmScheduler(config)
if isinstance(config, ArqSchedulerConfig):
return ArqScheduler(config)
config2scheduler = {
MemorySchedulerConfig: MemoryScheduler,
SlurmSchedulerConfig: SlurmScheduler,
ArqSchedulerConfig: ArqScheduler,
EagerSchedulerConfig: EagerScheduler,
}
for cfgcls, schedulercls in config2scheduler.items():
if isinstance(config, cfgcls):
return schedulercls(config)
if isinstance(config, DiracSchedulerConfig):
if DIRAC_INSTALLED:
from bartender.schedulers.dirac import ( # noqa: WPS433 is optional import
Expand Down
81 changes: 81 additions & 0 deletions src/bartender/schedulers/eager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from asyncio import create_subprocess_shell, wait_for
from asyncio.subprocess import Process
from pathlib import Path
from typing import Literal
from uuid import uuid1

from pydantic import BaseModel, PositiveInt
from pydantic.types import PositiveFloat

from bartender.check_load import check_load
from bartender.db.models.job_model import State
from bartender.schedulers.abstract import (
AbstractScheduler,
JobDescription,
JobSubmissionError,
)


class EagerSchedulerConfig(BaseModel):
"""Configuration for eager scheduler.
Args:
max_load: Maximum load that scheduler will process submissions.
timeout: Maximum time to wait for job to finish. In seconds.
"""

type: Literal["eager"] = "eager"
max_load: PositiveFloat = 1.0
timeout: PositiveInt = 300


async def _exec(description: JobDescription, timeout: int) -> None:
stderr_fn = description.job_dir / "stderr.txt"
stdout_fn = description.job_dir / "stdout.txt"

with open(stderr_fn, "w") as stderr:
with open(stdout_fn, "w") as stdout:
proc = await create_subprocess_shell(
description.command,
stdout=stdout,
stderr=stderr,
cwd=description.job_dir,
)
try:
await _handle_job_completion(timeout, proc, description.job_dir)
except TimeoutError:
raise JobSubmissionError(f"Job took longer than {timeout} seconds")


async def _handle_job_completion(timeout: int, proc: Process, job_dir: Path) -> None:
returncode = await wait_for(proc.wait(), timeout=timeout)
(job_dir / "returncode").write_text(str(returncode))
if returncode != 0:
raise JobSubmissionError(
f"Job failed with return code {returncode}",
)


class EagerScheduler(AbstractScheduler):
"""Scheduler that runs jobs immediately on submission."""

def __init__(self, config: EagerSchedulerConfig) -> None:
self.config = config

async def submit(self, description: JobDescription) -> str: # noqa: D102
check_load(self.config.max_load)
await _exec(description, self.config.timeout)
return str(uuid1())

async def state(self, job_id: str) -> State: # noqa: D102
return "ok"

async def states(self, job_ids: list[str]) -> list[State]: # noqa: D102
return ["ok" for _ in job_ids]

async def cancel(self, job_id: str) -> None: # noqa: D102
pass # noqa: WPS420 -- cannot cancel job that is already completed.

async def close(self) -> None: # noqa: D102
pass # noqa: WPS420 -- nothing to close.
6 changes: 5 additions & 1 deletion src/bartender/schedulers/slurm.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from textwrap import dedent
from typing import Literal, Optional

Expand All @@ -12,6 +13,8 @@
)
from bartender.shared.ssh import SshConnectConfig

logger = logging.getLogger(__name__)


def _map_slurm_state(slurm_state: str) -> State:
status_map: dict[str, State] = {
Expand All @@ -32,6 +35,7 @@ def _map_slurm_state(slurm_state: str) -> State:
try:
return status_map[slurm_state]
except KeyError:
logger.error(f"Unmapped slurm state code: {slurm_state}")
# fallback to error when slurm state code is unmapped.
return "error"

Expand Down Expand Up @@ -155,7 +159,7 @@ def __repr__(self) -> str:

async def _state_from_accounting(self, job_id: str) -> str:
command = "sacct"
args = ["-j", job_id, "--noheader", "--format=state"]
args = ["-j", job_id, "--noheader", "--format=state", "--allocations"]
(returncode, stdout, stderr) = await self.runner.run(command, args)
if returncode != 0:
raise RuntimeError(
Expand Down
17 changes: 10 additions & 7 deletions src/bartender/web/api/applications/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from bartender.context import Context
from bartender.db.dao.job_dao import JobDAO
from bartender.filesystems.abstract import AbstractFileSystem
from bartender.schedulers.abstract import JobDescription
from bartender.schedulers.abstract import JobDescription, JobSubmissionError
from bartender.template_environment import template_environment
from bartender.web.users import User

Expand Down Expand Up @@ -71,13 +71,16 @@ async def submit( # noqa: WPS211
localized_description,
)

internal_job_id = await destination.scheduler.submit(localized_description)
try:
internal_job_id = await destination.scheduler.submit(localized_description)

await job_dao.update_internal_job_id(
external_job_id,
internal_job_id,
destination_name,
)
await job_dao.update_internal_job_id(
external_job_id,
internal_job_id,
destination_name,
)
except JobSubmissionError:
await job_dao.update_job_state(external_job_id, "error")


async def _upload_input_files(
Expand Down
1 change: 1 addition & 0 deletions src/bartender/web/api/job/interactive_apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ async def _shell(job_dir: Path, command: str, timeout: float) -> InteractiveAppR
job_dir: The path to the job directory.
command: The shell command to execute.
timeout: The maximum time to wait for the command to finish.
In seconds.
Returns:
The result of running the shell command.
Expand Down
Loading

0 comments on commit 565e23b

Please sign in to comment.