Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update SmartSim to use Dragon V0.10 #753

Open
wants to merge 16 commits into
base: develop
Choose a base branch
from
7 changes: 7 additions & 0 deletions doc/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,18 @@ To be released at some point in the future

Description

- Update the `DragonBackend` to use
[Dragon V0.10](https://github.com/DragonHPC/dragon/releases/tag/v0.10-beta)
- Implement workaround for Tensorflow that allows RedisAI to build with GCC-14
- Add instructions for installing SmartSim on PML's Scylla

Detailed Notes

- Dragon V0.10 introduced support for infiniband networks and largely
overhauled the ``ProcessGroup`` API, used widely throughout SmartSim's
``DragonBackend``, for better readability and debugging. SmartSim has has
adopted this new version of Dragon to take advantage of these improvements.
([SmartSim-PR753](https://github.com/CrayLabs/SmartSim/pull/753))
- In libtensorflow, the input argument to TF_SessionRun seems to be mistyped to
TF_Output instead of TF_Input. These two types differ only in name. GCC-14
catches this and throws an error, even though earlier versions allow this. To
Expand Down
11 changes: 5 additions & 6 deletions smartsim/_core/_cli/scripts/dragon_install.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from smartsim._core._cli.utils import pip
from smartsim._core._install.utils import retrieve
from smartsim._core.config import CONFIG
from smartsim._core.utils.helpers import check_platform, is_crayex_platform
from smartsim._core.utils.helpers import check_platform, is_hsn_platform
from smartsim.error.errors import SmartSimCLIActionCancelled
from smartsim.log import get_logger

Expand Down Expand Up @@ -51,7 +51,7 @@ def python_version() -> str:
def dragon_pin() -> str:
"""Return a string indicating the pinned major/minor version of the dragon
package to install"""
return "0.9"
return "0.10"


def _platform_filter(asset_name: str) -> bool:
Expand All @@ -60,9 +60,9 @@ def _platform_filter(asset_name: str) -> bool:

:param asset_name: A value to inspect for keywords indicating a Cray EX asset
:returns: True if supplied value is correct for current platform"""
key = "crayex"
key = "hsn"
is_cray = key in asset_name.lower()
if is_crayex_platform():
if is_hsn_platform():
return is_cray
return not is_cray

Expand Down Expand Up @@ -132,13 +132,12 @@ def filter_assets(assets: t.Collection[GitReleaseAsset]) -> t.Optional[GitReleas
def retrieve_asset_info() -> GitReleaseAsset:
"""Find a release asset that meets all necessary filtering criteria

:param dragon_pin: identify the dragon version to install (e.g. dragon-0.8)
:returns: A GitHub release asset"""
assets = _get_release_assets()
asset = filter_assets(assets)

platform_result = check_platform()
if not platform_result.is_cray:
if not platform_result.is_hsn:
logger.warning("Installing Dragon without HSTA support")
for msg in platform_result.failures:
logger.warning(msg)
Expand Down
167 changes: 94 additions & 73 deletions smartsim/_core/launcher/dragon/dragonBackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import collections
import functools
import itertools
import time
import typing as t
from dataclasses import dataclass, field
from enum import Enum
from threading import RLock

from tabulate import tabulate
Expand All @@ -38,7 +38,6 @@
# isort: off
import dragon.infrastructure.connection as dragon_connection
import dragon.infrastructure.policy as dragon_policy
import dragon.native.group_state as dragon_group_state
import dragon.native.process as dragon_process
import dragon.native.process_group as dragon_process_group
import dragon.native.machine as dragon_machine
Expand Down Expand Up @@ -67,12 +66,7 @@
logger = get_logger(__name__)


class DragonStatus(str, Enum):
ERROR = str(dragon_group_state.Error())
RUNNING = str(dragon_group_state.Running())

def __str__(self) -> str:
return self.value
_RETURN_CODES_NO_PROCESS_GROUP: t.Final = [-1]


@dataclass
Expand All @@ -81,20 +75,60 @@
"""Status of step"""
process_group: t.Optional[dragon_process_group.ProcessGroup] = None
"""Internal Process Group object, None for finished or not started steps"""
puids: t.Optional[t.List[t.Optional[int]]] = None # puids can be None
"""List of Process UIDS belonging to the ProcessGroup"""
return_codes: t.Optional[t.List[int]] = None
"""List of return codes of completed processes"""
hosts: t.List[str] = field(default_factory=list)
"""List of hosts on which the Process Group """
redir_workers: t.Optional[dragon_process_group.ProcessGroup] = None
"""Workers used to redirect stdout and stderr to file"""
_final_return_codes: t.Optional[t.List[int]] = field(default=None, init=False)
"""Field to cache final statuses when a process group info is marked as
completed so that the underlying process group can be released.
"""

@property
def smartsim_info(self) -> t.Tuple[SmartSimStatus, t.Optional[t.List[int]]]:
def smartsim_info(self) -> t.Tuple[SmartSimStatus, t.List[int]]:
"""Information needed by SmartSim Launcher and Job Manager"""
return (self.status, self.return_codes)

@property
MattToast marked this conversation as resolved.
Show resolved Hide resolved
def puids(self) -> t.List[int]:
"""List of Process IDs belonging to the ProcessGroup.

:returns: List of Process IDs belonging to the ProcessGroup.
"""
return list(set(itertools.chain(self.active_puids, self.inactive_puids)))

Check warning on line 98 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L98

Added line #L98 was not covered by tests
ankona marked this conversation as resolved.
Show resolved Hide resolved

@property
def active_puids(self) -> t.List[int]:
"""List of process IDs that are running.

:returns: List of process IDs that are running.
"""
if self.process_group is None:
MattToast marked this conversation as resolved.
Show resolved Hide resolved
return []
return list(self.process_group.puids)

Check warning on line 108 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L106-L108

Added lines #L106 - L108 were not covered by tests

@property
def inactive_puids(self) -> t.List[int]:
"""List of process IDs that have completed.

:returns: List of process IDs that have completed.
"""
if self.process_group is None:
MattToast marked this conversation as resolved.
Show resolved Hide resolved
return []
return [puid for puid, _ in self.process_group.inactive_puids]

Check warning on line 118 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L116-L118

Added lines #L116 - L118 were not covered by tests

@property
def return_codes(self) -> t.List[int]:
"""List of return codes of completed processes.

:returns: List of return codes of completed processes.
"""
if self._final_return_codes is not None:
return self._final_return_codes
if self.process_group is None:
return _RETURN_CODES_NO_PROCESS_GROUP
return [ret for _, ret in self.process_group.inactive_puids]

Check warning on line 130 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L126-L130

Added lines #L126 - L130 were not covered by tests

def __str__(self) -> str:
if self.process_group is not None and self.redir_workers is not None:
msg = [f"Active Group ({self.status})"]
Expand All @@ -105,11 +139,25 @@

if self.hosts is not None:
msg.append(f"Hosts: {','.join(self.hosts)}")
if self.return_codes is not None:
if self.return_codes:

Check warning on line 142 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L142

Added line #L142 was not covered by tests
msg.append(f"{self.return_codes}")

return ", ".join(msg)

def mark_complete(self) -> None:
"""Cache the final return codes and release any underlying dragon
process groups.
"""
if self.process_group is not None:
self.process_group.join()
self._final_return_codes = self.return_codes
self.process_group.close()
self.process_group = None
if self.redir_workers is not None:
self.redir_workers.join()
self.redir_workers.close()
self.redir_workers = None

Check warning on line 159 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L151-L159

Added lines #L151 - L159 were not covered by tests


# Thanks to Colin Wahl from HPE HPC Dragon Team
def redir_worker(io_conn: dragon_connection.Connection, file_path: str) -> None:
Expand Down Expand Up @@ -361,7 +409,10 @@
err_file: t.Optional[str],
) -> dragon_process_group.ProcessGroup:
grp_redir = dragon_process_group.ProcessGroup(
restart=False, policy=global_policy, pmi_enabled=False
restart=False,
ignore_error_on_exit=True,
policy=global_policy,
pmi_enabled=False,
)
for pol, puid in zip(policies, puids):
proc = dragon_process.Process(None, ident=puid)
Expand Down Expand Up @@ -404,10 +455,10 @@
else:
# Technically we could just terminate, but what if
# the application intercepts that and ignores it?
proc_group = self._group_infos[step_id].process_group
group_info = self._group_infos[step_id]

Check warning on line 458 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L458

Added line #L458 was not covered by tests
if (
proc_group is not None
and proc_group.status == DragonStatus.RUNNING
group_info.active_puids
and (proc_group := group_info.process_group) is not None
):
try:
proc_group.kill()
Expand All @@ -416,7 +467,7 @@
proc_group.stop()
except dragon_process_group.DragonProcessGroupError:
logger.error("Process group already stopped")
redir_group = self._group_infos[step_id].redir_workers
redir_group = group_info.redir_workers

Check warning on line 470 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L470

Added line #L470 was not covered by tests
if redir_group is not None:
try:
redir_group.join(0.1)
Expand All @@ -425,7 +476,6 @@
logger.error(e)

self._group_infos[step_id].status = SmartSimStatus.STATUS_CANCELLED
self._group_infos[step_id].return_codes = [-9]

@staticmethod
def create_run_policy(
Expand All @@ -438,33 +488,27 @@
if isinstance(request, DragonRunRequest):
run_request: DragonRunRequest = request

affinity = dragon_policy.Policy.Affinity.DEFAULT
cpu_affinity: t.List[int] = []
gpu_affinity: t.List[int] = []

# Customize policy only if the client requested it, otherwise use default
if run_request.policy is not None:
# Affinities are not mutually exclusive. If specified, both are used
if run_request.policy.cpu_affinity:
affinity = dragon_policy.Policy.Affinity.SPECIFIC
cpu_affinity = run_request.policy.cpu_affinity

if run_request.policy.gpu_affinity:
affinity = dragon_policy.Policy.Affinity.SPECIFIC
gpu_affinity = run_request.policy.gpu_affinity
logger.debug(
ankona marked this conversation as resolved.
Show resolved Hide resolved
f"Affinity strategy: {affinity}, "
f"CPU affinity mask: {cpu_affinity}, "
f"GPU affinity mask: {gpu_affinity}"
)
if affinity != dragon_policy.Policy.Affinity.DEFAULT:
return dragon_policy.Policy(
placement=dragon_policy.Policy.Placement.HOST_NAME,
host_name=node_name,
affinity=affinity,
cpu_affinity=cpu_affinity,
gpu_affinity=gpu_affinity,
)
return dragon_policy.Policy(

Check warning on line 506 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L506

Added line #L506 was not covered by tests
placement=dragon_policy.Policy.Placement.HOST_NAME,
host_name=node_name,
cpu_affinity=cpu_affinity,
gpu_affinity=gpu_affinity,
)

return dragon_policy.Policy(
placement=dragon_policy.Policy.Placement.HOST_NAME,
Expand All @@ -487,7 +531,10 @@
host_name=hosts[0],
)
grp = dragon_process_group.ProcessGroup(
restart=False, pmi_enabled=request.pmi_enabled, policy=global_policy
restart=False,
ignore_error_on_exit=True,
pmi_enabled=request.pmi_enabled,
policy=global_policy,
)

policies = []
Expand All @@ -513,22 +560,19 @@
logger.error(e)
grp_status = SmartSimStatus.STATUS_FAILED

puids = None
try:
puids = list(
set(grp.puids + [puid for puid, retcode in grp.inactive_puids])
)
self._group_infos[step_id] = ProcessGroupInfo(
grp_info = ProcessGroupInfo(

Check warning on line 564 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L564

Added line #L564 was not covered by tests
process_group=grp,
puids=puids,
return_codes=[],
status=grp_status,
hosts=hosts,
)
puids = grp_info.puids
self._group_infos[step_id] = grp_info

Check warning on line 570 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L569-L570

Added lines #L569 - L570 were not covered by tests
self._running_steps.append(step_id)
started.append(step_id)
except Exception as e:
logger.error(e)
puids = None

Check warning on line 575 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L575

Added line #L575 was not covered by tests

if (
puids is not None
Expand Down Expand Up @@ -575,32 +619,15 @@
grp = group_info.process_group
if grp is None:
group_info.status = SmartSimStatus.STATUS_FAILED
group_info.return_codes = [-1]
elif group_info.status not in TERMINAL_STATUSES:
if grp.status == str(DragonStatus.RUNNING):
if group_info.active_puids:

Check warning on line 623 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L623

Added line #L623 was not covered by tests
group_info.status = SmartSimStatus.STATUS_RUNNING
else:
puids = group_info.puids
if puids is not None and all(
puid is not None for puid in puids
):
try:
group_info.return_codes = [
dragon_process.Process(None, ident=puid).returncode
for puid in puids
]
except (ValueError, TypeError) as e:
logger.error(e)
group_info.return_codes = [-1 for _ in puids]
else:
group_info.return_codes = [0]
if not group_info.status == SmartSimStatus.STATUS_CANCELLED:
group_info.status = (
SmartSimStatus.STATUS_FAILED
if any(group_info.return_codes)
or grp.status == DragonStatus.ERROR
else SmartSimStatus.STATUS_COMPLETED
)
elif group_info.status != SmartSimStatus.STATUS_CANCELLED:
group_info.status = (

Check warning on line 626 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L625-L626

Added lines #L625 - L626 were not covered by tests
SmartSimStatus.STATUS_FAILED
if any(group_info.return_codes)
else SmartSimStatus.STATUS_COMPLETED
)

if group_info.status in TERMINAL_STATUSES:
terminated.append(step_id)
Expand All @@ -620,8 +647,7 @@
except KeyError:
logger.error(f"Tried to free a non-allocated host: {host}")
self._free_hosts.append(host)
group_info.process_group = None
group_info.redir_workers = None
group_info.mark_complete()

Check warning on line 650 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L650

Added line #L650 was not covered by tests

def _update_shutdown_status(self) -> None:
self._heartbeat()
Expand Down Expand Up @@ -685,7 +711,7 @@
honorable, err = self._can_honor(request)
if not honorable:
self._group_infos[step_id] = ProcessGroupInfo(
status=SmartSimStatus.STATUS_FAILED, return_codes=[-1]
status=SmartSimStatus.STATUS_FAILED
)
else:
self._queued_steps[step_id] = request
Expand Down Expand Up @@ -751,12 +777,7 @@
else:
table_line.append("")

if proc_group_info.return_codes is not None:
table_line.append(
f"{','.join(str(ret) for ret in proc_group_info.return_codes)}"
)
else:
table_line.append("")
table_line.append(",".join(str(ret) for ret in proc_group_info.return_codes))

Check warning on line 780 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L780

Added line #L780 was not covered by tests

if proc_group_info.puids is not None:
table_line.append(f"{len(proc_group_info.puids)}")
Expand Down
2 changes: 1 addition & 1 deletion smartsim/_core/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@
execute_platform_cmd,
expand_exe_path,
installed_redisai_backends,
is_crayex_platform,
is_hsn_platform,
)
from .redis import check_cluster_status, create_cluster, db_is_active
Loading
Loading