Skip to content

Commit

Permalink
Move MPI behavior from HTEX to MPIExecutor (#3582)
Browse files Browse the repository at this point in the history
This PR moves the following MPI related functionality and options from HTEX to MPIExecutor:

Kwarg options enable_mpi_mode and mpi_launcher is now removed from HTEX
Checks for launcher being set to SimpleLauncher
Checks for a valid mpi_launcher in now in MPIExecutor
A new validate_resource_specification method is added to HTEX that currently asserts that no resource_specification is passed to it, since HTEX does not support any such options
MPIExecutor overrides validate_resource_specification to check for a valid MPI resource specification
These changes should make it easier to have executor specific resource validation.

Changed Behaviour
HTEX kwarg enable_mpi_mode and mpi_launcher are no longer supported.
Expect to use MPI functionality only through the MPIExecutor
  • Loading branch information
yadudoc authored Aug 16, 2024
1 parent f135919 commit 123df51
Show file tree
Hide file tree
Showing 10 changed files with 171 additions and 145 deletions.
42 changes: 16 additions & 26 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

import typeguard

import parsl.launchers
from parsl import curvezmq
from parsl.addresses import get_all_addresses
from parsl.app.errors import RemoteExceptionWrapper
Expand All @@ -25,8 +24,7 @@
RandomManagerSelector,
)
from parsl.executors.high_throughput.mpi_prefix_composer import (
VALID_LAUNCHERS,
validate_resource_spec,
InvalidResourceSpecification,
)
from parsl.executors.status_handling import BlockProviderExecutor
from parsl.jobs.states import TERMINAL_STATES, JobState, JobStatus
Expand Down Expand Up @@ -224,17 +222,6 @@ class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin, UsageIn
Parsl will create names as integers starting with 0.
default: empty list
enable_mpi_mode: bool
If enabled, MPI launch prefixes will be composed for the batch scheduler based on
the nodes available in each batch job and the resource_specification dict passed
from the app. This is an experimental feature, please refer to the following doc section
before use: https://parsl.readthedocs.io/en/stable/userguide/mpi_apps.html
mpi_launcher: str
This field is only used if enable_mpi_mode is set. Select one from the
list of supported MPI launchers = ("srun", "aprun", "mpiexec").
default: "mpiexec"
"""

@typeguard.typechecked
Expand Down Expand Up @@ -263,8 +250,6 @@ def __init__(self,
poll_period: int = 10,
address_probe_timeout: Optional[int] = None,
worker_logdir_root: Optional[str] = None,
enable_mpi_mode: bool = False,
mpi_launcher: str = "mpiexec",
manager_selector: ManagerSelector = RandomManagerSelector(),
block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True,
encrypted: bool = False):
Expand Down Expand Up @@ -330,15 +315,6 @@ def __init__(self,
self.encrypted = encrypted
self.cert_dir = None

self.enable_mpi_mode = enable_mpi_mode
assert mpi_launcher in VALID_LAUNCHERS, \
f"mpi_launcher must be set to one of {VALID_LAUNCHERS}"
if self.enable_mpi_mode:
assert isinstance(self.provider.launcher, parsl.launchers.SimpleLauncher), \
"mpi_mode requires the provider to be configured to use a SimpleLauncher"

self.mpi_launcher = mpi_launcher

if not launch_cmd:
launch_cmd = DEFAULT_LAUNCH_CMD
self.launch_cmd = launch_cmd
Expand All @@ -348,6 +324,8 @@ def __init__(self,
self.interchange_launch_cmd = interchange_launch_cmd

radio_mode = "htex"
enable_mpi_mode: bool = False
mpi_launcher: str = "mpiexec"

def _warn_deprecated(self, old: str, new: str):
warnings.warn(
Expand Down Expand Up @@ -377,6 +355,18 @@ def worker_logdir(self):
return "{}/{}".format(self.worker_logdir_root, self.label)
return self.logdir

def validate_resource_spec(self, resource_specification: dict):
"""HTEX does not support *any* resource_specification options and
will raise InvalidResourceSpecification is any are passed to it"""
if resource_specification:
raise InvalidResourceSpecification(
set(resource_specification.keys()),
("HTEX does not support the supplied resource_specifications."
"For MPI applications consider using the MPIExecutor. "
"For specifications for core count/memory/walltime, consider using WorkQueueExecutor. ")
)
return

def initialize_scaling(self):
"""Compose the launch command and scale out the initial blocks.
"""
Expand Down Expand Up @@ -660,7 +650,7 @@ def submit(self, func, resource_specification, *args, **kwargs):
Future
"""

validate_resource_spec(resource_specification, self.enable_mpi_mode)
self.validate_resource_spec(resource_specification)

