Skip to content

Commit

Permalink
implement can_honor & tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ankona committed Aug 7, 2024
1 parent 577d58b commit d52c74d
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 38 deletions.
64 changes: 32 additions & 32 deletions smartsim/_core/launcher/dragon/dragonBackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def __str__(self) -> str:
def status_message(self) -> str:
"""Message with status of available nodes and history of launched jobs.
:returns: Status message
:returns: a status message
"""
return (
"Dragon server backend update\n"
Expand All @@ -274,9 +274,8 @@ def _heartbeat(self) -> None:

@property
def cooldown_period(self) -> int:
"""Time (in seconds) the server will wait before shutting down
when exit conditions are met (see ``should_shutdown()`` for further details).
"""Time (in seconds) the server will wait before shutting down when
exit conditions are met (see ``should_shutdown()`` for further details).
"""
return self._cooldown_period

Expand Down Expand Up @@ -310,21 +309,26 @@ def should_shutdown(self) -> bool:
and it requested immediate shutdown, or if it did not request immediate
shutdown, but all jobs have been executed.
In both cases, a cooldown period may need to be waited before shutdown.
:returns: `True` if the server should terminate, otherwise `False`
"""
if self._shutdown_requested and self._can_shutdown:
return self._has_cooled_down
return False

@property
def current_time(self) -> float:
"""Current time for DragonBackend object, in seconds since the Epoch"""
"""Current time for DragonBackend object, in seconds since the Epoch
:returns: the current timestamp"""
return time.time()

def _can_honor_policy(
self, request: DragonRunRequest
) -> t.Tuple[bool, t.Optional[str]]:
"""Check if the policy can be honored with resources available
in the allocation.
:param request: DragonRunRequest containing policy information
:returns: Tuple indicating if the policy can be honored and
an optional error message"""
Expand All @@ -350,32 +354,23 @@ def _can_honor(self, request: DragonRunRequest) -> t.Tuple[bool, t.Optional[str]
in the future it will also look at other constraints
such as memory, accelerators, and so on.
"""
if request.nodes > len(self._hosts):
message = f"Cannot satisfy request. Requested {request.nodes} nodes, "
message += f"but only {len(self._hosts)} nodes are available."
return False, message
if self._shutdown_requested:
message = "Cannot satisfy request, server is shutting down."
return False, message
honorable, err = self._can_honor_state(request)
if not honorable:
return False, err

honorable, err = self._can_honor_policy(request)
if not honorable:
return False, err

return True, None
# honorable, err = self._can_honor_hosts(request)
# if not honorable:
# return False, err

# honorable, err = self._can_honor_state(request)
# if not honorable:
# return False, err
honorable, err = self._can_honor_hosts(request)
if not honorable:
return False, err

# honorable, err = self._can_honor_affinities(request)
# if not honorable:
# return False, err
honorable, err = self._can_honor_affinities(request)
if not honorable:
return False, err

# return True, None
return True, None

def _can_honor_affinities(
self, request: DragonRunRequest
Expand Down Expand Up @@ -409,24 +404,30 @@ def _can_honor_hosts(
) -> t.Tuple[bool, t.Optional[str]]:
"""Check if the current state of the backend process inhibits executing
the request.
:param request: the DragonRunRequest to verify
:returns: Tuple indicating if the request can be honored and
an optional error message"""
# fail if requesting more nodes than the total number available
if request.nodes > len(self._hosts):
message = f"Cannot satisfy request. Requested {request.nodes} nodes, "
message += f"but only {len(self._hosts)} nodes are available."
message = f"Cannot satisfy request. {request.nodes} requested nodes"
message += f"exceeds {len(self._hosts)} available."
return False, message

requested_hosts: t.Set[str] = set()
requested_hosts: t.Set[str] = set(self._hosts)
if request.hostlist:
requested_hosts = set(request.hostlist)
if not requested_hosts:
return True, None
requested_hosts = set(
[host.strip() for host in request.hostlist.split(",")]
)

all_hosts = set(self._hosts)
valid_hosts = all_hosts.intersection(requested_hosts)
invalid_hosts = requested_hosts - valid_hosts

if invalid_hosts:
logger.warning(f"Some invalid hostnames were requested: {invalid_hosts}")

# don't worry about count when only hostnames are supplied (req.nodes == 0)
# fail if requesting specific hostnames and there aren't enough available
if request.nodes > len(valid_hosts):
message = f"Cannot satisfy request. Requested {request.nodes} nodes, "
message += f"but only {len(valid_hosts)} named hosts are available."
Expand Down Expand Up @@ -459,7 +460,6 @@ def _allocate_step(
num_hosts: int = request.nodes # or 1 # todo - make at least 1 again

with self._queue_lock:
# # ensure positive integers in valid range are accepted
# if num_hosts <= 0 or num_hosts > len(self._hosts):
# return None

Expand Down
3 changes: 3 additions & 0 deletions smartsim/_core/launcher/dragon/pqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__(self, nodes: t.List[Node], lock: threading.RLock) -> None:
raise SmartSimError("Missing nodes to prioritize")

self._lock = lock
"""Lock used to ensure thread safe changes of the reference counters"""

self._ref_map: t.Dict[str, _NodeRefCount] = {}
"""Map node names to a ref counter for direct access"""
Expand Down Expand Up @@ -134,6 +135,7 @@ def next_n_from(self, num_items: int, hosts: t.List[str]) -> t.List[_NodeRefCoun
sub_heap = self._create_sub_heap(hosts)
return self._get_next_n_available_nodes(num_items, sub_heap)

@property
def unassigned(
self, heap: t.Optional[t.List[_NodeRefCount]] = None
) -> t.List[_NodeRefCount]:
Expand All @@ -143,6 +145,7 @@ def unassigned(

return [node for node in heap if node[0] == 0]

@property
def assigned(
self, heap: t.Optional[t.List[_NodeRefCount]] = None
) -> t.List[_NodeRefCount]:
Expand Down
107 changes: 101 additions & 6 deletions tests/test_dragon_run_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import time
from unittest.mock import MagicMock

import pydantic.error_wrappers
import pytest
from pydantic import ValidationError

from smartsim._core.launcher.dragon.pqueue import NodePrioritizer

Expand Down Expand Up @@ -552,9 +552,33 @@ def test_can_honor(monkeypatch: pytest.MonkeyPatch, num_nodes: int) -> None:
pmi_enabled=False,
)

assert dragon_backend._can_honor(run_req)[0] == (
num_nodes <= len(dragon_backend._hosts)
)
can_honor, error_msg = dragon_backend._can_honor(run_req)

nodes_in_range = num_nodes <= len(dragon_backend._hosts)
assert can_honor == nodes_in_range
assert error_msg is None if nodes_in_range else error_msg is not None


@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems")
@pytest.mark.parametrize("num_nodes", [-10, -1, 0])
def test_can_honor_invalid_num_nodes(
monkeypatch: pytest.MonkeyPatch, num_nodes: int
) -> None:
"""Verify that requests for invalid numbers of nodes (negative, zero) are rejected"""
dragon_backend = get_mock_backend(monkeypatch)

with pytest.raises(pydantic.error_wrappers.ValidationError) as ex:
DragonRunRequest(
exe="sleep",
exe_args=["5"],
path="/a/fake/path",
nodes=num_nodes,
tasks=1,
tasks_per_node=1,
env={},
current_env={},
pmi_enabled=False,
)


@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems")
Expand Down Expand Up @@ -684,7 +708,8 @@ def test_view(monkeypatch: pytest.MonkeyPatch) -> None:
set_mock_group_infos(monkeypatch, dragon_backend)
hosts = dragon_backend.hosts

expected_message = textwrap.dedent(f"""\
expected_message = textwrap.dedent(
f"""\
Dragon server backend update
| Host | Status |
|---------|----------|
Expand All @@ -697,10 +722,80 @@ def test_view(monkeypatch: pytest.MonkeyPatch) -> None:
| del999-2 | Cancelled | {hosts[1]} | -9 | 1 |
| c101vz-3 | Completed | {hosts[1]},{hosts[2]} | 0 | 2 |
| 0ghjk1-4 | Failed | {hosts[2]} | -1 | 1 |
| ljace0-5 | NeverStarted | | | 0 |""")
| ljace0-5 | NeverStarted | | | 0 |"""
)

# get rid of white space to make the comparison easier
actual_msg = dragon_backend.status_message.replace(" ", "")
expected_message = expected_message.replace(" ", "")

assert actual_msg == expected_message


def test_can_honor_hosts_unavailable_hosts(monkeypatch: pytest.MonkeyPatch) -> None:
"""Verify that requesting nodes with invalid names causes number of available
nodes check to fail due to valid # of named nodes being under num_nodes"""
dragon_backend = get_mock_backend(monkeypatch)

# let's supply 2 invalid and 1 valid hostname
actual_hosts = list(dragon_backend._hosts)
actual_hosts[0] = f"x{actual_hosts[0]}"
actual_hosts[1] = f"x{actual_hosts[1]}"

host_list = ",".join(actual_hosts)

run_req = DragonRunRequest(
exe="sleep",
exe_args=["5"],
path="/a/fake/path",
nodes=2, # <----- requesting 2 of 3 available nodes
hostlist=host_list, # <--- only one valid name available
tasks=1,
tasks_per_node=1,
env={},
current_env={},
pmi_enabled=False,
policy=DragonRunPolicy(),
)
# ['2bce558', '44aa0e0', '5ddd3ec']

can_honor, error_msg = dragon_backend._can_honor(run_req)

# confirm the failure is indicated
assert not can_honor
# confirm failure message indicates number of nodes requested as cause
assert "named hosts" in error_msg


def test_can_honor_hosts_unavailable_hosts_ok(monkeypatch: pytest.MonkeyPatch) -> None:
"""Verify that requesting nodes with invalid names causes number of available
nodes check to be reduced but still passes if enough valid named nodes are passed"""
dragon_backend = get_mock_backend(monkeypatch)

# let's supply 2 valid and 1 invalid hostname
actual_hosts = list(dragon_backend._hosts)
actual_hosts[0] = f"x{actual_hosts[0]}"

host_list = ",".join(actual_hosts)

run_req = DragonRunRequest(
exe="sleep",
exe_args=["5"],
path="/a/fake/path",
nodes=2, # <----- requesting 2 of 3 available nodes
hostlist=host_list, # <--- two valid names are available
tasks=1,
tasks_per_node=1,
env={},
current_env={},
pmi_enabled=False,
policy=DragonRunPolicy(),
)
# ['2bce558', '44aa0e0', '5ddd3ec']

can_honor, error_msg = dragon_backend._can_honor(run_req)

# confirm the failure is indicated
assert can_honor, error_msg
# confirm failure message indicates number of nodes requested as cause
assert error_msg is None, error_msg

0 comments on commit d52c74d

Please sign in to comment.