Skip to content

Commit

Permalink
Merge pull request #106 from i-VRESSE/105-slurm-account-job-name
Browse files Browse the repository at this point in the history
Add --account and --job-name to slurm submission
  • Loading branch information
sverhoeven authored Oct 23, 2024
2 parents 6bb29cf + fc9c1de commit 775567e
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 9 deletions.
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# https://www.sphinx-doc.org/en/master/usage/configuration.html#project-information

project = "bartender"
release = "0.5.1" # TODO have version only in one place pyproject.toml
release = "0.5.2" # TODO have version only in one place pyproject.toml

# -- General configuration ---------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration
Expand Down
2 changes: 2 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ A destination has its own authentication mechanism.
When a job is submitted by any user of the web service,
it will be executed by the username/proxy that is configured in the destination.

For allowed configuration options see the [API reference](https://i-vresse-bartender.readthedocs.io/en/latest/autoapi/bartender).

### Example of running jobs on the local system

```yaml
Expand Down
11 changes: 11 additions & 0 deletions docs/develop.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,14 @@ dirac-dms-filecatalog-cli
# pilot logs
cat ~diracpilot/localsite/output/*
```

## Creating a new release

To create a new release, you should follow these steps:

1. In `pyproject.toml` and `docs/conf.py` update the version number.
2. Create a new GitHub release

- set tag and title to new version number with `v` prefix.
- with same first line of description as previous release.
- append generated release notes to the description.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "bartender"
version = "0.5.1"
version = "0.5.2"
description = "Job middleware for i-VRESSE"
authors = [

Expand Down
4 changes: 4 additions & 0 deletions src/bartender/schedulers/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ class JobDescription(BaseModel):
job_dir: Path
# Command to run
command: str
# Application name
application: str = ""
# User that submitted the job
submitter: str = ""


class JobSubmissionError(Exception):
Expand Down
19 changes: 15 additions & 4 deletions src/bartender/schedulers/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,15 @@ class SlurmSchedulerConfig(BaseModel):
extra_options: Escape hatch to add extra options to job script.
The string `#SBATCH {extra_options[i]}` will be appended to job
script.
submitter_as_account: Use the submitter as the account to run the job.
"""

type: Literal["slurm"] = "slurm"
ssh_config: Optional[SshConnectConfig] = None
partition: Optional[str] = None
time: Optional[str] = None
extra_options: Optional[list[str]] = None
submitter_as_account: Optional[bool] = False


class SlurmScheduler(AbstractScheduler):
Expand All @@ -80,12 +82,14 @@ def __init__(self, config: SlurmSchedulerConfig):
self.extra_options = []
else:
self.extra_options = config.extra_options
self.submitter_as_account = config.submitter_as_account

async def submit(self, description: JobDescription) -> str: # noqa: D102):
# TODO if runner is a SSHCommandRunner then description.jobdir
# if runner is a SSHCommandRunner then description.jobdir
# must be on a shared filesystem or remote filesystem
script = self._submit_script(description)
command = "sbatch"
logger.debug(f"Submitting job with stdin: \n{script}")
(returncode, stdout, stderr) = await self.runner.run(
command,
args=[],
Expand Down Expand Up @@ -167,14 +171,19 @@ async def _state_from_accounting(self, job_id: str) -> str:
)
return stdout

def _submit_script(self, description: JobDescription) -> str:
def _submit_script(self, description: JobDescription) -> str: # noqa: WPS210
partition_line = ""
if self.partition:
partition_line = f"#SBATCH --partition={self.partition}"
time_line = ""
if self.time:
time_line = f"#SBATCH --time={self.time}"

account_line = ""
if description.submitter != "" and self.submitter_as_account:
account_line = f"#SBATCH --account={description.submitter}"
job_name_line = ""
if description.application != "":
job_name_line = f"#SBATCH --job-name={description.application}"
# TODO filter out options already set
extra_option_lines = "\n".join(
[f"#SBATCH {extra}" for extra in self.extra_options],
Expand All @@ -184,9 +193,11 @@ def _submit_script(self, description: JobDescription) -> str:
{extra_option_lines}
{partition_line}
{time_line}
{account_line}
{job_name_line}
#SBATCH --output=stdout.txt
#SBATCH --error=stderr.txt
{description.command}
echo -n $? > returncode
"""
""" # noqa: WPS221
return dedent(script)
19 changes: 17 additions & 2 deletions src/bartender/web/api/applications/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ def build_description(
job_dir: Path,
payload: dict[str, str],
config: ApplicatonConfiguration,
application: str = "",
submitter: str = "",
) -> JobDescription:
"""
Builds a job description.
Expand All @@ -21,13 +23,20 @@ def build_description(
job_dir: The directory where the job will be executed.
payload: The payload containing the non-file input data for the job.
config: The configuration for the application.
application: The name of the application.
submitter: The user who submitted the job.
Returns:
Job description containing the job directory and command.
"""
template = template_environment.from_string(config.command_template)
command = template.render(**payload)
return JobDescription(job_dir=job_dir, command=command)
return JobDescription(
job_dir=job_dir,
command=command,
application=application,
submitter=submitter,
)


async def submit( # noqa: WPS211
Expand All @@ -50,7 +59,13 @@ async def submit( # noqa: WPS211
job_dao: JobDAO object.
context: Context with applications and destinations.
"""
description = build_description(job_dir, payload, context.applications[application])
description = build_description(
job_dir,
payload,
context.applications[application],
application=application,
submitter=submitter.username,
)

destination_name = context.destination_picker(
job_dir,
Expand Down
41 changes: 40 additions & 1 deletion tests/schedulers/test_slurm.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from asyncio import new_event_loop
from pathlib import Path
from typing import Generator
Expand Down Expand Up @@ -73,7 +74,9 @@ async def test_ok_running_job_with_input_and_output_file(
job_dir = tmp_path
try:
ssh_config = slurm_server.get_config()
scheduler = SlurmScheduler(SlurmSchedulerConfig(ssh_config=ssh_config))
scheduler = SlurmScheduler(
SlurmSchedulerConfig(ssh_config=ssh_config),
)
description = prepare_input(job_dir)
fs = slurm_server.get_filesystem()
localized_description = fs.localize_description(description, job_dir.parent)
Expand All @@ -91,6 +94,41 @@ async def test_ok_running_job_with_input_and_output_file(
await scheduler.close()


@pytest.mark.anyio
async def test_submit_cancel_job_with_submitter_as_account( # noqa: WPS231
tmp_path: Path,
slurm_server: SlurmContainer,
caplog: pytest.LogCaptureFixture,
) -> None:
caplog.set_level(logging.DEBUG, "bartender.schedulers.slurm")
job_dir = tmp_path
try:
ssh_config = slurm_server.get_config()
scheduler = SlurmScheduler(
SlurmSchedulerConfig(
ssh_config=ssh_config,
submitter_as_account=True,
),
)
description = prepare_input(job_dir)
description.submitter = "testsubmitter"
description.application = "hellowc"

fs = slurm_server.get_filesystem()
localized_description = fs.localize_description(description, job_dir.parent)

await fs.upload(description, localized_description)

jid = await scheduler.submit(localized_description)
await scheduler.cancel(jid)

assert "--job-name=hellowc" in caplog.text
assert "--account=testsubmitter" in caplog.text
finally:
await fs.delete(localized_description)
await scheduler.close()


@pytest.mark.anyio
async def test_ok_running_job_without_iofiles(
tmp_path: Path,
Expand All @@ -113,6 +151,7 @@ async def test_ok_running_job_without_iofiles(
await fs.download(localized_description, description)

assert_output_without_iofiles(job_dir)

finally:
await scheduler.close()

Expand Down

0 comments on commit 775567e

Please sign in to comment.