From bc7b2323a8a3e9ee9b7b9653e4f843a6a34b5c7a Mon Sep 17 00:00:00 2001 From: Al Rigazzi Date: Mon, 18 Sep 2023 22:55:30 +0200 Subject: [PATCH] Send shutdown command to DB (#355) Adds explicit shutdown of DB shards. Previously, DBs were terminated by simply terminating their processes, but that does not work in certain settings. [ committed by @al-rigazzi ] [ reviewed by @MattToast ] --- conftest.py | 27 +++++++++++++++++---- smartsim/_core/control/controller.py | 36 +++++++++++++++++++--------- smartsim/_core/utils/redis.py | 31 ++++++++++++++++++++++++ smartsim/database/orchestrator.py | 6 +++++ smartsim/experiment.py | 7 ++++-- smartsim/settings/palsSettings.py | 17 +++++++++++++ tests/backends/test_dbmodel.py | 24 ++++++++++++------- tests/test_configs/cov/local_cov.cfg | 2 ++ 8 files changed, 124 insertions(+), 26 deletions(-) diff --git a/conftest.py b/conftest.py index f20f48766..9c59aaaa2 100644 --- a/conftest.py +++ b/conftest.py @@ -41,6 +41,7 @@ AprunSettings, JsrunSettings, MpirunSettings, + PalsMpiexecSettings, RunSettings, ) from smartsim._core.config import CONFIG @@ -83,8 +84,7 @@ def print_test_configuration() -> None: print("TEST_ALLOC_SPEC_SHEET_PATH:", test_alloc_specs_path) print("TEST_DIR:", test_dir) print("Test output will be located in TEST_DIR if there is a failure") - print("TEST_PORT", test_port) - print("TEST_PORT + 1", test_port + 1) + print("TEST_PORTS", ", ".join(str(port) for port in range(test_port, test_port+3))) def pytest_configure() -> None: @@ -92,6 +92,7 @@ def pytest_configure() -> None: pytest.wlm_options = ["slurm", "pbs", "cobalt", "lsf", "pals"] account = get_account() pytest.test_account = account + pytest.test_device = test_device def pytest_sessionstart( @@ -144,6 +145,12 @@ def get_hostlist() -> t.Optional[t.List[str]]: return _parse_hostlist_file(os.environ["COBALT_NODEFILE"]) except FileNotFoundError: return None + elif "PBS_NODEFILE" in os.environ and test_launcher=="pals": + # with PALS, we need a hostfile even if `aprun` is available + try: + return _parse_hostlist_file(os.environ["PBS_NODEFILE"]) + except FileNotFoundError: + return None elif "PBS_NODEFILE" in os.environ and not shutil.which("aprun"): try: return _parse_hostlist_file(os.environ["PBS_NODEFILE"]) @@ -320,7 +327,7 @@ def get_run_settings( @staticmethod def get_orchestrator(nodes: int = 1, batch: bool = False) -> Orchestrator: - if test_launcher in ["pbs", "cobalt", "pals"]: + if test_launcher in ["pbs", "cobalt"]: if not shutil.which("aprun"): hostlist = get_hostlist() else: @@ -333,6 +340,16 @@ def get_orchestrator(nodes: int = 1, batch: bool = False) -> Orchestrator: launcher=test_launcher, hosts=hostlist, ) + if test_launcher == "pals": + hostlist = get_hostlist() + return Orchestrator( + db_nodes=nodes, + port=test_port, + batch=batch, + interface=test_nic, + launcher=test_launcher, + hosts=hostlist, + ) if test_launcher == "slurm": return Orchestrator( db_nodes=nodes, @@ -579,8 +596,8 @@ def make_test_dir( :type caller_function: str, optional :param caller_fspath: absolute path to file containing caller, defaults to None :type caller_fspath: str or Path, optional - :param level: indicate depth in the call stack relative to test method. - :type level: int, optional + :param level: indicate depth in the call stack relative to test method. + :type level: int, optional :param sub_dir: a relative path to create in the test directory :type sub_dir: str or Path, optional diff --git a/smartsim/_core/control/controller.py b/smartsim/_core/control/controller.py index b3aae3fbc..d55150427 100644 --- a/smartsim/_core/control/controller.py +++ b/smartsim/_core/control/controller.py @@ -36,27 +36,26 @@ from smartredis import Client from ..._core.launcher.step import Step -from ..._core.utils.redis import db_is_active, set_ml_model, set_script +from ..._core.utils.redis import db_is_active, set_ml_model, set_script, shutdown_db from ...database import Orchestrator -from ...entity import EntityList, SmartSimEntity, Model, Ensemble +from ...entity import Ensemble, EntityList, Model, SmartSimEntity from ...error import LauncherError, SmartSimError, SSInternalError, SSUnsupportedError from ...log import get_logger -from ...status import STATUS_RUNNING, TERMINAL_STATUSES +from ...settings.base import BatchSettings +from ...status import STATUS_CANCELLED, STATUS_RUNNING, TERMINAL_STATUSES from ..config import CONFIG from ..launcher import ( - SlurmLauncher, - PBSLauncher, - LocalLauncher, CobaltLauncher, + LocalLauncher, LSFLauncher, + PBSLauncher, + SlurmLauncher, ) from ..launcher.launcher import Launcher from ..utils import check_cluster_status, create_cluster +from .job import Job from .jobmanager import JobManager from .manifest import Manifest -from .job import Job -from ...settings.base import BatchSettings - logger = get_logger(__name__) @@ -189,6 +188,21 @@ def stop_entity(self, entity: t.Union[SmartSimEntity, EntityList]) -> None: ) self._jobs.move_to_completed(job) + def stop_db(self, db: Orchestrator) -> None: + """Stop an orchestrator + :param db: orchestrator to be stopped + :type db: Orchestrator + """ + if db.batch: + self.stop_entity(db) + else: + shutdown_db(db.hosts, db.ports) + with JM_LOCK: + for entity in db: + job = self._jobs[entity.name] + job.set_status(STATUS_CANCELLED, "", 0, output=None, error=None) + self._jobs.move_to_completed(job) + def stop_entity_list(self, entity_list: EntityList) -> None: """Stop an instance of an entity list @@ -550,7 +564,7 @@ def _orchestrator_launch_wait(self, orchestrator: Orchestrator) -> None: # TODO remove in favor of by node status check time.sleep(CONFIG.jm_interval) elif any(stat in TERMINAL_STATUSES for stat in statuses): - self.stop_entity_list(orchestrator) + self.stop_db(orchestrator) msg = "Orchestrator failed during startup" msg += f" See {orchestrator.path} for details" raise SmartSimError(msg) @@ -558,7 +572,7 @@ def _orchestrator_launch_wait(self, orchestrator: Orchestrator) -> None: logger.debug("Waiting for orchestrator instances to spin up...") except KeyboardInterrupt: logger.info("Orchestrator launch cancelled - requesting to stop") - self.stop_entity_list(orchestrator) + self.stop_db(orchestrator) # re-raise keyboard interrupt so the job manager will display # any running and un-killed jobs as this method is only called diff --git a/smartsim/_core/utils/redis.py b/smartsim/_core/utils/redis.py index 9645a367e..51f531e21 100644 --- a/smartsim/_core/utils/redis.py +++ b/smartsim/_core/utils/redis.py @@ -24,6 +24,7 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +import itertools import logging import redis import time @@ -214,3 +215,33 @@ def set_script(db_script: DBScript, client: Client) -> None: except RedisReplyError as error: # pragma: no cover logger.error("Error while setting model on orchestrator.") raise error + + +def shutdown_db(hosts: t.List[str], ports: t.List[int]) -> None: # cov-wlm + """Send shutdown signal to cluster instances. + + Should only be used in the case where cluster deallocation + needs to occur manually. Usually, the SmartSim task manager + will take care of this automatically. + + :param hosts: List of hostnames to connect to + :type hosts: List[str] + :param ports: List of ports for each hostname + :type ports: List[int] + :raises SmartSimError: if cluster creation fails + """ + for host_ip, port in itertools.product( + (get_ip_from_host(host) for host in hosts), ports + ): + # call cluster command + redis_cli = CONFIG.database_cli + cmd = [redis_cli, "-h", host_ip, "-p", str(port), "shutdown"] + returncode, out, err = execute_cmd( + cmd, proc_input="yes", shell=False, timeout=10 + ) + + if returncode != 0: + logger.error(out) + logger.error(err) + else: + logger.debug(out) diff --git a/smartsim/database/orchestrator.py b/smartsim/database/orchestrator.py index 6be4f09a9..1bd0b8230 100644 --- a/smartsim/database/orchestrator.py +++ b/smartsim/database/orchestrator.py @@ -407,6 +407,12 @@ def set_hosts(self, host_list: t.List[str]) -> None: if self.launcher == "lsf": for db in self.dbnodes: db.set_hosts(host_list) + elif (self.launcher == "pals" + and isinstance(self.dbnodes[0].run_settings, PalsMpiexecSettings) + and self.dbnodes[0].is_mpmd): + # In this case, --hosts is a global option, we only set it to the + # first run command + self.dbnodes[0].run_settings.set_hostlist(host_list) else: for host, db in zip(host_list, self.dbnodes): if isinstance(db.run_settings, AprunSettings): diff --git a/smartsim/experiment.py b/smartsim/experiment.py index 639d0e7c9..b74618d70 100644 --- a/smartsim/experiment.py +++ b/smartsim/experiment.py @@ -220,12 +220,15 @@ def stop(self, *args: t.Any) -> None: :raises TypeError: if wrong type :raises SmartSimError: if stop request fails """ + stop_manifest = Manifest(*args) try: - stop_manifest = Manifest(*args) for entity in stop_manifest.models: self._control.stop_entity(entity) - for entity_list in stop_manifest.all_entity_lists: + for entity_list in stop_manifest.ensembles: self._control.stop_entity_list(entity_list) + db = stop_manifest.db + if db: + self._control.stop_db(db) except SmartSimError as e: logger.error(e) raise diff --git a/smartsim/settings/palsSettings.py b/smartsim/settings/palsSettings.py index bee381a84..3a8868793 100644 --- a/smartsim/settings/palsSettings.py +++ b/smartsim/settings/palsSettings.py @@ -221,3 +221,20 @@ def format_env_vars(self) -> t.List[str]: formatted += ["--envlist", ",".join(export_vars)] return formatted + + def set_hostlist(self, host_list: t.Union[str, t.List[str]]) -> None: + """Set the hostlist for the PALS ``mpiexec`` command + + This sets ``--hosts`` + + :param host_list: list of host names + :type host_list: str | list[str] + :raises TypeError: if not str or list of str + """ + if isinstance(host_list, str): + host_list = [host_list.strip()] + if not isinstance(host_list, list): + raise TypeError("host_list argument must be a list of strings") + if not all(isinstance(host, str) for host in host_list): + raise TypeError("host_list argument must be list of strings") + self.run_args["hosts"] = ",".join(host_list) diff --git a/tests/backends/test_dbmodel.py b/tests/backends/test_dbmodel.py index 83a2e119b..ff5854864 100644 --- a/tests/backends/test_dbmodel.py +++ b/tests/backends/test_dbmodel.py @@ -26,13 +26,12 @@ import sys -import time import pytest -import smartsim from smartsim import Experiment, status from smartsim._core.utils import installed_redisai_backends +from smartsim.entity import Ensemble from smartsim.error.errors import SSUnsupportedError from smartsim.log import get_logger @@ -45,8 +44,10 @@ # Check TensorFlow is available for tests try: - import tensorflow.keras as keras + from tensorflow import keras + import tensorflow as tf from tensorflow.keras.layers import Conv2D, Input + except ImportError: should_run_tf = False else: @@ -59,7 +60,14 @@ def __init__(self): def call(self, x): y = self.conv(x) return y - + if pytest.test_device == "GPU": + try: + physical_devices = tf.config.list_physical_devices('GPU') + tf.config.set_logical_device_configuration( + physical_devices[0], + [tf.config.LogicalDeviceConfiguration(memory_limit=5_000)]) + except: + logger.warning("Could not set TF max memory limit for GPU") should_run_tf &= "tensorflow" in installed_redisai_backends() @@ -342,7 +350,7 @@ def test_db_model_ensemble(fileutils, wlmutils, mlutils): smartsim_ensemble.add_model(smartsim_model) # Add the second ML model to the newly added entity. This is - # because the test script run both ML models for all entities. + # because the test script runs both ML models for all entities. smartsim_model.add_ml_model( "cnn2", "TF", @@ -520,7 +528,7 @@ def test_colocated_db_model_ensemble(fileutils, wlmutils, mlutils): colo_settings.set_tasks_per_node(1) # Create ensemble of two identical models - colo_ensemble = exp.create_ensemble( + colo_ensemble: Ensemble = exp.create_ensemble( "colocated_ens", run_settings=colo_settings, replicas=2 ) colo_ensemble.set_path(test_dir) @@ -795,7 +803,7 @@ def test_colocated_db_model_errors(fileutils, wlmutils, mlutils): @pytest.mark.skipif(not should_run_tf, reason="Test needs TensorFlow to run") def test_inconsistent_params_db_model(): """Test error when devices_per_node parameter>1 when devices is set to CPU in DBModel""" - + # Create and save ML model to filesystem model, inputs, outputs = create_tf_cnn() with pytest.raises(SSUnsupportedError) as ex: @@ -810,6 +818,6 @@ def test_inconsistent_params_db_model(): outputs=outputs, ) assert ( - ex.value.args[0] + ex.value.args[0] == "Cannot set devices_per_node>1 if CPU is specified under devices" ) diff --git a/tests/test_configs/cov/local_cov.cfg b/tests/test_configs/cov/local_cov.cfg index cb922f98e..22bb56b2f 100644 --- a/tests/test_configs/cov/local_cov.cfg +++ b/tests/test_configs/cov/local_cov.cfg @@ -6,6 +6,7 @@ omit = *mpirun* *alps* *lsf* + *pals* *redis_starter.py* */_cli/* */_install/* @@ -47,3 +48,4 @@ exclude_lines= launcher == "pbs" launcher == "cobalt" launcher == "lsf" + launcher == "pals"