Skip to content

Commit

Permalink
Merge MLI feature branch into v1.0 branch (#754)
Browse files Browse the repository at this point in the history
Create a v1.0 branch to combine ongoing efforts in `mli-feature` and
`smartsim-refactor` feature branches

---------

Co-authored-by: Alyssa Cote <[email protected]>
Co-authored-by: Al Rigazzi <[email protected]>
  • Loading branch information
3 people authored Oct 19, 2024
1 parent 9f40322 commit 8a19dee
Show file tree
Hide file tree
Showing 131 changed files with 18,278 additions and 427 deletions.
27 changes: 25 additions & 2 deletions .github/workflows/run_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
strategy:
fail-fast: false
matrix:
subset: [backends, slow_tests, group_a, group_b]
subset: [backends, slow_tests, group_a, group_b, dragon]
os: [macos-12, macos-14, ubuntu-22.04] # Operating systems
compiler: [8] # GNU compiler version
rai: [1.2.7] # Redis AI versions
Expand Down Expand Up @@ -109,8 +109,24 @@ jobs:
python -m pip install .[dev,mypy]
- name: Install ML Runtimes
if: matrix.subset != 'dragon'
run: smart build --device cpu -v


- name: Install ML Runtimes (with dragon)
if: matrix.subset == 'dragon'
env:
SMARTSIM_DRAGON_TOKEN: ${{ secrets.DRAGON_TOKEN }}
run: |
if [ -n "${SMARTSIM_DRAGON_TOKEN}" ]; then
smart build --device cpu -v --dragon-repo dragonhpc/dragon-nightly --dragon-version 0.10
else
smart build --device cpu -v --dragon
fi
SP=$(python -c 'import site; print(site.getsitepackages()[0])')/smartsim/_core/config/dragon/.env
LLP=$(cat $SP | grep LD_LIBRARY_PATH | awk '{split($0, array, "="); print array[2]}')
echo "LD_LIBRARY_PATH=$LLP:$LD_LIBRARY_PATH" >> $GITHUB_ENV
- name: Run mypy
run: |
make check-mypy
Expand All @@ -134,9 +150,16 @@ jobs:
echo "SMARTSIM_LOG_LEVEL=debug" >> $GITHUB_ENV
py.test -s --import-mode=importlib -o log_cli=true --cov=$(smart site) --cov-report=xml --cov-config=./tests/test_configs/cov/local_cov.cfg --ignore=tests/full_wlm/ ./tests/backends
# Run pytest (dragon subtests)
- name: Run Dragon Pytest
if: (matrix.subset == 'dragon' && matrix.os == 'ubuntu-22.04')
run: |
echo "SMARTSIM_LOG_LEVEL=debug" >> $GITHUB_ENV
dragon -s py.test -s --import-mode=importlib -o log_cli=true --cov=$(smart site) --cov-report=xml --cov-config=./tests/test_configs/cov/local_cov.cfg --ignore=tests/full_wlm/ -m ${{ matrix.subset }} ./tests
# Run pytest (test subsets)
- name: Run Pytest
if: "!contains(matrix.subset, 'backends')" # if not running backend tests
if: (matrix.subset != 'backends' && matrix.subset != 'dragon') # if not running backend tests or dragon tests
run: |
echo "SMARTSIM_LOG_LEVEL=debug" >> $GITHUB_ENV
py.test -s --import-mode=importlib -o log_cli=true --cov=$(smart site) --cov-report=xml --cov-config=./tests/test_configs/cov/local_cov.cfg --ignore=tests/full_wlm/ -m ${{ matrix.subset }} ./tests
Expand Down
13 changes: 9 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -164,22 +164,22 @@ tutorials-prod:
# help: test - Run all tests
.PHONY: test
test:
@python -m pytest --ignore=tests/full_wlm/
@python -m pytest --ignore=tests/full_wlm/ --ignore=tests/dragon

# help: test-verbose - Run all tests verbosely
.PHONY: test-verbose
test-verbose:
@python -m pytest -vv --ignore=tests/full_wlm/
@python -m pytest -vv --ignore=tests/full_wlm/ --ignore=tests/dragon

# help: test-debug - Run all tests with debug output
.PHONY: test-debug
test-debug:
@SMARTSIM_LOG_LEVEL=developer python -m pytest -s -o log_cli=true -vv --ignore=tests/full_wlm/
@SMARTSIM_LOG_LEVEL=developer python -m pytest -s -o log_cli=true -vv --ignore=tests/full_wlm/ --ignore=tests/dragon

# help: test-cov - Run all tests with coverage
.PHONY: test-cov
test-cov:
@python -m pytest -vv --cov=./smartsim --cov-config=${COV_FILE} --ignore=tests/full_wlm/
@python -m pytest -vv --cov=./smartsim --cov-config=${COV_FILE} --ignore=tests/full_wlm/ --ignore=tests/dragon


# help: test-full - Run all WLM tests with Python coverage (full test suite)
Expand All @@ -192,3 +192,8 @@ test-full:
.PHONY: test-wlm
test-wlm:
@python -m pytest -vv tests/full_wlm/ tests/on_wlm

# help: test-dragon - Run dragon-specific tests
.PHONY: test-dragon
test-dragon:
@dragon pytest tests/dragon
2 changes: 1 addition & 1 deletion conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
test_hostlist = None
has_aprun = shutil.which("aprun") is not None


def get_account() -> str:
return test_account

Expand Down Expand Up @@ -227,7 +228,6 @@ def kill_all_test_spawned_processes() -> None:
print("Not all processes were killed after test")



def get_hostlist() -> t.Optional[t.List[str]]:
global test_hostlist
if not test_hostlist:
Expand Down
33 changes: 33 additions & 0 deletions doc/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,39 @@ Jump to:

## SmartSim

### MLI branch

Description

- Implement asynchronous notifications for shared data
- Quick bug fix in _validate
- Add helper methods to MLI classes
- Update error handling for consistency
- Parameterize installation of dragon package with `smart build`
- Update docstrings
- Filenames conform to snake case
- Update SmartSim environment variables using new naming convention
- Refactor `exception_handler`
- Add RequestDispatcher and the possibility of batching inference requests
- Enable hostname selection for dragon tasks
- Remove pydantic dependency from MLI code
- Update MLI environment variables using new naming convention
- Reduce a copy by using torch.from_numpy instead of torch.tensor
- Enable dynamic feature store selection
- Fix dragon package installation bug
- Adjust schemas for better performance
- Add TorchWorker first implementation and mock inference app example
- Add error handling in Worker Manager pipeline
- Add EnvironmentConfigLoader for ML Worker Manager
- Add Model schema with model metadata included
- Removed device from schemas, MessageHandler and tests
- Add ML worker manager, sample worker, and feature store
- Add schemas and MessageHandler class for de/serialization of
inference requests and response messages


### Develop

To be released at some point in the future

Description
Expand Down
14 changes: 14 additions & 0 deletions doc/installation_instructions/basic.rst
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,20 @@ For example, to install dragon alongside the RedisAI CPU backends, you can run
smart build --device cpu --dragon # install Dragon, PT and TF for cpu
``smart build`` supports installing a specific version of dragon. It exposes the
parameters ``--dragon-repo`` and ``--dragon-version``, which can be used alone or
in combination to customize the Dragon installation. For example:

.. code-block:: bash
# using the --dragon-repo and --dragon-version flags to customize the Dragon installation
smart build --device cpu --dragon-repo userfork/dragon # install Dragon from a specific repo
smart build --device cpu --dragon-version 0.10 # install a specific Dragon release
# combining both flags
smart build --device cpu --dragon-repo userfork/dragon --dragon-version 0.91
.. note::
Dragon is only supported on Linux systems. For further information, you
can read :ref:`the dedicated documentation page <dragon>`.
Expand Down
77 changes: 77 additions & 0 deletions ex/high_throughput_inference/mli_driver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import os
import base64
import cloudpickle
import sys
from smartsim import Experiment
from smartsim._core.mli.infrastructure.worker.torch_worker import TorchWorker
from smartsim.status import TERMINAL_STATUSES
from smartsim.settings import DragonRunSettings
import time
import typing as t

DEVICE = "gpu"
NUM_RANKS = 4
NUM_WORKERS = 1
filedir = os.path.dirname(__file__)
worker_manager_script_name = os.path.join(filedir, "standalone_worker_manager.py")
app_script_name = os.path.join(filedir, "mock_app.py")
model_name = os.path.join(filedir, f"resnet50.{DEVICE}.pt")

transport: t.Literal["hsta", "tcp"] = "hsta"

os.environ["SMARTSIM_DRAGON_TRANSPORT"] = transport

exp_path = os.path.join(filedir, f"MLI_proto_{transport.upper()}")
os.makedirs(exp_path, exist_ok=True)
exp = Experiment("MLI_proto", launcher="dragon", exp_path=exp_path)

torch_worker_str = base64.b64encode(cloudpickle.dumps(TorchWorker)).decode("ascii")

worker_manager_rs: DragonRunSettings = exp.create_run_settings(
sys.executable,
[
worker_manager_script_name,
"--device",
DEVICE,
"--worker_class",
torch_worker_str,
"--batch_size",
str(NUM_RANKS//NUM_WORKERS),
"--batch_timeout",
str(0.00),
"--num_workers",
str(NUM_WORKERS)
],
)

aff = []

worker_manager_rs.set_cpu_affinity(aff)

worker_manager = exp.create_model("worker_manager", run_settings=worker_manager_rs)
worker_manager.attach_generator_files(to_copy=[worker_manager_script_name])

app_rs: DragonRunSettings = exp.create_run_settings(
sys.executable,
exe_args=[app_script_name, "--device", DEVICE, "--log_max_batchsize", str(6)],
)
app_rs.set_tasks_per_node(NUM_RANKS)


app = exp.create_model("app", run_settings=app_rs)
app.attach_generator_files(to_copy=[app_script_name], to_symlink=[model_name])

exp.generate(worker_manager, app, overwrite=True)
exp.start(worker_manager, app, block=False)

while True:
if exp.get_status(app)[0] in TERMINAL_STATUSES:
time.sleep(10)
exp.stop(worker_manager)
break
if exp.get_status(worker_manager)[0] in TERMINAL_STATUSES:
time.sleep(10)
exp.stop(app)
break

print("Exiting.")
142 changes: 142 additions & 0 deletions ex/high_throughput_inference/mock_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# BSD 2-Clause License
#
# Copyright (c) 2021-2024, Hewlett Packard Enterprise
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

# isort: off
import dragon
from dragon import fli
from dragon.channels import Channel
import dragon.channels
from dragon.data.ddict.ddict import DDict
from dragon.globalservices.api_setup import connect_to_infrastructure
from dragon.utils import b64decode, b64encode

# isort: on

import argparse
import io

import torch

from smartsim.log import get_logger

torch.set_num_interop_threads(16)
torch.set_num_threads(1)

logger = get_logger("App")
logger.info("Started app")

from collections import OrderedDict

from smartsim.log import get_logger, log_to_file
from smartsim._core.mli.client.protoclient import ProtoClient

logger = get_logger("App")


CHECK_RESULTS_AND_MAKE_ALL_SLOWER = False


class ResNetWrapper:
"""Wrapper around a pre-rained ResNet model."""
def __init__(self, name: str, model: str):
"""Initialize the instance.
:param name: The name to use for the model
:param model: The path to the pre-trained PyTorch model"""
self._model = torch.jit.load(model)
self._name = name
buffer = io.BytesIO()
scripted = torch.jit.trace(self._model, self.get_batch())
torch.jit.save(scripted, buffer)
self._serialized_model = buffer.getvalue()

def get_batch(self, batch_size: int = 32):
"""Create a random batch of data with the correct dimensions to
invoke a ResNet model.
:param batch_size: The desired number of samples to produce
:returns: A PyTorch tensor"""
return torch.randn((batch_size, 3, 224, 224), dtype=torch.float32)

@property
def model(self) -> bytes:
"""The content of a model file.
:returns: The model bytes"""
return self._serialized_model

@property
def name(self) -> str:
"""The name applied to the model.
:returns: The name"""
return self._name


if __name__ == "__main__":

parser = argparse.ArgumentParser("Mock application")
parser.add_argument("--device", default="cpu", type=str)
parser.add_argument("--log_max_batchsize", default=8, type=int)
args = parser.parse_args()

resnet = ResNetWrapper("resnet50", f"resnet50.{args.device}.pt")

client = ProtoClient(timing_on=True)
client.set_model(resnet.name, resnet.model)

if CHECK_RESULTS_AND_MAKE_ALL_SLOWER:
# TODO: adapt to non-Nvidia devices
torch_device = args.device.replace("gpu", "cuda")
pt_model = torch.jit.load(io.BytesIO(initial_bytes=(resnet.model))).to(
torch_device
)

TOTAL_ITERATIONS = 100

for log2_bsize in range(args.log_max_batchsize + 1):
b_size: int = 2**log2_bsize
logger.info(f"Batch size: {b_size}")
for iteration_number in range(TOTAL_ITERATIONS + int(b_size == 1)):
logger.info(f"Iteration: {iteration_number}")
sample_batch = resnet.get_batch(b_size)
remote_result = client.run_model(resnet.name, sample_batch)
logger.info(client.perf_timer.get_last("total_time"))
if CHECK_RESULTS_AND_MAKE_ALL_SLOWER:
local_res = pt_model(sample_batch.to(torch_device))
err_norm = torch.linalg.vector_norm(
torch.flatten(remote_result).to(torch_device)
- torch.flatten(local_res),
ord=1,
).cpu()
res_norm = torch.linalg.vector_norm(remote_result, ord=1).item()
local_res_norm = torch.linalg.vector_norm(local_res, ord=1).item()
logger.info(
f"Avg norm of error {err_norm.item()/b_size} compared to result norm of {res_norm/b_size}:{local_res_norm/b_size}"
)
torch.cuda.synchronize()

client.perf_timer.print_timings(to_file=True)
Loading

0 comments on commit 8a19dee

Please sign in to comment.