diff --git a/smartsim/_core/_cli/build.py b/smartsim/_core/_cli/build.py index ea5f2177c..2cd3cddb7 100644 --- a/smartsim/_core/_cli/build.py +++ b/smartsim/_core/_cli/build.py @@ -38,7 +38,6 @@ from smartsim._core._install import builder from smartsim._core._install.buildenv import ( BuildEnv, - DbEngine, SetupError, Version_, VersionConflictError, @@ -46,7 +45,6 @@ ) from smartsim._core._install.builder import BuildError, Device from smartsim._core.config import CONFIG -from smartsim._core.utils.helpers import installed_redisai_backends from smartsim.error import SSConfigError from smartsim.log import get_logger @@ -63,9 +61,9 @@ def check_py_onnx_version(versions: Versioner) -> None: _check_packages_in_python_env( { "onnx": Version_(versions.ONNX), - "skl2onnx": Version_(versions.REDISAI.skl2onnx), - "onnxmltools": Version_(versions.REDISAI.onnxmltools), - "scikit-learn": Version_(getattr(versions.REDISAI, "scikit-learn")), + "skl2onnx": "1.16.0", + "onnxmltools": "1.12.0", + "scikit-learn": "1.3.2", }, ) @@ -75,43 +73,8 @@ def check_py_tf_version(versions: Versioner) -> None: _check_packages_in_python_env({"tensorflow": Version_(versions.TENSORFLOW)}) -def check_backends_install() -> bool: - """Checks if backends have already been installed. - Logs details on how to proceed forward - if the RAI_PATH environment variable is set or if - backends have already been installed. - """ - rai_path = os.environ.get("RAI_PATH", "") - installed = installed_redisai_backends() - msg = "" - - if rai_path and installed: - msg = ( - f"There is no need to build. backends are already built and " - f"specified in the environment at 'RAI_PATH': {CONFIG.redisai}" - ) - elif rai_path and not installed: - msg = ( - "Before running 'smart build', unset your RAI_PATH environment " - "variable with 'unset RAI_PATH'." - ) - elif not rai_path and installed: - msg = ( - "If you wish to re-run `smart build`, you must first run `smart clean`." - " The following backend(s) must be removed: " + ", ".join(installed) - ) - - if msg: - logger.error(msg) - - return not bool(msg) - - -def build_feature_store( - build_env: BuildEnv, versions: Versioner, keydb: bool, verbose: bool -) -> None: +def build_feature_store(build_env: BuildEnv, verbose: bool) -> None: # check feature store installation - feature_store_name = "KeyDB" if keydb else "Redis" feature_store_builder = builder.FeatureStoreBuilder( build_env(), jobs=build_env.JOBS, @@ -120,111 +83,12 @@ def build_feature_store( malloc=build_env.MALLOC, verbose=verbose, ) - if not feature_store_builder.is_built: - logger.info( - f"Building {feature_store_name} version {versions.REDIS} " - f"from {versions.REDIS_URL}" - ) - feature_store_builder.build_from_git(versions.REDIS_URL, versions.REDIS_BRANCH) - feature_store_builder.cleanup() - logger.info(f"{feature_store_name} build complete!") - - -def build_redis_ai( - build_env: BuildEnv, - versions: Versioner, - device: Device, - use_torch: bool = True, - use_tf: bool = True, - use_onnx: bool = False, - torch_dir: t.Union[str, Path, None] = None, - libtf_dir: t.Union[str, Path, None] = None, - verbose: bool = False, - torch_with_mkl: bool = True, -) -> None: - # make sure user isn't trying to do something silly on MacOS - if build_env.PLATFORM == "darwin" and device == Device.GPU: - raise BuildError("SmartSim does not support GPU on MacOS") - - # decide which runtimes to build - print("\nML Backends Requested") - backends_table = [ - ["PyTorch", versions.TORCH, color_bool(use_torch)], - ["TensorFlow", versions.TENSORFLOW, color_bool(use_tf)], - ["ONNX", versions.ONNX, color_bool(use_onnx)], - ] - print(tabulate(backends_table, tablefmt="fancy_outline"), end="\n\n") - print(f"Building for GPU support: {color_bool(device == Device.GPU)}\n") - - if not check_backends_install(): - sys.exit(1) - - # TORCH - if use_torch and torch_dir: - torch_dir = Path(torch_dir).resolve() - if not torch_dir.is_dir(): - raise SetupError( - f"Could not find requested user Torch installation: {torch_dir}" - ) - - # TF - if use_tf and libtf_dir: - libtf_dir = Path(libtf_dir).resolve() - if not libtf_dir.is_dir(): - raise SetupError( - f"Could not find requested user TF installation: {libtf_dir}" - ) - build_env_dict = build_env() - - rai_builder = builder.RedisAIBuilder( - build_env=build_env_dict, - jobs=build_env.JOBS, - _os=builder.OperatingSystem.from_str(platform.system()), - architecture=builder.Architecture.from_str(platform.machine()), - torch_dir=str(torch_dir) if torch_dir else "", - libtf_dir=str(libtf_dir) if libtf_dir else "", - build_torch=use_torch, - build_tf=use_tf, - build_onnx=use_onnx, - verbose=verbose, - torch_with_mkl=torch_with_mkl, - ) - - if rai_builder.is_built: - logger.info("RedisAI installed. Run `smart clean` to remove.") - else: - # get the build environment, update with CUDNN env vars - # if present and building for GPU, otherwise warn the user - if device == Device.GPU: - gpu_env = build_env.get_cudnn_env() - cudnn_env_vars = [ - "CUDNN_LIBRARY", - "CUDNN_INCLUDE_DIR", - "CUDNN_INCLUDE_PATH", - "CUDNN_LIBRARY_PATH", - ] - if not gpu_env: - logger.warning( - "CUDNN environment variables not found.\n" - f"Looked for {cudnn_env_vars}" - ) - else: - build_env_dict.update(gpu_env) - # update RAI build env with cudnn env vars - rai_builder.env = build_env_dict - - logger.info( - f"Building RedisAI version {versions.REDISAI}" - f" from {versions.REDISAI_URL}" - ) + if not feature_store_builder.is_built: + logger.info("No feature store is currently being built by 'smart build'") - # NOTE: have the option to add other builds here in the future - # like "from_tarball" - rai_builder.build_from_git( - versions.REDISAI_URL, versions.REDISAI_BRANCH, device - ) - logger.info("ML Backends and RedisAI build complete!") + feature_store_builder.cleanup() + logger.info("No feature store is currently being built by 'smart build'") def check_py_torch_version(versions: Versioner, device: Device = Device.CPU) -> None: @@ -359,25 +223,11 @@ def _format_incompatible_python_env_message( ) -def _configure_keydb_build(versions: Versioner) -> None: - """Configure the redis versions to be used during the build operation""" - versions.REDIS = Version_("6.2.0") - versions.REDIS_URL = "https://github.com/EQ-Alpha/KeyDB" - versions.REDIS_BRANCH = "v6.2.0" - - CONFIG.conf_path = Path(CONFIG.core_path, "config", "keydb.conf") - if not CONFIG.conf_path.resolve().is_file(): - raise SSConfigError( - "Database configuration file at REDIS_CONF could not be found" - ) - - # pylint: disable-next=too-many-statements def execute( args: argparse.Namespace, _unparsed_args: t.Optional[t.List[str]] = None, / ) -> int: verbose = args.v - keydb = args.keydb device = Device(args.device.lower()) is_dragon_requested = args.dragon # torch and tf build by default @@ -399,13 +249,9 @@ def execute( env_vars = list(env.keys()) print(tabulate(env, headers=env_vars, tablefmt="github"), "\n") - if keydb: - _configure_keydb_build(versions) - if verbose: - fs_name: DbEngine = "KEYDB" if keydb else "REDIS" logger.info("Version Information:") - vers = versions.as_dict(fs_name=fs_name) + vers = versions.as_dict() version_names = list(vers.keys()) print(tabulate(vers, headers=version_names, tablefmt="github"), "\n") @@ -422,32 +268,20 @@ def execute( try: if not args.only_python_packages: - # REDIS/KeyDB - build_feature_store(build_env, versions, keydb, verbose) - - # REDISAI - build_redis_ai( - build_env, - versions, - device, - pt, - tf, - onnx, - args.torch_dir, - args.libtensorflow_dir, - verbose=verbose, - torch_with_mkl=args.torch_with_mkl, - ) + ... + except (SetupError, BuildError) as e: logger.error(str(e)) return os.EX_SOFTWARE - backends = installed_redisai_backends() + backends = [] backends_str = ", ".join(s.capitalize() for s in backends) if backends else "No" logger.info(f"{backends_str} backend(s) built") try: - if "torch" in backends: + # TODO: always installing torch, otherwise tests will fail. + # Should revert once torch install has been revamped + if "torch" in backends or True: check_py_torch_version(versions, device) if "tensorflow" in backends: check_py_tf_version(versions) @@ -519,12 +353,6 @@ def configure_parser(parser: argparse.ArgumentParser) -> None: type=str, help=f"Path to custom libtensorflow directory {warn_usage}", ) - parser.add_argument( - "--keydb", - action="store_true", - default=False, - help="Build KeyDB instead of Redis", - ) parser.add_argument( "--no_torch_with_mkl", dest="torch_with_mkl", diff --git a/smartsim/_core/_cli/cli.py b/smartsim/_core/_cli/cli.py index 3d5c6e066..2195f008c 100644 --- a/smartsim/_core/_cli/cli.py +++ b/smartsim/_core/_cli/cli.py @@ -108,7 +108,7 @@ def default_cli() -> SmartCli: menu = [ MenuItemConfig( "build", - "Build SmartSim dependencies (Redis, RedisAI, Dragon, ML runtimes)", + "Build SmartSim dependencies (Dragon, ML runtimes)", build_execute, build_parser, ), @@ -118,11 +118,6 @@ def default_cli() -> SmartCli: clean_execute, clean_parser, ), - MenuItemConfig( - "dbcli", - "Print the path to the redis-cli binary", - dbcli_execute, - ), MenuItemConfig( "site", "Print the installation site of SmartSim", diff --git a/smartsim/_core/_cli/info.py b/smartsim/_core/_cli/info.py index 4f4137cd2..ec50e151a 100644 --- a/smartsim/_core/_cli/info.py +++ b/smartsim/_core/_cli/info.py @@ -8,6 +8,7 @@ import smartsim._core._cli.utils as _utils import smartsim._core.utils.helpers as _helpers +from smartsim._core._cli.scripts.dragon_install import dragon_pin from smartsim._core._install.buildenv import BuildEnv as _BuildEnv _MISSING_DEP = _helpers.colorize("Not Installed", "red") @@ -21,7 +22,6 @@ def execute( tabulate( [ ["SmartSim", _fmt_py_pkg_version("smartsim")], - ["SmartRedis", _fmt_py_pkg_version("smartredis")], ], headers=["Name", "Version"], tablefmt="fancy_outline", @@ -29,42 +29,30 @@ def execute( end="\n\n", ) - print("FeatureStore Configuration:") - fs_path = _utils.get_fs_path() - fs_table = [["Installed", _fmt_installed_fs(fs_path)]] - if fs_path: - fs_table.append(["Location", str(fs_path)]) - print(tabulate(fs_table, tablefmt="fancy_outline"), end="\n\n") + print("Dragon Installation:") + dragon_version = dragon_pin() - print("Redis AI Configuration:") - rai_path = _helpers.redis_install_base().parent / "redisai.so" - rai_table = [["Status", _fmt_installed_redis_ai(rai_path)]] - if rai_path.is_file(): - rai_table.append(["Location", str(rai_path)]) - print(tabulate(rai_table, tablefmt="fancy_outline"), end="\n\n") + fs_table = [["Version", str(dragon_version)]] + print(tabulate(fs_table, tablefmt="fancy_outline"), end="\n\n") - print("Machine Learning Backends:") - backends = _helpers.installed_redisai_backends() + print("Machine Learning Packages:") print( tabulate( [ [ "Tensorflow", - _utils.color_bool("tensorflow" in backends), _fmt_py_pkg_version("tensorflow"), ], [ "Torch", - _utils.color_bool("torch" in backends), _fmt_py_pkg_version("torch"), ], [ "ONNX", - _utils.color_bool("onnxruntime" in backends), _fmt_py_pkg_version("onnx"), ], ], - headers=["Name", "Backend Available", "Python Package"], + headers=["Name", "Python Package"], tablefmt="fancy_outline", ), end="\n\n", @@ -79,12 +67,6 @@ def _fmt_installed_fs(fs_path: t.Optional[pathlib.Path]) -> str: return _helpers.colorize(fs_name.upper(), "green") -def _fmt_installed_redis_ai(rai_path: pathlib.Path) -> str: - if not rai_path.is_file(): - return _MISSING_DEP - return _helpers.colorize("Installed", "green") - - def _fmt_py_pkg_version(pkg_name: str) -> str: try: return _helpers.colorize(_BuildEnv.get_py_package_version(pkg_name), "green") diff --git a/smartsim/_core/_cli/utils.py b/smartsim/_core/_cli/utils.py index 6c2a40911..ff6a2d257 100644 --- a/smartsim/_core/_cli/utils.py +++ b/smartsim/_core/_cli/utils.py @@ -91,38 +91,15 @@ def clean(core_path: Path, _all: bool = False) -> int: lib_path = core_path / "lib" if lib_path.is_dir(): - # remove RedisAI - rai_path = lib_path / "redisai.so" - if rai_path.is_file(): - rai_path.unlink() - logger.info("Successfully removed existing RedisAI installation") - backend_path = lib_path / "backends" if backend_path.is_dir(): shutil.rmtree(backend_path, ignore_errors=True) logger.info("Successfully removed ML runtimes") - bin_path = core_path / "bin" - if bin_path.is_dir() and _all: - files_to_remove = ["redis-server", "redis-cli", "keydb-server", "keydb-cli"] - removed = False - for _file in files_to_remove: - file_path = bin_path.joinpath(_file) - - if file_path.is_file(): - removed = True - file_path.unlink() - if removed: - logger.info("Successfully removed SmartSim feature store installation") - return os.EX_OK def get_fs_path() -> t.Optional[Path]: - bin_path = get_install_path() / "_core" / "bin" - for option in bin_path.iterdir(): - if option.name in ("redis-cli", "keydb-cli"): - return option return None diff --git a/smartsim/_core/_cli/validate.py b/smartsim/_core/_cli/validate.py index 78f60b50a..16b6ec4ea 100644 --- a/smartsim/_core/_cli/validate.py +++ b/smartsim/_core/_cli/validate.py @@ -26,22 +26,14 @@ import argparse import contextlib -import io -import multiprocessing as mp import os import os.path import tempfile import typing as t from types import TracebackType -import numpy as np -from smartredis import Client - -from smartsim import Experiment from smartsim._core._cli.utils import SMART_LOGGER_FORMAT from smartsim._core._install.builder import Device -from smartsim._core.utils.helpers import installed_redisai_backends -from smartsim._core.utils.network import find_free_port from smartsim.log import get_logger logger = get_logger("Smart", fmt=SMART_LOGGER_FORMAT) @@ -54,8 +46,6 @@ if t.TYPE_CHECKING: - from multiprocessing.connection import Connection - # pylint: disable-next=unsubscriptable-object _TemporaryDirectory = tempfile.TemporaryDirectory[str] else: @@ -79,13 +69,10 @@ def __exit__( self._finalizer.detach() # type: ignore[attr-defined] -def execute( - args: argparse.Namespace, _unparsed_args: t.Optional[t.List[str]] = None, / -) -> int: +def execute(args: argparse.Namespace) -> int: """Validate the SmartSim installation works as expected given a simple experiment """ - backends = installed_redisai_backends() temp_dir = "" device = Device(args.device) try: @@ -93,21 +80,10 @@ def execute( temp_dir = ctx.enter_context(_VerificationTempDir(dir=os.getcwd())) validate_env = { "SR_LOG_LEVEL": os.environ.get("SR_LOG_LEVEL", "INFO"), - "SR_LOG_FILE": os.environ.get( - "SR_LOG_FILE", os.path.join(temp_dir, "smartredis.log") - ), } if device == Device.GPU: validate_env["CUDA_VISIBLE_DEVICES"] = "0" ctx.enter_context(_env_vars_set_to(validate_env)) - test_install( - location=temp_dir, - port=args.port, - device=device, - with_tf="tensorflow" in backends, - with_pt="torch" in backends, - with_onnx="onnxruntime" in backends, - ) except Exception as e: logger.error( "SmartSim failed to run a simple experiment!\n" @@ -142,34 +118,6 @@ def configure_parser(parser: argparse.ArgumentParser) -> None: ) -def test_install( - location: str, - port: t.Optional[int], - device: Device, - with_tf: bool, - with_pt: bool, - with_onnx: bool, -) -> None: - exp = Experiment("ValidationExperiment", exp_path=location, launcher="local") - exp.telemetry.disable() - port = find_free_port() if port is None else port - - with _make_managed_local_feature_store(exp, port) as client: - logger.info("Verifying Tensor Transfer") - client.put_tensor("plain-tensor", np.ones((1, 1, 3, 3))) - client.get_tensor("plain-tensor") - if with_pt: - logger.info("Verifying Torch Backend") - _test_torch_install(client, device) - if with_onnx: - logger.info("Verifying ONNX Backend") - _test_onnx_install(client, device) - if with_tf: # Run last in case TF locks an entire GPU - logger.info("Verifying TensorFlow Backend") - _test_tf_install(client, location, device) - logger.info("Success!") - - @contextlib.contextmanager def _env_vars_set_to( evars: t.Mapping[str, t.Optional[str]] @@ -189,127 +137,3 @@ def _set_or_del_env_var(var: str, val: t.Optional[str]) -> None: os.environ[var] = val else: os.environ.pop(var, None) - - -@contextlib.contextmanager -def _make_managed_local_feature_store( - exp: Experiment, port: int -) -> t.Generator[Client, None, None]: - """Context managed feature store that will be stopped if an exception is raised""" - feature_store = exp.create_feature_store(fs_nodes=1, interface="lo", port=port) - exp.generate(feature_store) - exp.start(feature_store) - try: - (client_addr,) = feature_store.get_address() - yield Client(False, address=client_addr) - finally: - exp.stop(feature_store) - - -def _test_tf_install(client: Client, tmp_dir: str, device: Device) -> None: - recv_conn, send_conn = mp.Pipe(duplex=False) - # Build the model in a subproc so that keras does not hog the gpu - proc = mp.Process(target=_build_tf_frozen_model, args=(send_conn, tmp_dir)) - proc.start() - - # do not need the sending connection in this proc anymore - send_conn.close() - - proc.join(timeout=600) - if proc.is_alive(): - proc.terminate() - raise Exception("Failed to build a simple keras model within 2 minutes") - try: - model_path, inputs, outputs = recv_conn.recv() - except EOFError as e: - raise Exception( - "Failed to receive serialized model from subprocess. " - "Is the `tensorflow` python package installed?" - ) from e - - client.set_model_from_file( - "keras-fcn", - model_path, - "TF", - device=device.value.upper(), - inputs=inputs, - outputs=outputs, - ) - client.put_tensor("keras-input", np.random.rand(1, 28, 28).astype(np.float32)) - client.run_model("keras-fcn", inputs=["keras-input"], outputs=["keras-output"]) - client.get_tensor("keras-output") - - -def _build_tf_frozen_model(conn: "Connection", tmp_dir: str) -> None: - from tensorflow import keras - - from smartsim.ml.tf import freeze_model - - fcn = keras.Sequential( - layers=[ - keras.layers.InputLayer(input_shape=(28, 28), name="input"), - keras.layers.Flatten(input_shape=(28, 28), name="flatten"), - keras.layers.Dense(128, activation="relu", name="dense"), - keras.layers.Dense(10, activation="softmax", name="output"), - ], - name="FullyConnectedNetwork", - ) - fcn.compile( - optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"] - ) - model_path, inputs, outputs = freeze_model(fcn, tmp_dir, "keras_model.pb") - conn.send((model_path, inputs, outputs)) - - -def _test_torch_install(client: Client, device: Device) -> None: - import torch - from torch import nn - - class Net(nn.Module): - def __init__(self) -> None: - super().__init__() - self.conv: t.Callable[..., torch.Tensor] = nn.Conv2d(1, 1, 3) - - def forward(self, x: torch.Tensor) -> torch.Tensor: - return self.conv(x) - - if device == Device.GPU: - device_ = torch.device("cuda") - else: - device_ = torch.device("cpu") - - net = Net() - net.to(device_) - net.eval() - - forward_input = torch.rand(1, 1, 3, 3).to(device_) - traced = torch.jit.trace(net, forward_input) # type: ignore[no-untyped-call] - - buffer = io.BytesIO() - torch.jit.save(traced, buffer) # type: ignore[no-untyped-call] - model = buffer.getvalue() - - client.set_model("torch-nn", model, backend="TORCH", device=device.value.upper()) - client.put_tensor("torch-in", torch.rand(1, 1, 3, 3).numpy()) - client.run_model("torch-nn", inputs=["torch-in"], outputs=["torch-out"]) - client.get_tensor("torch-out") - - -def _test_onnx_install(client: Client, device: Device) -> None: - from skl2onnx import to_onnx - from sklearn.cluster import KMeans - - data = np.arange(20, dtype=np.float32).reshape(10, 2) - model = KMeans(n_clusters=2, n_init=10) - model.fit(data) - - kmeans = to_onnx(model, data, target_opset=11) - model = kmeans.SerializeToString() - sample = np.arange(20, dtype=np.float32).reshape(10, 2) - - client.put_tensor("onnx-input", sample) - client.set_model("onnx-kmeans", model, "ONNX", device=device.value.upper()) - client.run_model( - "onnx-kmeans", inputs=["onnx-input"], outputs=["onnx-labels", "onnx-transform"] - ) - client.get_tensor("onnx-labels") diff --git a/smartsim/_core/_install/buildenv.py b/smartsim/_core/_install/buildenv.py index 119313835..ca5252069 100644 --- a/smartsim/_core/_install/buildenv.py +++ b/smartsim/_core/_install/buildenv.py @@ -37,8 +37,6 @@ from packaging.version import InvalidVersion, Version, parse -DbEngine = t.Literal["REDIS", "KEYDB"] - class SetupError(Exception): """A simple exception class for errors in _install.buildenv file. @@ -156,72 +154,7 @@ def get_env(var: str, default: str) -> str: return os.environ.get(var, default) -class RedisAIVersion(Version_): - """A subclass of Version_ that holds the dependency sets for RedisAI - - this class serves two purposes: - - 1. It is used to populate the [ml] ``extras_require`` of the setup.py. - This is because the RedisAI version will determine which ML based - dependencies are required. - - 2. Used to set the default values for PyTorch, TF, and ONNX - given the SMARTSIM_REDISAI env var set by the user. - - NOTE: Torch requires additional information depending on whether - CPU or GPU support is requested - """ - - defaults = { - "1.2.7": { - "tensorflow": "2.13.1", - "onnx": "1.14.1", - "skl2onnx": "1.16.0", - "onnxmltools": "1.12.0", - "scikit-learn": "1.3.2", - "torch": "2.0.1", - "torch_cpu_suffix": "+cpu", - "torch_cuda_suffix": "+cu117", - "torchvision": "0.15.2", - }, - } - - def __init__(self, vers: str) -> None: # pylint: disable=super-init-not-called - min_rai_version = min(Version_(ver) for ver in self.defaults) - if min_rai_version > vers: - raise SetupError( - f"RedisAI version must be greater than or equal to {min_rai_version}" - ) - if vers not in self.defaults: - if vers.startswith("1.2"): - # resolve to latest version for 1.2.x - # the str representation will still be 1.2.x - self.version = "1.2.7" - else: - raise SetupError( - ( - f"Invalid RedisAI version {vers}. Options are " - f"{self.defaults.keys()}" - ) - ) - else: - self.version = vers - - def __getattr__(self, name: str) -> str: - try: - return self.defaults[self.version][name] - except KeyError: - raise AttributeError( - f"'{type(self).__name__}' object has no attribute '{name}'\n\n" - "This is likely a problem with the SmartSim build process;" - "if this problem persists please log a new issue at " - "https://github.com/CrayLabs/SmartSim/issues " - "or get in contact with us at " - "https://www.craylabs.org/docs/community.html" - ) from None - - def get_defaults(self) -> t.Dict[str, str]: - return self.defaults[self.version].copy() +# TODO Add A Version class for the new backend class Versioner: @@ -242,9 +175,8 @@ class Versioner: ``smart build`` command to determine which dependency versions to look for and download. - Default versions for SmartSim, Redis, and RedisAI are - all set here. Setting a default version for RedisAI also dictates - default versions of the machine learning libraries. + Default versions for SmartSim and its machine learning library dependencies + all defined here. """ # compatible Python version @@ -254,49 +186,41 @@ class Versioner: SMARTSIM = Version_(get_env("SMARTSIM_VERSION", "0.7.0")) SMARTSIM_SUFFIX = get_env("SMARTSIM_SUFFIX", "") - # Redis - REDIS = Version_(get_env("SMARTSIM_REDIS", "7.2.4")) - REDIS_URL = get_env("SMARTSIM_REDIS_URL", "https://github.com/redis/redis.git/") - REDIS_BRANCH = get_env("SMARTSIM_REDIS_BRANCH", REDIS) + # ML/DL + # torch can be set by the user because we download that for them + TORCH = Version_(get_env("SMARTSIM_TORCH", "2.0.1")) + TORCHVISION = Version_(get_env("SMARTSIM_TORCHVIS", "0.15.2")) + TORCH_CPU_SUFFIX = Version_(get_env("TORCH_CPU_SUFFIX", "+cpu")) + TORCH_CUDA_SUFFIX = Version_(get_env("TORCH_CUDA_SUFFIX", "+cu117")) + + # TensorFlow and ONNX only use the defaults - # RedisAI - REDISAI = RedisAIVersion(get_env("SMARTSIM_REDISAI", "1.2.7")) - REDISAI_URL = get_env( - "SMARTSIM_REDISAI_URL", "https://github.com/RedisAI/RedisAI.git/" - ) - REDISAI_BRANCH = get_env("SMARTSIM_REDISAI_BRANCH", f"v{REDISAI}") + TENSORFLOW = Version_("2.13.1") + ONNX = Version_("1.14.1") - # ML/DL (based on RedisAI version defaults) - # torch can be set by the user because we download that for them - TORCH = Version_(get_env("SMARTSIM_TORCH", REDISAI.torch)) - TORCHVISION = Version_(get_env("SMARTSIM_TORCHVIS", REDISAI.torchvision)) - TORCH_CPU_SUFFIX = Version_(get_env("TORCH_CPU_SUFFIX", REDISAI.torch_cpu_suffix)) - TORCH_CUDA_SUFFIX = Version_( - get_env("TORCH_CUDA_SUFFIX", REDISAI.torch_cuda_suffix) - ) - - # TensorFlow and ONNX only use the defaults, but these are not built into - # the RedisAI package and therefore the user is free to pick other versions. - TENSORFLOW = Version_(REDISAI.tensorflow) - ONNX = Version_(REDISAI.onnx) - - def as_dict(self, fs_name: DbEngine = "REDIS") -> t.Dict[str, t.Tuple[str, ...]]: + def as_dict(self) -> t.Dict[str, t.Tuple[str, ...]]: pkg_map = { "SMARTSIM": self.SMARTSIM, - fs_name: self.REDIS, - "REDISAI": self.REDISAI, "TORCH": self.TORCH, "TENSORFLOW": self.TENSORFLOW, "ONNX": self.ONNX, } return {"Packages": tuple(pkg_map), "Versions": tuple(pkg_map.values())} + # TODO add a backend for ml libraries def ml_extras_required(self) -> t.Dict[str, t.List[str]]: - """Optional ML/DL dependencies we suggest for the user. - - The defaults are based on the RedisAI version - """ - ml_defaults = self.REDISAI.get_defaults() + """Optional ML/DL dependencies we suggest for the user.""" + ml_defaults = { + "torch": self.TORCH, + "tensorflow": self.TENSORFLOW, + "onnx": self.ONNX, + "skl2onnx": "1.16.0", + "onnxmltools": "1.12.0", + "scikit-learn": "1.3.2", + "torchvision": "0.15.2", + "torch_cpu_suffix": "+cpu", + "torch_cuda_suffix": "+cu117", + } # remove torch-related fields as they are subject to change # by having the user change hardware (cpu/gpu) @@ -352,12 +276,7 @@ class BuildEnv: """Environment for building third-party dependencies BuildEnv provides a method for configuring how the third-party - dependencies within SmartSim are built, namely Redis/KeyDB - and RedisAI. - - The environment variables listed here can be set to control the - Redis build in the pip wheel build as well as the Redis and RedisAI - build executed by the CLI. + dependencies within SmartSim are built. Build tools are also checked for here and if they are not found then a SetupError is raised. diff --git a/smartsim/_core/_install/builder.py b/smartsim/_core/_install/builder.py index 20d025773..87800939c 100644 --- a/smartsim/_core/_install/builder.py +++ b/smartsim/_core/_install/builder.py @@ -53,7 +53,6 @@ # TODO: check cmake version and use system if possible to avoid conflicts -TRedisAIBackendStr = t.Literal["tensorflow", "torch", "onnxruntime", "tflite"] _PathLike = t.Union[str, "os.PathLike[str]"] _T = t.TypeVar("_T") _U = t.TypeVar("_U") @@ -243,506 +242,6 @@ def run_command( raise BuildError(e) from e -class FeatureStoreBuilder(Builder): - """Class to build Redis or KeyDB from Source - Supported build methods: - - from git - See buildenv.py for buildtime configuration of Redis/KeyDB - version and url. - """ - - def __init__( - self, - build_env: t.Optional[t.Dict[str, str]] = None, - malloc: str = "libc", - jobs: int = 1, - _os: OperatingSystem = OperatingSystem.from_str(platform.system()), - architecture: Architecture = Architecture.from_str(platform.machine()), - verbose: bool = False, - ) -> None: - super().__init__( - build_env or {}, - jobs=jobs, - _os=_os, - architecture=architecture, - verbose=verbose, - ) - self.malloc = malloc - - @property - def is_built(self) -> bool: - """Check if Redis or KeyDB is built""" - bin_files = {file.name for file in self.bin_path.iterdir()} - redis_files = {"redis-server", "redis-cli"} - keydb_files = {"keydb-server", "keydb-cli"} - return redis_files.issubset(bin_files) or keydb_files.issubset(bin_files) - - def build_from_git( - self, git_url: str, branch: str, device: Device = Device.CPU - ) -> None: - """Build Redis from git - :param git_url: url from which to retrieve Redis - :param branch: branch to checkout - """ - # pylint: disable=too-many-locals - feature_store_name = "keydb" if "KeyDB" in git_url else "redis" - feature_store_build_path = Path(self.build_dir, feature_store_name.lower()) - - # remove git directory if it exists as it should - # really never exist as we delete after build - redis_build_path = Path(self.build_dir, "redis") - keydb_build_path = Path(self.build_dir, "keydb") - if redis_build_path.is_dir(): - shutil.rmtree(str(redis_build_path)) - if keydb_build_path.is_dir(): - shutil.rmtree(str(keydb_build_path)) - - # Check feature store URL - if not self.is_valid_url(git_url): - raise BuildError(f"Malformed {feature_store_name} URL: {git_url}") - - clone_cmd = config_git_command( - self._platform, - [ - self.binary_path("git"), - "clone", - git_url, - "--branch", - branch, - "--depth", - "1", - feature_store_name, - ], - ) - - # clone Redis - self.run_command(clone_cmd, cwd=self.build_dir) - - # build Redis - build_cmd = [ - self.binary_path("make"), - "-j", - str(self.jobs), - f"MALLOC={self.malloc}", - ] - self.run_command(build_cmd, cwd=str(feature_store_build_path)) - - # move redis binaries to smartsim/smartsim/_core/bin - feature_store_src_dir = feature_store_build_path / "src" - server_source = feature_store_src_dir / (feature_store_name.lower() + "-server") - server_destination = self.bin_path / (feature_store_name.lower() + "-server") - cli_source = feature_store_src_dir / (feature_store_name.lower() + "-cli") - cli_destination = self.bin_path / (feature_store_name.lower() + "-cli") - self.copy_file(server_source, server_destination, set_exe=True) - self.copy_file(cli_source, cli_destination, set_exe=True) - - # validate install -- redis-server - core_path = Path(os.path.abspath(__file__)).parent.parent - dependency_path = os.environ.get("SMARTSIM_DEP_INSTALL_PATH", core_path) - bin_path = Path(dependency_path, "bin").resolve() - try: - database_exe = next(bin_path.glob("*-server")) - feature_store = Path(os.environ.get("REDIS_PATH", database_exe)).resolve() - _ = expand_exe_path(str(feature_store)) - except (TypeError, FileNotFoundError) as e: - raise BuildError("Installation of redis-server failed!") from e - - # validate install -- redis-cli - try: - redis_cli_exe = next(bin_path.glob("*-cli")) - redis_cli = Path(os.environ.get("REDIS_CLI_PATH", redis_cli_exe)).resolve() - _ = expand_exe_path(str(redis_cli)) - except (TypeError, FileNotFoundError) as e: - raise BuildError("Installation of redis-cli failed!") from e - - -class _RAIBuildDependency(ABC): - """An interface with a collection of magic methods so that - ``RedisAIBuilder`` can fetch and place its own dependencies - """ - - @property - @abstractmethod - def __rai_dependency_name__(self) -> str: ... - - @abstractmethod - def __place_for_rai__(self, target: _PathLike) -> Path: ... - - @staticmethod - @abstractmethod - def supported_platforms() -> t.Sequence[t.Tuple[OperatingSystem, Architecture]]: ... - - -def _place_rai_dep_at( - target: _PathLike, verbose: bool -) -> t.Callable[[_RAIBuildDependency], Path]: - def _place(dep: _RAIBuildDependency) -> Path: - if verbose: - print(f"Placing: '{dep.__rai_dependency_name__}'") - path = dep.__place_for_rai__(target) - if verbose: - print(f"Placed: '{dep.__rai_dependency_name__}' at '{path}'") - return path - - return _place - - -class RedisAIBuilder(Builder): - """Class to build RedisAI from Source - Supported build method: - - from git - See buildenv.py for buildtime configuration of RedisAI - version and url. - """ - - def __init__( - self, - _os: OperatingSystem = OperatingSystem.from_str(platform.system()), - architecture: Architecture = Architecture.from_str(platform.machine()), - build_env: t.Optional[t.Dict[str, str]] = None, - torch_dir: str = "", - libtf_dir: str = "", - build_torch: bool = True, - build_tf: bool = True, - build_onnx: bool = False, - jobs: int = 1, - verbose: bool = False, - torch_with_mkl: bool = True, - ) -> None: - super().__init__( - build_env or {}, - jobs=jobs, - _os=_os, - architecture=architecture, - verbose=verbose, - ) - - self.rai_install_path: t.Optional[Path] = None - - # convert to int for RAI build script - self._torch = build_torch - self._tf = build_tf - self._onnx = build_onnx - self.libtf_dir = libtf_dir - self.torch_dir = torch_dir - - # extra configuration options - self.torch_with_mkl = torch_with_mkl - - # Sanity checks - self._validate_platform() - - def _validate_platform(self) -> None: - unsupported = [] - if self._platform not in _DLPackRepository.supported_platforms(): - unsupported.append("DLPack") - if self.fetch_tf and (self._platform not in _TFArchive.supported_platforms()): - unsupported.append("Tensorflow") - if self.fetch_onnx and ( - self._platform not in _ORTArchive.supported_platforms() - ): - unsupported.append("ONNX") - if self.fetch_torch and ( - self._platform not in _PTArchive.supported_platforms() - ): - unsupported.append("PyTorch") - if unsupported: - raise BuildError( - f"The {', '.join(unsupported)} backend(s) are not supported " - f"on {self._platform.os} with {self._platform.architecture}" - ) - - @property - def rai_build_path(self) -> Path: - return Path(self.build_dir, "RedisAI") - - @property - def is_built(self) -> bool: - server = self.lib_path.joinpath("backends").is_dir() - cli = self.lib_path.joinpath("redisai.so").is_file() - return server and cli - - @property - def build_torch(self) -> bool: - return self._torch - - @property - def fetch_torch(self) -> bool: - return self.build_torch and not self.torch_dir - - @property - def build_tf(self) -> bool: - return self._tf - - @property - def fetch_tf(self) -> bool: - return self.build_tf and not self.libtf_dir - - @property - def build_onnx(self) -> bool: - return self._onnx - - @property - def fetch_onnx(self) -> bool: - return self.build_onnx - - def get_deps_dir_path_for(self, device: Device) -> Path: - def fail_to_format(reason: str) -> BuildError: # pragma: no cover - return BuildError(f"Failed to format RedisAI dependency path: {reason}") - - _os, architecture = self._platform - if _os == OperatingSystem.DARWIN: - os_ = "macos" - elif _os == OperatingSystem.LINUX: - os_ = "linux" - else: # pragma: no cover - raise fail_to_format(f"Unknown operating system: {_os}") - if architecture == Architecture.X64: - arch = "x64" - elif architecture == Architecture.ARM64: - arch = "arm64v8" - else: # pragma: no cover - raise fail_to_format(f"Unknown architecture: {architecture}") - return self.rai_build_path / f"deps/{os_}-{arch}-{device.value}" - - def _get_deps_to_fetch_for( - self, device: Device - ) -> t.Tuple[_RAIBuildDependency, ...]: - os_, arch = self._platform - # TODO: It would be nice if the backend version numbers were declared - # alongside the python package version numbers so that all of the - # dependency versions were declared in single location. - # Unfortunately importing into this module is non-trivial as it - # is used as script in the SmartSim `setup.py`. - - # DLPack is always required - fetchable_deps: t.List[_RAIBuildDependency] = [_DLPackRepository("v0.5_RAI")] - if self.fetch_torch: - pt_dep = _choose_pt_variant(os_)(arch, device, "2.0.1", self.torch_with_mkl) - fetchable_deps.append(pt_dep) - if self.fetch_tf: - fetchable_deps.append(_TFArchive(os_, arch, device, "2.13.1")) - if self.fetch_onnx: - fetchable_deps.append(_ORTArchive(os_, device, "1.16.3")) - - return tuple(fetchable_deps) - - def symlink_libtf(self, device: Device) -> None: - """Add symbolic link to available libtensorflow in RedisAI deps. - - :param device: cpu or gpu - """ - rai_deps_path = sorted( - self.rai_build_path.glob(os.path.join("deps", f"*{device.value}*")) - ) - if not rai_deps_path: - raise FileNotFoundError("Could not find RedisAI 'deps' directory") - - # There should only be one path for a given device, - # and this should hold even if in the future we use - # an external build of RedisAI - rai_libtf_path = rai_deps_path[0] / "libtensorflow" - rai_libtf_path.resolve() - if rai_libtf_path.is_dir(): - shutil.rmtree(rai_libtf_path) - - os.makedirs(rai_libtf_path) - libtf_path = Path(self.libtf_dir).resolve() - - # Copy include directory to deps/libtensorflow - include_src_path = libtf_path / "include" - if not include_src_path.exists(): - raise FileNotFoundError(f"Could not find include directory in {libtf_path}") - os.symlink(include_src_path, rai_libtf_path / "include") - - # RedisAI expects to find a lib directory, which is only - # available in some distributions. - rai_libtf_lib_dir = rai_libtf_path / "lib" - os.makedirs(rai_libtf_lib_dir) - src_libtf_lib_dir = libtf_path / "lib" - # If the lib directory existed in the libtensorflow distribution, - # copy its content, otherwise gather library files from - # libtensorflow base dir and copy them into destination lib dir - if src_libtf_lib_dir.is_dir(): - library_files = sorted(src_libtf_lib_dir.glob("*")) - if not library_files: - raise FileNotFoundError( - f"Could not find libtensorflow library files in {src_libtf_lib_dir}" - ) - else: - library_files = sorted(libtf_path.glob("lib*.so*")) - if not library_files: - raise FileNotFoundError( - f"Could not find libtensorflow library files in {libtf_path}" - ) - - for src_file in library_files: - dst_file = rai_libtf_lib_dir / src_file.name - if not dst_file.is_file(): - os.symlink(src_file, dst_file) - - def build_from_git( - self, git_url: str, branch: str, device: Device = Device.CPU - ) -> None: - """Build RedisAI from git - - :param git_url: url from which to retrieve RedisAI - :param branch: branch to checkout - :param device: cpu or gpu - """ - # delete previous build dir (should never be there) - if self.rai_build_path.is_dir(): - shutil.rmtree(self.rai_build_path) - - # Check RedisAI URL - if not self.is_valid_url(git_url): - raise BuildError(f"Malformed RedisAI URL: {git_url}") - - # clone RedisAI - clone_cmd = config_git_command( - self._platform, - [ - self.binary_path("env"), - "GIT_LFS_SKIP_SMUDGE=1", - "git", - "clone", - "--recursive", - git_url, - "--branch", - branch, - "--depth=1", - os.fspath(self.rai_build_path), - ], - ) - - self.run_command(clone_cmd, out=subprocess.DEVNULL, cwd=self.build_dir) - self._fetch_deps_for(device) - - if self.libtf_dir and device.value: - self.symlink_libtf(device) - - build_cmd = self._rai_build_env_prefix( - with_pt=self.build_torch, - with_tf=self.build_tf, - with_ort=self.build_onnx, - extra_env={"GPU": "1" if device == Device.GPU else "0"}, - ) - - if self.torch_dir: - self.env["Torch_DIR"] = str(self.torch_dir) - - build_cmd.extend( - [ - self.binary_path("make"), - "-C", - str(self.rai_build_path / "opt"), - "-j", - f"{self.jobs}", - "build", - ] - ) - self.run_command(build_cmd, cwd=self.rai_build_path) - - self._install_backends(device) - if self.user_supplied_backend("torch"): - self._move_torch_libs() - self.cleanup() - - def user_supplied_backend(self, backend: TRedisAIBackendStr) -> bool: - if backend == "torch": - return bool(self.build_torch and not self.fetch_torch) - if backend == "tensorflow": - return bool(self.build_tf and not self.fetch_tf) - if backend == "onnxruntime": - return bool(self.build_onnx and not self.fetch_onnx) - if backend == "tflite": - return False - raise BuildError(f"Unrecognized backend requested {backend}") - - def _rai_build_env_prefix( - self, - with_tf: bool, - with_pt: bool, - with_ort: bool, - extra_env: t.Optional[t.Dict[str, str]] = None, - ) -> t.List[str]: - extra_env = extra_env or {} - return [ - self.binary_path("env"), - f"WITH_PT={1 if with_pt else 0}", - f"WITH_TF={1 if with_tf else 0}", - "WITH_TFLITE=0", # never use TF Lite (for now) - f"WITH_ORT={1 if with_ort else 0}", - *(f"{key}={val}" for key, val in extra_env.items()), - ] - - def _fetch_deps_for(self, device: Device) -> None: - if not self.rai_build_path.is_dir(): - raise BuildError("RedisAI build directory not found") - - deps_dir = self.get_deps_dir_path_for(device) - deps_dir.mkdir(parents=True, exist_ok=True) - if any(deps_dir.iterdir()): - raise BuildError("RAI build dependency directory is not empty") - to_fetch = self._get_deps_to_fetch_for(device) - placed_paths = _threaded_map( - _place_rai_dep_at(deps_dir, self.verbose), to_fetch - ) - unique_placed_paths = {os.fspath(path.resolve()) for path in placed_paths} - if len(unique_placed_paths) != len(to_fetch): - raise BuildError( - f"Expected to place {len(to_fetch)} dependencies, but only " - f"found {len(unique_placed_paths)}" - ) - - def _install_backends(self, device: Device) -> None: - """Move backend libraries to smartsim/_core/lib/ - :param device: cpu or cpu - """ - self.rai_install_path = self.rai_build_path.joinpath( - f"install-{device.value}" - ).resolve() - rai_lib = self.rai_install_path / "redisai.so" - rai_backends = self.rai_install_path / "backends" - - if rai_backends.is_dir(): - self.copy_dir(rai_backends, self.lib_path / "backends", set_exe=True) - if rai_lib.is_file(): - self.copy_file(rai_lib, self.lib_path / "redisai.so", set_exe=True) - - def _move_torch_libs(self) -> None: - """Move pip install torch libraries - Since we use pip installed torch libraries for building - RedisAI, we need to move them into the LD_runpath of redisai.so - in the smartsim/_core/lib directory. - """ - ss_rai_torch_path = self.lib_path / "backends" / "redisai_torch" - ss_rai_torch_lib_path = ss_rai_torch_path / "lib" - - # retrieve torch shared libraries and copy to the - # smartsim/_core/lib/backends/redisai_torch/lib dir - # self.torch_dir should be /path/to/torch/share/cmake/Torch - # so we take the great grandparent here - pip_torch_path = Path(self.torch_dir).parent.parent.parent - pip_torch_lib_path = pip_torch_path / "lib" - - self.copy_dir(pip_torch_lib_path, ss_rai_torch_lib_path, set_exe=True) - - # also move the openmp files if on a mac - if sys.platform == "darwin": - dylibs = pip_torch_path / ".dylibs" - self.copy_dir(dylibs, ss_rai_torch_path / ".dylibs", set_exe=True) - - -def _threaded_map(fn: t.Callable[[_T], _U], items: t.Iterable[_T]) -> t.Sequence[_U]: - items = tuple(items) - if not items: # No items so no work to do - return () - num_workers = min(len(items), (os.cpu_count() or 4) * 5) - with concurrent.futures.ThreadPoolExecutor(num_workers) as pool: - return tuple(pool.map(fn, items)) - - class _WebLocation(ABC): @property @abstractmethod @@ -763,7 +262,7 @@ def clone( @t.final @dataclass(frozen=True) -class _DLPackRepository(_WebGitRepository, _RAIBuildDependency): +class _DLPackRepository(_WebGitRepository): version: str @staticmethod @@ -776,18 +275,7 @@ def supported_platforms() -> t.Sequence[t.Tuple[OperatingSystem, Architecture]]: @property def url(self) -> str: - return "https://github.com/RedisAI/dlpack.git" - - @property - def __rai_dependency_name__(self) -> str: - return f"dlpack@{self.url}" - - def __place_for_rai__(self, target: _PathLike) -> Path: - target = Path(target) / "dlpack" - self.clone(target, branch=self.version, depth=1) - if not target.is_dir(): - raise BuildError("Failed to place dlpack") - return target + return "" class _WebArchive(_WebLocation): @@ -836,7 +324,7 @@ def url(self) -> str: @dataclass(frozen=True) -class _PTArchive(_WebZip, _RAIBuildDependency): +class _PTArchive(_WebZip): architecture: Architecture device: Device version: str @@ -851,10 +339,6 @@ def supported_platforms() -> t.Sequence[t.Tuple[OperatingSystem, Architecture]]: ) ) - @property - def __rai_dependency_name__(self) -> str: - return f"libtorch@{self.url}" - @staticmethod def _patch_out_mkl(libtorch_root: Path) -> None: _modify_source_files( @@ -868,13 +352,6 @@ def extract(self, target: _PathLike) -> None: if not self.with_mkl: self._patch_out_mkl(Path(target)) - def __place_for_rai__(self, target: _PathLike) -> Path: - self.extract(target) - target = Path(target) / "libtorch" - if not target.is_dir(): - raise BuildError("Failed to place RAI dependency: `libtorch`") - return target - @t.final class _PTArchiveLinux(_PTArchive): @@ -906,8 +383,6 @@ def supported_platforms() -> t.Sequence[t.Tuple[OperatingSystem, Architecture]]: @property def url(self) -> str: - if self.device == Device.GPU: - raise BuildError("RedisAI does not currently support GPU on Mac OSX") if self.architecture == Architecture.X64: pt_build = Device.CPU.value libtorch_archive = f"libtorch-macos-{self.version}.zip" @@ -937,7 +412,7 @@ def _choose_pt_variant( @t.final @dataclass(frozen=True) -class _TFArchive(_WebTGZ, _RAIBuildDependency): +class _TFArchive(_WebTGZ): os_: OperatingSystem architecture: Architecture device: Device @@ -964,8 +439,6 @@ def url(self) -> str: tf_device = self.device elif self.os_ == OperatingSystem.DARWIN: tf_os = "darwin" - if self.device == Device.GPU: - raise BuildError("RedisAI does not currently support GPU on Macos") tf_device = Device.CPU else: raise BuildError(f"Unexpected OS for TF Archive: {self.os_}") @@ -974,20 +447,10 @@ def url(self) -> str: f"libtensorflow-{tf_device.value}-{tf_os}-{tf_arch}-{self.version}.tar.gz" ) - @property - def __rai_dependency_name__(self) -> str: - return f"libtensorflow@{self.url}" - - def __place_for_rai__(self, target: _PathLike) -> Path: - target = Path(target) / "libtensorflow" - target.mkdir() - self.extract(target) - return target - @t.final @dataclass(frozen=True) -class _ORTArchive(_WebTGZ, _RAIBuildDependency): +class _ORTArchive(_WebTGZ): os_: OperatingSystem device: Device version: str @@ -1013,31 +476,11 @@ def url(self) -> str: ort_os = "osx" ort_arch = "x86_64" ort_build = "" - if self.device == Device.GPU: - raise BuildError("RedisAI does not currently support GPU on Macos") else: raise BuildError(f"Unexpected OS for TF Archive: {self.os_}") ort_archive = f"onnxruntime-{ort_os}-{ort_arch}{ort_build}-{self.version}.tgz" return f"{ort_url_base}/{ort_archive}" - @property - def __rai_dependency_name__(self) -> str: - return f"onnxruntime@{self.url}" - - def __place_for_rai__(self, target: _PathLike) -> Path: - target = Path(target).resolve() / "onnxruntime" - self.extract(target) - try: - (extracted_dir,) = target.iterdir() - except ValueError: - raise BuildError( - "Unexpected number of files extracted from ORT archive" - ) from None - for file in extracted_dir.iterdir(): - file.rename(target / file.name) - extracted_dir.rmdir() - return target - def _git(*args: str) -> None: git = Builder.binary_path("git") diff --git a/smartsim/_core/config/config.py b/smartsim/_core/config/config.py index 1012129e9..70e6ef2de 100644 --- a/smartsim/_core/config/config.py +++ b/smartsim/_core/config/config.py @@ -40,22 +40,6 @@ # These values can be set through environment variables to # override the default behavior of SmartSim. # -# RAI_PATH -# - Path to the RAI shared library -# - Default: /smartsim/smartsim/_core/lib/redisai.so -# -# REDIS_CONF -# - Path to the redis.conf file -# - Default: /SmartSim/smartsim/_core/config/redis.conf -# -# REDIS_PATH -# - Path to the redis-server executable -# - Default: /SmartSim/smartsim/_core/bin/redis-server -# -# REDIS_CLI_PATH -# - Path to the redis-cli executable -# - Default: /SmartSim/smartsim/_core/bin/redis-cli -# # SMARTSIM_LOG_LEVEL # - Log level for SmartSim # - Default: info @@ -99,53 +83,9 @@ def __init__(self) -> None: self.lib_path = Path(dependency_path, "lib").resolve() self.bin_path = Path(dependency_path, "bin").resolve() - self.conf_path = Path(dependency_path, "config", "redis.conf") + self.conf_path = Path(dependency_path, "config") self.conf_dir = Path(self.core_path, "config") - @property - def redisai(self) -> str: - rai_path = self.lib_path / "redisai.so" - redisai = Path(os.environ.get("RAI_PATH", rai_path)).resolve() - if not redisai.is_file(): - raise SSConfigError( - "RedisAI dependency not found. Build with `smart` cli " - "or specify RAI_PATH" - ) - return str(redisai) - - @property - def database_conf(self) -> str: - conf = Path(os.environ.get("REDIS_CONF", self.conf_path)).resolve() - if not conf.is_file(): - raise SSConfigError( - "Feature store configuration file at REDIS_CONF could not be found" - ) - return str(conf) - - @property - def database_exe(self) -> str: - try: - database_exe = next(self.bin_path.glob("*-server")) - feature_store = Path(os.environ.get("REDIS_PATH", database_exe)).resolve() - exe = expand_exe_path(str(feature_store)) - return exe - except (TypeError, FileNotFoundError) as e: - raise SSConfigError( - "Specified feature store binary at REDIS_PATH could not be used" - ) from e - - @property - def database_cli(self) -> str: - try: - redis_cli_exe = next(self.bin_path.glob("*-cli")) - redis_cli = Path(os.environ.get("REDIS_CLI_PATH", redis_cli_exe)).resolve() - exe = expand_exe_path(str(redis_cli)) - return exe - except (TypeError, FileNotFoundError) as e: - raise SSConfigError( - "Specified Redis binary at REDIS_CLI_PATH could not be used" - ) from e - @property def database_file_parse_trials(self) -> int: return int(os.getenv("SMARTSIM_DB_FILE_PARSE_TRIALS", "10")) diff --git a/smartsim/_core/control/controller.py b/smartsim/_core/control/controller.py index 3cdad967e..5c1a4da3e 100644 --- a/smartsim/_core/control/controller.py +++ b/smartsim/_core/control/controller.py @@ -38,9 +38,8 @@ import time import typing as t -from smartredis import Client, ConfigOptions - from smartsim._core.utils.network import get_ip_from_host +from smartsim.entity._mock import Mock from ..._core.launcher.step import Step from ..._core.utils.helpers import ( @@ -48,12 +47,6 @@ unpack_colo_fs_identifier, unpack_fs_identifier, ) -from ..._core.utils.redis import ( - fs_is_active, - set_ml_model, - set_script, - shutdown_fs_node, -) from ...database import FeatureStore from ...entity import Application, Ensemble, EntitySequence, SmartSimEntity from ...error import ( @@ -76,7 +69,7 @@ SlurmLauncher, ) from ..launcher.launcher import Launcher -from ..utils import check_cluster_status, create_cluster, serialize +from ..utils import serialize from .controller_utils import _AnonymousBatchJob, _look_up_launched_data from .job import Job from .jobmanager import JobManager @@ -94,6 +87,38 @@ JM_LOCK = threading.RLock() +class Client(Mock): + """Mock Client""" + + +class ConfigOptions(Mock): + """Mock ConfigOptions""" + + +def fs_is_active(): + pass + + +def set_ml_model(): + pass + + +def set_script(): + pass + + +def shutdown_fs_node(): + pass + + +def create_cluster(): + pass + + +def check_cluster_status(): + pass + + class Controller: """The controller module provides an interface between the smartsim entities created in the experiment and the diff --git a/smartsim/_core/entrypoints/colocated.py b/smartsim/_core/entrypoints/colocated.py deleted file mode 100644 index 44429adaf..000000000 --- a/smartsim/_core/entrypoints/colocated.py +++ /dev/null @@ -1,352 +0,0 @@ -# BSD 2-Clause License -# -# Copyright (c) 2021-2024 Hewlett Packard Enterprise -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. -# -# 2. Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation -# and/or other materials provided with the distribution. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import argparse -import os -import signal -import socket -import sys -import tempfile -import typing as t -from pathlib import Path -from subprocess import STDOUT -from types import FrameType - -import filelock -import psutil -from smartredis import Client, ConfigOptions -from smartredis.error import RedisConnectionError, RedisReplyError - -from smartsim._core.utils.network import current_ip -from smartsim.error import SSInternalError -from smartsim.log import get_logger - -logger = get_logger(__name__) - -DBPID = None - -# kill is not catchable -SIGNALS = [signal.SIGINT, signal.SIGTERM, signal.SIGQUIT, signal.SIGABRT] - - -def handle_signal(signo: int, _frame: t.Optional[FrameType]) -> None: - if not signo: - logger.warning("Received signal with no signo") - cleanup() - - -def launch_fs_model(client: Client, fs_model: t.List[str]) -> str: - """Parse options to launch model on local cluster - - :param client: SmartRedis client connected to local FS - :param fs_model: List of arguments defining the model - :return: Name of model - """ - parser = argparse.ArgumentParser("Set ML model on FS") - parser.add_argument("--name", type=str) - parser.add_argument("--file", type=str) - parser.add_argument("--backend", type=str) - parser.add_argument("--device", type=str) - parser.add_argument("--devices_per_node", type=int, default=1) - parser.add_argument("--first_device", type=int, default=0) - parser.add_argument("--batch_size", type=int, default=0) - parser.add_argument("--min_batch_size", type=int, default=0) - parser.add_argument("--min_batch_timeout", type=int, default=0) - parser.add_argument("--tag", type=str, default="") - parser.add_argument("--inputs", nargs="+", default=None) - parser.add_argument("--outputs", nargs="+", default=None) - args = parser.parse_args(fs_model) - - inputs = None - outputs = None - - if args.inputs: - inputs = list(args.inputs) - if args.outputs: - outputs = list(args.outputs) - - name = str(args.name) - - # devices_per_node being greater than one only applies to GPU devices - if args.devices_per_node > 1 and args.device.lower() == "gpu": - client.set_model_from_file_multigpu( - name=name, - model_file=args.file, - backend=args.backend, - first_gpu=args.first_device, - num_gpus=args.devices_per_node, - batch_size=args.batch_size, - min_batch_size=args.min_batch_size, - min_batch_timeout=args.min_batch_timeout, - tag=args.tag, - inputs=inputs, - outputs=outputs, - ) - else: - client.set_model_from_file( - name=name, - model_file=args.file, - backend=args.backend, - device=args.device, - batch_size=args.batch_size, - min_batch_size=args.min_batch_size, - min_batch_timeout=args.min_batch_timeout, - tag=args.tag, - inputs=inputs, - outputs=outputs, - ) - - return name - - -def launch_fs_script(client: Client, fs_script: t.List[str]) -> str: - """Parse options to launch script on local cluster - - :param client: SmartRedis client connected to local FS - :param fs_model: List of arguments defining the script - :return: Name of model - """ - parser = argparse.ArgumentParser("Set script on FS") - parser.add_argument("--name", type=str) - parser.add_argument("--func", type=str) - parser.add_argument("--file", type=str) - parser.add_argument("--backend", type=str) - parser.add_argument("--device", type=str) - parser.add_argument("--devices_per_node", type=int, default=1) - parser.add_argument("--first_device", type=int, default=0) - args = parser.parse_args(fs_script) - - if args.file and args.func: - raise ValueError("Both file and func cannot be provided.") - - if args.func: - func = args.func.replace("\\n", "\n") - if args.devices_per_node > 1 and args.device.lower() == "gpu": - client.set_script_multigpu( - args.name, func, args.first_device, args.devices_per_node - ) - else: - client.set_script(args.name, func, args.device) - elif args.file: - if args.devices_per_node > 1 and args.device.lower() == "gpu": - client.set_script_from_file_multigpu( - args.name, args.file, args.first_device, args.devices_per_node - ) - else: - client.set_script_from_file(args.name, args.file, args.device) - else: - raise ValueError("No file or func provided.") - - return str(args.name) - - -def main( - network_interface: str, - fs_cpus: int, - command: t.List[str], - fs_models: t.List[t.List[str]], - fs_scripts: t.List[t.List[str]], - fs_identifier: str, -) -> None: - # pylint: disable=too-many-statements - global DBPID # pylint: disable=global-statement - - lo_address = current_ip("lo") - ip_addresses = [] - if network_interface: - try: - ip_addresses = [ - current_ip(interface) for interface in network_interface.split(",") - ] - except ValueError as e: - logger.warning(e) - - if all(lo_address == ip_address for ip_address in ip_addresses) or not ip_addresses: - cmd = command + [f"--bind {lo_address}"] - else: - # bind to both addresses if the user specified a network - # address that exists and is not the loopback address - cmd = command + [f"--bind {lo_address} {' '.join(ip_addresses)}"] - # pin source address to avoid random selection by Redis - cmd += [f"--bind-source-addr {lo_address}"] - - # we generally want to catch all exceptions here as - # if this process dies, the application will most likely fail - try: - hostname = socket.gethostname() - filename = ( - f"colo_feature_store_{hostname}.log" - if os.getenv("SMARTSIM_LOG_LEVEL") == "debug" - else os.devnull - ) - with open(filename, "w", encoding="utf-8") as file: - process = psutil.Popen(cmd, stdout=file.fileno(), stderr=STDOUT) - DBPID = process.pid - # printing to stdout shell file for extraction - print(f"__PID__{DBPID}__PID__", flush=True) - - except Exception as e: - cleanup() - logger.error(f"Failed to start feature store process: {str(e)}") - raise SSInternalError("Colocated process failed to start") from e - - try: - logger.debug( - "\n\nColocated feature store information\n" - f"\n\tIP Address(es): {' '.join(ip_addresses + [lo_address])}" - f"\n\tCommand: {' '.join(cmd)}\n\n" - f"\n\t# of Feature Store CPUs: {fs_cpus}" - f"\n\tFeature Store Identifier: {fs_identifier}" - ) - except Exception as e: - cleanup() - logger.error(f"Failed to start feature store process: {str(e)}") - raise SSInternalError("Colocated process failed to start") from e - - def launch_models(client: Client, fs_models: t.List[t.List[str]]) -> None: - for i, fs_model in enumerate(fs_models): - logger.debug("Uploading model") - model_name = launch_fs_model(client, fs_model) - logger.debug(f"Added model {model_name} ({i+1}/{len(fs_models)})") - - def launch_fs_scripts(client: Client, fs_scripts: t.List[t.List[str]]) -> None: - for i, fs_script in enumerate(fs_scripts): - logger.debug("Uploading script") - script_name = launch_fs_script(client, fs_script) - logger.debug(f"Added script {script_name} ({i+1}/{len(fs_scripts)})") - - try: - if fs_models or fs_scripts: - try: - options = ConfigOptions.create_from_environment(fs_identifier) - client = Client(options, logger_name="SmartSim") - launch_models(client, fs_models) - launch_fs_scripts(client, fs_scripts) - except (RedisConnectionError, RedisReplyError) as ex: - raise SSInternalError( - "Failed to set model or script, could not connect to feature store" - ) from ex - # Make sure we don't keep this around - del client - - except Exception as e: - cleanup() - logger.error(f"Colocated feature store process failed: {str(e)}") - raise SSInternalError("Colocated entrypoint raised an error") from e - - -def cleanup() -> None: - try: - logger.debug("Cleaning up colocated feature store") - # attempt to stop the feature store process - fs_proc = psutil.Process(DBPID) - fs_proc.terminate() - - except psutil.NoSuchProcess: - logger.warning("Couldn't find feature store process to kill.") - - except OSError as e: - logger.warning( - f"Failed to clean up colocated feature store gracefully: {str(e)}" - ) - finally: - if LOCK.is_locked: - LOCK.release() - - if os.path.exists(LOCK.lock_file): - os.remove(LOCK.lock_file) - - -def register_signal_handlers() -> None: - for sig in SIGNALS: - signal.signal(sig, handle_signal) - - -if __name__ == "__main__": - arg_parser = argparse.ArgumentParser( - prefix_chars="+", description="SmartSim Process Launcher" - ) - arg_parser.add_argument( - "+ifname", type=str, help="Network Interface name", default="" - ) - arg_parser.add_argument( - "+lockfile", type=str, help="Filename to create for single proc per host" - ) - arg_parser.add_argument( - "+fs_cpus", type=int, default=2, help="Number of CPUs to use for FS" - ) - - arg_parser.add_argument( - "+fs_identifier", type=str, default="", help="Feature Store Identifier" - ) - - arg_parser.add_argument("+command", nargs="+", help="Command to run") - arg_parser.add_argument( - "+fs_model", - nargs="+", - action="append", - default=[], - help="Model to set on FS", - ) - arg_parser.add_argument( - "+fs_script", - nargs="+", - action="append", - default=[], - help="Script to set on FS", - ) - - os.environ["PYTHONUNBUFFERED"] = "1" - - try: - parsed_args = arg_parser.parse_args() - tmp_lockfile = Path(tempfile.gettempdir()) / parsed_args.lockfile - - LOCK = filelock.FileLock(tmp_lockfile) - LOCK.acquire(timeout=0.1) - logger.debug( - f"Starting colocated feature store on host: {socket.gethostname()}" - ) - - # make sure to register the cleanup before we start - # the proecss so our signaller will be able to stop - # the feature store process. - register_signal_handlers() - - main( - parsed_args.ifname, - parsed_args.fs_cpus, - parsed_args.command, - parsed_args.fs_model, - parsed_args.fs_script, - parsed_args.fs_identifier, - ) - - # gracefully exit the processes in the distributed application that - # we do not want to have start a colocated process. Only one process - # per node should be running. - except filelock.Timeout: - sys.exit(0) diff --git a/smartsim/_core/entrypoints/redis.py b/smartsim/_core/entrypoints/redis.py deleted file mode 100644 index 995c6faa0..000000000 --- a/smartsim/_core/entrypoints/redis.py +++ /dev/null @@ -1,192 +0,0 @@ -# BSD 2-Clause License -# -# Copyright (c) 2021-2024 Hewlett Packard Enterprise -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. -# -# 2. Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation -# and/or other materials provided with the distribution. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import argparse -import json -import os -import signal -import textwrap -import typing as t -from subprocess import PIPE, STDOUT -from types import FrameType - -import psutil - -from smartsim._core.utils.network import current_ip -from smartsim.entity.dbnode import LaunchedShardData -from smartsim.log import get_logger - -logger = get_logger(__name__) - -""" -Redis/KeyDB entrypoint script -""" - -DBPID: t.Optional[int] = None - -# kill is not catchable -SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM, signal.SIGABRT] - - -def handle_signal(signo: int, _frame: t.Optional[FrameType]) -> None: - if not signo: - logger.warning("Received signal with no signo") - cleanup() - - -def build_bind_args(source_addr: str, *addrs: str) -> t.Tuple[str, ...]: - return ( - "--bind", - source_addr, - *addrs, - # pin source address to avoid random selection by Redis - "--bind-source-addr", - source_addr, - ) - - -def build_cluster_args(shard_data: LaunchedShardData) -> t.Tuple[str, ...]: - if cluster_conf_file := shard_data.cluster_conf_file: - return ("--cluster-enabled", "yes", "--cluster-config-file", cluster_conf_file) - return () - - -def print_summary( - cmd: t.List[str], network_interface: str, shard_data: LaunchedShardData -) -> None: - print( - textwrap.dedent(f"""\ - ----------- Running Command ---------- - COMMAND: {' '.join(cmd)} - IPADDRESS: {shard_data.hostname} - NETWORK: {network_interface} - SMARTSIM_ORC_SHARD_INFO: {json.dumps(shard_data.to_dict())} - -------------------------------------- - - --------------- Output --------------- - - """), - flush=True, - ) - - -def main(args: argparse.Namespace) -> int: - global DBPID # pylint: disable=global-statement - - src_addr, *bind_addrs = (current_ip(net_if) for net_if in args.ifname.split(",")) - shard_data = LaunchedShardData( - name=args.name, hostname=src_addr, port=args.port, cluster=args.cluster - ) - - cmd = [ - args.orc_exe, - args.conf_file, - *args.rai_module, - "--port", - str(args.port), - *build_cluster_args(shard_data), - *build_bind_args(src_addr, *bind_addrs), - ] - - print_summary(cmd, args.ifname, shard_data) - - try: - process = psutil.Popen(cmd, stdout=PIPE, stderr=STDOUT) - DBPID = process.pid - - for line in iter(process.stdout.readline, b""): - print(line.decode("utf-8").rstrip(), flush=True) - except Exception: - cleanup() - logger.error("Feature store process starter raised an exception", exc_info=True) - return 1 - return 0 - - -def cleanup() -> None: - logger.debug("Cleaning up feature store instance") - try: - # attempt to stop the feature store process - if DBPID is not None: - psutil.Process(DBPID).terminate() - except psutil.NoSuchProcess: - logger.warning("Couldn't find feature store process to kill.") - except OSError as e: - logger.warning(f"Failed to clean up feature store gracefully: {str(e)}") - - -if __name__ == "__main__": - os.environ["PYTHONUNBUFFERED"] = "1" - - parser = argparse.ArgumentParser( - prefix_chars="+", description="SmartSim Process Launcher" - ) - parser.add_argument( - "+orc-exe", type=str, help="Path to the feature store executable", required=True - ) - parser.add_argument( - "+conf-file", - type=str, - help="Path to the feature store configuration file", - required=True, - ) - parser.add_argument( - "+rai-module", - nargs="+", - type=str, - help=( - "Command for the orcestrator to load the Redis AI module with " - "symbols seperated by whitespace" - ), - required=True, - ) - parser.add_argument( - "+name", type=str, help="Name to identify the shard", required=True - ) - parser.add_argument( - "+port", - type=int, - help="The port on which to launch the shard of the feature store", - required=True, - ) - parser.add_argument( - "+ifname", type=str, help="Network Interface name", required=True - ) - parser.add_argument( - "+cluster", - action="store_true", - help="Specify if this feature store shard is part of a cluster", - ) - - args_ = parser.parse_args() - - # make sure to register the cleanup before the start - # the process so our signaller will be able to stop - # the feature store process. - for sig in SIGNALS: - signal.signal(sig, handle_signal) - - raise SystemExit(main(args_)) diff --git a/smartsim/_core/launcher/colocated.py b/smartsim/_core/launcher/colocated.py deleted file mode 100644 index 9f307968b..000000000 --- a/smartsim/_core/launcher/colocated.py +++ /dev/null @@ -1,244 +0,0 @@ -# BSD 2-Clause License -# -# Copyright (c) 2021-2024 Hewlett Packard Enterprise -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. -# -# 2. Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation -# and/or other materials provided with the distribution. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import sys -import typing as t - -from ...entity.dbobject import FSModel, FSScript -from ...error import SSInternalError -from ..config import CONFIG -from ..utils.helpers import create_lockfile_name - - -def write_colocated_launch_script( - file_name: str, fs_log: str, colocated_settings: t.Dict[str, t.Any] -) -> None: - """Write the colocated launch script - - This file will be written into the cwd of the step that - is created for this entity. - - :param file_name: name of the script to write - :param fs_log: log file for the fs - :param colocated_settings: fs settings from entity run_settings - """ - - colocated_cmd = _build_colocated_wrapper_cmd(fs_log, **colocated_settings) - - with open(file_name, "w", encoding="utf-8") as script_file: - script_file.write("#!/bin/bash\n") - script_file.write("set -e\n\n") - - script_file.write("Cleanup () {\n") - script_file.write("if ps -p $DBPID > /dev/null; then\n") - script_file.write("\tkill -15 $DBPID\n") - script_file.write("fi\n}\n\n") - - # run cleanup after all exitcodes - script_file.write("trap Cleanup exit\n\n") - - # force entrypoint to write some debug information to the - # STDOUT of the job - if colocated_settings["debug"]: - script_file.write("export SMARTSIM_LOG_LEVEL=debug\n") - script_file.write(f"db_stdout=$({colocated_cmd})\n") - # extract and set DBPID within the shell script that is - # enclosed between __PID__ and sent to stdout by the colocated - # entrypoints file - script_file.write( - "DBPID=$(echo $db_stdout | sed -n " - "'s/.*__PID__\\([0-9]*\\)__PID__.*/\\1/p')\n" - ) - - # Write the actual launch command for the app - script_file.write("$@\n\n") - - -def _build_colocated_wrapper_cmd( - fs_log: str, - cpus: int = 1, - rai_args: t.Optional[t.Dict[str, str]] = None, - extra_fs_args: t.Optional[t.Dict[str, str]] = None, - port: int = 6780, - ifname: t.Optional[t.Union[str, t.List[str]]] = None, - custom_pinning: t.Optional[str] = None, - **kwargs: t.Any, -) -> str: - """Build the command use to run a colocated fs application - - :param fs_log: log file for the fs - :param cpus: fs cpus - :param rai_args: redisai args - :param extra_fs_args: extra redis args - :param port: port to bind fs to - :param ifname: network interface(s) to bind fs to - :param fs_cpu_list: The list of CPUs that the feature store should be limited to - :return: the command to run - """ - # pylint: disable=too-many-locals - - # create unique lockfile name to avoid symlink vulnerability - # this is the lockfile all the processes in the distributed - # application will try to acquire. since we use a local tmp - # directory on the compute node, only one process can acquire - # the lock on the file. - lockfile = create_lockfile_name() - - # create the command that will be used to launch the - # feature store with the python entrypoint for starting - # up the backgrounded fs process - - cmd = [ - sys.executable, - "-m", - "smartsim._core.entrypoints.colocated", - "+lockfile", - lockfile, - "+fs_cpus", - str(cpus), - ] - # Add in the interface if using TCP/IP - if ifname: - if isinstance(ifname, str): - ifname = [ifname] - cmd.extend(["+ifname", ",".join(ifname)]) - cmd.append("+command") - # collect fs binaries and libraries from the config - - fs_cmd = [] - if custom_pinning: - fs_cmd.extend(["taskset", "-c", custom_pinning]) - fs_cmd.extend( - [CONFIG.database_exe, CONFIG.database_conf, "--loadmodule", CONFIG.redisai] - ) - - # add extra redisAI configurations - for arg, value in (rai_args or {}).items(): - if value: - # RAI wants arguments for inference in all caps - # ex. THREADS_PER_QUEUE=1 - fs_cmd.append(f"{arg.upper()} {str(value)}") - - fs_cmd.extend(["--port", str(port)]) - - # Add socket and permissions for UDS - unix_socket = kwargs.get("unix_socket", None) - socket_permissions = kwargs.get("socket_permissions", None) - - if unix_socket and socket_permissions: - fs_cmd.extend( - [ - "--unixsocket", - str(unix_socket), - "--unixsocketperm", - str(socket_permissions), - ] - ) - elif bool(unix_socket) ^ bool(socket_permissions): - raise SSInternalError( - "`unix_socket` and `socket_permissions` must both be defined or undefined." - ) - - fs_cmd.extend( - ["--logfile", fs_log] - ) # usually /dev/null, unless debug was specified - if extra_fs_args: - for fs_arg, value in extra_fs_args.items(): - # replace "_" with "-" in the fs_arg because we use kwargs - # for the extra configurations and Python doesn't allow a hyphen - # in a variable name. All redis and KeyDB configuration options - # use hyphens in their names. - fs_arg = fs_arg.replace("_", "-") - fs_cmd.extend([f"--{fs_arg}", value]) - - fs_models = kwargs.get("fs_models", None) - if fs_models: - fs_model_cmd = _build_fs_model_cmd(fs_models) - fs_cmd.extend(fs_model_cmd) - - fs_scripts = kwargs.get("fs_scripts", None) - if fs_scripts: - fs_script_cmd = _build_fs_script_cmd(fs_scripts) - fs_cmd.extend(fs_script_cmd) - - cmd.extend(fs_cmd) - - return " ".join(cmd) - - -def _build_fs_model_cmd(fs_models: t.List[FSModel]) -> t.List[str]: - cmd = [] - for fs_model in fs_models: - cmd.append("+fs_model") - cmd.append(f"--name={fs_model.name}") - - # Here fs_model.file is guaranteed to exist - # because we don't allow the user to pass a serialized FSModel - cmd.append(f"--file={fs_model.file}") - - cmd.append(f"--backend={fs_model.backend}") - cmd.append(f"--device={fs_model.device}") - cmd.append(f"--devices_per_node={fs_model.devices_per_node}") - cmd.append(f"--first_device={fs_model.first_device}") - if fs_model.batch_size: - cmd.append(f"--batch_size={fs_model.batch_size}") - if fs_model.min_batch_size: - cmd.append(f"--min_batch_size={fs_model.min_batch_size}") - if fs_model.min_batch_timeout: - cmd.append(f"--min_batch_timeout={fs_model.min_batch_timeout}") - if fs_model.tag: - cmd.append(f"--tag={fs_model.tag}") - if fs_model.inputs: - cmd.append("--inputs=" + ",".join(fs_model.inputs)) - if fs_model.outputs: - cmd.append("--outputs=" + ",".join(fs_model.outputs)) - - return cmd - - -def _build_fs_script_cmd(fs_scripts: t.List[FSScript]) -> t.List[str]: - cmd = [] - for fs_script in fs_scripts: - cmd.append("+fs_script") - cmd.append(f"--name={fs_script.name}") - if fs_script.func: - # Notice that here fs_script.func is guaranteed to be a str - # because we don't allow the user to pass a serialized function - func = fs_script.func - sanitized_func = func.replace("\n", "\\n") - if not ( - sanitized_func.startswith("'") - and sanitized_func.endswith("'") - or (sanitized_func.startswith('"') and sanitized_func.endswith('"')) - ): - sanitized_func = '"' + sanitized_func + '"' - cmd.append(f"--func={sanitized_func}") - elif fs_script.file: - cmd.append(f"--file={fs_script.file}") - cmd.append(f"--device={fs_script.device}") - cmd.append(f"--devices_per_node={fs_script.devices_per_node}") - cmd.append(f"--first_device={fs_script.first_device}") - return cmd diff --git a/smartsim/_core/launcher/step/step.py b/smartsim/_core/launcher/step/step.py index cd933ec3f..46bcebf7f 100644 --- a/smartsim/_core/launcher/step/step.py +++ b/smartsim/_core/launcher/step/step.py @@ -42,11 +42,14 @@ from ....log import get_logger from ....settings import RunSettings, SettingsBase from ...utils.helpers import encode_cmd, get_base_36_repr -from ..colocated import write_colocated_launch_script logger = get_logger(__name__) +def write_colocated_launch_script(): + pass + + class Step: def __init__( self, entity: t.Union[Application, FSNode], step_settings: SettingsBase diff --git a/smartsim/_core/utils/__init__.py b/smartsim/_core/utils/__init__.py index 584a417a2..30256034c 100644 --- a/smartsim/_core/utils/__init__.py +++ b/smartsim/_core/utils/__init__.py @@ -29,7 +29,5 @@ colorize, delete_elements, execute_platform_cmd, - installed_redisai_backends, is_crayex_platform, ) -from .redis import check_cluster_status, create_cluster, fs_is_active diff --git a/smartsim/_core/utils/helpers.py b/smartsim/_core/utils/helpers.py index 62d176259..eb01329fd 100644 --- a/smartsim/_core/utils/helpers.py +++ b/smartsim/_core/utils/helpers.py @@ -38,11 +38,8 @@ import uuid from datetime import datetime from functools import lru_cache -from pathlib import Path from shutil import which -from smartsim._core._install.builder import TRedisAIBackendStr as _TRedisAIBackendStr - if t.TYPE_CHECKING: from types import FrameType @@ -231,52 +228,6 @@ def cat_arg_and_value(arg_name: str, value: str) -> str: return f"--{arg_name}={value}" -def _installed(base_path: Path, backend: str) -> bool: - """ - Check if a backend is available for the RedisAI module. - """ - backend_key = f"redisai_{backend}" - backend_path = base_path / backend_key / f"{backend_key}.so" - backend_so = Path(os.environ.get("RAI_PATH", backend_path)).resolve() - - return backend_so.is_file() - - -def redis_install_base(backends_path: t.Optional[str] = None) -> Path: - # pylint: disable-next=import-outside-toplevel - from ..._core.config import CONFIG - - base_path = Path(backends_path) if backends_path else CONFIG.lib_path / "backends" - return base_path - - -def installed_redisai_backends( - backends_path: t.Optional[str] = None, -) -> t.Set[_TRedisAIBackendStr]: - """Check which ML backends are available for the RedisAI module. - - The optional argument ``backends_path`` is needed if the backends - have not been built as part of the SmartSim building process (i.e. - they have not been built by invoking `smart build`). In that case - ``backends_path`` should point to the directory containing e.g. - the backend directories (`redisai_tensorflow`, `redisai_torch`, - `redisai_onnxruntime`, or `redisai_tflite`). - - :param backends_path: path containing backends - :return: list of installed RedisAI backends - """ - # import here to avoid circular import - base_path = redis_install_base(backends_path) - backends: t.Set[_TRedisAIBackendStr] = { - "tensorflow", - "torch", - "onnxruntime", - "tflite", - } - - return {backend for backend in backends if _installed(base_path, backend)} - - def get_ts_ms() -> int: """Return the current timestamp (accurate to milliseconds) cast to an integer""" return int(datetime.now().timestamp() * 1000) diff --git a/smartsim/_core/utils/redis.py b/smartsim/_core/utils/redis.py deleted file mode 100644 index 524fadbdc..000000000 --- a/smartsim/_core/utils/redis.py +++ /dev/null @@ -1,238 +0,0 @@ -# BSD 2-Clause License -# -# Copyright (c) 2021-2024, Hewlett Packard Enterprise -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. -# -# 2. Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation -# and/or other materials provided with the distribution. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import logging -import time -import typing as t -from itertools import product - -import redis -from redis.cluster import ClusterNode, RedisCluster -from redis.exceptions import ClusterDownError, RedisClusterException -from smartredis import Client -from smartredis.error import RedisReplyError - -from ...entity import FSModel, FSScript -from ...error import SSInternalError -from ...log import get_logger -from ..config import CONFIG -from .network import get_ip_from_host -from .shell import execute_cmd - -logging.getLogger("rediscluster").setLevel(logging.WARNING) -logger = get_logger(__name__) - - -def create_cluster(hosts: t.List[str], ports: t.List[int]) -> None: # cov-wlm - """Connect launched cluster instances. - - Should only be used in the case where cluster initialization - needs to occur manually which is not often. - - :param hosts: List of hostnames to connect to - :param ports: List of ports for each hostname - :raises SmartSimError: if cluster creation fails - """ - ip_list = [] - for host in hosts: - ip_address = get_ip_from_host(host) - for port in ports: - address = ":".join((ip_address, str(port) + " ")) - ip_list.append(address) - - # call cluster command - redis_cli = CONFIG.database_cli - cmd = [redis_cli, "--cluster", "create"] - cmd += ip_list - cmd += ["--cluster-replicas", "0", "--cluster-yes"] - returncode, out, err = execute_cmd(cmd, proc_input="yes", shell=False) - - if returncode != 0: - logger.error(out) - logger.error(err) - raise SSInternalError("Feature store '--cluster create' command failed") - logger.debug(out) - - -def check_cluster_status( - hosts: t.List[str], ports: t.List[int], trials: int = 10 -) -> None: # cov-wlm - """Check that a Redis/KeyDB cluster is up and running - - :param hosts: List of hostnames to connect to - :param ports: List of ports for each hostname - :param trials: number of attempts to verify cluster status - - :raises SmartSimError: If cluster status cannot be verified - """ - cluster_nodes = [ - ClusterNode(get_ip_from_host(host), port) - for host, port in product(hosts, ports) - ] - - if not cluster_nodes: - raise SSInternalError( - "No cluster nodes have been set for feature store status check." - ) - - logger.debug("Beginning feature store cluster status check...") - while trials > 0: - # wait for cluster to spin up - time.sleep(5) - try: - redis_tester: "RedisCluster[t.Any]" = RedisCluster( - startup_nodes=cluster_nodes - ) - redis_tester.set("__test__", "__test__") - redis_tester.delete("__test__") # type: ignore - logger.debug("Cluster status verified") - return - except (ClusterDownError, RedisClusterException, redis.RedisError): - logger.debug("Cluster still spinning up...") - trials -= 1 - if trials == 0: - raise SSInternalError("Cluster setup could not be verified") - - -def fs_is_active(hosts: t.List[str], ports: t.List[int], num_shards: int) -> bool: - """Check if a FS is running - - if the FS is clustered, check cluster status, otherwise - just ping FS. - - :param hosts: list of hosts - :param ports: list of ports - :param num_shards: Number of FS shards - :return: Whether FS is running - """ - # if single shard - if num_shards < 2: - host = hosts[0] - port = ports[0] - try: - client = redis.Redis(host=host, port=port, db=0) - if client.ping(): - return True - return False - except redis.RedisError: - return False - # if a cluster - else: - try: - check_cluster_status(hosts, ports, trials=1) - return True - # we expect this to fail if the cluster is not active - except SSInternalError: - return False - - -def set_ml_model(fs_model: FSModel, client: Client) -> None: - logger.debug(f"Adding FSModel named {fs_model.name}") - - for device in fs_model.devices: - try: - if fs_model.is_file: - client.set_model_from_file( - name=fs_model.name, - model_file=str(fs_model.file), - backend=fs_model.backend, - device=device, - batch_size=fs_model.batch_size, - min_batch_size=fs_model.min_batch_size, - min_batch_timeout=fs_model.min_batch_timeout, - tag=fs_model.tag, - inputs=fs_model.inputs, - outputs=fs_model.outputs, - ) - else: - if fs_model.model is None: - raise ValueError(f"No model attacted to {fs_model.name}") - client.set_model( - name=fs_model.name, - model=fs_model.model, - backend=fs_model.backend, - device=device, - batch_size=fs_model.batch_size, - min_batch_size=fs_model.min_batch_size, - min_batch_timeout=fs_model.min_batch_timeout, - tag=fs_model.tag, - inputs=fs_model.inputs, - outputs=fs_model.outputs, - ) - except RedisReplyError as error: # pragma: no cover - logger.error("Error while setting model on feature store.") - raise error - - -def set_script(fs_script: FSScript, client: Client) -> None: - logger.debug(f"Adding FSScript named {fs_script.name}") - - for device in fs_script.devices: - try: - if fs_script.is_file: - client.set_script_from_file( - name=fs_script.name, file=str(fs_script.file), device=device - ) - elif fs_script.script: - if isinstance(fs_script.script, str): - client.set_script( - name=fs_script.name, script=fs_script.script, device=device - ) - elif callable(fs_script.script): - client.set_function( - name=fs_script.name, function=fs_script.script, device=device - ) - else: - raise ValueError(f"No script or file attached to {fs_script.name}") - except RedisReplyError as error: # pragma: no cover - logger.error("Error while setting model on feature store.") - raise error - - -def shutdown_fs_node(host_ip: str, port: int) -> t.Tuple[int, str, str]: # cov-wlm - """Send shutdown signal to FS node. - - Should only be used in the case where cluster deallocation - needs to occur manually. Usually, the SmartSim job manager - will take care of this automatically. - - :param host_ip: IP of host to connect to - :param ports: Port to which node is listening - :return: returncode, output, and error of the process - """ - redis_cli = CONFIG.database_cli - cmd = [redis_cli, "-h", host_ip, "-p", str(port), "shutdown"] - returncode, out, err = execute_cmd(cmd, proc_input="yes", shell=False, timeout=10) - - if returncode != 0: - logger.error(out) - err_msg = "Error while shutting down DB node. " - err_msg += f"Return code: {returncode}, err: {err}" - logger.error(err_msg) - elif out: - logger.debug(out) - - return returncode, out, err diff --git a/smartsim/_core/utils/telemetry/collector.py b/smartsim/_core/utils/telemetry/collector.py index 4d0a79af3..02f5ed9f1 100644 --- a/smartsim/_core/utils/telemetry/collector.py +++ b/smartsim/_core/utils/telemetry/collector.py @@ -30,16 +30,18 @@ import logging import typing as t -import redis.asyncio as redisa -import redis.exceptions as redisex - from smartsim._core.control.job import JobEntity from smartsim._core.utils.helpers import get_ts_ms from smartsim._core.utils.telemetry.sink import FileSink, Sink +from smartsim.entity._mock import Mock logger = logging.getLogger("TelemetryMonitor") +class Client(Mock): + """Mock Client""" + + class Collector(abc.ABC): """Base class for telemetry collectors. @@ -114,6 +116,7 @@ def __str__(self) -> str: return f"{self.host}:{self.port}" +# TODO add a new Client class DBCollector(Collector): """A base class for collectors that retrieve statistics from a feature store""" @@ -124,7 +127,7 @@ def __init__(self, entity: JobEntity, sink: Sink) -> None: :param sink: destination to write collected information """ super().__init__(entity, sink) - self._client: t.Optional[redisa.Redis[bytes]] = None + self._client: Client self._address = _DBAddress( self._entity.config.get("host", ""), int(self._entity.config.get("port", 0)), @@ -134,9 +137,7 @@ async def _configure_client(self) -> None: """Configure the client connection to the target feature store""" try: if not self._client: - self._client = redisa.Redis( - host=self._address.host, port=self._address.port - ) + self._client = None except Exception as e: logger.exception(e) finally: @@ -218,7 +219,7 @@ async def _check_fs(self) -> bool: try: if self._client: return await self._client.ping() - except redisex.ConnectionError: + except Exception: logger.warning(f"Cannot ping fs {self._address}") return False diff --git a/smartsim/database/orchestrator.py b/smartsim/database/orchestrator.py index 7ca101bf5..a6bd01c07 100644 --- a/smartsim/database/orchestrator.py +++ b/smartsim/database/orchestrator.py @@ -35,21 +35,14 @@ from shlex import split as sh_split import psutil -from smartredis import Client, ConfigOptions -from smartredis.error import RedisReplyError -from .._core.config import CONFIG -from .._core.utils import fs_is_active +from smartsim.entity._mock import Mock + from .._core.utils.helpers import is_valid_cmd, unpack_fs_identifier from .._core.utils.network import get_ip_from_host from .._core.utils.shell import execute_cmd from ..entity import EntityList, FSNode, TelemetryConfiguration -from ..error import ( - SmartSimError, - SSConfigError, - SSDBFilesNotParseable, - SSUnsupportedError, -) +from ..error import SmartSimError, SSDBFilesNotParseable, SSUnsupportedError from ..log import get_logger from ..servertype import CLUSTERED, STANDALONE from ..settings import ( @@ -72,6 +65,19 @@ logger = get_logger(__name__) + +class Client(Mock): + """Mock Client""" + + +class ConfigOptions(Mock): + """Mock ConfigOptions""" + + +def fs_is_active(): + return False + + by_launcher: t.Dict[str, t.List[str]] = { "dragon": [""], "slurm": ["srun", "mpirun", "mpiexec"], @@ -189,7 +195,7 @@ def __init__( ) -> None: """Initialize an ``FeatureStore`` reference for local launch - Extra configurations for RedisAI + Extra configurations :param path: path to location of ``FeatureStore`` directory :param port: TCP/IP port @@ -254,23 +260,6 @@ def __init__( **kwargs, ) - # detect if we can find at least the redis binaries. We - # don't want to force the user to launch with RedisAI so - # it's ok if that isn't present. - try: - # try to obtain redis binaries needed to launch Redis - # will raise SSConfigError if not found - self._redis_exe # pylint: disable=W0104 - self._redis_conf # pylint: disable=W0104 - CONFIG.database_cli # pylint: disable=W0104 - except SSConfigError as e: - raise SSConfigError( - "SmartSim not installed with pre-built extensions (Redis)\n" - "Use the `smart` cli tool to install needed extensions\n" - "or set REDIS_PATH and REDIS_CLI_PATH in your environment\n" - "See documentation for more information" - ) from e - if self.launcher != "local": self.batch_settings = self._build_batch_settings( fs_nodes, @@ -405,30 +394,6 @@ def is_active(self) -> bool: return False return fs_is_active(hosts, self.ports, self.num_shards) - @property - def _rai_module(self) -> t.Tuple[str, ...]: - """Get the RedisAI module from third-party installations - - :return: Tuple of args to pass to the FeatureStore exe - to load and configure the RedisAI - """ - module = ["--loadmodule", CONFIG.redisai] - if self.queue_threads: - module.extend(("THREADS_PER_QUEUE", str(self.queue_threads))) - if self.inter_threads: - module.extend(("INTER_OP_PARALLELISM", str(self.inter_threads))) - if self.intra_threads: - module.extend(("INTRA_OP_PARALLELISM", str(self.intra_threads))) - return tuple(module) - - @property - def _redis_exe(self) -> str: - return CONFIG.database_exe - - @property - def _redis_conf(self) -> str: - return CONFIG.database_conf - @property def checkpoint_file(self) -> str: """Get the path to the checkpoint file for this Feature Store @@ -649,10 +614,6 @@ def set_fs_conf(self, key: str, value: str) -> None: for address in addresses: client.config_set(key, value, address) - except RedisReplyError: - raise SmartSimError( - f"Invalid CONFIG key-value pair ({key}: {value})" - ) from None except TypeError: raise TypeError( "Incompatible function arguments. The key and value used for " @@ -883,13 +844,7 @@ def _get_start_script_args( ) -> t.List[str]: cmd = [ "-m", - "smartsim._core.entrypoints.redis", # entrypoint - f"+orc-exe={self._redis_exe}", # redis-server - f"+conf-file={self._redis_conf}", # redis.conf file - "+rai-module", # load redisai.so - *self._rai_module, f"+name={name}", # name of node - f"+port={port}", # redis port f"+ifname={','.join(self._interfaces)}", # pass interface to start script ] if cluster: diff --git a/smartsim/ml/data.py b/smartsim/ml/data.py index 36c0ae415..21e4e33a5 100644 --- a/smartsim/ml/data.py +++ b/smartsim/ml/data.py @@ -29,8 +29,6 @@ from os import environ import numpy as np -from smartredis import Client, Dataset -from smartredis.error import RedisReplyError from ..error import SSInternalError from ..log import get_logger @@ -75,7 +73,7 @@ def __init__( self.num_classes = num_classes self._ds_name = form_name(self.list_name, "info") - def publish(self, client: Client) -> None: + def publish(self) -> None: """Upload DataInfo information to FeatureStore The information is put on the DB as a DataSet, with strings @@ -83,15 +81,9 @@ def publish(self, client: Client) -> None: :param client: Client to connect to Feature Store """ - info_ds = Dataset(self._ds_name) - info_ds.add_meta_string("sample_name", self.sample_name) - if self.target_name: - info_ds.add_meta_string("target_name", self.target_name) - if self.num_classes: - info_ds.add_meta_scalar("num_classes", self.num_classes) - client.put_dataset(info_ds) + ... - def download(self, client: Client) -> None: + def download(self) -> None: """Download DataInfo information from FeatureStore The information retrieved from the DB is used to populate @@ -100,23 +92,6 @@ def download(self, client: Client) -> None: :param client: Client to connect to Feature Store """ - try: - info_ds = client.get_dataset(self._ds_name) - except RedisReplyError as e: - # If the info was not published, proceed with default parameters - logger.warning( - "Could not retrieve data for DataInfo object, the following " - "values will be kept." - ) - logger.error(f"Original error from Redis was {e}") - logger.warning(str(self)) - return - self.sample_name = info_ds.get_meta_strings("sample_name")[0] - field_names = info_ds.get_metadata_field_names() - if "target_name" in field_names: - self.target_name = info_ds.get_meta_strings("target_name")[0] - if "num_classes" in field_names: - self.num_classes = int(info_ds.get_meta_scalars("num_classes")[0]) def __repr__(self) -> str: strings = ["DataInfo object"] @@ -147,7 +122,7 @@ class TrainingDataUploader: :param target_name: Name of targets tensor (if needed) in uploaded Datasets :param num_classes: Number of classes of targets, if categorical :param cluster: Whether the SmartSim FeatureStore is being run as a cluster - :param address: Address of Redis DB as : + :param address: :param rank: Rank of DataUploader in multi-process application (e.g. MPI rank). :param verbose: If output should be logged to screen. @@ -169,7 +144,6 @@ def __init__( if not sample_name: raise ValueError("Sample name can not be empty") - self.client = Client(cluster, address=address) self.verbose = verbose self.batch_idx = 0 self.rank = rank @@ -192,7 +166,7 @@ def num_classes(self) -> t.Optional[int]: return self._info.num_classes def publish_info(self) -> None: - self._info.publish(self.client) + self._info.publish() def put_batch( self, @@ -200,25 +174,20 @@ def put_batch( targets: t.Optional[np.ndarray] = None, # type: ignore[type-arg] ) -> None: batch_ds_name = form_name("training_samples", self.rank, self.batch_idx) - batch_ds = Dataset(batch_ds_name) - batch_ds.add_tensor(self.sample_name, samples) if ( targets is not None and self.target_name and (self.target_name != self.sample_name) ): - batch_ds.add_tensor(self.target_name, targets) if self.verbose: logger.info(f"Putting dataset {batch_ds_name} with samples and targets") else: if self.verbose: logger.info(f"Putting dataset {batch_ds_name} with samples") - self.client.put_dataset(batch_ds) - self.client.append_to_list(self.list_name, batch_ds) if self.verbose: logger.info(f"Added dataset to list {self.list_name}") - logger.info(f"List length {self.client.get_list_length(self.list_name)}") + logger.info(f"List length") self.batch_idx += 1 @@ -262,7 +231,7 @@ class DataDownloader: from DB, assuming it was stored with ``list_name=data_info_or_list_name`` :param list_name: Name of aggregation list used to upload data :param cluster: Whether the FeatureStore will be run as a cluster - :param address: Address of Redis client as : + :param address: :param replica_rank: When StaticDataDownloader is used distributedly, indicates the rank of this object :param num_replicas: When BatchDownlaoder is used distributedly, indicates @@ -301,11 +270,9 @@ def __init__( self._info = data_info_or_list_name elif isinstance(data_info_or_list_name, str): self._info = DataInfo(list_name=data_info_or_list_name) - client = Client(self.cluster, self.address) - self._info.download(client) + self._info.download() else: raise TypeError("data_info_or_list_name must be either DataInfo or str") - self._client: t.Optional[Client] = None sskeyin = environ.get("SSKEYIN", "") self.uploader_keys = sskeyin.split(",") @@ -314,12 +281,6 @@ def __init__( if init_samples: self.init_samples(max_fetch_trials, wait_interval) - @property - def client(self) -> Client: - if self._client is None: - raise ValueError("Client not initialized") - return self._client - def log(self, message: str) -> None: if self.verbose: logger.info(message) @@ -387,7 +348,6 @@ def init_samples(self, init_trials: int = -1, wait_interval: float = 10.0) -> No :param init_trials: maximum number of attempts to fetch data """ - self._client = Client(self.cluster, self.address) num_trials = 0 max_trials = init_trials or -1 @@ -406,73 +366,15 @@ def init_samples(self, init_trials: int = -1, wait_interval: float = 10.0) -> No if self.shuffle: np.random.shuffle(self.indices) - def _data_exists(self, batch_name: str, target_name: str) -> bool: - if self.need_targets: - return all( - self.client.tensor_exists(datum) for datum in [batch_name, target_name] - ) - - return bool(self.client.tensor_exists(batch_name)) + def _data_exists(self, batch_name: str, target_name: str) -> None: + pass def _add_samples(self, indices: t.List[int]) -> None: - datasets: t.List[Dataset] = [] - - if self.num_replicas == 1: - datasets = self.client.get_dataset_list_range( - self.list_name, start_index=indices[0], end_index=indices[-1] - ) - else: - for idx in indices: - datasets += self.client.get_dataset_list_range( - self.list_name, start_index=idx, end_index=idx - ) - - if self.samples is None: - self.samples = datasets[0].get_tensor(self.sample_name) - if self.need_targets: - self.targets = datasets[0].get_tensor(self.target_name) - - if len(datasets) > 1: - datasets = datasets[1:] - - if self.samples is not None: - for dataset in datasets: - self.samples = np.concatenate( - ( - t.cast("npt.NDArray[t.Any]", self.samples), - dataset.get_tensor(self.sample_name), - ) - ) - if self.need_targets: - self.targets = np.concatenate( - ( - t.cast("npt.NDArray[t.Any]", self.targets), - dataset.get_tensor(self.target_name), - ) - ) - - self.num_samples = t.cast("npt.NDArray[t.Any]", self.samples).shape[0] - self.indices = np.arange(self.num_samples) - - self.log(f"New dataset size: {self.num_samples}, batches: {len(self)}") + pass def _update_samples_and_targets(self) -> None: self.log(f"Rank {self.replica_rank} out of {self.num_replicas} replicas") - for uploader_idx, uploader_key in enumerate(self.uploader_keys): - if uploader_key: - self.client.use_list_ensemble_prefix(True) - self.client.set_data_source(uploader_key) - - list_length = self.client.get_list_length(self.list_name) - - # Strictly greater, because next_index is 0-based - if list_length > self.next_indices[uploader_idx]: - start = self.next_indices[uploader_idx] - indices = list(range(start, list_length, self.num_replicas)) - self._add_samples(indices) - self.next_indices[uploader_idx] = indices[-1] + self.num_replicas - def update_data(self) -> None: if self.dynamic: self._update_samples_and_targets() diff --git a/smartsim/ml/tf/utils.py b/smartsim/ml/tf/utils.py index 9e16a21dc..dc66c3b55 100644 --- a/smartsim/ml/tf/utils.py +++ b/smartsim/ml/tf/utils.py @@ -39,10 +39,6 @@ def freeze_model( ) -> t.Tuple[str, t.List[str], t.List[str]]: """Freeze a Keras or TensorFlow Graph - to use a Keras or TensorFlow model in SmartSim, the model - must be frozen and the inputs and outputs provided to the - smartredis.client.set_model_from_file() method. - This utiliy function provides everything users need to take a trained model and put it inside an ``featurestore`` instance @@ -81,10 +77,6 @@ def freeze_model( def serialize_model(model: keras.Model) -> t.Tuple[str, t.List[str], t.List[str]]: """Serialize a Keras or TensorFlow Graph - to use a Keras or TensorFlow model in SmartSim, the model - must be frozen and the inputs and outputs provided to the - smartredis.client.set_model() method. - This utiliy function provides everything users need to take a trained model and put it inside an ``featurestore`` instance. diff --git a/smartsim/ml/torch/data.py b/smartsim/ml/torch/data.py index c6a8e6eac..71addd04e 100644 --- a/smartsim/ml/torch/data.py +++ b/smartsim/ml/torch/data.py @@ -28,11 +28,23 @@ import numpy as np import torch -from smartredis import Client, Dataset +from smartsim.entity._mock import Mock from smartsim.ml.data import DataDownloader +class Client(Mock): + """Mock Client""" + + pass + + +class Dataset(Mock): + """Mock Dataset""" + + pass + + class _TorchDataGenerationCommon(DataDownloader, torch.utils.data.IterableDataset): def __init__(self, **kwargs: t.Any) -> None: init_samples = kwargs.pop("init_samples", False)