Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/smartsim-refactor' into wait-f…
Browse files Browse the repository at this point in the history
…or-job-end
  • Loading branch information
MattToast committed Aug 22, 2024
2 parents cc12ebb + 0569c05 commit a6c5799
Show file tree
Hide file tree
Showing 17 changed files with 311 additions and 192 deletions.
2 changes: 1 addition & 1 deletion smartsim/_core/control/launch_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from smartsim._core.utils import helpers as _helpers

if t.TYPE_CHECKING:
from smartsim._core.dispatch import LauncherProtocol
from smartsim._core.utils.launcher import LauncherProtocol
from smartsim.types import LaunchedJobID


Expand Down
172 changes: 4 additions & 168 deletions smartsim/_core/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,23 @@

from __future__ import annotations

import abc
import collections.abc
import dataclasses
import os
import subprocess as sp
import typing as t
import uuid

import psutil
from typing_extensions import Self, TypeAlias, TypeVarTuple, Unpack

from smartsim._core.utils import helpers
from smartsim.error import errors
from smartsim.status import JobStatus
from smartsim.types import LaunchedJobID

if t.TYPE_CHECKING:
from smartsim._core.utils.launcher import ExecutableProtocol, LauncherProtocol
from smartsim.experiment import Experiment
from smartsim.settings.arguments import LaunchArguments

_Ts = TypeVarTuple("_Ts")
_T_contra = t.TypeVar("_T_contra", contravariant=True)


_WorkingDirectory: TypeAlias = t.Union[str, os.PathLike[str]]
"""A working directory represented as a string or PathLike object"""
Expand Down Expand Up @@ -75,6 +70,7 @@
_LaunchConfigType: TypeAlias = (
"_LauncherAdapter[ExecutableProtocol, _WorkingDirectory, _EnvironMappingType]"
)

