Skip to content

Commit

Permalink
merge v1.0 into mli-merge
Browse files Browse the repository at this point in the history
  • Loading branch information
ankona committed Oct 21, 2024
2 parents 86a23d9 + 8a19dee commit 54ca522
Show file tree
Hide file tree
Showing 29 changed files with 6,111 additions and 2 deletions.
4 changes: 2 additions & 2 deletions smartsim/_core/launcher/dragon/dragon_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def __init__(self, path: str | os.PathLike[str]) -> None:
# but process was started by another connector
self._dragon_head_pid: t.Optional[int] = None
"""Process ID of the process executing the DragonBackend"""
self._dragon_server_path = _resolve_dragon_path(path)
self._dragon_server_path = config.dragon_server_path
"""Path to a dragon installation"""
logger.debug(f"Dragon Server path was set to {self._dragon_server_path}")
self._env_vars: t.Dict[str, str] = {}
Expand Down Expand Up @@ -125,7 +125,7 @@ def _handshake(self, address: str) -> None:
:param address: The address of the head node socket to initiate a
handhake with
"""
self._dragon_head_socket = dragon_sockets.get_secure_socket(
self._dragon_head_socket = dragonSockets.get_secure_socket(
self._context, zmq.REQ, False
)
self._dragon_head_socket.connect(address)
Expand Down
20 changes: 20 additions & 0 deletions smartsim/settings/arguments/launch/dragon.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,26 @@ def set_hostlist(self, host_list: t.Union[str, t.List[str]]) -> None:

self.run_args["host-list"] = ",".join(cleaned_list)

@override
def set_hostlist(self, host_list: t.Union[str, t.List[str]]) -> None:
"""Specify the hostlist for this job
:param host_list: hosts to launch on
:raises ValueError: if an empty host list is supplied
"""
if not host_list:
raise ValueError("empty hostlist provided")

if isinstance(host_list, str):
host_list = host_list.replace(" ", "").split(",")

# strip out all whitespace-only values
cleaned_list = [host.strip() for host in host_list if host and host.strip()]
if not len(cleaned_list) == len(host_list):
raise ValueError(f"invalid names found in hostlist: {host_list}")

self.run_args["host-list"] = ",".join(cleaned_list)

def set_cpu_affinity(self, devices: t.List[int]) -> None:
"""Set the CPU affinity for this job
Expand Down
Empty file added tests/dragon/__init__.py
Empty file.
125 changes: 125 additions & 0 deletions tests/dragon/channel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# BSD 2-Clause License
#
# Copyright (c) 2021-2024, Hewlett Packard Enterprise
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import base64
import pathlib
import threading
import typing as t

from smartsim._core.mli.comm.channel.channel import CommChannelBase
from smartsim.error.errors import SmartSimError
from smartsim.log import get_logger

logger = get_logger(__name__)


class FileSystemCommChannel(CommChannelBase):
"""Passes messages by writing to a file"""

def __init__(self, key: pathlib.Path) -> None:
"""Initialize the FileSystemCommChannel instance.
:param key: a path to the root directory of the feature store
"""
self._lock = threading.RLock()

super().__init__(key.as_posix())
self._file_path = key

if not self._file_path.parent.exists():
self._file_path.parent.mkdir(parents=True)

self._file_path.touch()

def send(self, value: bytes, timeout: float = 0) -> None:
"""Send a message throuh the underlying communication channel.
:param value: The value to send
:param timeout: maximum time to wait (in seconds) for messages to send
"""
with self._lock:
# write as text so we can add newlines as delimiters
with open(self._file_path, "a") as fp:
encoded_value = base64.b64encode(value).decode("utf-8")
fp.write(f"{encoded_value}\n")
logger.debug(f"FileSystemCommChannel {self._file_path} sent message")

def recv(self, timeout: float = 0) -> t.List[bytes]:
"""Receives message(s) through the underlying communication channel.
:param timeout: maximum time to wait (in seconds) for messages to arrive
:returns: the received message
:raises SmartSimError: if the descriptor points to a missing file
"""
with self._lock:
messages: t.List[bytes] = []
if not self._file_path.exists():
raise SmartSimError("Empty channel")

# read as text so we can split on newlines
with open(self._file_path, "r") as fp:
lines = fp.readlines()

if lines:
line = lines.pop(0)
event_bytes = base64.b64decode(line.encode("utf-8"))
messages.append(event_bytes)

