Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rationaizing Resource Specification errors acorss executors. #3627

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions parsl/executors/errors.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
"""Exceptions raise by Executors."""
from typing import Set

from parsl.errors import ParslError
from parsl.executors.base import ParslExecutor

Expand Down Expand Up @@ -44,6 +46,17 @@ def __str__(self):
self.current_executor)


class InvalidResourceSpecificationError(ExecutorError):
"""Error raised when Invalid input is supplied via resource Specification"""

def __init__(self, invalid_keys: Set[str], message: str = ''):
self.invalid_keys = invalid_keys
self.message = message

def __str__(self):
return f"Invalid Resource Specification Supplied: {self.invalid_keys}. {self.message}"


class ScalingFailed(ExecutorError):
"""Scaling failed due to error in Execution provider."""

Expand Down
13 changes: 7 additions & 6 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@
from parsl.addresses import get_all_addresses
from parsl.app.errors import RemoteExceptionWrapper
from parsl.data_provider.staging import Staging
from parsl.executors.errors import BadMessage, ScalingFailed
from parsl.executors.errors import (
BadMessage,
InvalidResourceSpecificationError,
ScalingFailed,
)
from parsl.executors.high_throughput import zmq_pipes
from parsl.executors.high_throughput.errors import CommandClientTimeoutError
from parsl.executors.high_throughput.manager_selector import (
ManagerSelector,
RandomManagerSelector,
)
from parsl.executors.high_throughput.mpi_prefix_composer import (
InvalidResourceSpecification,
)
from parsl.executors.status_handling import BlockProviderExecutor
from parsl.jobs.states import TERMINAL_STATES, JobState, JobStatus
from parsl.process_loggers import wrap_with_logs
Expand Down Expand Up @@ -341,9 +342,9 @@ def worker_logdir(self):

def validate_resource_spec(self, resource_specification: dict):
"""HTEX does not support *any* resource_specification options and
will raise InvalidResourceSpecification is any are passed to it"""
will raise InvalidResourceSpecificationError is any are passed to it"""
if resource_specification:
raise InvalidResourceSpecification(
raise InvalidResourceSpecificationError(
set(resource_specification.keys()),
("HTEX does not support the supplied resource_specifications."
"For MPI applications consider using the MPIExecutor. "
Expand Down
32 changes: 7 additions & 25 deletions parsl/executors/high_throughput/mpi_prefix_composer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging
from typing import Dict, List, Set, Tuple
from typing import Dict, List, Tuple

from parsl.executors.errors import InvalidResourceSpecificationError

logger = logging.getLogger(__name__)

Expand All @@ -8,39 +10,19 @@
'mpiexec')


class MissingResourceSpecification(Exception):
"""Exception raised when input is not supplied a resource specification"""

def __init__(self, reason: str):
self.reason = reason

def __str__(self):
return f"Missing resource specification: {self.reason}"


class InvalidResourceSpecification(Exception):
"""Exception raised when Invalid input is supplied via resource specification"""

def __init__(self, invalid_keys: Set[str], message: str = ''):
self.invalid_keys = invalid_keys
self.message = message

def __str__(self):
return f"Invalid resource specification options supplied: {self.invalid_keys} {self.message}"


def validate_resource_spec(resource_spec: Dict[str, str]):
"""Basic validation of keys in the resource_spec