"""A launcher adapater that has configured a launcher to launch the components
of a job with some pre-determined launch settings
"""
Expand Down Expand Up @@ -232,7 +228,7 @@ def create_new_launcher_configuration(
self, for_experiment: Experiment, with_arguments: _DispatchableT
) -> _LaunchConfigType:
"""Create a new instance of a launcher for an experiment that the
provided settings where set to dispatch to, and configure it with the
provided settings were set to dispatch, and configure it with the
provided launch settings.
:param for_experiment: The experiment responsible creating the launcher
Expand Down Expand Up @@ -376,163 +372,3 @@ def start(self, *args: Unpack[_Ts]) -> LaunchedJobID:
"""Function that can be used as a decorator to add a dispatch registration into
`DEFAULT_DISPATCHER`.
"""


# >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
# TODO: move these to a common module under `smartsim._core.launcher`
# -----------------------------------------------------------------------------


def create_job_id() -> LaunchedJobID:
return LaunchedJobID(str(uuid.uuid4()))


class ExecutableProtocol(t.Protocol):
def as_program_arguments(self) -> t.Sequence[str]: ...


class LauncherProtocol(collections.abc.Hashable, t.Protocol[_T_contra]):
"""The protocol defining a launcher that can be used by a SmartSim
experiment
"""

@classmethod
@abc.abstractmethod
def create(cls, exp: Experiment, /) -> Self:
"""Create an new launcher instance from and to be used by the passed in
experiment instance
:param: An experiment to use the newly created launcher instance
:returns: The newly constructed launcher instance
"""

@abc.abstractmethod
def start(self, launchable: _T_contra, /) -> LaunchedJobID:
"""Given input that this launcher understands, create a new process and
issue a launched job id to query the status of the job in future.
:param launchable: The input to start a new process
:returns: The id to query the status of the process in future
"""

@abc.abstractmethod
def get_status(
self, *launched_ids: LaunchedJobID
) -> t.Mapping[LaunchedJobID, JobStatus]:
"""Given a collection of launched job ids, return a mapping of id to
current status of the launched job. If a job id is no recognized by the
launcher, a `smartsim.error.errors.LauncherJobNotFound` error should be
raised.
:param launched_ids: The collection of ids of launched jobs to query
for current status
:raises smartsim.error.errors.LauncherJobNotFound: If at least one of
the ids of the `launched_ids` collection is not recognized.
:returns: A mapping of launched id to current status
"""


def make_shell_format_fn(
run_command: str | None,
) -> _FormatterType[LaunchArguments, tuple[str | os.PathLike[str], t.Sequence[str]]]:
"""A function that builds a function that formats a `LaunchArguments` as a
shell executable sequence of strings for a given launching utility.
Example usage:
.. highlight:: python
.. code-block:: python
echo_hello_world: ExecutableProtocol = ...
env = {}
slurm_args: SlurmLaunchArguments = ...
slurm_args.set_nodes(3)
as_srun_command = make_shell_format_fn("srun")
fmt_cmd = as_srun_command(slurm_args, echo_hello_world, env)
print(list(fmt_cmd))
# prints: "['srun', '--nodes=3', '--', 'echo', 'Hello World!']"
.. note::
This function was/is a kind of slap-dash implementation, and is likely
to change or be removed entierely as more functionality is added to the
shell launcher. Use with caution and at your own risk!
:param run_command: Name or path of the launching utility to invoke with
the arguments.
:returns: A function to format an arguments, an executable, and an
environment as a shell launchable sequence for strings.
"""

def impl(
args: LaunchArguments,
exe: ExecutableProtocol,
path: str | os.PathLike[str],
_env: _EnvironMappingType,
) -> t.Tuple[str | os.PathLike[str], t.Sequence[str]]:
return path, (
(
run_command,
*(args.format_launch_args() or ()),
"--",
*exe.as_program_arguments(),
)
if run_command is not None
else exe.as_program_arguments()
)

return impl


class ShellLauncher:
"""Mock launcher for launching/tracking simple shell commands"""

def __init__(self) -> None:
self._launched: dict[LaunchedJobID, sp.Popen[bytes]] = {}

def start(
self, command: tuple[str | os.PathLike[str], t.Sequence[str]]
) -> LaunchedJobID:
id_ = create_job_id()
path, args = command
exe, *rest = args
# pylint: disable-next=consider-using-with
self._launched[id_] = sp.Popen((helpers.expand_exe_path(exe), *rest), cwd=path)
return id_

def get_status(
self, *launched_ids: LaunchedJobID
) -> t.Mapping[LaunchedJobID, JobStatus]:
return {id_: self._get_status(id_) for id_ in launched_ids}

def _get_status(self, id_: LaunchedJobID, /) -> JobStatus:
if (proc := self._launched.get(id_)) is None:
msg = f"Launcher `{self}` has not launched a job with id `{id_}`"
raise errors.LauncherJobNotFound(msg)
ret_code = proc.poll()
if ret_code is None:
status = psutil.Process(proc.pid).status()
return {
psutil.STATUS_RUNNING: JobStatus.RUNNING,
psutil.STATUS_SLEEPING: JobStatus.RUNNING,
psutil.STATUS_WAKING: JobStatus.RUNNING,
psutil.STATUS_DISK_SLEEP: JobStatus.RUNNING,
psutil.STATUS_DEAD: JobStatus.FAILED,
psutil.STATUS_TRACING_STOP: JobStatus.PAUSED,
psutil.STATUS_WAITING: JobStatus.PAUSED,
psutil.STATUS_STOPPED: JobStatus.PAUSED,
psutil.STATUS_LOCKED: JobStatus.PAUSED,
psutil.STATUS_PARKED: JobStatus.PAUSED,
psutil.STATUS_IDLE: JobStatus.PAUSED,
psutil.STATUS_ZOMBIE: JobStatus.COMPLETED,
}.get(status, JobStatus.UNKNOWN)
if ret_code == 0:
return JobStatus.COMPLETED
return JobStatus.FAILED

@classmethod
def create(cls, _: Experiment) -> Self:
return cls()


# <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
4 changes: 3 additions & 1 deletion smartsim/_core/launcher/dragon/dragonLauncher.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@
if t.TYPE_CHECKING:
from typing_extensions import Self

from smartsim._core.utils.launcher import ExecutableProtocol
from smartsim.experiment import Experiment


logger = get_logger(__name__)


Expand Down Expand Up @@ -355,7 +357,7 @@ def _assert_schema_type(obj: object, typ: t.Type[_SchemaT], /) -> _SchemaT:
return obj


from smartsim._core.dispatch import ExecutableProtocol, dispatch
from smartsim._core.dispatch import dispatch

# >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
# TODO: Remove this registry and move back to builder file after fixing
Expand Down
25 changes: 25 additions & 0 deletions smartsim/_core/shell/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# BSD 2-Clause License
#
# Copyright (c) 2021-2024, Hewlett Packard Enterprise
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Loading

0 comments on commit a6c5799

Please sign in to comment.