diff --git a/app_runner/src/app_runner/cli/validate.py b/app_runner/src/app_runner/cli/validate.py index 3f855fda..0c7a0cc2 100644 --- a/app_runner/src/app_runner/cli/validate.py +++ b/app_runner/src/app_runner/cli/validate.py @@ -9,6 +9,7 @@ from app_runner.specs.app.app_spec import AppSpecTemplate, AppSpec from app_runner.specs.inputs_spec import InputsSpec from app_runner.specs.outputs_spec import OutputsSpec +from app_runner.specs.submitter_spec import SubmittersSpec app_validate = cyclopts.App("validate", help="Validate yaml files.") @@ -39,3 +40,10 @@ def outputs_spec(yaml_file: Path) -> None: """Validate an outputs spec file.""" outputs_spec = OutputsSpec.model_validate(yaml.safe_load(yaml_file.read_text())) pprint(outputs_spec) + + +@app_validate.command() +def submitters_spec(yaml_file: Path) -> None: + """Validate a submitters spec file.""" + submitters_spec = SubmittersSpec.model_validate(yaml.safe_load(yaml_file.read_text())) + pprint(submitters_spec) diff --git a/app_runner/src/app_runner/specs/submitter_ref.py b/app_runner/src/app_runner/specs/submitter_ref.py index e5f3f8d6..357489b1 100644 --- a/app_runner/src/app_runner/specs/submitter_ref.py +++ b/app_runner/src/app_runner/specs/submitter_ref.py @@ -2,9 +2,18 @@ from pydantic import BaseModel +from app_runner.specs.submitter_spec import SubmitterSpec + class SubmitterRef(BaseModel): """Reference of a submitter and potential configuration overrides.""" name: str config: dict[str, Any] = {} + + def resolve(self, specs: dict[str, SubmitterSpec]) -> SubmitterSpec: + # TODO move this into a function + base_spec = specs[self.name] + base = base_spec.model_dump(mode="json") + base.update(self.config) + return type(base_spec).model_validate(base) diff --git a/app_runner/src/app_runner/specs/submitter_spec.py b/app_runner/src/app_runner/specs/submitter_spec.py new file mode 100644 index 00000000..277bf781 --- /dev/null +++ b/app_runner/src/app_runner/specs/submitter_spec.py @@ -0,0 +1,73 @@ +from pathlib import Path +from typing import Literal + +from pydantic import BaseModel + + +class SubmitterSlurmSpec(BaseModel): + type: Literal["slurm22"] = "slurm22" + n_nodes: int | tuple[int, int] = 1 + n_tasks: int = 1 + n_cpus: int + n_cpus_scope: Literal["task", "gpu"] = "task" + partition: str + memory: str + memory_scope: Literal["node", "cpu", "gpu"] = "node" + nodelist: str | None = None + time: str | None = None + disk: int | None = None + log_stdout: str | None = None + log_stderr: str | None = None + custom_args: dict[str, str | None] = {} + local_script_dir: Path + worker_scratch_dir: Path + + def get_slurm_parameters(self) -> dict[str, str | None]: + params = {} + if isinstance(self.n_nodes, int): + params["nodes"] = str(self.n_nodes) + else: + params["nodes"] = f"{self.n_nodes[0]}-{self.n_nodes[1]}" + + if self.n_cpus_scope == "task": + params["cpus-per-task"] = str(self.n_cpus) + elif self.n_cpus_scope == "gpu": + params["cpus-per-gpu"] = str(self.n_cpus) + + if self.memory_scope == "node": + params["mem"] = self.memory + elif self.memory_scope == "cpu": + params["mem-per-cpu"] = self.memory + elif self.memory_scope == "gpu": + params["mem-per-gpu"] = self.memory + + if self.nodelist: + params["nodelist"] = self.nodelist + if self.time: + params["time"] = self.time + if self.disk: + params["disk"] = str(self.disk) + if self.log_stdout: + params["output"] = self.log_stdout + if self.log_stderr: + params["error"] = self.log_stderr + return params + + def to_slurm_cli_arguments(self) -> list[str]: + args = [] + params = self.get_slurm_parameters() + for name, value in params.items(): + if value is None: + args.append(f"--{name}") + else: + args.append(f"--{name}={value}") + # TODO reintroduce this + # args.extend(self.custom_args) + return args + + +SubmitterSpec = SubmitterSlurmSpec + + +class SubmittersSpec(BaseModel): + submitters: dict[str, SubmitterSpec] diff --git a/app_runner/src/app_runner/submitter/__init__.py b/app_runner/src/app_runner/submitter/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/app_runner/src/app_runner/submitter/slurm_submitter.py b/app_runner/src/app_runner/submitter/slurm_submitter.py new file mode 100644 index 00000000..71841613 --- /dev/null +++ b/app_runner/src/app_runner/submitter/slurm_submitter.py @@ -0,0 +1,18 @@ +from typing import TextIO + +from app_runner.specs.submitter_spec import SubmitterSlurmSpec + + +class SubmitterSlurm: + def __init__(self, spec: SubmitterSlurmSpec) -> None: + self._spec = spec + + def submit(self) -> None: + # TODO this should receive basically the main command, create the slurm script and then submit it... + # -> specific configuration is needed + pass + + def compose_script(self, out: TextIO) -> None: + print("#!/bin/bash", file=out) + for arg in self._spec.to_slurm_args(): + print(f"#SBATCH {arg}", file=out) diff --git a/pyproject.toml b/pyproject.toml index 100ecb51..fad8b2a2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,7 @@ dev = [ "licensecheck", "nox", "uv", + "ipython", ] doc = ["mkdocs", "mkdocs-material", "mkdocstrings[python]"] test = ["pytest", "pytest-mock", "logot"] diff --git a/tests/app_runner/specs/app/test_submitters.yml b/tests/app_runner/specs/app/test_submitters.yml new file mode 100644 index 00000000..3723ad11 --- /dev/null +++ b/tests/app_runner/specs/app/test_submitters.yml @@ -0,0 +1,8 @@ +submitters: + submitter: + type: "slurm22" + n_cpus: 1 + partition: xyz + memory: 1G + local_script_dir: "/tmp/local_script_dir" + worker_scratch_dir: "/tmp/worker_scratch_dir" diff --git a/tests/app_runner/specs/test_submitter_spec.py b/tests/app_runner/specs/test_submitter_spec.py new file mode 100644 index 00000000..c99b5e8d --- /dev/null +++ b/tests/app_runner/specs/test_submitter_spec.py @@ -0,0 +1,43 @@ +import pytest + +from app_runner.specs.submitter_spec import SubmitterSlurmSpec +from app_runner.specs.submitter_ref import SubmitterRef + + +@pytest.fixture +def spec_base_data() -> dict[str, str | int]: + return { + "partition": "prx", + "nodelist": "fgcz-r-033", + "n_nodes": 1, + "n_tasks": 1, + "n_cpus": 1, + "memory": "1G", + "custom_args": {}, + "local_script_dir": "/home/bfabric/prx", + "worker_scratch_dir": "/scratch", + } + + +@pytest.fixture +def spec_base(spec_base_data) -> SubmitterSlurmSpec: + return SubmitterSlurmSpec.model_validate(spec_base_data) + + +@pytest.fixture +def specs_base(spec_base) -> dict[str, SubmitterSlurmSpec]: + return {"slurm": spec_base} + + +# TODO +# def test_ban_extra_arg_in_main_spec(spec_base_data): +# spec_base_data["custom_args"]["cpus-per-task"] = "5" +# with pytest.raises(RuntimeError): +# SubmitterSlurmSpec.model_validate(spec_base_data) + + +def test_resolve(specs_base): + ref = SubmitterRef(name="slurm", config={"n_nodes": 2}) + resolved = ref.resolve(specs_base) + assert resolved.memory == "1G" + assert resolved.n_nodes == 2