if self.bad_state_is_set:
raise self.executor_exception
Expand Down
25 changes: 23 additions & 2 deletions parsl/executors/high_throughput/mpi_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,13 @@
GENERAL_HTEX_PARAM_DOCS,
HighThroughputExecutor,
)
from parsl.executors.high_throughput.mpi_prefix_composer import (
VALID_LAUNCHERS,
validate_resource_spec,
)
from parsl.executors.status_handling import BlockProviderExecutor
from parsl.jobs.states import JobStatus
from parsl.launchers import SimpleLauncher
from parsl.providers import LocalProvider
from parsl.providers.base import ExecutionProvider

Expand All @@ -30,6 +35,11 @@ class MPIExecutor(HighThroughputExecutor):
max_workers_per_block: int
Maximum number of MPI applications to run at once per block
mpi_launcher: str
Select one from the list of supported MPI launchers:
("srun", "aprun", "mpiexec").
default: "mpiexec"
{GENERAL_HTEX_PARAM_DOCS}
"""

Expand Down Expand Up @@ -60,7 +70,6 @@ def __init__(self,
super().__init__(
# Hard-coded settings
cores_per_worker=1e-9, # Ensures there will be at least an absurd number of workers
enable_mpi_mode=True,
max_workers_per_node=max_workers_per_block,

# Everything else
Expand All @@ -82,9 +91,21 @@ def __init__(self,
poll_period=poll_period,
address_probe_timeout=address_probe_timeout,
worker_logdir_root=worker_logdir_root,
mpi_launcher=mpi_launcher,
block_error_handler=block_error_handler,
encrypted=encrypted
)
self.enable_mpi_mode = True
self.mpi_launcher = mpi_launcher

self.max_workers_per_block = max_workers_per_block

if not isinstance(self.provider.launcher, SimpleLauncher):
raise TypeError("mpi_mode requires the provider to be configured to use a SimpleLauncher")

if mpi_launcher not in VALID_LAUNCHERS:
raise ValueError(f"mpi_launcher set to:{mpi_launcher} must be set to one of {VALID_LAUNCHERS}")

self.mpi_launcher = mpi_launcher

def validate_resource_spec(self, resource_specification: dict):
return validate_resource_spec(resource_specification)
9 changes: 5 additions & 4 deletions parsl/executors/high_throughput/mpi_prefix_composer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ def __str__(self):
class InvalidResourceSpecification(Exception):
"""Exception raised when Invalid input is supplied via resource specification"""

def __init__(self, invalid_keys: Set[str]):
def __init__(self, invalid_keys: Set[str], message: str = ''):
self.invalid_keys = invalid_keys
self.message = message

def __str__(self):
return f"Invalid resource specification options supplied: {self.invalid_keys}"
return f"Invalid resource specification options supplied: {self.invalid_keys} {self.message}"


def validate_resource_spec(resource_spec: Dict[str, str], is_mpi_enabled: bool):
def validate_resource_spec(resource_spec: Dict[str, str]):
"""Basic validation of keys in the resource_spec
Raises: InvalidResourceSpecification if the resource_spec
Expand All @@ -38,7 +39,7 @@ def validate_resource_spec(resource_spec: Dict[str, str], is_mpi_enabled: bool):

# empty resource_spec when mpi_mode is set causes parsl to hang
# ref issue #3427
if is_mpi_enabled and len(user_keys) == 0:
if len(user_keys) == 0:
raise MissingResourceSpecification('MPI mode requires optional parsl_resource_specification keyword argument to be configured')

