diff --git a/.github/workflows/run_tests.yml b/.github/workflows/run_tests.yml index a7f7be7f5..c0d2c43f3 100644 --- a/.github/workflows/run_tests.yml +++ b/.github/workflows/run_tests.yml @@ -135,9 +135,9 @@ jobs: LLP=$(cat $SP | grep LD_LIBRARY_PATH | awk '{split($0, array, "="); print array[2]}') echo "LD_LIBRARY_PATH=$LLP:$LD_LIBRARY_PATH" >> $GITHUB_ENV - # - name: Run mypy - # run: | - # make check-mypy + - name: Run mypy + run: | + make check-mypy # TODO: Re-enable static analysis once API is firmed up # - name: Run Pylint diff --git a/smartsim/_core/utils/__init__.py b/smartsim/_core/utils/__init__.py index 2954d4561..4159c9042 100644 --- a/smartsim/_core/utils/__init__.py +++ b/smartsim/_core/utils/__init__.py @@ -30,6 +30,5 @@ delete_elements, execute_platform_cmd, expand_exe_path, - installed_redisai_backends, is_crayex_platform, ) diff --git a/smartsim/_core/utils/helpers.py b/smartsim/_core/utils/helpers.py index 79a9b4f31..265205bef 100644 --- a/smartsim/_core/utils/helpers.py +++ b/smartsim/_core/utils/helpers.py @@ -41,10 +41,8 @@ import uuid import warnings from datetime import datetime -from pathlib import Path from shutil import which -from deprecated import deprecated from typing_extensions import TypeAlias if t.TYPE_CHECKING: @@ -269,57 +267,6 @@ def cat_arg_and_value(arg_name: str, value: str) -> str: return f"--{arg_name}={value}" -@deprecated("Remove after completing fixes in MLI tests post-merge of refactor") -def _installed(base_path: Path, backend: str) -> bool: - """ - Check if a backend is available for the RedisAI module. - """ - backend_key = f"redisai_{backend}" - backend_path = base_path / backend_key / f"{backend_key}.so" - backend_so = Path(os.environ.get("SMARTSIM_RAI_LIB", backend_path)).resolve() - - return backend_so.is_file() - - -@deprecated("Remove after completing fixes in MLI tests post-merge of refactor") -def redis_install_base(backends_path: t.Optional[str] = None) -> Path: - # pylint: disable-next=import-outside-toplevel - from ..._core.config import CONFIG - - base_path: Path = ( - Path(backends_path) if backends_path else CONFIG.lib_path / "backends" - ) - return base_path - - -@deprecated("Remove after completing fixes in MLI tests post-merge of refactor") -def installed_redisai_backends( - backends_path: t.Optional[str] = None, -) -> t.Set[_TRedisAIBackendStr]: - """Check which ML backends are available for the RedisAI module. - - The optional argument ``backends_path`` is needed if the backends - have not been built as part of the SmartSim building process (i.e. - they have not been built by invoking `smart build`). In that case - ``backends_path`` should point to the directory containing e.g. - the backend directories (`redisai_tensorflow`, `redisai_torch`, - `redisai_onnxruntime`, or `redisai_tflite`). - - :param backends_path: path containing backends - :return: list of installed RedisAI backends - """ - # import here to avoid circular import - base_path = redis_install_base(backends_path) - backends: t.Set[_TRedisAIBackendStr] = { - "tensorflow", - "torch", - "onnxruntime", - } - - installed = {backend for backend in backends if _installed(base_path, backend)} - return installed - - def get_ts_ms() -> int: """Return the current timestamp (accurate to milliseconds) cast to an integer""" return int(datetime.now().timestamp() * 1000) diff --git a/smartsim/entity/ensemble.py b/smartsim/entity/ensemble.py deleted file mode 100644 index 965b10db7..000000000 --- a/smartsim/entity/ensemble.py +++ /dev/null @@ -1,573 +0,0 @@ -# BSD 2-Clause License -# -# Copyright (c) 2021-2024, Hewlett Packard Enterprise -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. -# -# 2. Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation -# and/or other materials provided with the distribution. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import os.path as osp -import typing as t -from copy import deepcopy -from os import getcwd - -from tabulate import tabulate - -from smartsim._core.types import Device - -from ..error import ( - EntityExistsError, - SmartSimError, - SSUnsupportedError, - UserStrategyError, -) -from ..log import get_logger -from ..settings.base import BatchSettings, RunSettings -from .dbobject import DBModel, DBScript -from .entity import SmartSimEntity -from .entityList import EntityList -from .model import Model -from .strategies import create_all_permutations, random_permutations, step_values - -logger = get_logger(__name__) - -StrategyFunction = t.Callable[ - [t.List[str], t.List[t.List[str]], int], t.List[t.Dict[str, str]] -] - - -class Ensemble(EntityList[Model]): - """``Ensemble`` is a group of ``Model`` instances that can - be treated as a reference to a single instance. - """ - - def __init__( - self, - name: str, - params: t.Dict[str, t.Any], - path: t.Optional[str] = getcwd(), - params_as_args: t.Optional[t.List[str]] = None, - batch_settings: t.Optional[BatchSettings] = None, - run_settings: t.Optional[RunSettings] = None, - perm_strat: str = "all_perm", - **kwargs: t.Any, - ) -> None: - """Initialize an Ensemble of Model instances. - - The kwargs argument can be used to pass custom input - parameters to the permutation strategy. - - :param name: name of the ensemble - :param params: parameters to expand into ``Model`` members - :param params_as_args: list of params that should be used as command - line arguments to the ``Model`` member executables and not written - to generator files - :param batch_settings: describes settings for ``Ensemble`` as batch workload - :param run_settings: describes how each ``Model`` should be executed - :param replicas: number of ``Model`` replicas to create - a keyword - argument of kwargs - :param perm_strategy: strategy for expanding ``params`` into - ``Model`` instances from params argument - options are "all_perm", "step", "random" - or a callable function. - :return: ``Ensemble`` instance - """ - self.params = params or {} - self.params_as_args = params_as_args or [] - self._key_prefixing_enabled = True - self.batch_settings = batch_settings - self.run_settings = run_settings - self.replicas: str - - super().__init__(name, str(path), perm_strat=perm_strat, **kwargs) - - @property - def models(self) -> t.Collection[Model]: - """An alias for a shallow copy of the ``entities`` attribute""" - return list(self.entities) - - def _initialize_entities(self, **kwargs: t.Any) -> None: - """Initialize all the models within the ensemble based - on the parameters passed to the ensemble and the permutation - strategy given at init. - - :raises UserStrategyError: if user generation strategy fails - """ - strategy = self._set_strategy(kwargs.pop("perm_strat")) - replicas = kwargs.pop("replicas", None) - self.replicas = replicas - - # if a ensemble has parameters and run settings, create - # the ensemble and assign run_settings to each member - if self.params: - if self.run_settings: - param_names, params = self._read_model_parameters() - - # Compute all combinations of model parameters and arguments - n_models = kwargs.get("n_models", 0) - all_model_params = strategy(param_names, params, n_models) - if not isinstance(all_model_params, list): - raise UserStrategyError(strategy) - - for i, param_set in enumerate(all_model_params): - if not isinstance(param_set, dict): - raise UserStrategyError(strategy) - run_settings = deepcopy(self.run_settings) - model_name = "_".join((self.name, str(i))) - model = Model( - name=model_name, - params=param_set, - path=osp.join(self.path, model_name), - run_settings=run_settings, - params_as_args=self.params_as_args, - ) - model.enable_key_prefixing() - model.params_to_args() - logger.debug( - f"Created ensemble member: {model_name} in {self.name}" - ) - self.add_model(model) - # cannot generate models without run settings - else: - raise SmartSimError( - "Ensembles without 'params' or 'replicas' argument to " - "expand into members cannot be given run settings" - ) - else: - if self.run_settings: - if replicas: - for i in range(replicas): - model_name = "_".join((self.name, str(i))) - model = Model( - name=model_name, - params={}, - path=osp.join(self.path, model_name), - run_settings=deepcopy(self.run_settings), - ) - model.enable_key_prefixing() - logger.debug( - f"Created ensemble member: {model_name} in {self.name}" - ) - self.add_model(model) - else: - raise SmartSimError( - "Ensembles without 'params' or 'replicas' argument to " - "expand into members cannot be given run settings" - ) - # if no params, no run settings and no batch settings, error because we - # don't know how to run the ensemble - elif not self.batch_settings: - raise SmartSimError( - "Ensemble must be provided batch settings or run settings" - ) - else: - logger.info("Empty ensemble created for batch launch") - - def add_model(self, model: Model) -> None: - """Add a model to this ensemble - - :param model: model instance to be added - :raises TypeError: if model is not an instance of ``Model`` - :raises EntityExistsError: if model already exists in this ensemble - """ - if not isinstance(model, Model): - raise TypeError( - f"Argument to add_model was of type {type(model)}, not Model" - ) - # "in" operator uses model name for __eq__ - if model in self.entities: - raise EntityExistsError( - f"Model {model.name} already exists in ensemble {self.name}" - ) - - if self._db_models: - self._extend_entity_db_models(model, self._db_models) - if self._db_scripts: - self._extend_entity_db_scripts(model, self._db_scripts) - - self.entities.append(model) - - def register_incoming_entity(self, incoming_entity: SmartSimEntity) -> None: - """Register future communication between entities. - - Registers the named data sources that this entity - has access to by storing the key_prefix associated - with that entity - - Only python clients can have multiple incoming connections - - :param incoming_entity: The entity that data will be received from - """ - for model in self.models: - model.register_incoming_entity(incoming_entity) - - def enable_key_prefixing(self) -> None: - """If called, each model within this ensemble will prefix its key with its - own model name. - """ - for model in self.models: - model.enable_key_prefixing() - - def query_key_prefixing(self) -> bool: - """Inquire as to whether each model within the ensemble will prefix their keys - - :returns: True if all models have key prefixing enabled, False otherwise - """ - return all(model.query_key_prefixing() for model in self.models) - - def attach_generator_files( - self, - to_copy: t.Optional[t.List[str]] = None, - to_symlink: t.Optional[t.List[str]] = None, - to_configure: t.Optional[t.List[str]] = None, - ) -> None: - """Attach files to each model within the ensemble for generation - - Attach files needed for the entity that, upon generation, - will be located in the path of the entity. - - During generation, files "to_copy" are copied into - the path of the entity, and files "to_symlink" are - symlinked into the path of the entity. - - Files "to_configure" are text based model input files where - parameters for the model are set. Note that only models - support the "to_configure" field. These files must have - fields tagged that correspond to the values the user - would like to change. The tag is settable but defaults - to a semicolon e.g. THERMO = ;10; - - :param to_copy: files to copy - :param to_symlink: files to symlink - :param to_configure: input files with tagged parameters - """ - for model in self.models: - model.attach_generator_files( - to_copy=to_copy, to_symlink=to_symlink, to_configure=to_configure - ) - - @property - def attached_files_table(self) -> str: - """Return a plain-text table with information about files - attached to models belonging to this ensemble. - - :returns: A table of all files attached to all models - """ - if not self.models: - return "The ensemble is empty, no files to show." - - table = tabulate( - [[model.name, model.attached_files_table] for model in self.models], - headers=["Model name", "Files"], - tablefmt="grid", - ) - - return table - - def print_attached_files(self) -> None: - """Print table of attached files to std out""" - print(self.attached_files_table) - - @staticmethod - def _set_strategy(strategy: str) -> StrategyFunction: - """Set the permutation strategy for generating models within - the ensemble - - :param strategy: name of the strategy or callable function - :raises SSUnsupportedError: if str name is not supported - :return: strategy function - """ - if strategy == "all_perm": - return create_all_permutations - if strategy == "step": - return step_values - if strategy == "random": - return random_permutations - if callable(strategy): - return strategy - raise SSUnsupportedError( - f"Permutation strategy given is not supported: {strategy}" - ) - - def _read_model_parameters(self) -> t.Tuple[t.List[str], t.List[t.List[str]]]: - """Take in the parameters given to the ensemble and prepare to - create models for the ensemble - - :raises TypeError: if params are of the wrong type - :return: param names and values for permutation strategy - """ - - if not isinstance(self.params, dict): - raise TypeError( - "Ensemble initialization argument 'params' must be of type dict" - ) - - param_names: t.List[str] = [] - parameters: t.List[t.List[str]] = [] - for name, val in self.params.items(): - param_names.append(name) - - if isinstance(val, list): - val = [str(v) for v in val] - parameters.append(val) - elif isinstance(val, (int, str)): - parameters.append([str(val)]) - else: - raise TypeError( - "Incorrect type for ensemble parameters\n" - + "Must be list, int, or string." - ) - return param_names, parameters - - def add_ml_model( - self, - name: str, - backend: str, - model: t.Optional[bytes] = None, - model_path: t.Optional[str] = None, - device: str = Device.CPU.value.upper(), - devices_per_node: int = 1, - first_device: int = 0, - batch_size: int = 0, - min_batch_size: int = 0, - min_batch_timeout: int = 0, - tag: str = "", - inputs: t.Optional[t.List[str]] = None, - outputs: t.Optional[t.List[str]] = None, - ) -> None: - """A TF, TF-lite, PT, or ONNX model to load into the DB at runtime - - Each ML Model added will be loaded into an - orchestrator (converged or not) prior to the execution - of every entity belonging to this ensemble - - One of either model (in memory representation) or model_path (file) - must be provided - - :param name: key to store model under - :param model: model in memory - :param model_path: serialized model - :param backend: name of the backend (TORCH, TF, TFLITE, ONNX) - :param device: name of device for execution - :param devices_per_node: number of GPUs per node in multiGPU nodes - :param first_device: first device in multi-GPU nodes to use for execution, - defaults to 0; ignored if devices_per_node is 1 - :param batch_size: batch size for execution - :param min_batch_size: minimum batch size for model execution - :param min_batch_timeout: time to wait for minimum batch size - :param tag: additional tag for model information - :param inputs: model inputs (TF only) - :param outputs: model outupts (TF only) - """ - db_model = DBModel( - name=name, - backend=backend, - model=model, - model_file=model_path, - device=device, - devices_per_node=devices_per_node, - first_device=first_device, - batch_size=batch_size, - min_batch_size=min_batch_size, - min_batch_timeout=min_batch_timeout, - tag=tag, - inputs=inputs, - outputs=outputs, - ) - dupe = next( - ( - db_model.name - for ensemble_ml_model in self._db_models - if ensemble_ml_model.name == db_model.name - ), - None, - ) - if dupe: - raise SSUnsupportedError( - f'An ML Model with name "{db_model.name}" already exists' - ) - self._db_models.append(db_model) - for entity in self.models: - self._extend_entity_db_models(entity, [db_model]) - - def add_script( - self, - name: str, - script: t.Optional[str] = None, - script_path: t.Optional[str] = None, - device: str = Device.CPU.value.upper(), - devices_per_node: int = 1, - first_device: int = 0, - ) -> None: - """TorchScript to launch with every entity belonging to this ensemble - - Each script added to the model will be loaded into an - orchestrator (converged or not) prior to the execution - of every entity belonging to this ensemble - - Device selection is either "GPU" or "CPU". If many devices are - present, a number can be passed for specification e.g. "GPU:1". - - Setting ``devices_per_node=N``, with N greater than one will result - in the model being stored in the first N devices of type ``device``. - - One of either script (in memory string representation) or script_path (file) - must be provided - - :param name: key to store script under - :param script: TorchScript code - :param script_path: path to TorchScript code - :param device: device for script execution - :param devices_per_node: number of devices on each host - :param first_device: first device to use on each host - """ - db_script = DBScript( - name=name, - script=script, - script_path=script_path, - device=device, - devices_per_node=devices_per_node, - first_device=first_device, - ) - dupe = next( - ( - db_script.name - for ensemble_script in self._db_scripts - if ensemble_script.name == db_script.name - ), - None, - ) - if dupe: - raise SSUnsupportedError( - f'A Script with name "{db_script.name}" already exists' - ) - self._db_scripts.append(db_script) - for entity in self.models: - self._extend_entity_db_scripts(entity, [db_script]) - - def add_function( - self, - name: str, - function: t.Optional[str] = None, - device: str = Device.CPU.value.upper(), - devices_per_node: int = 1, - first_device: int = 0, - ) -> None: - """TorchScript function to launch with every entity belonging to this ensemble - - Each script function to the model will be loaded into a - non-converged orchestrator prior to the execution - of every entity belonging to this ensemble. - - For converged orchestrators, the :meth:`add_script` method should be used. - - Device selection is either "GPU" or "CPU". If many devices are - present, a number can be passed for specification e.g. "GPU:1". - - Setting ``devices_per_node=N``, with N greater than one will result - in the script being stored in the first N devices of type ``device``; - alternatively, setting ``first_device=M`` will result in the script - being stored on nodes M through M + N - 1. - - :param name: key to store function under - :param function: TorchScript code - :param device: device for script execution - :param devices_per_node: number of devices on each host - :param first_device: first device to use on each host - """ - db_script = DBScript( - name=name, - script=function, - device=device, - devices_per_node=devices_per_node, - first_device=first_device, - ) - dupe = next( - ( - db_script.name - for ensemble_script in self._db_scripts - if ensemble_script.name == db_script.name - ), - None, - ) - if dupe: - raise SSUnsupportedError( - f'A Script with name "{db_script.name}" already exists' - ) - self._db_scripts.append(db_script) - for entity in self.models: - self._extend_entity_db_scripts(entity, [db_script]) - - @staticmethod - def _extend_entity_db_models(model: Model, db_models: t.List[DBModel]) -> None: - """ - Ensures that the Machine Learning model names being added to the Ensemble - are unique. - - This static method checks if the provided ML model names already exist in - the Ensemble. An SSUnsupportedError is raised if any duplicate names are - found. Otherwise, it appends the given list of DBModels to the Ensemble. - - :param model: SmartSim Model object. - :param db_models: List of DBModels to append to the Ensemble. - """ - for add_ml_model in db_models: - dupe = next( - ( - db_model.name - for db_model in model.db_models - if db_model.name == add_ml_model.name - ), - None, - ) - if dupe: - raise SSUnsupportedError( - f'An ML Model with name "{add_ml_model.name}" already exists' - ) - model.add_ml_model_object(add_ml_model) - - @staticmethod - def _extend_entity_db_scripts(model: Model, db_scripts: t.List[DBScript]) -> None: - """ - Ensures that the script/function names being added to the Ensemble are unique. - - This static method checks if the provided script/function names already exist - in the Ensemble. An SSUnsupportedError is raised if any duplicate names - are found. Otherwise, it appends the given list of DBScripts to the - Ensemble. - - :param model: SmartSim Model object. - :param db_scripts: List of DBScripts to append to the Ensemble. - """ - for add_script in db_scripts: - dupe = next( - ( - add_script.name - for db_script in model.db_scripts - if db_script.name == add_script.name - ), - None, - ) - if dupe: - raise SSUnsupportedError( - f'A Script with name "{add_script.name}" already exists' - ) - model.add_script_object(add_script) diff --git a/smartsim/entity/model.py b/smartsim/entity/model.py deleted file mode 100644 index 3e8baad5c..000000000 --- a/smartsim/entity/model.py +++ /dev/null @@ -1,701 +0,0 @@ -# BSD 2-Clause License -# -# Copyright (c) 2021-2024, Hewlett Packard Enterprise -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. -# -# 2. Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation -# and/or other materials provided with the distribution. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -from __future__ import annotations - -import itertools -import numbers -import re -import sys -import typing as t -import warnings -from os import getcwd -from os import path as osp - -from smartsim._core.types import Device - -from .._core.utils.helpers import cat_arg_and_value -from ..error import EntityExistsError, SSUnsupportedError -from ..log import get_logger -from ..settings.base import BatchSettings, RunSettings -from .dbobject import DBModel, DBScript -from .entity import SmartSimEntity -from .files import EntityFiles - -logger = get_logger(__name__) - - -def _parse_model_parameters(params_dict: t.Dict[str, t.Any]) -> t.Dict[str, str]: - """Convert the values in a params dict to strings - :raises TypeError: if params are of the wrong type - :return: param dictionary with values and keys cast as strings - """ - param_names: t.List[str] = [] - parameters: t.List[str] = [] - for name, val in params_dict.items(): - param_names.append(name) - if isinstance(val, (str, numbers.Number)): - parameters.append(str(val)) - else: - raise TypeError( - "Incorrect type for model parameters\n" - + "Must be numeric value or string." - ) - return dict(zip(param_names, parameters)) - - -class Model(SmartSimEntity): - def __init__( - self, - name: str, - params: t.Dict[str, str], - run_settings: RunSettings, - path: t.Optional[str] = getcwd(), - params_as_args: t.Optional[t.List[str]] = None, - batch_settings: t.Optional[BatchSettings] = None, - ): - """Initialize a ``Model`` - - :param name: name of the model - :param params: model parameters for writing into configuration files or - to be passed as command line arguments to executable. - :param path: path to output, error, and configuration files - :param run_settings: launcher settings specified in the experiment - :param params_as_args: list of parameters which have to be - interpreted as command line arguments to - be added to run_settings - :param batch_settings: Launcher settings for running the individual - model as a batch job - """ - super().__init__(name, str(path), run_settings) - self.params = _parse_model_parameters(params) - self.params_as_args = params_as_args - self.incoming_entities: t.List[SmartSimEntity] = [] - self._key_prefixing_enabled = False - self.batch_settings = batch_settings - self._db_models: t.List[DBModel] = [] - self._db_scripts: t.List[DBScript] = [] - self.files: t.Optional[EntityFiles] = None - - @property - def db_models(self) -> t.Iterable[DBModel]: - """Retrieve an immutable collection of attached models - - :return: Return an immutable collection of attached models - """ - return (model for model in self._db_models) - - @property - def db_scripts(self) -> t.Iterable[DBScript]: - """Retrieve an immutable collection attached of scripts - - :return: Return an immutable collection of attached scripts - """ - return (script for script in self._db_scripts) - - @property - def colocated(self) -> bool: - """Return True if this Model will run with a colocated Orchestrator - - :return: Return True of the Model will run with a colocated Orchestrator - """ - return bool(self.run_settings.colocated_db_settings) - - def register_incoming_entity(self, incoming_entity: SmartSimEntity) -> None: - """Register future communication between entities. - - Registers the named data sources that this entity - has access to by storing the key_prefix associated - with that entity - - :param incoming_entity: The entity that data will be received from - :raises SmartSimError: if incoming entity has already been registered - """ - if incoming_entity.name in [ - in_entity.name for in_entity in self.incoming_entities - ]: - raise EntityExistsError( - f"'{incoming_entity.name}' has already " - + "been registered as an incoming entity" - ) - - self.incoming_entities.append(incoming_entity) - - def enable_key_prefixing(self) -> None: - """If called, the entity will prefix its keys with its own model name""" - self._key_prefixing_enabled = True - - def disable_key_prefixing(self) -> None: - """If called, the entity will not prefix its keys with its own model name""" - self._key_prefixing_enabled = False - - def query_key_prefixing(self) -> bool: - """Inquire as to whether this entity will prefix its keys with its name - - :return: Return True if entity will prefix its keys with its name - """ - return self._key_prefixing_enabled - - def attach_generator_files( - self, - to_copy: t.Optional[t.List[str]] = None, - to_symlink: t.Optional[t.List[str]] = None, - to_configure: t.Optional[t.List[str]] = None, - ) -> None: - """Attach files to an entity for generation - - Attach files needed for the entity that, upon generation, - will be located in the path of the entity. Invoking this method - after files have already been attached will overwrite - the previous list of entity files. - - During generation, files "to_copy" are copied into - the path of the entity, and files "to_symlink" are - symlinked into the path of the entity. - - Files "to_configure" are text based model input files where - parameters for the model are set. Note that only models - support the "to_configure" field. These files must have - fields tagged that correspond to the values the user - would like to change. The tag is settable but defaults - to a semicolon e.g. THERMO = ;10; - - :param to_copy: files to copy - :param to_symlink: files to symlink - :param to_configure: input files with tagged parameters - """ - to_copy = to_copy or [] - to_symlink = to_symlink or [] - to_configure = to_configure or [] - - # Check that no file collides with the parameter file written - # by Generator. We check the basename, even though it is more - # restrictive than what we need (but it avoids relative path issues) - for strategy in [to_copy, to_symlink, to_configure]: - if strategy is not None and any( - osp.basename(filename) == "smartsim_params.txt" for filename in strategy - ): - raise ValueError( - "`smartsim_params.txt` is a file automatically " - + "generated by SmartSim and cannot be ovewritten." - ) - - self.files = EntityFiles(to_configure, to_copy, to_symlink) - - @property - def attached_files_table(self) -> str: - """Return a list of attached files as a plain text table - - :returns: String version of table - """ - if not self.files: - return "No file attached to this model." - return str(self.files) - - def print_attached_files(self) -> None: - """Print a table of the attached files on std out""" - print(self.attached_files_table) - - def colocate_db(self, *args: t.Any, **kwargs: t.Any) -> None: - """An alias for ``Model.colocate_db_tcp``""" - warnings.warn( - ( - "`colocate_db` has been deprecated and will be removed in a \n" - "future release. Please use `colocate_db_tcp` or `colocate_db_uds`." - ), - FutureWarning, - ) - self.colocate_db_tcp(*args, **kwargs) - - def colocate_db_uds( - self, - unix_socket: str = "/tmp/redis.socket", - socket_permissions: int = 755, - db_cpus: int = 1, - custom_pinning: t.Optional[t.Iterable[t.Union[int, t.Iterable[int]]]] = None, - debug: bool = False, - db_identifier: str = "", - **kwargs: t.Any, - ) -> None: - """Colocate an Orchestrator instance with this Model over UDS. - - This method will initialize settings which add an unsharded - database to this Model instance. Only this Model will be able to communicate - with this colocated database by using Unix Domain sockets. - - Extra parameters for the db can be passed through kwargs. This includes - many performance, caching and inference settings. - - .. highlight:: python - .. code-block:: python - - example_kwargs = { - "maxclients": 100000, - "threads_per_queue": 1, - "inter_op_threads": 1, - "intra_op_threads": 1, - "server_threads": 2 # keydb only - } - - Generally these don't need to be changed. - - :param unix_socket: path to where the socket file will be created - :param socket_permissions: permissions for the socketfile - :param db_cpus: number of cpus to use for orchestrator - :param custom_pinning: CPUs to pin the orchestrator to. Passing an empty - iterable disables pinning - :param debug: launch Model with extra debug information about the colocated db - :param kwargs: additional keyword arguments to pass to the orchestrator database - """ - - if not re.match(r"^[a-zA-Z0-9.:\,_\-/]*$", unix_socket): - raise ValueError( - f"Invalid name for unix socket: {unix_socket}. Must only " - "contain alphanumeric characters or . : _ - /" - ) - uds_options: t.Dict[str, t.Union[int, str]] = { - "unix_socket": unix_socket, - "socket_permissions": socket_permissions, - # This is hardcoded to 0 as recommended by redis for UDS - "port": 0, - } - - common_options = { - "cpus": db_cpus, - "custom_pinning": custom_pinning, - "debug": debug, - "db_identifier": db_identifier, - } - self._set_colocated_db_settings(uds_options, common_options, **kwargs) - - def colocate_db_tcp( - self, - port: int = 6379, - ifname: t.Union[str, list[str]] = "lo", - db_cpus: int = 1, - custom_pinning: t.Optional[t.Iterable[t.Union[int, t.Iterable[int]]]] = None, - debug: bool = False, - db_identifier: str = "", - **kwargs: t.Any, - ) -> None: - """Colocate an Orchestrator instance with this Model over TCP/IP. - - This method will initialize settings which add an unsharded - database to this Model instance. Only this Model will be able to communicate - with this colocated database by using the loopback TCP interface. - - Extra parameters for the db can be passed through kwargs. This includes - many performance, caching and inference settings. - - .. highlight:: python - .. code-block:: python - - ex. kwargs = { - maxclients: 100000, - threads_per_queue: 1, - inter_op_threads: 1, - intra_op_threads: 1, - server_threads: 2 # keydb only - } - - Generally these don't need to be changed. - - :param port: port to use for orchestrator database - :param ifname: interface to use for orchestrator - :param db_cpus: number of cpus to use for orchestrator - :param custom_pinning: CPUs to pin the orchestrator to. Passing an empty - iterable disables pinning - :param debug: launch Model with extra debug information about the colocated db - :param kwargs: additional keyword arguments to pass to the orchestrator database - """ - - tcp_options = {"port": port, "ifname": ifname} - common_options = { - "cpus": db_cpus, - "custom_pinning": custom_pinning, - "debug": debug, - "db_identifier": db_identifier, - } - self._set_colocated_db_settings(tcp_options, common_options, **kwargs) - - def _set_colocated_db_settings( - self, - connection_options: t.Mapping[str, t.Union[int, t.List[str], str]], - common_options: t.Dict[ - str, - t.Union[ - t.Union[t.Iterable[t.Union[int, t.Iterable[int]]], None], - bool, - int, - str, - None, - ], - ], - **kwargs: t.Union[int, None], - ) -> None: - """ - Ingest the connection-specific options (UDS/TCP) and set the final settings - for the colocated database - """ - - if hasattr(self.run_settings, "mpmd") and len(self.run_settings.mpmd) > 0: - raise SSUnsupportedError( - "Models colocated with databases cannot be run as a mpmd workload" - ) - - if hasattr(self.run_settings, "_prep_colocated_db"): - # pylint: disable-next=protected-access - self.run_settings._prep_colocated_db(common_options["cpus"]) - - if "limit_app_cpus" in kwargs: - raise SSUnsupportedError( - "Pinning app CPUs via limit_app_cpus is not supported. Modify " - "RunSettings using the correct binding option for your launcher." - ) - - # TODO list which db settings can be extras - custom_pinning_ = t.cast( - t.Optional[t.Iterable[t.Union[int, t.Iterable[int]]]], - common_options.get("custom_pinning"), - ) - cpus_ = t.cast(int, common_options.get("cpus")) - common_options["custom_pinning"] = self._create_pinning_string( - custom_pinning_, cpus_ - ) - - colo_db_config: t.Dict[ - str, - t.Union[ - bool, - int, - str, - None, - t.List[str], - t.Iterable[t.Union[int, t.Iterable[int]]], - t.List[DBModel], - t.List[DBScript], - t.Dict[str, t.Union[int, None]], - t.Dict[str, str], - ], - ] = {} - colo_db_config.update(connection_options) - colo_db_config.update(common_options) - - redis_ai_temp = { - "threads_per_queue": kwargs.get("threads_per_queue", None), - "inter_op_parallelism": kwargs.get("inter_op_parallelism", None), - "intra_op_parallelism": kwargs.get("intra_op_parallelism", None), - } - # redisai arguments for inference settings - colo_db_config["rai_args"] = redis_ai_temp - colo_db_config["extra_db_args"] = { - k: str(v) for k, v in kwargs.items() if k not in redis_ai_temp - } - - self._check_db_objects_colo() - colo_db_config["db_models"] = self._db_models - colo_db_config["db_scripts"] = self._db_scripts - - self.run_settings.colocated_db_settings = colo_db_config - - @staticmethod - def _create_pinning_string( - pin_ids: t.Optional[t.Iterable[t.Union[int, t.Iterable[int]]]], cpus: int - ) -> t.Optional[str]: - """Create a comma-separated string of CPU ids. By default, ``None`` - returns 0,1,...,cpus-1; an empty iterable will disable pinning - altogether, and an iterable constructs a comma separated string of - integers (e.g. ``[0, 2, 5]`` -> ``"0,2,5"``) - """ - - def _stringify_id(_id: int) -> str: - """Return the cPU id as a string if an int, otherwise raise a ValueError""" - if isinstance(_id, int): - if _id < 0: - raise ValueError("CPU id must be a nonnegative number") - return str(_id) - - raise TypeError(f"Argument is of type '{type(_id)}' not 'int'") - - try: - pin_ids = tuple(pin_ids) if pin_ids is not None else None - except TypeError: - raise TypeError( - "Expected a cpu pinning specification of type iterable of ints or " - f"iterables of ints. Instead got type `{type(pin_ids)}`" - ) from None - - # Deal with MacOSX limitations first. The "None" (default) disables pinning - # and is equivalent to []. The only invalid option is a non-empty pinning - if sys.platform == "darwin": - if pin_ids: - warnings.warn( - "CPU pinning is not supported on MacOSX. Ignoring pinning " - "specification.", - RuntimeWarning, - ) - return None - - # Flatten the iterable into a list and check to make sure that the resulting - # elements are all ints - if pin_ids is None: - return ",".join(_stringify_id(i) for i in range(cpus)) - if not pin_ids: - return None - pin_ids = ((x,) if isinstance(x, int) else x for x in pin_ids) - to_fmt = itertools.chain.from_iterable(pin_ids) - return ",".join(sorted({_stringify_id(x) for x in to_fmt})) - - def params_to_args(self) -> None: - """Convert parameters to command line arguments and update run settings.""" - if self.params_as_args is not None: - for param in self.params_as_args: - if not param in self.params: - raise ValueError( - f"Tried to convert {param} to command line argument for Model " - f"{self.name}, but its value was not found in model params" - ) - if self.run_settings is None: - raise ValueError( - "Tried to configure command line parameter for Model " - f"{self.name}, but no RunSettings are set." - ) - self.run_settings.add_exe_args( - cat_arg_and_value(param, self.params[param]) - ) - - def add_ml_model( - self, - name: str, - backend: str, - model: t.Optional[bytes] = None, - model_path: t.Optional[str] = None, - device: str = Device.CPU.value.upper(), - devices_per_node: int = 1, - first_device: int = 0, - batch_size: int = 0, - min_batch_size: int = 0, - min_batch_timeout: int = 0, - tag: str = "", - inputs: t.Optional[t.List[str]] = None, - outputs: t.Optional[t.List[str]] = None, - ) -> None: - """A TF, TF-lite, PT, or ONNX model to load into the DB at runtime - - Each ML Model added will be loaded into an - orchestrator (converged or not) prior to the execution - of this Model instance - - One of either model (in memory representation) or model_path (file) - must be provided - - :param name: key to store model under - :param backend: name of the backend (TORCH, TF, TFLITE, ONNX) - :param model: A model in memory (only supported for non-colocated orchestrators) - :param model_path: serialized model - :param device: name of device for execution - :param devices_per_node: The number of GPU devices available on the host. - This parameter only applies to GPU devices and will be ignored if device - is specified as CPU. - :param first_device: The first GPU device to use on the host. - This parameter only applies to GPU devices and will be ignored if device - is specified as CPU. - :param batch_size: batch size for execution - :param min_batch_size: minimum batch size for model execution - :param min_batch_timeout: time to wait for minimum batch size - :param tag: additional tag for model information - :param inputs: model inputs (TF only) - :param outputs: model outupts (TF only) - """ - db_model = DBModel( - name=name, - backend=backend, - model=model, - model_file=model_path, - device=device, - devices_per_node=devices_per_node, - first_device=first_device, - batch_size=batch_size, - min_batch_size=min_batch_size, - min_batch_timeout=min_batch_timeout, - tag=tag, - inputs=inputs, - outputs=outputs, - ) - self.add_ml_model_object(db_model) - - def add_script( - self, - name: str, - script: t.Optional[str] = None, - script_path: t.Optional[str] = None, - device: str = Device.CPU.value.upper(), - devices_per_node: int = 1, - first_device: int = 0, - ) -> None: - """TorchScript to launch with this Model instance - - Each script added to the model will be loaded into an - orchestrator (converged or not) prior to the execution - of this Model instance - - Device selection is either "GPU" or "CPU". If many devices are - present, a number can be passed for specification e.g. "GPU:1". - - Setting ``devices_per_node=N``, with N greater than one will result - in the script being stored in the first N devices of type ``device``; - alternatively, setting ``first_device=M`` will result in the script - being stored on nodes M through M + N - 1. - - One of either script (in memory string representation) or script_path (file) - must be provided - - :param name: key to store script under - :param script: TorchScript code (only supported for non-colocated orchestrators) - :param script_path: path to TorchScript code - :param device: device for script execution - :param devices_per_node: The number of GPU devices available on the host. - This parameter only applies to GPU devices and will be ignored if device - is specified as CPU. - :param first_device: The first GPU device to use on the host. - This parameter only applies to GPU devices and will be ignored if device - is specified as CPU. - """ - db_script = DBScript( - name=name, - script=script, - script_path=script_path, - device=device, - devices_per_node=devices_per_node, - first_device=first_device, - ) - self.add_script_object(db_script) - - def add_function( - self, - name: str, - function: t.Optional[str] = None, - device: str = Device.CPU.value.upper(), - devices_per_node: int = 1, - first_device: int = 0, - ) -> None: - """TorchScript function to launch with this Model instance - - Each script function to the model will be loaded into a - non-converged orchestrator prior to the execution - of this Model instance. - - For converged orchestrators, the :meth:`add_script` method should be used. - - Device selection is either "GPU" or "CPU". If many devices are - present, a number can be passed for specification e.g. "GPU:1". - - Setting ``devices_per_node=N``, with N greater than one will result - in the model being stored in the first N devices of type ``device``. - - :param name: key to store function under - :param function: TorchScript function code - :param device: device for script execution - :param devices_per_node: The number of GPU devices available on the host. - This parameter only applies to GPU devices and will be ignored if device - is specified as CPU. - :param first_device: The first GPU device to use on the host. - This parameter only applies to GPU devices and will be ignored if device - is specified as CPU. - """ - db_script = DBScript( - name=name, - script=function, - device=device, - devices_per_node=devices_per_node, - first_device=first_device, - ) - self.add_script_object(db_script) - - def __hash__(self) -> int: - return hash(self.name) - - def __eq__(self, other: object) -> bool: - if not isinstance(other, Model): - return False - - if self.name == other.name: - return True - return False - - def __str__(self) -> str: # pragma: no cover - entity_str = "Name: " + self.name + "\n" - entity_str += "Type: " + self.type + "\n" - entity_str += str(self.run_settings) + "\n" - if self._db_models: - entity_str += "DB Models: \n" + str(len(self._db_models)) + "\n" - if self._db_scripts: - entity_str += "DB Scripts: \n" + str(len(self._db_scripts)) + "\n" - return entity_str - - def add_ml_model_object(self, db_model: DBModel) -> None: - if not db_model.is_file and self.colocated: - err_msg = "ML model can not be set from memory for colocated databases.\n" - err_msg += ( - f"Please store the ML model named {db_model.name} in binary format " - ) - err_msg += "and add it to the SmartSim Model as file." - raise SSUnsupportedError(err_msg) - - self._db_models.append(db_model) - - def add_script_object(self, db_script: DBScript) -> None: - if db_script.func and self.colocated: - if not isinstance(db_script.func, str): - err_msg = ( - "Functions can not be set from memory for colocated databases.\n" - f"Please convert the function named {db_script.name} " - "to a string or store it as a text file and add it to the " - "SmartSim Model with add_script." - ) - raise SSUnsupportedError(err_msg) - self._db_scripts.append(db_script) - - def _check_db_objects_colo(self) -> None: - for db_model in self._db_models: - if not db_model.is_file: - err_msg = ( - "ML model can not be set from memory for colocated databases.\n" - f"Please store the ML model named {db_model.name} in binary " - "format and add it to the SmartSim Model as file." - ) - raise SSUnsupportedError(err_msg) - - for db_script in self._db_scripts: - if db_script.func: - if not isinstance(db_script.func, str): - err_msg = ( - "Functions can not be set from memory for colocated " - "databases.\nPlease convert the function named " - f"{db_script.name} to a string or store it as a text" - "file and add it to the SmartSim Model with add_script." - ) - raise SSUnsupportedError(err_msg) diff --git a/smartsim/settings/arguments/launch/dragon.py b/smartsim/settings/arguments/launch/dragon.py index ecbff4706..d8044267e 100644 --- a/smartsim/settings/arguments/launch/dragon.py +++ b/smartsim/settings/arguments/launch/dragon.py @@ -86,7 +86,6 @@ def set_node_feature(self, feature_list: t.Union[str, t.List[str]]) -> None: raise TypeError("feature_list must be string or list of strings") self.set("node-feature", ",".join(feature_list)) - @override def set_hostlist(self, host_list: t.Union[str, t.List[str]]) -> None: """Specify the hostlist for this job @@ -103,28 +102,7 @@ def set_hostlist(self, host_list: t.Union[str, t.List[str]]) -> None: cleaned_list = [host.strip() for host in host_list if host and host.strip()] if not len(cleaned_list) == len(host_list): raise ValueError(f"invalid names found in hostlist: {host_list}") - - self.run_args["host-list"] = ",".join(cleaned_list) - - @override - def set_hostlist(self, host_list: t.Union[str, t.List[str]]) -> None: - """Specify the hostlist for this job - - :param host_list: hosts to launch on - :raises ValueError: if an empty host list is supplied - """ - if not host_list: - raise ValueError("empty hostlist provided") - - if isinstance(host_list, str): - host_list = host_list.replace(" ", "").split(",") - - # strip out all whitespace-only values - cleaned_list = [host.strip() for host in host_list if host and host.strip()] - if not len(cleaned_list) == len(host_list): - raise ValueError(f"invalid names found in hostlist: {host_list}") - - self.run_args["host-list"] = ",".join(cleaned_list) + self.set("host-list", ",".join(cleaned_list)) def set_cpu_affinity(self, devices: t.List[int]) -> None: """Set the CPU affinity for this job