self.clear()

# remove the first message only, write remainder back...
if len(lines) > 0:
with open(self._file_path, "w") as fp:
fp.writelines(lines)

logger.debug(
f"FileSystemCommChannel {self._file_path} received message"
)

return messages

def clear(self) -> None:
"""Create an empty file for events."""
if self._file_path.exists():
self._file_path.unlink()
self._file_path.touch()

@classmethod
def from_descriptor(
cls,
descriptor: str,
) -> "FileSystemCommChannel":
"""A factory method that creates an instance from a descriptor string.
:param descriptor: The descriptor that uniquely identifies the resource
:returns: An attached FileSystemCommChannel
"""
try:
path = pathlib.Path(descriptor)
return FileSystemCommChannel(path)
except:
logger.warning(f"failed to create fs comm channel: {descriptor}")
raise
129 changes: 129 additions & 0 deletions tests/dragon/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# BSD 2-Clause License
#
# Copyright (c) 2021-2024, Hewlett Packard Enterprise
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

from __future__ import annotations

import os
import pathlib
import socket
import subprocess
import sys
import typing as t

import pytest

dragon = pytest.importorskip("dragon")

# isort: off
import dragon.data.ddict.ddict as dragon_ddict
import dragon.infrastructure.policy as dragon_policy
import dragon.infrastructure.process_desc as dragon_process_desc
import dragon.native.process as dragon_process

from dragon.fli import FLInterface

# isort: on

from smartsim._core.mli.comm.channel.dragon_fli import DragonFLIChannel
from smartsim._core.mli.comm.channel.dragon_util import create_local
from smartsim._core.mli.infrastructure.storage import dragon_util
from smartsim._core.mli.infrastructure.storage.backbone_feature_store import (
BackboneFeatureStore,
)
from smartsim.log import get_logger

logger = get_logger(__name__)


@pytest.fixture(scope="module")
def the_storage() -> dragon_ddict.DDict:
"""Fixture to instantiate a dragon distributed dictionary."""
return dragon_util.create_ddict(1, 2, 32 * 1024**2)


@pytest.fixture(scope="module")
def the_worker_channel() -> DragonFLIChannel:
"""Fixture to create a valid descriptor for a worker channel
that can be attached to."""
channel_ = create_local()
fli_ = FLInterface(main_ch=channel_, manager_ch=None)
comm_channel = DragonFLIChannel(fli_)
return comm_channel


@pytest.fixture(scope="module")
def the_backbone(
the_storage: t.Any, the_worker_channel: DragonFLIChannel
) -> BackboneFeatureStore:
"""Fixture to create a distributed dragon dictionary and wrap it
in a BackboneFeatureStore.
:param the_storage: The dragon storage engine to use
:param the_worker_channel: Pre-configured worker channel
"""

backbone = BackboneFeatureStore(the_storage, allow_reserved_writes=True)
backbone[BackboneFeatureStore.MLI_WORKER_QUEUE] = the_worker_channel.descriptor

return backbone


@pytest.fixture(scope="module")
def backbone_descriptor(the_backbone: BackboneFeatureStore) -> str:
# create a shared backbone featurestore
return the_backbone.descriptor


def function_as_dragon_proc(
entrypoint_fn: t.Callable[[t.Any], None],
args: t.List[t.Any],
cpu_affinity: t.List[int],
gpu_affinity: t.List[int],
) -> dragon_process.Process:
"""Execute a function as an independent dragon process.
:param entrypoint_fn: The function to execute
:param args: The arguments for the entrypoint function
:param cpu_affinity: The cpu affinity for the process
:param gpu_affinity: The gpu affinity for the process
:returns: The dragon process handle
"""
options = dragon_process_desc.ProcessOptions(make_inf_channels=True)
local_policy = dragon_policy.Policy(
placement=dragon_policy.Policy.Placement.HOST_NAME,
host_name=socket.gethostname(),
cpu_affinity=cpu_affinity,
gpu_affinity=gpu_affinity,
)
return dragon_process.Process(
target=entrypoint_fn,
args=args,
cwd=os.getcwd(),
policy=local_policy,
options=options,
stderr=dragon_process.Popen.STDOUT,
stdout=dragon_process.Popen.STDOUT,
)
Loading

0 comments on commit 54ca522

Please sign in to comment.