Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for sequences into exp.start(), and unpack iterables #712

Merged
merged 11 commits into from
Oct 4, 2024
20 changes: 18 additions & 2 deletions smartsim/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@
logger = get_logger(__name__)


def _unpack(jobs: t.Sequence[t.Tuple[Job]] | t.Sequence[str]) -> t.Iterator[str]:
juliaputko marked this conversation as resolved.
Show resolved Hide resolved
"""Unpack any iterable input into exp.start in order to obtain a
single sequence of jobs that can be launched"""
for item in jobs:

if isinstance(item, t.Iterable):
yield from _unpack(item)
else:
yield item
juliaputko marked this conversation as resolved.
Show resolved Hide resolved


def _exp_path_map(exp: "Experiment") -> str:
"""Mapping function for use by method contextualizer to place the path of
the currently-executing experiment into context for log enrichment"""
Expand Down Expand Up @@ -151,14 +162,19 @@ def __init__(self, name: str, exp_path: str | None = None):
experiment
"""

def start(self, *jobs: Job) -> tuple[LaunchedJobID, ...]:
def start(
self, *jobs: Job | t.Sequence[t.Tuple[Job]] | t.Sequence[Job]
) -> tuple[LaunchedJobID, ...]:
juliaputko marked this conversation as resolved.
Show resolved Hide resolved
"""Execute a collection of `Job` instances.

:param jobs: A collection of other job instances to start
:returns: A sequence of ids with order corresponding to the sequence of
jobs that can be used to query or alter the status of that
particular execution of the job.
"""
# If item is instance iterable then unpack and extend the list
juliaputko marked this conversation as resolved.
Show resolved Hide resolved
if isinstance(jobs, t.Iterable):
jobs = list(tuple(_unpack(jobs)))
juliaputko marked this conversation as resolved.
Show resolved Hide resolved
# Create the run id
run_id = datetime.datetime.now().replace(microsecond=0).isoformat()
# Generate the root path
juliaputko marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -170,7 +186,7 @@ def _dispatch(
generator: Generator,
dispatcher: dispatch.Dispatcher,
job: Job,
*jobs: Job,
*jobs: Job | t.Sequence[t.Tuple[Job]],
) -> tuple[LaunchedJobID, ...]:
"""Dispatch a series of jobs with a particular dispatcher

Expand Down
155 changes: 154 additions & 1 deletion tests/test_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@
import time
import typing as t
import uuid
from os import path as osp

import pytest

from smartsim._core import dispatch
from smartsim._core.control.interval import SynchronousTimeInterval
from smartsim._core.control.launch_history import LaunchHistory
from smartsim._core.utils.launcher import LauncherProtocol, create_job_id
from smartsim.entity import entity
from smartsim.entity.application import Application
from smartsim.entity.ensemble import Ensemble
from smartsim.error import errors
from smartsim.experiment import Experiment
from smartsim.launchable import job
Expand All @@ -51,6 +53,17 @@

pytestmark = pytest.mark.group_a

_ID_GENERATOR = (str(i) for i in itertools.count())


@pytest.fixture
def get_gen_symlink_dir(fileutils):
yield fileutils.get_test_conf_path(osp.join("generator_files", "to_symlink_dir"))
juliaputko marked this conversation as resolved.
Show resolved Hide resolved


def random_id():
return next(_ID_GENERATOR)


