Skip to content

Commit

Permalink
Merge pull request #711 from djarecka/env
Browse files Browse the repository at this point in the history
adding the Singularity environment class
  • Loading branch information
djarecka authored Oct 5, 2023
2 parents 4d2098b + bd76c3d commit 9cc0904
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 395 deletions.
45 changes: 42 additions & 3 deletions pydra/engine/environments.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ def execute(self, task):


class Docker(Environment):
def __init__(
self, image, tag="latest", binds=None, output_cpath="/output_pydra", xargs=None
):
def __init__(self, image, tag="latest", output_cpath="/output_pydra", xargs=None):
self.image = image
self.tag = tag
self.xargs = xargs
Expand Down Expand Up @@ -82,3 +80,44 @@ def execute(self, task, root="/mnt/pydra"):
# to be de-rooted
# task.finalize_outputs("/mnt/pydra") TODO: probably don't need it
return output


class Singularity(Docker):
def execute(self, task, root="/mnt/pydra"):
# XXX Need to mount all input locations
singularity_img = f"{self.image}:{self.tag}"
# TODO ?
# Skips over any inputs in task.cache_dir
# Needs to include `out_file`s when not relative to working dir
# Possibly a `TargetFile` type to distinguish between `File` and `str`?
mounts = task.get_bindings(root=root)

# todo adding xargsy etc
singularity_args = [
"singularity",
"exec",
"-B",
self.bind(task.cache_dir, "rw"),
]
singularity_args.extend(
" ".join(
[f"-B {key}:{val[0]}:{val[1]}" for (key, val) in mounts.items()]
).split()
)
singularity_args.extend(["--pwd", f"{root}{task.output_dir}"])
keys = ["return_code", "stdout", "stderr"]

values = execute(
singularity_args + [singularity_img] + task.command_args(root="/mnt/pydra"),
strip=task.strip,
)
output = dict(zip(keys, values))
if output["return_code"]:
if output["stderr"]:
raise RuntimeError(output["stderr"])

Check warning on line 117 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L117

Added line #L117 was not covered by tests
else:
raise RuntimeError(output["stdout"])

Check warning on line 119 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L119

Added line #L119 was not covered by tests
# Any outputs that have been created with a re-rooted path need
# to be de-rooted
# task.finalize_outputs("/mnt/pydra") TODO: probably don't need it
return output
7 changes: 0 additions & 7 deletions pydra/engine/specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -693,13 +693,6 @@ class ContainerSpec(ShellSpec):
)


@attr.s(auto_attribs=True, kw_only=True)
class SingularitySpec(ContainerSpec):
"""Particularize container specifications to Singularity."""

container: str = attr.ib("singularity", metadata={"help_string": "container type"})


