Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mli feature #18

Closed
wants to merge 26 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d2fd6a7
Initial MLI schemas and MessageHandler class (#607)
AlyssaCote Jun 11, 2024
3c9915c
Merge branch 'develop' into mli-feature
ankona Jun 14, 2024
38081da
ML Worker Manager MVP (#608)
ankona Jun 20, 2024
ab900b8
Remove device attribute from schemas (#619)
AlyssaCote Jun 25, 2024
a9ffb14
Merge branch 'develop' into mli-feature
ankona Jul 2, 2024
ee2c110
Merge branch 'develop' into mli-feature
ankona Jul 2, 2024
8a2f173
Add model metadata to request schema (#624)
AlyssaCote Jul 3, 2024
52abd32
Enable environment variable based configuration for ML Worker Manager…
AlyssaCote Jul 10, 2024
eace71e
FLI-based Worker Manager (#622)
al-rigazzi Jul 15, 2024
5fac3e2
Add ability to specify hardware policies on dragon run requests (#631)
ankona Jul 17, 2024
0030a4a
Revert "Add ability to specify hardware policies on dragon run reques…
ankona Jul 17, 2024
b6c2f2b
Merge latest develop into mli-feature (#640)
ankona Jul 18, 2024
272a1d7
Improve error handling in worker manager (#629)
AlyssaCote Jul 18, 2024
7169f1c
Schema performance improvements (#632)
AlyssaCote Jul 18, 2024
84101b3
New develop merger (#645)
al-rigazzi Jul 19, 2024
e225c07
merging develop
ankona Jul 26, 2024
9f482b1
Merge branch 'develop' into mli-feature
ankona Jul 31, 2024
263e3c7
Fix dragon installation issues (#652)
ankona Aug 2, 2024
0453b8b
Add FeatureStore descriptor to tensor & model keys (#633)
ankona Aug 7, 2024
99ed41c
Merge branch 'develop' into mli-feature
ankona Aug 8, 2024
74d6e78
Use `torch.from_numpy` instead of `torch.tensor` to reduce a copy (#661)
AlyssaCote Aug 8, 2024
391784c
MLI environment variables updated using new naming convention (#665)
AlyssaCote Aug 14, 2024
f7ef49b
Remove pydantic dependency from MLI code (#667)
AlyssaCote Aug 20, 2024
ef034d5
Enable specification of target hostname for a dragon task (#660)
ankona Aug 26, 2024
6d5518b
fix init reordering bug (#675)
ankona Aug 26, 2024
5d85995
Queue-based Worker Manager (#647)
al-rigazzi Aug 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Enable environment variable based configuration for ML Worker Manager (
…CrayLabs#621)

EnvironmentConfigLoader added for ML Worker Manager.
  • Loading branch information
AlyssaCote authored Jul 10, 2024
commit 52abd324457bf2fc4762346bd0a2acee9e999fe5
19 changes: 16 additions & 3 deletions .github/workflows/run_tests.yml
Original file line number Diff line number Diff line change
@@ -54,7 +54,7 @@ jobs:
strategy:
fail-fast: false
matrix:
subset: [backends, slow_tests, group_a, group_b]
subset: [backends, slow_tests, group_a, group_b, dragon]
os: [macos-12, macos-14, ubuntu-22.04] # Operating systems
compiler: [8] # GNU compiler version
rai: [1.2.7] # Redis AI versions
@@ -112,9 +112,15 @@ jobs:
python -m pip install .[dev,ml]

- name: Install ML Runtimes with Smart (with pt, tf, and onnx support)
if: contains( matrix.os, 'ubuntu' ) || contains( matrix.os, 'macos-12')
if: (contains( matrix.os, 'ubuntu' ) || contains( matrix.os, 'macos-12')) && ( matrix.subset != 'dragon' )
run: smart build --device cpu --onnx -v

- name: Install ML Runtimes with Smart (with pt, tf, dragon, and onnx support)
if: (contains( matrix.os, 'ubuntu' ) || contains( matrix.os, 'macos-12')) && ( matrix.subset == 'dragon' )
run: |
smart build --device cpu --onnx --dragon -v
echo "LD_LIBRARY_PATH=$(python -c 'import site; print(site.getsitepackages()[0])')/smartsim/_core/.dragon/dragon-0.9/lib:$LD_LIBRARY_PATH" >> $GITHUB_ENV

- name: Install ML Runtimes with Smart (no ONNX,TF on Apple Silicon)
if: contains( matrix.os, 'macos-14' )
run: smart build --device cpu --no_tf -v
@@ -143,9 +149,16 @@ jobs:
echo "SMARTSIM_LOG_LEVEL=debug" >> $GITHUB_ENV
py.test -s --import-mode=importlib -o log_cli=true --cov=$(smart site) --cov-report=xml --cov-config=./tests/test_configs/cov/local_cov.cfg --ignore=tests/full_wlm/ ./tests/backends

# Run pytest (dragon subtests)
- name: Run Dragon Pytest
if: (matrix.subset == 'dragon' && matrix.os == 'ubuntu-22.04')
run: |
echo "SMARTSIM_LOG_LEVEL=debug" >> $GITHUB_ENV
dragon -s py.test -s --import-mode=importlib -o log_cli=true --cov=$(smart site) --cov-report=xml --cov-config=./tests/test_configs/cov/local_cov.cfg --ignore=tests/full_wlm/ -m ${{ matrix.subset }} ./tests

# Run pytest (test subsets)
- name: Run Pytest
if: "!contains(matrix.subset, 'backends')" # if not running backend tests
if: (matrix.subset != 'backends' && matrix.subset != 'dragon') # if not running backend tests or dragon tests
run: |
echo "SMARTSIM_LOG_LEVEL=debug" >> $GITHUB_ENV
py.test -s --import-mode=importlib -o log_cli=true --cov=$(smart site) --cov-report=xml --cov-config=./tests/test_configs/cov/local_cov.cfg --ignore=tests/full_wlm/ -m ${{ matrix.subset }} ./tests
7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -164,7 +164,7 @@ tutorials-prod:
# help: test - Run all tests
.PHONY: test
test:
@python -m pytest --ignore=tests/full_wlm/
@python -m pytest --ignore=tests/full_wlm/ --ignore=tests/dragon

# help: test-verbose - Run all tests verbosely
.PHONY: test-verbose
@@ -192,3 +192,8 @@ test-full:
.PHONY: test-wlm
test-wlm:
@python -m pytest -vv tests/full_wlm/ tests/on_wlm

# help: test-dragon - Run dragon-specific tests
.PHONY: test-dragon
test-dragon:
@dragon pytest tests/dragon
3 changes: 2 additions & 1 deletion doc/changelog.md
Original file line number Diff line number Diff line change
@@ -13,10 +13,11 @@ Jump to:

Description

- Add EnvironmentConfigLoader for ML Worker Manager
- Add Model schema with model metadata included
- Removed device from schemas, MessageHandler and tests
- Add ML worker manager, sample worker, and feature store
- Added schemas and MessageHandler class for de/serialization of
- Add schemas and MessageHandler class for de/serialization of
inference requests and response messages


1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -62,6 +62,7 @@ markers = [
"group_a: fast test subset a",
"group_b: fast test subset b",
"slow_tests: tests that take a long duration to complete",
"dragon: tests that must be executed in a dragon runtime",
]

[tool.isort]
18 changes: 11 additions & 7 deletions smartsim/_core/mli/infrastructure/control/workermanager.py
Original file line number Diff line number Diff line change
@@ -32,6 +32,7 @@
from smartsim._core.entrypoints.service import Service
from smartsim._core.mli.comm.channel.channel import CommChannelBase
from smartsim._core.mli.comm.channel.dragonchannel import DragonCommChannel
from smartsim._core.mli.infrastructure.environmentloader import EnvironmentConfigLoader
from smartsim._core.mli.infrastructure.storage.featurestore import FeatureStore
from smartsim._core.mli.infrastructure.worker.worker import (
InferenceReply,
@@ -43,6 +44,8 @@
from smartsim.log import get_logger

if t.TYPE_CHECKING:
from dragon.fli import FLInterface

from smartsim._core.mli.mli_schemas.model.model_capnp import Model
from smartsim._core.mli.mli_schemas.response.response_capnp import StatusEnum

@@ -162,28 +165,29 @@ class WorkerManager(Service):

def __init__(
self,
task_queue: "mp.Queue[bytes]",
config_loader: EnvironmentConfigLoader,
worker: MachineLearningWorkerBase,
feature_store: t.Optional[FeatureStore] = None,
as_service: bool = False,
cooldown: int = 0,
comm_channel_type: t.Type[CommChannelBase] = DragonCommChannel,
) -> None:
"""Initialize the WorkerManager
:param task_queue: The queue to monitor for new tasks
:param config_loader: Environment config loader that loads the task queue and
feature store
:param workers: A worker to manage
:param feature_store: The persistence mechanism
:param as_service: Specifies run-once or run-until-complete behavior of service
:param cooldown: Number of seconds to wait before shutting down afer
:param cooldown: Number of seconds to wait before shutting down after
shutdown criteria are met
:param comm_channel_type: The type of communication channel used for callbacks
"""
super().__init__(as_service, cooldown)

"""a collection of workers the manager is controlling"""
self._task_queue: "mp.Queue[bytes]" = task_queue
self._task_queue: t.Optional["FLInterface"] = config_loader.get_queue()
"""the queue the manager monitors for new tasks"""
self._feature_store: t.Optional[FeatureStore] = feature_store
self._feature_store: t.Optional[FeatureStore] = (
config_loader.get_feature_store()
)
"""a feature store to retrieve models from"""
self._worker = worker
"""The ML Worker implementation"""
61 changes: 61 additions & 0 deletions smartsim/_core/mli/infrastructure/environmentloader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# BSD 2-Clause License
#
# Copyright (c) 2021-2024, Hewlett Packard Enterprise
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import base64
import os
import pickle
import typing as t

from dragon.fli import FLInterface # pylint: disable=all

from smartsim._core.mli.infrastructure.storage.featurestore import FeatureStore


class EnvironmentConfigLoader:
"""
Facilitates the loading of a FeatureStore and Queue
into the WorkerManager.
"""

def __init__(self) -> None:
self._feature_store_descriptor = os.getenv("SSFeatureStore", None)
self._queue_descriptor = os.getenv("SSQueue", None)
self.feature_store: t.Optional[FeatureStore] = None
self.queue: t.Optional["FLInterface"] = None

def get_feature_store(self) -> t.Optional[FeatureStore]:
"""Loads the Feature Store previously set in SSFeatureStore"""
if self._feature_store_descriptor is not None:
self.feature_store = pickle.loads(
base64.b64decode(self._feature_store_descriptor)
)
return self.feature_store

def get_queue(self) -> t.Optional["FLInterface"]:
"""Returns the Queue previously set in SSQueue"""
if self._queue_descriptor is not None:
self.queue = FLInterface.attach(base64.b64decode(self._queue_descriptor))
return self.queue
Original file line number Diff line number Diff line change
@@ -31,7 +31,7 @@
from smartsim.log import get_logger

if t.TYPE_CHECKING:
from dragon.data.distdictionary.dragon_dict import DragonDict
from dragon.data.ddict.ddict import DDict


logger = get_logger(__name__)
@@ -40,7 +40,7 @@
class DragonFeatureStore(FeatureStore):
"""A feature store backed by a dragon distributed dictionary"""

def __init__(self, storage: "DragonDict") -> None:
def __init__(self, storage: "DDict") -> None:
"""Initialize the DragonFeatureStore instance"""
self._storage = storage

Empty file added tests/dragon/__init__.py
Empty file.
152 changes: 152 additions & 0 deletions tests/dragon/test_environment_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# BSD 2-Clause License
#
# Copyright (c) 2021-2024, Hewlett Packard Enterprise
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import base64
import os
import pickle

import pytest

dragon = pytest.importorskip("dragon")

import dragon.utils as du
from dragon.channels import Channel
from dragon.data.ddict.ddict import DDict
from dragon.fli import DragonFLIError, FLInterface

from smartsim._core.mli.infrastructure.environmentloader import EnvironmentConfigLoader
from smartsim._core.mli.infrastructure.storage.dragonfeaturestore import (
DragonFeatureStore,
)

from .utils.featurestore import MemoryFeatureStore

# The tests in this file belong to the dragon group
pytestmark = pytest.mark.dragon


@pytest.mark.parametrize(
"content",
[
pytest.param(b"a"),
pytest.param(b"new byte string"),
],
)
def test_environment_loader_attach_FLI(content, monkeypatch):
"""A descriptor can be stored, loaded, and reattached"""
chan = Channel.make_process_local()
queue = FLInterface(main_ch=chan)
monkeypatch.setenv("SSQueue", du.B64.bytes_to_str(queue.serialize()))

config = EnvironmentConfigLoader()
config_queue = config.get_queue()

new_sender = config_queue.sendh(use_main_as_stream_channel=True)
new_sender.send_bytes(content)

old_recv = queue.recvh(use_main_as_stream_channel=True)
result, _ = old_recv.recv_bytes()
assert result == content


def test_environment_loader_serialize_FLI(monkeypatch):
"""The serialized descriptors of a loaded and unloaded
queue are the same"""
chan = Channel.make_process_local()
queue = FLInterface(main_ch=chan)
monkeypatch.setenv("SSQueue", du.B64.bytes_to_str(queue.serialize()))

config = EnvironmentConfigLoader()
config_queue = config.get_queue()
assert config_queue.serialize() == queue.serialize()


def test_environment_loader_FLI_fails(monkeypatch):
"""An incorrect serialized descriptor will fails to attach"""
monkeypatch.setenv("SSQueue", "randomstring")
config = EnvironmentConfigLoader()

with pytest.raises(DragonFLIError):
config_queue = config.get_queue()


@pytest.mark.parametrize(
"expected_keys, expected_values",
[
pytest.param(["key1", "key2", "key3"], ["value1", "value2", "value3"]),
pytest.param(["another key"], ["another value"]),
],
)
def test_environment_loader_memory_featurestore(
expected_keys, expected_values, monkeypatch
):
"""MemoryFeatureStores can be correctly serialized and deserialized"""
feature_store = MemoryFeatureStore()
key_value_pairs = zip(expected_keys, expected_values)
for k, v in key_value_pairs:
feature_store[k] = v
monkeypatch.setenv(
"SSFeatureStore", base64.b64encode(pickle.dumps(feature_store)).decode("utf-8")
)
config = EnvironmentConfigLoader()
config_feature_store = config.get_feature_store()

for k, _ in key_value_pairs:
assert config_feature_store[k] == feature_store[k]


@pytest.mark.parametrize(
"expected_keys, expected_values",
[
pytest.param(["key1", "key2", "key3"], ["value1", "value2", "value3"]),
pytest.param(["another key"], ["another value"]),
],
)
def test_environment_loader_dragon_featurestore(
expected_keys, expected_values, monkeypatch
):
"""DragonFeatureStores can be correctly serialized and deserialized"""
storage = DDict()
feature_store = DragonFeatureStore(storage)
key_value_pairs = zip(expected_keys, expected_values)
for k, v in key_value_pairs:
feature_store[k] = v
monkeypatch.setenv(
"SSFeatureStore", base64.b64encode(pickle.dumps(feature_store)).decode("utf-8")
)
config = EnvironmentConfigLoader()
config_feature_store = config.get_feature_store()

for k, _ in key_value_pairs:
assert config_feature_store[k] == feature_store[k]


def test_environment_variables_not_set():
"""EnvironmentConfigLoader getters return None when environment
variables are not set"""
config = EnvironmentConfigLoader()
assert config.get_feature_store() == None
assert config.get_queue() == None
Empty file added tests/dragon/utils/__init__.py
Empty file.
Loading