@pytest.fixture
def experiment(monkeypatch, test_dir, dispatcher):
Expand Down Expand Up @@ -611,3 +624,143 @@ def test_experiment_stop_does_not_raise_on_unknown_job_id(
assert stat == InvalidJobStatus.NEVER_STARTED
after_cancel = exp.get_status(*all_known_ids)
assert before_cancel == after_cancel


def test_start_sequence_5(
test_dir,
wlmutils,
monkeypatch,
):
monkeypatch.setattr(
"smartsim._core.dispatch._LauncherAdapter.start",
lambda launch, exe, job_execution_path, env, out, err: random_id(),
)
"""Test the unpacking of a tuple of tuples
exp.start(job1, (job2, job3))
"""
juliaputko marked this conversation as resolved.
Show resolved Hide resolved
ensemble = Ensemble("ensemble-name", "echo", replicas=2)

launch_settings = launchSettings.LaunchSettings(wlmutils.get_test_launcher())
job_list = ensemble.as_jobs(launch_settings)

exp = Experiment(name="exp_name", exp_path=test_dir)

exp.start(job_list)


def test_start_with_two_element_tuple(
test_dir,
wlmutils,
monkeypatch,
):
"""Test unpacking a tuple of two jobs"""

monkeypatch.setattr(
"smartsim._core.dispatch._LauncherAdapter.start",
lambda launch, exe, job_execution_path, env, out, err: random_id(),
)
ensemble = Ensemble("ensemble-name", "echo", replicas=2)

application = Application(
"test_name",
exe="echo",
exe_args=["spam", "eggs"],
)

launch_settings = launchSettings.LaunchSettings(wlmutils.get_test_launcher())
job_list = ensemble.as_jobs(launch_settings)

job2 = job.Job(application, launch_settings)
exp = Experiment(name="exp_name", exp_path=test_dir)

exp.start((job2, job_list))


def test_start_with_nested_tuple(
test_dir,
wlmutils,
monkeypatch,
):
"""Test unpacking a nested tuple"""
monkeypatch.setattr(
"smartsim._core.dispatch._LauncherAdapter.start",
lambda launch, exe, job_execution_path, env, out, err: random_id(),
)
ensemble = Ensemble("ensemble-name", "echo", replicas=2)

launch_settings = launchSettings.LaunchSettings(wlmutils.get_test_launcher())
job_list = ensemble.as_jobs(launch_settings)

application = Application(
"test_name",
exe="echo",
exe_args=["spam", "eggs"],
)

job2 = job.Job(application, launch_settings)

application_2 = Application(
"test_name_2",
exe="echo",
exe_args=["spam", "eggs"],
)
job_3 = job.Job(application_2, launch_settings)

exp = Experiment(name="exp_name", exp_path=test_dir)

exp.start((job2, (job_list, job_3)))


def test_start_with_one_element_tuple(
test_dir,
wlmutils,
monkeypatch,
):
"""Test unpacking a tuple of one job"""
monkeypatch.setattr(
"smartsim._core.dispatch._LauncherAdapter.start",
lambda launch, exe, job_execution_path, env, out, err: random_id(),
)
ensemble = Ensemble("ensemble-name", "echo", replicas=2)

launch_settings = launchSettings.LaunchSettings(wlmutils.get_test_launcher())
job_list = ensemble.as_jobs(launch_settings)

exp = Experiment(name="exp_name", exp_path=test_dir)

exp.start((job_list))
juliaputko marked this conversation as resolved.
Show resolved Hide resolved


def test_start_list_of_jobs(
test_dir,
wlmutils,
monkeypatch,
):
"""Test unpacking a list of tuples"""
monkeypatch.setattr(
"smartsim._core.dispatch._LauncherAdapter.start",
lambda launch, exe, job_execution_path, env, out, err: random_id(),
)
ensemble = Ensemble("ensemble-name", "echo", replicas=2)

launch_settings = launchSettings.LaunchSettings(wlmutils.get_test_launcher())
job_list = ensemble.as_jobs(launch_settings)

application = Application(
"test_name",
exe="echo",
exe_args=["spam", "eggs"],
)

job2 = job.Job(application, launch_settings)

application_2 = Application(
"test_name_2",
exe="echo",
exe_args=["spam", "eggs"],
)
job_3 = job.Job(application_2, launch_settings)

exp = Experiment(name="exp_name", exp_path=test_dir)

exp.start([job2, (job_list, job_3)])
Loading