Raises: InvalidResourceSpecification if the resource_spec
Raises: InvalidResourceSpecificationError if the resource_spec
is invalid (e.g, contains invalid keys)
"""
user_keys = set(resource_spec.keys())

# empty resource_spec when mpi_mode is set causes parsl to hang
# ref issue #3427
if len(user_keys) == 0:
raise MissingResourceSpecification('MPI mode requires optional parsl_resource_specification keyword argument to be configured')
raise InvalidResourceSpecificationError(user_keys,
'MPI mode requires optional parsl_resource_specification keyword argument to be configured')

legal_keys = set(("ranks_per_node",
"num_nodes",
Expand All @@ -49,7 +31,7 @@ def validate_resource_spec(resource_spec: Dict[str, str]):
))
invalid_keys = user_keys - legal_keys
if invalid_keys:
raise InvalidResourceSpecification(invalid_keys)
raise InvalidResourceSpecificationError(invalid_keys=invalid_keys)
if "num_nodes" in resource_spec:
if not resource_spec.get("num_ranks") and resource_spec.get("ranks_per_node"):
resource_spec["num_ranks"] = str(int(resource_spec["num_nodes"]) * int(resource_spec["ranks_per_node"]))
Expand Down
5 changes: 3 additions & 2 deletions parsl/executors/threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from parsl.data_provider.staging import Staging
from parsl.executors.base import ParslExecutor
from parsl.executors.errors import UnsupportedFeatureError
from parsl.executors.errors import InvalidResourceSpecificationError
from parsl.utils import RepresentationMixin

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -54,7 +54,8 @@ def submit(self, func, resource_specification, *args, **kwargs):
if resource_specification:
logger.error("Ignoring the resource specification. "
"Parsl resource specification is not supported in ThreadPool Executor.")
raise UnsupportedFeatureError('resource specification', 'ThreadPool Executor', None)
raise InvalidResourceSpecificationError(set(resource_specification.keys()),
"Parsl resource specification is not supported in ThreadPool Executor.")

return self.executor.submit(func, *args, **kwargs)

Expand Down
11 changes: 6 additions & 5 deletions parsl/executors/workqueue/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from parsl.data_provider.files import File
from parsl.data_provider.staging import Staging
from parsl.errors import OptionalModuleMissing
from parsl.executors.errors import ExecutorError
from parsl.executors.errors import ExecutorError, InvalidResourceSpecificationError
from parsl.executors.status_handling import BlockProviderExecutor
from parsl.executors.workqueue import exec_parsl_function
from parsl.process_loggers import wrap_with_logs
Expand Down Expand Up @@ -419,7 +419,7 @@ def submit(self, func, resource_specification, *args, **kwargs):
message = "Task resource specification only accepts these types of resources: {}".format(
', '.join(acceptable_fields))
logger.error(message)
raise ExecutorError(self, message)
raise InvalidResourceSpecificationError(keys, message)

# this checks that either all of the required resource types are specified, or
# that none of them are: the `required_resource_types` are not actually required,
Expand All @@ -430,9 +430,10 @@ def submit(self, func, resource_specification, *args, **kwargs):
logger.error("Running with `autolabel=False`. In this mode, "
"task resource specification requires "
"three resources to be specified simultaneously: cores, memory, and disk")
raise ExecutorError(self, "Task resource specification requires "
"three resources to be specified simultaneously: cores, memory, and disk. "
"Try setting autolabel=True if you are unsure of the resource usage")
raise InvalidResourceSpecificationError(keys,
"Task resource specification requires "
"three resources to be specified simultaneously: cores, memory, and disk. "
"Try setting autolabel=True if you are unsure of the resource usage")

for k in keys:
if k == 'cores':
Expand Down
27 changes: 15 additions & 12 deletions parsl/tests/test_error_handling/test_resource_spec.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import parsl
from parsl.app.app import python_app
from parsl.executors import WorkQueueExecutor
from parsl.executors.errors import ExecutorError, UnsupportedFeatureError
from parsl.executors.high_throughput.executor import HighThroughputExecutor
from parsl.executors.high_throughput.mpi_prefix_composer import (
InvalidResourceSpecification,
from parsl.executors.errors import (
InvalidResourceSpecificationError,
UnsupportedFeatureError,
)
from parsl.executors.high_throughput.executor import HighThroughputExecutor
from parsl.executors.threads import ThreadPoolExecutor


@python_app
Expand All @@ -26,22 +27,24 @@ def test_resource(n=2):
fut = double(n, parsl_resource_specification=spec)
try:
fut.result()
except InvalidResourceSpecification:
assert isinstance(executor, HighThroughputExecutor)
except InvalidResourceSpecificationError:
assert (
isinstance(executor, HighThroughputExecutor) or
isinstance(executor, WorkQueueExecutor) or
isinstance(executor, ThreadPoolExecutor))
except UnsupportedFeatureError:
assert not isinstance(executor, WorkQueueExecutor)
except Exception as e:
assert isinstance(e, ExecutorError)

# Specify resources with wrong types
# 'cpus' is incorrect, should be 'cores'
spec = {'cpus': 2, 'memory': 1000, 'disk': 1000}
fut = double(n, parsl_resource_specification=spec)
try:
fut.result()
except InvalidResourceSpecification:
assert isinstance(executor, HighThroughputExecutor)
except InvalidResourceSpecificationError:
assert (
isinstance(executor, HighThroughputExecutor) or
isinstance(executor, WorkQueueExecutor) or
isinstance(executor, ThreadPoolExecutor))
except UnsupportedFeatureError:
assert not isinstance(executor, WorkQueueExecutor)
except Exception as e:
assert isinstance(e, ExecutorError)
6 changes: 2 additions & 4 deletions parsl/tests/test_htex/test_resource_spec_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
import pytest

from parsl.executors import HighThroughputExecutor
from parsl.executors.high_throughput.mpi_prefix_composer import (
InvalidResourceSpecification,
)
from parsl.executors.errors import InvalidResourceSpecificationError


def double(x):
Expand Down Expand Up @@ -36,5 +34,5 @@ def test_resource_spec_validation():
def test_resource_spec_validation_bad_keys():
htex = HighThroughputExecutor()

with pytest.raises(InvalidResourceSpecification):
with pytest.raises(InvalidResourceSpecificationError):
htex.validate_resource_spec({"num_nodes": 2})
6 changes: 2 additions & 4 deletions parsl/tests/test_mpi_apps/test_mpi_mode_enabled.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@
import parsl
from parsl import Config, bash_app, python_app
from parsl.executors import MPIExecutor
from parsl.executors.high_throughput.mpi_prefix_composer import (
MissingResourceSpecification,
)
from parsl.executors.errors import InvalidResourceSpecificationError
from parsl.launchers import SimpleLauncher
from parsl.providers import LocalProvider

Expand Down Expand Up @@ -185,6 +183,6 @@ def test_simulated_load(rounds: int = 100):
@pytest.mark.local
def test_missing_resource_spec():

with pytest.raises(MissingResourceSpecification):
with pytest.raises(InvalidResourceSpecificationError):
future = mock_app(sleep_dur=0.4)
future.result(timeout=10)
11 changes: 4 additions & 7 deletions parsl/tests/test_mpi_apps/test_resource_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,9 @@
import pytest

from parsl.app.app import python_app
from parsl.executors.errors import InvalidResourceSpecificationError
from parsl.executors.high_throughput.executor import HighThroughputExecutor
from parsl.executors.high_throughput.mpi_executor import MPIExecutor
from parsl.executors.high_throughput.mpi_prefix_composer import (
InvalidResourceSpecification,
MissingResourceSpecification,
)
from parsl.executors.high_throughput.mpi_resource_management import (
get_nodes_in_batchjob,
get_pbs_hosts_list,
Expand Down Expand Up @@ -104,8 +101,8 @@ def test_top_level():

({"num_nodes": 2, "ranks_per_node": 1}, None),
({"launcher_options": "--debug_foo"}, None),
({"num_nodes": 2, "BAD_OPT": 1}, InvalidResourceSpecification),
({}, MissingResourceSpecification),
({"num_nodes": 2, "BAD_OPT": 1}, InvalidResourceSpecificationError),
({}, InvalidResourceSpecificationError),
)
)
def test_mpi_resource_spec(resource_spec: Dict, exception):
Expand Down Expand Up @@ -137,5 +134,5 @@ def test_mpi_resource_spec_passed_to_htex(resource_spec: dict):
htex = HighThroughputExecutor()
htex.outgoing_q = mock.Mock(spec=queue.Queue)

with pytest.raises(InvalidResourceSpecification):
with pytest.raises(InvalidResourceSpecificationError):
htex.validate_resource_spec(resource_spec)
Loading