@attr.s
class LazyInterface:
_task: "core.TaskBase" = attr.ib()
Expand Down
81 changes: 0 additions & 81 deletions pydra/engine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
ShellSpec,
ShellOutSpec,
ContainerSpec,
SingularitySpec,
attr_fields,
)
from .helpers import (
Expand Down Expand Up @@ -707,86 +706,6 @@ def _prepare_bindings(self):
SUPPORTED_COPY_MODES = FileSet.CopyMode.any - FileSet.CopyMode.symlink


class SingularityTask(ContainerTask):
"""Extend shell command task for containerized execution with Singularity."""

init = False

def __init__(
self,
name=None,
audit_flags: AuditFlag = AuditFlag.NONE,
cache_dir=None,
input_spec: ty.Optional[SpecInfo] = None,
messenger_args=None,
messengers=None,
output_spec: ty.Optional[SpecInfo] = None,
rerun=False,
strip=False,
**kwargs,
):
"""
Initialize this task.
Parameters
----------
name : :obj:`str`
Name of this task.
audit_flags : :obj:`pydra.utils.messenger.AuditFlag`
Auditing configuration
cache_dir : :obj:`os.pathlike`
Cache directory
input_spec : :obj:`pydra.engine.specs.SpecInfo`
Specification of inputs.
messenger_args :
TODO
messengers :
TODO
output_spec : :obj:`pydra.engine.specs.BaseSpec`
Specification of inputs.
strip : :obj:`bool`
TODO
"""
if not self.init:
if input_spec is None:
input_spec = SpecInfo(
name="Inputs", fields=[], bases=(SingularitySpec,)
)
super().__init__(
name=name,
input_spec=input_spec,
output_spec=output_spec,
audit_flags=audit_flags,
messengers=messengers,
messenger_args=messenger_args,
cache_dir=cache_dir,
strip=strip,
rerun=rerun,
**kwargs,
)
self.init = True

@property
def container_args(self):
"""Get container-specific CLI arguments."""
if is_lazy(self.inputs):
raise Exception("can't return container_args, self.inputs has LazyFields")
self.container_check("singularity")
if self.state:
raise NotImplementedError

cargs = ["singularity", "exec"]

if self.inputs.container_xargs is not None:
cargs.extend(self.inputs.container_xargs)

cargs.extend(self.binds("-B"))
cargs.extend(["--pwd", str(self.output_cpath)])
cargs.append(self.inputs.image)
return cargs


def split_cmd(cmd: str):
"""Splits a shell command line into separate arguments respecting quotes
Expand Down
70 changes: 68 additions & 2 deletions pydra/engine/tests/test_environments.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
from pathlib import Path

from ..environments import Native, Docker
from ..environments import Native, Docker, Singularity
from ..task import ShellCommandTask
from ..submitter import Submitter
from ..specs import (
ShellSpec,
SpecInfo,
File,
)
from .utils import no_win, need_docker
from .utils import no_win, need_docker, need_singularity

import attr

Expand Down Expand Up @@ -109,6 +109,72 @@ def test_docker_1_subm(tmp_path, plugin):
assert env_res == shelly_call.result().output.__dict__


@no_win
@need_singularity
def test_singularity_1(tmp_path):
"""singularity env: simple command, no arguments"""
newcache = lambda x: makedir(tmp_path, x)

cmd = ["whoami"]
sing = Singularity(image="docker://alpine")
shelly = ShellCommandTask(
name="shelly", executable=cmd, cache_dir=newcache("shelly")
)
assert shelly.cmdline == " ".join(cmd)
env_res = sing.execute(shelly)

shelly_env = ShellCommandTask(
name="shelly",
executable=cmd,
cache_dir=newcache("shelly_env"),
environment=sing,
)
shelly_env()
assert env_res == shelly_env.output_ == shelly_env.result().output.__dict__

shelly_call = ShellCommandTask(
name="shelly", executable=cmd, cache_dir=newcache("shelly_call")
)
shelly_call(environment=sing)
assert env_res == shelly_call.output_ == shelly_call.result().output.__dict__


@no_win
@need_singularity
def test_singularity_1_subm(tmp_path, plugin):
"""docker env with submitter: simple command, no arguments"""
newcache = lambda x: makedir(tmp_path, x)

cmd = ["whoami"]
sing = Singularity(image="docker://alpine")
shelly = ShellCommandTask(
name="shelly", executable=cmd, cache_dir=newcache("shelly")
)
assert shelly.cmdline == " ".join(cmd)
env_res = sing.execute(shelly)

shelly_env = ShellCommandTask(
name="shelly",
executable=cmd,
cache_dir=newcache("shelly_env"),
environment=sing,
)
with Submitter(plugin=plugin) as sub:
shelly_env(submitter=sub)
assert env_res == shelly_env.result().output.__dict__

shelly_call = ShellCommandTask(
name="shelly", executable=cmd, cache_dir=newcache("shelly_call")
)
with Submitter(plugin=plugin) as sub:
shelly_call(submitter=sub, environment=sing)
for key in [
"stdout",
"return_code",
]: # singularity gives info about cashed image in stderr
assert env_res[key] == shelly_call.result().output.__dict__[key]


def create_shelly_inputfile(tempdir, filename, name, executable):
"""creating a task with a simple input_spec"""
my_input_spec = SpecInfo(
Expand Down
Loading

0 comments on commit 9cc0904

Please sign in to comment.