Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mli feature #18

Closed
wants to merge 26 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d2fd6a7
Initial MLI schemas and MessageHandler class (#607)
AlyssaCote Jun 11, 2024
3c9915c
Merge branch 'develop' into mli-feature
ankona Jun 14, 2024
38081da
ML Worker Manager MVP (#608)
ankona Jun 20, 2024
ab900b8
Remove device attribute from schemas (#619)
AlyssaCote Jun 25, 2024
a9ffb14
Merge branch 'develop' into mli-feature
ankona Jul 2, 2024
ee2c110
Merge branch 'develop' into mli-feature
ankona Jul 2, 2024
8a2f173
Add model metadata to request schema (#624)
AlyssaCote Jul 3, 2024
52abd32
Enable environment variable based configuration for ML Worker Manager…
AlyssaCote Jul 10, 2024
eace71e
FLI-based Worker Manager (#622)
al-rigazzi Jul 15, 2024
5fac3e2
Add ability to specify hardware policies on dragon run requests (#631)
ankona Jul 17, 2024
0030a4a
Revert "Add ability to specify hardware policies on dragon run reques…
ankona Jul 17, 2024
b6c2f2b
Merge latest develop into mli-feature (#640)
ankona Jul 18, 2024
272a1d7
Improve error handling in worker manager (#629)
AlyssaCote Jul 18, 2024
7169f1c
Schema performance improvements (#632)
AlyssaCote Jul 18, 2024
84101b3
New develop merger (#645)
al-rigazzi Jul 19, 2024
e225c07
merging develop
ankona Jul 26, 2024
9f482b1
Merge branch 'develop' into mli-feature
ankona Jul 31, 2024
263e3c7
Fix dragon installation issues (#652)
ankona Aug 2, 2024
0453b8b
Add FeatureStore descriptor to tensor & model keys (#633)
ankona Aug 7, 2024
99ed41c
Merge branch 'develop' into mli-feature
ankona Aug 8, 2024
74d6e78
Use `torch.from_numpy` instead of `torch.tensor` to reduce a copy (#661)
AlyssaCote Aug 8, 2024
391784c
MLI environment variables updated using new naming convention (#665)
AlyssaCote Aug 14, 2024
f7ef49b
Remove pydantic dependency from MLI code (#667)
AlyssaCote Aug 20, 2024
ef034d5
Enable specification of target hostname for a dragon task (#660)
ankona Aug 26, 2024
6d5518b
fix init reordering bug (#675)
ankona Aug 26, 2024
5d85995
Queue-based Worker Manager (#647)
al-rigazzi Aug 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Revert "Add ability to specify hardware policies on dragon run reques…
…ts" (CrayLabs#637)

Reverts CrayLabs#631
ankona authored Jul 17, 2024
commit 0030a4af2edbba211bf8f898456f3f20389f428c
1 change: 0 additions & 1 deletion doc/changelog.md
Original file line number Diff line number Diff line change
@@ -13,7 +13,6 @@ Jump to:

Description

- Add hardware pinning capability when using dragon
- Add TorchWorker first implementation and mock inference app example
- Add EnvironmentConfigLoader for ML Worker Manager
- Add Model schema with model metadata included
28 changes: 0 additions & 28 deletions doc/dragon.rst
Original file line number Diff line number Diff line change
@@ -65,34 +65,6 @@ In the next sections, we detail how Dragon is integrated into SmartSim.

For more information on HPC launchers, visit the :ref:`Run Settings<run_settings_hpc_ex>` page.

Hardware Pinning
================

Dragon also enables users to specify hardware constraints using ``DragonRunSettings``. CPU
and GPU affinity can be specified using the ``DragonRunSettings`` object. The following
example demonstrates how to specify CPU affinity and GPU affinities simultaneously. Note
that affinities are passed as a list of device indices.

.. code-block:: python

# Because "dragon" was specified as the launcher during Experiment initialization,
# create_run_settings will return a DragonRunSettings object
rs = exp.create_run_settings(exe="mpi_app",
exe_args=["--option", "value"],
env_vars={"MYVAR": "VALUE"})

# Request the first 8 CPUs for this job
rs.set_cpu_affinity(list(range(9)))

# Request the first two GPUs on the node for this job
rs.set_gpu_affinity([0, 1])

.. note::

SmartSim launches jobs in the order they are received on the first available
host in a round-robin pattern. To ensure a process is launched on a node with
specific features, configure a hostname constraint.

=================
The Dragon Server
=================
6 changes: 0 additions & 6 deletions doc/tutorials/online_analysis/lattice/online_analysis.ipynb
Original file line number Diff line number Diff line change
@@ -378,7 +378,6 @@
},
{
"cell_type": "code",
"id": "6f3ed63d-e324-443d-9b68-b2cf618d31c7",
"execution_count": 7,
"metadata": {},
"outputs": [
@@ -400,15 +399,13 @@
},
{
"cell_type": "markdown",
"id": "96c154fe-5ca8-4d89-91f8-8fd4e75cb80e",
"metadata": {},
"source": [
"We then apply the function `probe_points` to the `ux` and `uy` tensors computed in the last time step of the previous simulation. Note that all tensors are already on the DB, thus we can reference them by name. Finally, we download and plot the output (a 2D velocity field), which is stored as `probe_u` on the DB."
]
},
{
"cell_type": "code",
"id": "36e3b415-dcc1-4d25-9cce-52388146a4bb",
"execution_count": 8,
"metadata": {},
"outputs": [
@@ -435,7 +432,6 @@
},
{
"cell_type": "markdown",
"id": "9d7e4966-a0de-480c-9556-936197a5a5d2",
"metadata": {},
"source": [
"### Uploading a function inline\n",
@@ -457,7 +453,6 @@
},
{
"cell_type": "markdown",
"id": "1c4daf43-34d0-482a-b9b5-b3b6f1e173c4",
"metadata": {},
"source": [
"We then store the function on the DB under the key `norm_function`."
@@ -475,7 +470,6 @@
},
{
"cell_type": "markdown",
"id": "19409ac6-e118-44db-a847-2d905fdf0331",
"metadata": {},
"source": [
"Note that the key we used identifies a functional unit containing the function itself: this is similar to the key used to store the `probe` script above. When we want to run the function, we just call it with `run_script`, by indicating the `script` key as `\"norm_function\"` and the name of the function itself as `\"compute_norm\"`."
85 changes: 6 additions & 79 deletions smartsim/_core/launcher/dragon/dragonBackend.py
Original file line number Diff line number Diff line change
@@ -214,12 +214,9 @@ def group_infos(self) -> dict[str, ProcessGroupInfo]:
def _initialize_hosts(self) -> None:
with self._queue_lock:
self._hosts: t.List[str] = sorted(
node for node in dragon_machine.System().nodes
dragon_machine.Node(node).hostname
for node in dragon_machine.System().nodes
)
self._nodes = [dragon_machine.Node(node) for node in self._hosts]
self._cpus = [node.num_cpus for node in self._nodes]
self._gpus = [node.num_gpus for node in self._nodes]

"""List of hosts available in allocation"""
self._free_hosts: t.Deque[str] = collections.deque(self._hosts)
"""List of hosts on which steps can be launched"""
@@ -291,34 +288,6 @@ def current_time(self) -> float:
"""Current time for DragonBackend object, in seconds since the Epoch"""
return time.time()

def _can_honor_policy(
self, request: DragonRunRequest
) -> t.Tuple[bool, t.Optional[str]]:
"""Check if the policy can be honored with resources available
in the allocation.
:param request: DragonRunRequest containing policy information
:returns: Tuple indicating if the policy can be honored and
an optional error message"""
# ensure the policy can be honored
if request.policy:
if request.policy.cpu_affinity:
# make sure some node has enough CPUs
available = max(self._cpus)
requested = max(request.policy.cpu_affinity)

if requested >= available:
return False, "Cannot satisfy request, not enough CPUs available"

if request.policy.gpu_affinity:
# make sure some node has enough GPUs
available = max(self._gpus)
requested = max(request.policy.gpu_affinity)

if requested >= available:
return False, "Cannot satisfy request, not enough GPUs available"

return True, None

def _can_honor(self, request: DragonRunRequest) -> t.Tuple[bool, t.Optional[str]]:
"""Check if request can be honored with resources available in the allocation.

@@ -333,11 +302,6 @@ def _can_honor(self, request: DragonRunRequest) -> t.Tuple[bool, t.Optional[str]
if self._shutdown_requested:
message = "Cannot satisfy request, server is shutting down."
return False, message

honorable, err = self._can_honor_policy(request)
if not honorable:
return False, err

return True, None

def _allocate_step(
@@ -446,46 +410,6 @@ def infra_ddict(self) -> str:

return str(self._infra_ddict.serialize())

@staticmethod
def create_run_policy(
request: DragonRequest, node_name: str
) -> "dragon_policy.Policy":
"""Create a dragon Policy from the request and node name
:param request: DragonRunRequest containing policy information
:param node_name: Name of the node on which the process will run
:returns: dragon_policy.Policy object mapped from request properties"""
if isinstance(request, DragonRunRequest):
run_request: DragonRunRequest = request

affinity = dragon_policy.Policy.Affinity.DEFAULT
cpu_affinity: t.List[int] = []
gpu_affinity: t.List[int] = []

# Customize policy only if the client requested it, otherwise use default
if run_request.policy is not None:
# Affinities are not mutually exclusive. If specified, both are used
if run_request.policy.cpu_affinity:
affinity = dragon_policy.Policy.Affinity.SPECIFIC
cpu_affinity = run_request.policy.cpu_affinity

if run_request.policy.gpu_affinity:
affinity = dragon_policy.Policy.Affinity.SPECIFIC
gpu_affinity = run_request.policy.gpu_affinity

if affinity != dragon_policy.Policy.Affinity.DEFAULT:
return dragon_policy.Policy(
placement=dragon_policy.Policy.Placement.HOST_NAME,
host_name=node_name,
affinity=affinity,
cpu_affinity=cpu_affinity,
gpu_affinity=gpu_affinity,
)

return dragon_policy.Policy(
placement=dragon_policy.Policy.Placement.HOST_NAME,
host_name=node_name,
)

def _start_steps(self) -> None:
self._heartbeat()
with self._queue_lock:
@@ -508,7 +432,10 @@ def _start_steps(self) -> None:

policies = []
for node_name in hosts:
local_policy = self.create_run_policy(request, node_name)
local_policy = dragon_policy.Policy(
placement=dragon_policy.Policy.Placement.HOST_NAME,
host_name=node_name,
)
policies.extend([local_policy] * request.tasks_per_node)
tmp_proc = dragon_process.ProcessTemplate(
target=request.exe,
6 changes: 0 additions & 6 deletions smartsim/_core/launcher/dragon/dragonLauncher.py
Original file line number Diff line number Diff line change
@@ -29,8 +29,6 @@
import os
import typing as t

from smartsim._core.schemas.dragonRequests import DragonRunPolicy

from ...._core.launcher.stepMapping import StepMap
from ....error import LauncherError, SmartSimError
from ....log import get_logger
@@ -170,9 +168,6 @@ def run(self, step: Step) -> t.Optional[str]:
merged_env = self._connector.merge_persisted_env(os.environ.copy())
nodes = int(run_args.get("nodes", None) or 1)
tasks_per_node = int(run_args.get("tasks-per-node", None) or 1)

policy = DragonRunPolicy.from_run_args(run_args)

response = _assert_schema_type(
self._connector.send_request(
DragonRunRequest(
@@ -186,7 +181,6 @@ def run(self, step: Step) -> t.Optional[str]:
current_env=merged_env,
output_file=out,
error_file=err,
policy=policy,
)
),
DragonRunResponse,
10 changes: 1 addition & 9 deletions smartsim/_core/launcher/step/dragonStep.py
Original file line number Diff line number Diff line change
@@ -30,11 +30,7 @@
import sys
import typing as t

from ...._core.schemas.dragonRequests import (
DragonRunPolicy,
DragonRunRequest,
request_registry,
)
from ...._core.schemas.dragonRequests import DragonRunRequest, request_registry
from ....error.errors import SSUnsupportedError
from ....log import get_logger
from ....settings import (
@@ -170,11 +166,8 @@ def _write_request_file(self) -> str:
nodes = int(run_args.get("nodes", None) or 1)
tasks_per_node = int(run_args.get("tasks-per-node", None) or 1)

policy = DragonRunPolicy.from_run_args(run_args)

cmd = step.get_launch_cmd()
out, err = step.get_output_files()

request = DragonRunRequest(
exe=cmd[0],
exe_args=cmd[1:],
@@ -186,7 +179,6 @@ def _write_request_file(self) -> str:
current_env=os.environ,
output_file=out,
error_file=err,
policy=policy,
)
requests.append(request_registry.to_string(request))
with open(request_file, "w", encoding="utf-8") as script_file:
3 changes: 1 addition & 2 deletions smartsim/_core/launcher/step/step.py
Original file line number Diff line number Diff line change
@@ -26,7 +26,6 @@

from __future__ import annotations

import copy
import functools
import os.path as osp
import pathlib
@@ -52,7 +51,7 @@ def __init__(self, name: str, cwd: str, step_settings: SettingsBase) -> None:
self.entity_name = name
self.cwd = cwd
self.managed = False
self.step_settings = copy.deepcopy(step_settings)
self.step_settings = step_settings
self.meta: t.Dict[str, str] = {}

@property
41 changes: 1 addition & 40 deletions smartsim/_core/schemas/dragonRequests.py
Original file line number Diff line number Diff line change
@@ -26,10 +26,9 @@

import typing as t

from pydantic import BaseModel, Field, NonNegativeInt, PositiveInt, ValidationError
from pydantic import BaseModel, Field, PositiveInt

import smartsim._core.schemas.utils as _utils
from smartsim.error.errors import SmartSimError

# Black and Pylint disagree about where to put the `...`
# pylint: disable=multiple-statements
@@ -40,43 +39,6 @@
class DragonRequest(BaseModel): ...


class DragonRunPolicy(BaseModel):
"""Policy specifying hardware constraints when running a Dragon job"""

cpu_affinity: t.List[NonNegativeInt] = Field(default_factory=list)
"""List of CPU indices to which the job should be pinned"""
gpu_affinity: t.List[NonNegativeInt] = Field(default_factory=list)
"""List of GPU indices to which the job should be pinned"""

@staticmethod
def from_run_args(
run_args: t.Dict[str, t.Union[int, str, float, None]]
) -> "DragonRunPolicy":
"""Create a DragonRunPolicy with hardware constraints passed from
a dictionary of run arguments
:param run_args: Dictionary of run arguments
:returns: DragonRunPolicy instance created from the run arguments"""
gpu_args = ""
if gpu_arg_value := run_args.get("gpu-affinity", None):
gpu_args = str(gpu_arg_value)

cpu_args = ""
if cpu_arg_value := run_args.get("cpu-affinity", None):
cpu_args = str(cpu_arg_value)

# run args converted to a string must be split back into a list[int]
gpu_affinity = [int(x.strip()) for x in gpu_args.split(",") if x]
cpu_affinity = [int(x.strip()) for x in cpu_args.split(",") if x]

try:
return DragonRunPolicy(
cpu_affinity=cpu_affinity,
gpu_affinity=gpu_affinity,
)
except ValidationError as ex:
raise SmartSimError("Unable to build DragonRunPolicy") from ex


class DragonRunRequestView(DragonRequest):
exe: t.Annotated[str, Field(min_length=1)]
exe_args: t.List[t.Annotated[str, Field(min_length=1)]] = []
@@ -95,7 +57,6 @@ class DragonRunRequestView(DragonRequest):
@request_registry.register("run")
class DragonRunRequest(DragonRunRequestView):
current_env: t.Dict[str, t.Optional[str]] = {}
policy: t.Optional[DragonRunPolicy] = None

def __str__(self) -> str:
return str(DragonRunRequestView.parse_obj(self.dict(exclude={"current_env"})))
Loading