legal_keys = set(("ranks_per_node",
Expand Down
40 changes: 40 additions & 0 deletions parsl/tests/test_htex/test_resource_spec_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import queue
from unittest import mock

import pytest

from parsl.executors import HighThroughputExecutor
from parsl.executors.high_throughput.mpi_prefix_composer import (
InvalidResourceSpecification,
)


def double(x):
return x * 2


@pytest.mark.local
def test_submit_calls_validate():

htex = HighThroughputExecutor()
htex.outgoing_q = mock.Mock(spec=queue.Queue)
htex.validate_resource_spec = mock.Mock(spec=htex.validate_resource_spec)

res_spec = {}
htex.submit(double, res_spec, (5,), {})
htex.validate_resource_spec.assert_called()


@pytest.mark.local
def test_resource_spec_validation():
htex = HighThroughputExecutor()
ret_val = htex.validate_resource_spec({})
assert ret_val is None


@pytest.mark.local
def test_resource_spec_validation_bad_keys():
htex = HighThroughputExecutor()

with pytest.raises(InvalidResourceSpecification):
htex.validate_resource_spec({"num_nodes": 2})
43 changes: 29 additions & 14 deletions parsl/tests/test_mpi_apps/test_bad_mpi_config.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,48 @@
import pytest

from parsl import Config
from parsl.executors import HighThroughputExecutor
from parsl.executors import MPIExecutor
from parsl.launchers import AprunLauncher, SimpleLauncher, SrunLauncher
from parsl.providers import SlurmProvider


@pytest.mark.local
def test_bad_launcher_with_mpi_mode():
"""AssertionError if a launcher other than SimpleLauncher is supplied"""
def test_bad_launcher():
"""TypeError if a launcher other than SimpleLauncher is supplied"""

for launcher in [SrunLauncher(), AprunLauncher()]:
with pytest.raises(AssertionError):
with pytest.raises(TypeError):
Config(executors=[
HighThroughputExecutor(
enable_mpi_mode=True,
MPIExecutor(
provider=SlurmProvider(launcher=launcher),
)
])


@pytest.mark.local
def test_correct_launcher_with_mpi_mode():
def test_bad_mpi_launcher():
"""ValueError if an unsupported mpi_launcher is specified"""

with pytest.raises(ValueError):
Config(executors=[
MPIExecutor(
mpi_launcher="bad_launcher",
provider=SlurmProvider(launcher=SimpleLauncher()),
)
])


@pytest.mark.local
@pytest.mark.parametrize(
"mpi_launcher",
["srun", "aprun", "mpiexec"]
)
def test_correct_launcher_with_mpi_mode(mpi_launcher: str):
"""Confirm that SimpleLauncher works with mpi_mode"""

config = Config(executors=[
HighThroughputExecutor(
enable_mpi_mode=True,
provider=SlurmProvider(launcher=SimpleLauncher()),
)
])
assert isinstance(config.executors[0].provider.launcher, SimpleLauncher)
executor = MPIExecutor(
mpi_launcher=mpi_launcher,
provider=SlurmProvider(launcher=SimpleLauncher()),
)

assert isinstance(executor.provider.launcher, SimpleLauncher)
47 changes: 0 additions & 47 deletions parsl/tests/test_mpi_apps/test_mpi_mode_disabled.py

This file was deleted.

24 changes: 16 additions & 8 deletions parsl/tests/test_mpi_apps/test_mpi_mode_enabled.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,34 @@
import pytest

import parsl
from parsl import bash_app, python_app
from parsl import Config, bash_app, python_app
from parsl.executors import MPIExecutor
from parsl.executors.high_throughput.mpi_prefix_composer import (
MissingResourceSpecification,
)
from parsl.tests.configs.htex_local import fresh_config
from parsl.launchers import SimpleLauncher
from parsl.providers import LocalProvider

EXECUTOR_LABEL = "MPI_TEST"


def local_setup():
config = fresh_config()
config.executors[0].label = EXECUTOR_LABEL
config.executors[0].max_workers_per_node = 2
config.executors[0].enable_mpi_mode = True
config.executors[0].mpi_launcher = "mpiexec"

cwd = os.path.abspath(os.path.dirname(__file__))
pbs_nodefile = os.path.join(cwd, "mocks", "pbs_nodefile")

config.executors[0].provider.worker_init = f"export PBS_NODEFILE={pbs_nodefile}"
config = Config(
executors=[
MPIExecutor(
label=EXECUTOR_LABEL,
max_workers_per_block=2,
mpi_launcher="mpiexec",
provider=LocalProvider(
worker_init=f"export PBS_NODEFILE={pbs_nodefile}",
launcher=SimpleLauncher()
)
)
])

parsl.load(config)

Expand Down
5 changes: 2 additions & 3 deletions parsl/tests/test_mpi_apps/test_mpiex.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import pytest

import parsl
from parsl import Config, HighThroughputExecutor
from parsl.executors.high_throughput.mpi_executor import MPIExecutor
from parsl.launchers import SimpleLauncher
Expand Down Expand Up @@ -42,8 +41,8 @@ def test_docstring():
def test_init():
"""Ensure all relevant kwargs are copied over from HTEx"""

new_kwargs = {'max_workers_per_block'}
excluded_kwargs = {'available_accelerators', 'enable_mpi_mode', 'cores_per_worker', 'max_workers_per_node',
new_kwargs = {'max_workers_per_block', 'mpi_launcher'}
excluded_kwargs = {'available_accelerators', 'cores_per_worker', 'max_workers_per_node',
'mem_per_worker', 'cpu_affinity', 'max_workers', 'manager_selector'}

# Get the kwargs from both HTEx and MPIEx
Expand Down
Loading

0 comments on commit 123df51

Please sign in to comment.