From 3bfdff99f3da1236d55b31470f069f7f2d8bf14d Mon Sep 17 00:00:00 2001 From: Julia Putko <81587103+juliaputko@users.noreply.github.com> Date: Thu, 19 Sep 2024 15:15:52 -0700 Subject: [PATCH] Removals of EntityList, EntitySequence, JobManager, and Controller (#693) Remove EntityList, EntitySequence, JobManager, and Controller as well as any references. [ committed by @juliaputko ] [ reviewed by @amandarichardsonn, @mellis13 ] --- pyproject.toml | 2 - smartsim/_core/__init__.py | 4 +- smartsim/_core/control/__init__.py | 1 - smartsim/_core/control/controller.py | 1018 ------------------- smartsim/_core/control/controller_utils.py | 77 -- smartsim/_core/control/job.py | 26 +- smartsim/_core/control/job_manager.py | 364 ------- smartsim/_core/control/manifest.py | 46 +- smartsim/_core/utils/telemetry/telemetry.py | 33 +- smartsim/database/orchestrator.py | 4 +- smartsim/entity/__init__.py | 1 - smartsim/entity/entityList.py | 138 --- 12 files changed, 26 insertions(+), 1688 deletions(-) delete mode 100644 smartsim/_core/control/controller.py delete mode 100644 smartsim/_core/control/controller_utils.py delete mode 100644 smartsim/_core/control/job_manager.py delete mode 100644 smartsim/entity/entityList.py diff --git a/pyproject.toml b/pyproject.toml index 9d4e23c85..5b81676a3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -147,10 +147,8 @@ module = [ # FIXME: DO NOT MERGE THIS INTO DEVELOP BRANCH UNLESS THESE ARE PASSING OR # REMOVED!! "smartsim._core._cli.*", - "smartsim._core.control.controller", "smartsim._core.control.manifest", "smartsim._core.entrypoints.dragon_client", - "smartsim._core.launcher.colocated", "smartsim._core.launcher.launcher", "smartsim._core.launcher.local.*", "smartsim._core.launcher.lsf.*", diff --git a/smartsim/_core/__init__.py b/smartsim/_core/__init__.py index 958f8e297..ee8d3cc96 100644 --- a/smartsim/_core/__init__.py +++ b/smartsim/_core/__init__.py @@ -24,7 +24,7 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -from .control import Controller, Manifest, preview_renderer +from .control import Manifest, preview_renderer from .generation import Generator -__all__ = ["Controller", "Manifest", "Generator", "preview_renderer"] +__all__ = ["Manifest", "Generator", "preview_renderer"] diff --git a/smartsim/_core/control/__init__.py b/smartsim/_core/control/__init__.py index 0acd80650..ba3af1440 100644 --- a/smartsim/_core/control/__init__.py +++ b/smartsim/_core/control/__init__.py @@ -24,5 +24,4 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -from .controller import Controller from .manifest import Manifest diff --git a/smartsim/_core/control/controller.py b/smartsim/_core/control/controller.py deleted file mode 100644 index dd7e32b31..000000000 --- a/smartsim/_core/control/controller.py +++ /dev/null @@ -1,1018 +0,0 @@ -# BSD 2-Clause License -# -# Copyright (c) 2021-2024, Hewlett Packard Enterprise -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. -# -# 2. Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation -# and/or other materials provided with the distribution. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -from __future__ import annotations - -import itertools -import os -import os.path as osp -import pathlib -import pickle -import signal -import subprocess -import sys -import threading -import time -import typing as t - -from smartsim._core.utils.network import get_ip_from_host -from smartsim.entity._mock import Mock - -from ..._core.launcher.step import Step -from ..._core.utils.helpers import ( - SignalInterceptionStack, - unpack_colo_fs_identifier, - unpack_fs_identifier, -) -from ...database import FeatureStore -from ...entity import Application, Ensemble, EntitySequence, SmartSimEntity -from ...error import ( - LauncherError, - SmartSimError, - SSDBIDConflictError, - SSInternalError, - SSUnsupportedError, -) -from ...log import get_logger -from ...servertype import CLUSTERED, STANDALONE -from ...status import TERMINAL_STATUSES, JobStatus -from ..config import CONFIG -from ..launcher import ( - DragonLauncher, - LocalLauncher, - LSFLauncher, - PBSLauncher, - SGELauncher, - SlurmLauncher, -) -from ..launcher.launcher import Launcher -from ..utils import serialize -from .controller_utils import _AnonymousBatchJob, _look_up_launched_data -from .job import Job -from .job_manager import JobManager -from .manifest import LaunchedManifest, LaunchedManifestBuilder, Manifest - -if t.TYPE_CHECKING: - from types import FrameType - - from ..utils.serialize import TStepLaunchMetaData - - -logger = get_logger(__name__) - -# job manager lock -JM_LOCK = threading.RLock() - - -class Client(Mock): - """Mock Client""" - - -class ConfigOptions(Mock): - """Mock ConfigOptions""" - - -def fs_is_active(): - pass - - -def set_ml_model(): - pass - - -def set_script(): - pass - - -def shutdown_fs_node(): - pass - - -def create_cluster(): - pass - - -def check_cluster_status(): - pass - - -class Controller: - """The controller module provides an interface between the - smartsim entities created in the experiment and the - underlying workload manager or run framework. - """ - - def __init__(self, launcher: str = "local") -> None: - """Initialize a Controller - - :param launcher: the type of launcher being used - """ - self._jobs = JobManager(JM_LOCK) - self.init_launcher(launcher) - self._telemetry_monitor: t.Optional[subprocess.Popen[bytes]] = None - - def start( - self, - exp_name: str, - exp_path: str, - manifest: Manifest, - block: bool = True, - kill_on_interrupt: bool = True, - ) -> None: - """Start the passed SmartSim entities - - This function should not be called directly, but rather - through the experiment interface. - - The controller will start the job-manager thread upon - execution of all jobs. - """ - # launch a telemetry monitor to track job progress - if CONFIG.telemetry_enabled: - self._start_telemetry_monitor(exp_path) - - self._jobs.kill_on_interrupt = kill_on_interrupt - - # register custom signal handler for ^C (SIGINT) - SignalInterceptionStack.get(signal.SIGINT).push_unique( - self._jobs.signal_interrupt - ) - launched = self._launch(exp_name, exp_path, manifest) - - # start the job manager thread if not already started - if not self._jobs.actively_monitoring: - self._jobs.start() - - serialize.save_launch_manifest( - launched.map(_look_up_launched_data(self._launcher)) - ) - - # block until all non-feature store jobs are complete - if block: - # poll handles its own keyboard interrupt as - # it may be called separately - self.poll(5, True, kill_on_interrupt=kill_on_interrupt) - - @property - def active_feature_store_jobs(self) -> t.Dict[str, Job]: - """Return active feature store jobs.""" - return {**self._jobs.fs_jobs} - - @property - def feature_store_active(self) -> bool: - with JM_LOCK: - if len(self._jobs.fs_jobs) > 0: - return True - return False - - def poll( - self, interval: int, verbose: bool, kill_on_interrupt: bool = True - ) -> None: - """Poll running jobs and receive logging output of job status - - :param interval: number of seconds to wait before polling again - :param verbose: set verbosity - :param kill_on_interrupt: flag for killing jobs when SIGINT is received - """ - self._jobs.kill_on_interrupt = kill_on_interrupt - to_monitor = self._jobs.jobs - while len(to_monitor) > 0: - time.sleep(interval) - - # acquire lock to avoid "dictionary changed during iteration" error - # without having to copy dictionary each time. - if verbose: - with JM_LOCK: - for job in to_monitor.values(): - logger.info(job) - - def finished( - self, entity: t.Union[SmartSimEntity, EntitySequence[SmartSimEntity]] - ) -> bool: - """Return a boolean indicating wether a job has finished or not - - :param entity: object launched by SmartSim. - :returns: bool - :raises ValueError: if entity has not been launched yet - """ - try: - if isinstance(entity, FeatureStore): - raise TypeError("Finished() does not support FeatureStore instances") - if isinstance(entity, EntitySequence): - return all(self.finished(ent) for ent in entity.entities) - if not isinstance(entity, SmartSimEntity): - raise TypeError( - f"Argument was of type {type(entity)} not derived " - "from SmartSimEntity or EntitySequence" - ) - - return self._jobs.is_finished(entity) - except KeyError: - raise ValueError( - f"Entity {entity.name} has not been launched in this experiment" - ) from None - - def stop_entity( - self, entity: t.Union[SmartSimEntity, EntitySequence[SmartSimEntity]] - ) -> None: - """Stop an instance of an entity - - This function will also update the status of the job in - the jobmanager so that the job appears as "cancelled". - - :param entity: entity to be stopped - """ - with JM_LOCK: - job = self._jobs[entity.name] - if job.status not in TERMINAL_STATUSES: - logger.info( - " ".join( - ( - "Stopping application", - entity.name, - "with job name", - str(job.name), - ) - ) - ) - status = self._launcher.stop(job.name) - - job.set_status( - status.status, - status.launcher_status, - status.returncode, - error=status.error, - output=status.output, - ) - self._jobs.move_to_completed(job) - - def stop_fs(self, fs: FeatureStore) -> None: - """Stop an FeatureStore - - :param fs: FeatureStore to be stopped - """ - if fs.batch: - self.stop_entity(fs) - else: - with JM_LOCK: - for node in fs.entities: - for host_ip, port in itertools.product( - (get_ip_from_host(host) for host in node.hosts), fs.ports - ): - retcode, _, _ = shutdown_fs_node(host_ip, port) - # Sometimes the fs will not shutdown (unless we force NOSAVE) - if retcode != 0: - self.stop_entity(node) - continue - - job = self._jobs[node.name] - job.set_status( - JobStatus.CANCELLED, - "", - 0, - output=None, - error=None, - ) - self._jobs.move_to_completed(job) - - fs.reset_hosts() - - def stop_entity_list(self, entity_list: EntitySequence[SmartSimEntity]) -> None: - """Stop an instance of an entity list - - :param entity_list: entity list to be stopped - """ - - if entity_list.batch: - self.stop_entity(entity_list) - else: - for entity in entity_list.entities: - self.stop_entity(entity) - - def get_jobs(self) -> t.Dict[str, Job]: - """Return a dictionary of completed job data - - :returns: dict[str, Job] - """ - with JM_LOCK: - return self._jobs.completed - - def get_entity_status( - self, entity: t.Union[SmartSimEntity, EntitySequence[SmartSimEntity]] - ) -> JobStatus: - """Get the status of an entity - - :param entity: entity to get status of - :raises TypeError: if not SmartSimEntity | EntitySequence - :return: status of entity - """ - if not isinstance(entity, (SmartSimEntity, EntitySequence)): - raise TypeError( - "Argument must be of type SmartSimEntity or EntitySequence, " - f"not {type(entity)}" - ) - return self._jobs.get_status(entity) - - def get_entity_list_status( - self, entity_list: EntitySequence[SmartSimEntity] - ) -> t.List[JobStatus]: - """Get the statuses of an entity list - - :param entity_list: entity list containing entities to - get statuses of - :raises TypeError: if not EntitySequence - :return: list of SmartSimStatus statuses - """ - if not isinstance(entity_list, EntitySequence): - raise TypeError( - f"Argument was of type {type(entity_list)} not EntitySequence" - ) - if entity_list.batch: - return [self.get_entity_status(entity_list)] - statuses = [] - for entity in entity_list.entities: - statuses.append(self.get_entity_status(entity)) - return statuses - - def init_launcher(self, launcher: str) -> None: - """Initialize the controller with a specific type of launcher. - SmartSim currently supports slurm, pbs(pro), lsf, - and local launching - - :param launcher: which launcher to initialize - :raises SSUnsupportedError: if a string is passed that is not - a supported launcher - :raises TypeError: if no launcher argument is provided. - """ - launcher_map: t.Dict[str, t.Type[Launcher]] = { - "slurm": SlurmLauncher, - "pbs": PBSLauncher, - "pals": PBSLauncher, - "lsf": LSFLauncher, - "local": LocalLauncher, - "dragon": DragonLauncher, - "sge": SGELauncher, - } - - if launcher is not None: - launcher = launcher.lower() - if launcher in launcher_map: - # create new instance of the launcher - self._launcher = launcher_map[launcher]() - self._jobs.set_launcher(self._launcher) - else: - raise SSUnsupportedError("Launcher type not supported: " + launcher) - else: - raise TypeError("Must provide a 'launcher' argument") - - @staticmethod - def symlink_output_files( - job_step: Step, entity: t.Union[SmartSimEntity, EntitySequence[SmartSimEntity]] - ) -> None: - """Create symlinks for entity output files that point to the output files - under the .smartsim directory - - :param job_step: Job step instance - :param entity: Entity instance - """ - historical_out, historical_err = map(pathlib.Path, job_step.get_output_files()) - entity_out = pathlib.Path(entity.path) / f"{entity.name}.out" - entity_err = pathlib.Path(entity.path) / f"{entity.name}.err" - - # check if there is already a link to a previous run - if entity_out.is_symlink() or entity_err.is_symlink(): - entity_out.unlink() - entity_err.unlink() - - historical_err.touch() - historical_out.touch() - - if historical_err.exists() and historical_out.exists(): - entity_out.symlink_to(historical_out) - entity_err.symlink_to(historical_err) - else: - raise FileNotFoundError( - f"Output files for {entity.name} could not be found. " - "Symlinking files failed." - ) - - def _launch( - self, exp_name: str, exp_path: str, manifest: Manifest - ) -> LaunchedManifest[t.Tuple[str, Step]]: - """Main launching function of the controller - - FeatureStores are always launched first so that the - address of the feature store can be given to following entities - - :param exp_name: The name of the launching experiment - :param exp_path: path to location of ``Experiment`` directory if generated - :param manifest: Manifest of deployables to launch - """ - - manifest_builder = LaunchedManifestBuilder[t.Tuple[str, Step]]( - exp_name=exp_name, - exp_path=exp_path, - launcher_name=str(self._launcher), - ) - # Loop over deployables to launch and launch multiple FeatureStores - for featurestore in manifest.fss: - for key in self._jobs.get_fs_host_addresses(): - _, fs_id = unpack_fs_identifier(key, "_") - if featurestore.fs_identifier == fs_id: - raise SSDBIDConflictError( - f"Feature store identifier {featurestore.fs_identifier}" - " has already been used. Pass in a unique" - " name for fs_identifier" - ) - - if featurestore.num_shards > 1 and isinstance( - self._launcher, LocalLauncher - ): - raise SmartSimError( - "Local launcher does not support multi-host feature stores" - ) - self._launch_feature_store(featurestore, manifest_builder) - - if self.feature_store_active: - self._set_fsobjects(manifest) - - # create all steps prior to launch - steps: t.List[ - t.Tuple[Step, t.Union[SmartSimEntity, EntitySequence[SmartSimEntity]]] - ] = [] - - symlink_substeps: t.List[ - t.Tuple[Step, t.Union[SmartSimEntity, EntitySequence[SmartSimEntity]]] - ] = [] - - for elist in manifest.ensembles: - ens_telem_dir = manifest_builder.run_telemetry_subdirectory / "ensemble" - if elist.batch: - batch_step, substeps = self._create_batch_job_step(elist, ens_telem_dir) - manifest_builder.add_ensemble( - elist, [(batch_step.name, step) for step in substeps] - ) - - # symlink substeps to maintain directory structure - for substep, substep_entity in zip(substeps, elist.applications): - symlink_substeps.append((substep, substep_entity)) - - steps.append((batch_step, elist)) - else: - # if ensemble is to be run as separate job steps, aka not in a batch - job_steps = [ - (self._create_job_step(e, ens_telem_dir / elist.name), e) - for e in elist.entities - ] - manifest_builder.add_ensemble( - elist, [(step.name, step) for step, _ in job_steps] - ) - steps.extend(job_steps) - # applications themselves cannot be batch steps. If batch settings are - # attached, wrap them in an anonymous batch job step - for application in manifest.applications: - application_telem_dir = ( - manifest_builder.run_telemetry_subdirectory / "application" - ) - if application.batch_settings: - anon_entity_list = _AnonymousBatchJob(application) - batch_step, substeps = self._create_batch_job_step( - anon_entity_list, application_telem_dir - ) - manifest_builder.add_application( - application, (batch_step.name, batch_step) - ) - - symlink_substeps.append((substeps[0], application)) - steps.append((batch_step, application)) - else: - # create job step for aapplication with run settings - job_step = self._create_job_step(application, application_telem_dir) - manifest_builder.add_application(application, (job_step.name, job_step)) - steps.append((job_step, application)) - - # launch and symlink steps - for step, entity in steps: - self._launch_step(step, entity) - self.symlink_output_files(step, entity) - - # symlink substeps to maintain directory structure - for substep, entity in symlink_substeps: - self.symlink_output_files(substep, entity) - - return manifest_builder.finalize() - - def _launch_feature_store( - self, - featurestore: FeatureStore, - manifest_builder: LaunchedManifestBuilder[t.Tuple[str, Step]], - ) -> None: - """Launch an FeatureStore instance - - This function will launch the FeatureStore instance and - if on WLM, find the nodes where it was launched and - set them in the JobManager - - :param featurestore: FeatureStore to launch - :param manifest_builder: An `LaunchedManifestBuilder` to record the - names and `Step`s of the launched featurestore - """ - featurestore.remove_stale_files() - feature_store_telem_dir = ( - manifest_builder.run_telemetry_subdirectory / "database" - ) - - # if the featurestore was launched as a batch workload - if featurestore.batch: - feature_store_batch_step, substeps = self._create_batch_job_step( - featurestore, feature_store_telem_dir - ) - manifest_builder.add_feature_store( - featurestore, - [(feature_store_batch_step.name, step) for step in substeps], - ) - - self._launch_step(feature_store_batch_step, featurestore) - self.symlink_output_files(feature_store_batch_step, featurestore) - - # symlink substeps to maintain directory structure - for substep, substep_entity in zip(substeps, featurestore.entities): - self.symlink_output_files(substep, substep_entity) - - # if featurestore was run on existing allocation, locally, or in allocation - else: - fs_steps = [ - ( - self._create_job_step( - fs, feature_store_telem_dir / featurestore.name - ), - fs, - ) - for fs in featurestore.entities - ] - manifest_builder.add_feature_store( - featurestore, [(step.name, step) for step, _ in fs_steps] - ) - for fs_step in fs_steps: - self._launch_step(*fs_step) - self.symlink_output_files(*fs_step) - - # wait for featurestore to spin up - self._feature_store_launch_wait(featurestore) - - # set the jobs in the job manager to provide SSDB variable to entities - # if _host isnt set within each - self._jobs.set_fs_hosts(featurestore) - - # create the feature store cluster - if featurestore.num_shards > 2: - num_trials = 5 - cluster_created = False - while not cluster_created: - try: - create_cluster(featurestore.hosts, featurestore.ports) - check_cluster_status(featurestore.hosts, featurestore.ports) - num_shards = featurestore.num_shards - logger.info( - f"Feature store cluster created with {num_shards} shards" - ) - cluster_created = True - except SSInternalError: - if num_trials > 0: - logger.debug( - "Cluster creation failed, attempting again in five seconds." - ) - num_trials -= 1 - time.sleep(5) - else: - # surface SSInternalError as we have no way to recover - raise - self._save_feature_store(featurestore) - logger.debug(f"FeatureStore launched on nodes: {featurestore.hosts}") - - def _launch_step( - self, - job_step: Step, - entity: t.Union[SmartSimEntity, EntitySequence[SmartSimEntity]], - ) -> None: - """Use the launcher to launch a job step - - :param job_step: a job step instance - :param entity: entity instance - :raises SmartSimError: if launch fails - """ - # attempt to retrieve entity name in JobManager.completed - completed_job = self._jobs.completed.get(entity.name, None) - - # if completed job DNE and is the entity name is not - # running in JobManager.jobs or JobManager.fs_jobs, - # launch the job - if completed_job is None and ( - entity.name not in self._jobs.jobs and entity.name not in self._jobs.fs_jobs - ): - try: - job_id = self._launcher.run(job_step) - except LauncherError as e: - msg = f"An error occurred when launching {entity.name} \n" - msg += "Check error and output files for details.\n" - msg += f"{entity}" - logger.error(msg) - raise SmartSimError(f"Job step {entity.name} failed to launch") from e - - # if the completed job does exist and the entity passed in is the same - # that has ran and completed, relaunch the entity. - elif completed_job is not None and completed_job.entity is entity: - try: - job_id = self._launcher.run(job_step) - except LauncherError as e: - msg = f"An error occurred when launching {entity.name} \n" - msg += "Check error and output files for details.\n" - msg += f"{entity}" - logger.error(msg) - raise SmartSimError(f"Job step {entity.name} failed to launch") from e - - # the entity is using a duplicate name of an existing entity in - # the experiment, throw an error - else: - raise SSUnsupportedError("SmartSim entities cannot have duplicate names.") - - # a job step is a task if it is not managed by a workload manager (i.e. Slurm) - # but is rather started, monitored, and exited through the Popen interface - # in the taskmanager - is_task = not job_step.managed - - if self._jobs.query_restart(entity.name): - logger.debug(f"Restarting {entity.name}") - self._jobs.restart_job(job_step.name, job_id, entity.name, is_task) - else: - logger.debug(f"Launching {entity.name}") - self._jobs.add_job(job_step, job_id, is_task) - - def _create_batch_job_step( - self, - entity_list: t.Union[FeatureStore, Ensemble, _AnonymousBatchJob], - telemetry_dir: pathlib.Path, - ) -> t.Tuple[Step, t.List[Step]]: - """Use launcher to create batch job step - - :param entity_list: EntityList to launch as batch - :param telemetry_dir: Path to a directory in which the batch job step - may write telemetry events - :return: batch job step instance and a list of run steps to be - executed within the batch job - """ - if not entity_list.batch_settings: - raise ValueError( - "EntityList must have batch settings to be launched as batch" - ) - - telemetry_dir = telemetry_dir / entity_list.name - batch_step = self._launcher.create_step(entity, entity_list.batch_settings) - batch_step.meta["entity_type"] = str(type(entity_list).__name__).lower() - batch_step.meta["status_dir"] = str(telemetry_dir) - - substeps = [] - for entity in entity_list.entities: - # tells step creation not to look for an allocation - entity.run_settings.in_batch = True - step = self._create_job_step(entity, telemetry_dir) - substeps.append(step) - batch_step.add_to_batch(step) - return batch_step, substeps - - def _create_job_step( - self, entity: SmartSimEntity, telemetry_dir: pathlib.Path - ) -> Step: - """Create job steps for all entities with the launcher - - :param entity: an entity to create a step for - :param telemetry_dir: Path to a directory in which the job step - may write telemetry events - :return: the job step - """ - # get SSDB, SSIN, SSOUT and add to entity run settings - if isinstance(entity, Application): - self._prep_entity_client_env(entity) - - # creating job step through the created launcher - step = self._launcher.create_step(entity, entity.run_settings) - - step.meta["entity_type"] = str(type(entity).__name__).lower() - step.meta["status_dir"] = str(telemetry_dir / entity.name) - - # return the job step that was created using the launcher since the launcher is defined in the exp - return step - - def _prep_entity_client_env(self, entity: Application) -> None: - """Retrieve all connections registered to this entity - - :param entity: The entity to retrieve connections from - """ - client_env: t.Dict[str, t.Union[str, int, float, bool]] = {} - address_dict = self._jobs.get_fs_host_addresses() - - for fs_id, addresses in address_dict.items(): - fs_name, _ = unpack_fs_identifier(fs_id, "_") - if addresses: - # Cap max length of SSDB - client_env[f"SSDB{fs_name}"] = ",".join(addresses[:128]) - - # Retrieve num_shards to append to client env - client_env[f"SR_fs_TYPE{fs_name}"] = ( - CLUSTERED if len(addresses) > 1 else STANDALONE - ) - - if entity.incoming_entities: - client_env["SSKEYIN"] = ",".join( - [in_entity.name for in_entity in entity.incoming_entities] - ) - if entity.query_key_prefixing(): - client_env["SSKEYOUT"] = entity.name - - # Set address to local if it's a colocated application - if entity.colocated and entity.run_settings.colocated_fs_settings is not None: - fs_name_colo = entity.run_settings.colocated_fs_settings["fs_identifier"] - assert isinstance(fs_name_colo, str) - for key in address_dict: - _, fs_id = unpack_fs_identifier(key, "_") - if fs_name_colo == fs_id: - raise SSDBIDConflictError( - f"Feature store identifier {fs_name_colo}" - " has already been used. Pass in a unique" - " name for fs_identifier" - ) - - fs_name_colo = unpack_colo_fs_identifier(fs_name_colo) - if colo_cfg := entity.run_settings.colocated_fs_settings: - port = colo_cfg.get("port", None) - socket = colo_cfg.get("unix_socket", None) - if socket and port: - raise SSInternalError( - "Co-located was configured for both TCP/IP and UDS" - ) - if port: - client_env[f"SSDB{fs_name_colo}"] = f"127.0.0.1:{str(port)}" - elif socket: - client_env[f"SSDB{fs_name_colo}"] = f"unix://{socket}" - else: - raise SSInternalError( - "Colocated feature store was not configured for either TCP or UDS" - ) - client_env[f"SR_fs_TYPE{fs_name_colo}"] = STANDALONE - entity.run_settings.update_env(client_env) - - def _save_feature_store(self, feature_store: FeatureStore) -> None: - """Save the FeatureStore object via pickle - - This function saves the feature store information to a pickle - file that can be imported by subsequent experiments to reconnect - to the featurestore. - - :param featurestore: FeatureStore configuration to be saved - """ - - if not feature_store.is_active(): - raise Exception("Feature store is not running") - - # Extract only the fs_jobs associated with this particular feature store - if feature_store.batch: - job_names = [feature_store.name] - else: - job_names = [fsnode.name for fsnode in feature_store.entities] - fs_jobs = { - name: job for name, job in self._jobs.fs_jobs.items() if name in job_names - } - - # Extract the associated steps - steps = [ - self._launcher.step_mapping[fs_job.name] for fs_job in fs_jobs.values() - ] - - feature_store_data = {"fs": feature_store, "fs_jobs": fs_jobs, "steps": steps} - - with open(feature_store.checkpoint_file, "wb") as pickle_file: - pickle.dump(feature_store_data, pickle_file) - - # Extract only the fs_jobs associated with this particular featurestore - if feature_store.batch: - job_names = [feature_store.name] - else: - job_names = [fsnode.name for fsnode in feature_store.entities] - fs_jobs = { - name: job for name, job in self._jobs.fs_jobs.items() if name in job_names - } - - # Extract the associated steps - steps = [ - self._launcher.step_mapping[fs_job.name] for fs_job in fs_jobs.values() - ] - - feature_store_data = {"fs": feature_store, "fs_jobs": fs_jobs, "steps": steps} - - with open(feature_store.checkpoint_file, "wb") as pickle_file: - pickle.dump(feature_store_data, pickle_file) - - def _feature_store_launch_wait(self, featurestore: FeatureStore) -> None: - """Wait for the featurestore instances to run - - In the case where the featurestore is launched as a batch - through a WLM, we wait for the featurestore to exit the - queue before proceeding so new launched entities can - be launched with SSDB address - - :param featurestore: FeatureStore instance - :raises SmartSimError: if launch fails or manually stopped by user - """ - if featurestore.batch: - logger.info("FeatureStore launched as a batch") - logger.info("While queued, SmartSim will wait for FeatureStore to run") - logger.info("CTRL+C interrupt to abort and cancel launch") - - ready = False - while not ready: - try: - time.sleep(CONFIG.jm_interval) - # manually trigger job update if JM not running - if not self._jobs.actively_monitoring: - self._jobs.check_jobs() - - # _jobs.get_status acquires JM lock for main thread, no need for locking - statuses = self.get_entity_list_status(featurestore) - if all(stat == JobStatus.RUNNING for stat in statuses): - ready = True - # TODO: Add a node status check - elif any(stat in TERMINAL_STATUSES for stat in statuses): - self.stop_fs(featurestore) - msg = "FeatureStore failed during startup" - msg += f" See {featurestore.path} for details" - raise SmartSimError(msg) - else: - logger.debug("Waiting for featurestore instances to spin up...") - except KeyboardInterrupt: - logger.info("FeatureStore launch cancelled - requesting to stop") - self.stop_fs(featurestore) - - # re-raise keyboard interrupt so the job manager will display - # any running and un-killed jobs as this method is only called - # during launch and we handle all keyboard interrupts during - # launch explicitly - raise - - def reload_saved_fs( - self, checkpoint_file: t.Union[str, os.PathLike[str]] - ) -> FeatureStore: - with JM_LOCK: - - if not osp.exists(checkpoint_file): - raise FileNotFoundError( - f"The SmartSim feature store config file {os.fspath(checkpoint_file)} " - "cannot be found." - ) - - try: - with open(checkpoint_file, "rb") as pickle_file: - fs_config = pickle.load(pickle_file) - except (OSError, IOError) as e: - msg = "Feature store checkpoint corrupted" - raise SmartSimError(msg) from e - - err_message = ( - "The SmartSim feature store checkpoint is incomplete or corrupted. " - ) - if not "fs" in fs_config: - raise SmartSimError( - err_message + "Could not find the featurestore object." - ) - - if not "fs_jobs" in fs_config: - raise SmartSimError( - err_message + "Could not find feature store job objects." - ) - - if not "steps" in fs_config: - raise SmartSimError( - err_message + "Could not find feature store job objects." - ) - feature_store: FeatureStore = fs_config["fs"] - - # TODO check that each fs_object is running - - job_steps = zip(fs_config["fs_jobs"].values(), fs_config["steps"]) - try: - for fs_job, step in job_steps: - self._jobs.fs_jobs[fs_job.ename] = fs_job - self._launcher.add_step_to_mapping_table(fs_job.name, step) - if step.task_id: - self._launcher.task_manager.add_existing(int(step.task_id)) - except LauncherError as e: - raise SmartSimError("Failed to reconnect feature store") from e - - # start job manager if not already started - if not self._jobs.actively_monitoring: - self._jobs.start() - - return feature_store - - def _set_fsobjects(self, manifest: Manifest) -> None: - if not manifest.has_fs_objects: - return - - address_dict = self._jobs.get_fs_host_addresses() - for ( - fs_id, - fs_addresses, - ) in address_dict.items(): - fs_name, name = unpack_fs_identifier(fs_id, "_") - - hosts = list({address.split(":")[0] for address in fs_addresses}) - ports = list({int(address.split(":")[-1]) for address in fs_addresses}) - - if not fs_is_active(hosts=hosts, ports=ports, num_shards=len(fs_addresses)): - raise SSInternalError("Cannot set FS Objects, FS is not running") - - os.environ[f"SSDB{fs_name}"] = fs_addresses[0] - - os.environ[f"SR_fs_TYPE{fs_name}"] = ( - CLUSTERED if len(fs_addresses) > 1 else STANDALONE - ) - - options = ConfigOptions.create_from_environment(name) - client = Client(options, logger_name="SmartSim") - - for application in manifest.applications: - if not application.colocated: - for fs_model in application.fs_models: - set_ml_model(fs_model, client) - for fs_script in application.fs_scripts: - set_script(fs_script, client) - - for ensemble in manifest.ensembles: - for fs_model in ensemble.fs_models: - set_ml_model(fs_model, client) - for fs_script in ensemble.fs_scripts: - set_script(fs_script, client) - for entity in ensemble.applications: - if not entity.colocated: - # Set models which could belong only - # to the entities and not to the ensemble - # but avoid duplicates - for fs_model in entity.fs_models: - if fs_model not in ensemble.fs_models: - set_ml_model(fs_model, client) - for fs_script in entity.fs_scripts: - if fs_script not in ensemble.fs_scripts: - set_script(fs_script, client) - - def _start_telemetry_monitor(self, exp_dir: str) -> None: - """Spawns a telemetry monitor process to keep track of the life times - of the processes launched through this controller. - - :param exp_dir: An experiment directory - """ - if ( - self._telemetry_monitor is None - or self._telemetry_monitor.returncode is not None - ): - logger.debug("Starting telemetry monitor process") - cmd = [ - sys.executable, - "-m", - "smartsim._core.entrypoints.telemetrymonitor", - "-exp_dir", - exp_dir, - "-frequency", - str(CONFIG.telemetry_frequency), - "-cooldown", - str(CONFIG.telemetry_cooldown), - ] - # pylint: disable-next=consider-using-with - self._telemetry_monitor = subprocess.Popen( - cmd, - stderr=sys.stderr, - stdout=sys.stdout, - cwd=str(pathlib.Path(__file__).parent.parent.parent), - shell=False, - ) diff --git a/smartsim/_core/control/controller_utils.py b/smartsim/_core/control/controller_utils.py deleted file mode 100644 index 57694ce7c..000000000 --- a/smartsim/_core/control/controller_utils.py +++ /dev/null @@ -1,77 +0,0 @@ -# BSD 2-Clause License -# -# Copyright (c) 2021-2024, Hewlett Packard Enterprise -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. -# -# 2. Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation -# and/or other materials provided with the distribution. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -from __future__ import annotations - -import pathlib -import typing as t - -from ..._core.launcher.step import Step -from ...entity import Application, EntityList -from ...error import SmartSimError -from ..launcher.launcher import Launcher - -if t.TYPE_CHECKING: - from ..utils.serialize import TStepLaunchMetaData - - -class _AnonymousBatchJob(EntityList[Application]): - @staticmethod - def _validate(application: Application) -> None: - if application.batch_settings is None: - msg = "Unable to create _AnonymousBatchJob without batch_settings" - raise SmartSimError(msg) - - def __init__(self, application: Application) -> None: - self._validate(application) - super().__init__(application.name, application.path) - self.entities = [application] - self.batch_settings = application.batch_settings - - def _initialize_entities(self, **kwargs: t.Any) -> None: ... - - -def _look_up_launched_data( - launcher: Launcher, -) -> t.Callable[[t.Tuple[str, Step]], "TStepLaunchMetaData"]: - def _unpack_launched_data(data: t.Tuple[str, Step]) -> "TStepLaunchMetaData": - # NOTE: we cannot assume that the name of the launched step - # ``launched_step_name`` is equal to the name of the step referring to - # the entity ``step.name`` as is the case when an entity list is - # launched as a batch job - launched_step_name, step = data - launched_step_map = launcher.step_mapping[launched_step_name] - out_file, err_file = step.get_output_files() - return ( - launched_step_map.step_id, - launched_step_map.task_id, - launched_step_map.managed, - out_file, - err_file, - pathlib.Path(step.meta.get("status_dir", step.cwd)), - ) - - return _unpack_launched_data diff --git a/smartsim/_core/control/job.py b/smartsim/_core/control/job.py index 7e752cecd..fd79c0656 100644 --- a/smartsim/_core/control/job.py +++ b/smartsim/_core/control/job.py @@ -29,7 +29,9 @@ import typing as t from dataclasses import dataclass -from ...entity import EntitySequence, SmartSimEntity +from smartsim.entity._mock import Mock + +from ...entity import SmartSimEntity from ...status import JobStatus @@ -47,8 +49,7 @@ class _JobKey: class JobEntity: """An entity containing run-time SmartSimEntity metadata. The run-time metadata - is required to perform telemetry collection. The `JobEntity` satisfies the core - API necessary to use a `JobManager` to manage retrieval of managed step updates. + is required to perform telemetry collection. """ def __init__(self) -> None: @@ -190,27 +191,23 @@ def from_manifest( class Job: - """Keep track of various information for the controller. - In doing so, continuously add various fields of information - that is queryable by the user through interface methods in - the controller class. + """Keep track of various information. + In doing so, continuously add various fields of information. """ def __init__( self, job_name: str, job_id: t.Optional[str], - entity: t.Union[SmartSimEntity, EntitySequence[SmartSimEntity], JobEntity], + entity: t.Union[SmartSimEntity, JobEntity], launcher: str, - is_task: bool, ) -> None: """Initialize a Job. :param job_name: Name of the job step :param job_id: The id associated with the job - :param entity: The SmartSim entity(list) associated with the job + :param entity: The SmartSim entity associated with the job :param launcher: Launcher job was started with - :param is_task: process monitored by TaskManager (True) or the WLM (True) """ self.name = job_name self.jid = job_id @@ -224,7 +221,6 @@ def __init__( self.error: t.Optional[str] = None # same as output self.hosts: t.List[str] = [] # currently only used for FS jobs self.launched_with = launcher - self.is_task = is_task self.start_time = time.time() self.history = History() @@ -263,14 +259,11 @@ def record_history(self) -> None: """Record the launching history of a job.""" self.history.record(self.jid, self.status, self.returncode, self.elapsed) - def reset( - self, new_job_name: str, new_job_id: t.Optional[str], is_task: bool - ) -> None: + def reset(self, new_job_name: str, new_job_id: t.Optional[str]) -> None: """Reset the job in order to be able to restart it. :param new_job_name: name of the new job step :param new_job_id: new job id to launch under - :param is_task: process monitored by TaskManager (True) or the WLM (True) """ self.name = new_job_name self.jid = new_job_id @@ -279,7 +272,6 @@ def reset( self.output = None self.error = None self.hosts = [] - self.is_task = is_task self.start_time = time.time() self.history.new_run() diff --git a/smartsim/_core/control/job_manager.py b/smartsim/_core/control/job_manager.py deleted file mode 100644 index 5d59ad50e..000000000 --- a/smartsim/_core/control/job_manager.py +++ /dev/null @@ -1,364 +0,0 @@ -# BSD 2-Clause License -# -# Copyright (c) 2021-2024, Hewlett Packard Enterprise -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. -# -# 2. Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation -# and/or other materials provided with the distribution. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - - -import itertools -import time -import typing as t -from collections import ChainMap -from threading import RLock, Thread -from types import FrameType - -from ...database import FeatureStore -from ...entity import EntitySequence, FSNode, SmartSimEntity -from ...log import ContextThread, get_logger -from ...status import TERMINAL_STATUSES, InvalidJobStatus, JobStatus -from ..config import CONFIG -from ..launcher import Launcher, LocalLauncher -from ..launcher.step import Step -from ..utils.network import get_ip_from_host -from .job import Job, JobEntity - -logger = get_logger(__name__) - - -class JobManager: - """The JobManager maintains a mapping between user defined entities - and the steps launched through the launcher. The JobManager - holds jobs according to entity type. - - The JobManager is threaded and runs during the course of an experiment - to update the statuses of Jobs. - - The JobManager and Controller share a single instance of a launcher - object that allows both the Controller and launcher access to the - wlm to query information about jobs that the user requests. - """ - - def __init__(self, lock: RLock, launcher: t.Optional[Launcher] = None) -> None: - """Initialize a Jobmanager - - :param launcher: a Launcher object to manage jobs - """ - self.monitor: t.Optional[Thread] = None - - # active jobs - self.jobs: t.Dict[str, Job] = {} - self.fs_jobs: t.Dict[str, Job] = {} - - # completed jobs - self.completed: t.Dict[str, Job] = {} - - self.actively_monitoring = False # on/off flag - self._launcher = launcher # reference to launcher - self._lock = lock # thread lock - - self.kill_on_interrupt = True # flag for killing jobs on SIGINT - - def start(self) -> None: - """Start a thread for the job manager""" - self.monitor = ContextThread(name="JobManager", daemon=True, target=self.run) - self.monitor.start() - - def run(self) -> None: - """Start the JobManager thread to continually check - the status of all jobs. Whichever launcher is selected - by the user will be responsible for returning statuses - that progress the state of the job. - - The interval of the checks is controlled by - smartsim.constats.TM_INTERVAL and should be set to values - above 20 for congested, multi-user systems - - The job manager thread will exit when no jobs are left - or when the main thread dies - """ - logger.debug("Starting Job Manager") - self.actively_monitoring = True - while self.actively_monitoring: - self._thread_sleep() - self.check_jobs() # update all job statuses at once - for _, job in self().items(): - # if the job has errors then output the report - # this should only output once - if job.returncode is not None and job.status in TERMINAL_STATUSES: - if int(job.returncode) != 0: - logger.warning(job) - logger.warning(job.error_report()) - self.move_to_completed(job) - else: - # job completed without error - logger.info(job) - self.move_to_completed(job) - - # if no more jobs left to actively monitor - if not self(): - self.actively_monitoring = False - logger.debug("Sleeping, no jobs to monitor") - - def move_to_completed(self, job: Job) -> None: - """Move job to completed queue so that its no longer - actively monitored by the job manager - - :param job: job instance we are transitioning - """ - with self._lock: - self.completed[job.ename] = job - job.record_history() - - # remove from actively monitored jobs - if job.ename in self.fs_jobs: - del self.fs_jobs[job.ename] - elif job.ename in self.jobs: - del self.jobs[job.ename] - - def __getitem__(self, entity_name: str) -> Job: - """Return the job associated with the name of the entity - from which it was created. - - :param entity_name: The name of the entity of a job - :returns: the Job associated with the entity_name - """ - with self._lock: - entities = ChainMap(self.fs_jobs, self.jobs, self.completed) - return entities[entity_name] - - def __call__(self) -> t.Dict[str, Job]: - """Returns dictionary all jobs for () operator - - :returns: Dictionary of all jobs - """ - all_jobs = {**self.jobs, **self.fs_jobs} - return all_jobs - - def __contains__(self, key: str) -> bool: - try: - self[key] # pylint: disable=pointless-statement - return True - except KeyError: - return False - - def add_job( - self, - step: Step, - job_id: t.Optional[str], - is_task: bool = True, - ) -> None: - """Add a job to the job manager which holds specific jobs by type. - - :param job_name: name of the job step - :param job_id: job step id created by launcher - :param entity: entity that was launched on job step - :param is_task: process monitored by TaskManager (True) or the WLM (True) - """ - launcher = str(self._launcher) - # all operations here should be atomic - job = Job(step.name, job_id, step.entity, launcher, is_task) - if isinstance(step.entity, (FSNode, FeatureStore)): - self.fs_jobs[step.entity.name] = job - elif isinstance(step.entity, JobEntity) and step.entity.is_fs: - self.fs_jobs[step.entity.name] = job - else: - self.jobs[step.entity.name] = job - - def is_finished(self, entity: SmartSimEntity) -> bool: - """Detect if a job has completed - - :param entity: entity to check - :return: True if finished - """ - with self._lock: - job = self[entity.name] # locked operation - if entity.name in self.completed: - if job.status in TERMINAL_STATUSES: - return True - return False - - def check_jobs(self) -> None: - """Update all jobs in jobmanager - - Update all jobs returncode, status, error and output - through one call to the launcher. - - """ - with self._lock: - jobs = self().values() - job_name_map = {job.name: job.ename for job in jobs} - - # returns (job step name, StepInfo) tuples - if self._launcher: - step_names = list(job_name_map.keys()) - statuses = self._launcher.get_step_update(step_names) - for job_name, status in statuses: - job = self[job_name_map[job_name]] - - if status: - # uses abstract step interface - job.set_status( - status.status, - status.launcher_status, - status.returncode, - error=status.error, - output=status.output, - ) - - def get_status( - self, - entity: t.Union[SmartSimEntity, EntitySequence[SmartSimEntity]], - ) -> t.Union[JobStatus, InvalidJobStatus]: - """Return the status of a job. - - :param entity: SmartSimEntity or EntitySequence instance - :returns: a SmartSimStatus status - """ - with self._lock: - if entity.name in self.completed: - return self.completed[entity.name].status - - if entity.name in self: - job: Job = self[entity.name] # locked - return job.status - - return InvalidJobStatus.NEVER_STARTED - - def set_launcher(self, launcher: Launcher) -> None: - """Set the launcher of the job manager to a specific launcher instance - - :param launcher: child of Launcher - """ - self._launcher = launcher - - def query_restart(self, entity_name: str) -> bool: - """See if the job just started should be restarted or not. - - :param entity_name: name of entity to check for a job for - :return: if job should be restarted instead of started - """ - if entity_name in self.completed: - return True - return False - - def restart_job( - self, - job_name: str, - job_id: t.Optional[str], - entity_name: str, - is_task: bool = True, - ) -> None: - """Function to reset a job to record history and be - ready to launch again. - - :param job_name: new job step name - :param job_id: new job id - :param entity_name: name of the entity of the job - :param is_task: process monitored by TaskManager (True) or the WLM (True) - - """ - with self._lock: - job = self.completed[entity_name] - del self.completed[entity_name] - job.reset(job_name, job_id, is_task) - - if isinstance(job.entity, (FSNode, FeatureStore)): - self.fs_jobs[entity_name] = job - else: - self.jobs[entity_name] = job - - def get_fs_host_addresses(self) -> t.Dict[str, t.List[str]]: - """Retrieve the list of hosts for the feature store - for corresponding feature store identifiers - - :return: dictionary of host ip addresses - """ - - address_dict: t.Dict[str, t.List[str]] = {} - for fs_job in self.fs_jobs.values(): - addresses = [] - if isinstance(fs_job.entity, (FSNode, FeatureStore)): - fs_entity = fs_job.entity - for combine in itertools.product(fs_job.hosts, fs_entity.ports): - ip_addr = get_ip_from_host(combine[0]) - addresses.append(":".join((ip_addr, str(combine[1])))) - - dict_entry: t.List[str] = address_dict.get(fs_entity.fs_identifier, []) - dict_entry.extend(addresses) - address_dict[fs_entity.fs_identifier] = dict_entry - - return address_dict - - def set_fs_hosts(self, FeatureStore: FeatureStore) -> None: - """Set the fs hosts in fs_jobs so future entities can query this - - :param FeatureStore: FeatureStore instance - """ - # should only be called during launch in the controller - - with self._lock: - if FeatureStore.batch: - self.fs_jobs[FeatureStore.name].hosts = FeatureStore.hosts - - else: - for fsnode in FeatureStore.entities: - if not fsnode.is_mpmd: - self.fs_jobs[fsnode.name].hosts = [fsnode.host] - else: - self.fs_jobs[fsnode.name].hosts = fsnode.hosts - - def signal_interrupt(self, signo: int, _frame: t.Optional[FrameType]) -> None: - """Custom handler for whenever SIGINT is received""" - if not signo: - logger.warning("Received SIGINT with no signal number") - if self.actively_monitoring and len(self) > 0: - if self.kill_on_interrupt: - for _, job in self().items(): - if job.status not in TERMINAL_STATUSES and self._launcher: - self._launcher.stop(job.name) - else: - logger.warning("SmartSim process interrupted before resource cleanup") - logger.warning("You may need to manually stop the following:") - - for job_name, job in self().items(): - if job.is_task: - # this will be the process id - logger.warning(f"Task {job_name} with id: {job.jid}") - else: - logger.warning( - f"Job {job_name} with {job.launched_with} id: {job.jid}" - ) - - def _thread_sleep(self) -> None: - """Sleep the job manager for a specific constant - set for the launcher type. - """ - local_jm_interval = 2 - if isinstance(self._launcher, (LocalLauncher)): - time.sleep(local_jm_interval) - else: - time.sleep(CONFIG.jm_interval) - - def __len__(self) -> int: - # number of active jobs - return len(self.fs_jobs) + len(self.jobs) diff --git a/smartsim/_core/control/manifest.py b/smartsim/_core/control/manifest.py index 36b030504..cb47af14e 100644 --- a/smartsim/_core/control/manifest.py +++ b/smartsim/_core/control/manifest.py @@ -29,8 +29,10 @@ import typing as t from dataclasses import dataclass, field +from smartsim.entity._mock import Mock + from ...database import FeatureStore -from ...entity import Application, Ensemble, EntitySequence, FSNode, SmartSimEntity +from ...entity import Application, Ensemble, FSNode, SmartSimEntity from ...error import SmartSimError from ..config import CONFIG from ..utils import helpers as _helpers @@ -47,20 +49,17 @@ class Manifest: """This class is used to keep track of all deployables generated by an experiment. Different types of deployables (i.e. different - `SmartSimEntity`-derived objects or `EntitySequence`-derived objects) can + `SmartSimEntity`-derived objects) can be accessed by using the corresponding accessor. Instances of ``Application``, ``Ensemble`` and ``FeatureStore`` can all be passed as arguments """ - def __init__( - self, *args: t.Union[SmartSimEntity, EntitySequence[SmartSimEntity]] - ) -> None: + def __init__(self, *args: t.Union[SmartSimEntity]) -> None: self._deployables = list(args) self._check_types(self._deployables) self._check_names(self._deployables) - self._check_entity_lists_nonempty() @property def fss(self) -> t.List[FeatureStore]: @@ -91,20 +90,6 @@ def ensembles(self) -> t.List[Ensemble]: """ return [e for e in self._deployables if isinstance(e, Ensemble)] - @property - def all_entity_lists(self) -> t.List[EntitySequence[SmartSimEntity]]: - """All entity lists, including ensembles and - exceptional ones like FeatureStore - - :return: list of entity lists - """ - _all_entity_lists: t.List[EntitySequence[SmartSimEntity]] = list(self.ensembles) - - for fs in self.fss: - _all_entity_lists.append(fs) - - return _all_entity_lists - @property def has_deployable(self) -> bool: """ @@ -127,24 +112,16 @@ def _check_names(deployables: t.List[t.Any]) -> None: @staticmethod def _check_types(deployables: t.List[t.Any]) -> None: for deployable in deployables: - if not isinstance(deployable, (SmartSimEntity, EntitySequence)): + if not isinstance(deployable, SmartSimEntity): raise TypeError( - f"Entity has type {type(deployable)}, not " - + "SmartSimEntity or EntitySequence" + f"Entity has type {type(deployable)}, not " + "SmartSimEntity" ) - def _check_entity_lists_nonempty(self) -> None: - """Check deployables for sanity before launching""" - - for entity_list in self.all_entity_lists: - if len(entity_list) < 1: - raise ValueError(f"{entity_list.name} is empty. Nothing to launch.") - def __str__(self) -> str: output = "" e_header = "=== Ensembles ===\n" - m_header = "=== Applications ===\n" - db_header = "=== Feature Stores ===\n" + a_header = "=== Applications ===\n" + fs_header = "=== Feature Stores ===\n" if self.ensembles: output += e_header @@ -158,7 +135,7 @@ def __str__(self) -> str: output += "\n" if self.applications: - output += m_header + output += a_header for application in self.applications: output += f"{application.name}\n" if application.batch_settings: @@ -215,8 +192,7 @@ class LaunchedManifest(t.Generic[_T]): """Immutable manifest mapping launched entities or collections of launched entities to other pieces of external data. This is commonly used to map a launch-able entity to its constructed ``Step`` instance without assuming - that ``step.name == job.name`` or querying the ``JobManager`` which itself - can be ephemeral. + that ``step.name == job.name``. """ metadata: _LaunchedManifestMetadata diff --git a/smartsim/_core/utils/telemetry/telemetry.py b/smartsim/_core/utils/telemetry/telemetry.py index 98aa8ab15..c8ff3bf25 100644 --- a/smartsim/_core/utils/telemetry/telemetry.py +++ b/smartsim/_core/utils/telemetry/telemetry.py @@ -41,7 +41,6 @@ from smartsim._core.config import CONFIG from smartsim._core.control.job import JobEntity, _JobKey -from smartsim._core.control.job_manager import JobManager from smartsim._core.launcher.dragon.dragon_launcher import DragonLauncher from smartsim._core.launcher.launcher import Launcher from smartsim._core.launcher.local.local import LocalLauncher @@ -95,7 +94,6 @@ def __init__( self._tracked_jobs: t.Dict[_JobKey, JobEntity] = {} self._completed_jobs: t.Dict[_JobKey, JobEntity] = {} self._launcher: t.Optional[Launcher] = None - self.job_manager: JobManager = JobManager(threading.RLock()) self._launcher_map: t.Dict[str, t.Type[Launcher]] = { "slurm": SlurmLauncher, "pbs": PBSLauncher, @@ -132,14 +130,6 @@ def init_launcher(self, launcher: str) -> None: raise ValueError("Launcher type not supported: " + launcher) - def init_job_manager(self) -> None: - """Initialize the job manager instance""" - if not self._launcher: - raise TypeError("self._launcher must be initialized") - - self.job_manager.set_launcher(self._launcher) - self.job_manager.start() - def set_launcher(self, launcher_type: str) -> None: """Set the launcher for the experiment :param launcher_type: the name of the workload manager used by the experiment @@ -149,9 +139,6 @@ def set_launcher(self, launcher_type: str) -> None: if self._launcher is None: raise SmartSimError("Launcher init failed") - self.job_manager.set_launcher(self._launcher) - self.job_manager.start() - def process_manifest(self, manifest_path: str) -> None: """Read the manifest for the experiment. Process the `RuntimeManifest` by updating the set of tracked jobs @@ -210,14 +197,6 @@ def process_manifest(self, manifest_path: str) -> None: ) if entity.is_managed: - # Tell JobManager the task is unmanaged. This collects - # status updates but does not try to start a new copy - self.job_manager.add_job( - entity.name, - entity.step_id, - entity, - False, - ) # Tell the launcher it's managed so it doesn't attempt # to look for a PID that may no longer exist self._launcher.step_mapping.add( @@ -264,9 +243,6 @@ async def _to_completed( # remove all the registered collectors for the completed entity await self._collector_mgr.remove(entity) - job = self.job_manager[entity.name] - self.job_manager.move_to_completed(job) - status_clause = f"status: {step_info.status}" error_clause = f", error: {step_info.error}" if step_info.error else "" @@ -432,8 +408,7 @@ class TelemetryMonitor: """The telemetry monitor is a standalone process managed by SmartSim to perform long-term retrieval of experiment status updates and resource usage metrics. Note that a non-blocking driver script is likely to complete before - the SmartSim entities complete. Also, the JobManager performs status updates - only as long as the driver is running. This telemetry monitor entrypoint is + the SmartSim entities complete. This telemetry monitor entrypoint is started automatically when a SmartSim experiment calls the `start` method on resources. The entrypoint runs until it has no resources to monitor.""" @@ -463,11 +438,7 @@ def _can_shutdown(self) -> bool: :return: return True if capable of automatically shutting down """ - managed_jobs = ( - list(self._action_handler.job_manager.jobs.values()) - if self._action_handler - else [] - ) + managed_jobs = [] unmanaged_jobs = ( list(self._action_handler.tracked_jobs) if self._action_handler else [] ) diff --git a/smartsim/database/orchestrator.py b/smartsim/database/orchestrator.py index a6bd01c07..c29c781a1 100644 --- a/smartsim/database/orchestrator.py +++ b/smartsim/database/orchestrator.py @@ -41,7 +41,7 @@ from .._core.utils.helpers import is_valid_cmd, unpack_fs_identifier from .._core.utils.network import get_ip_from_host from .._core.utils.shell import execute_cmd -from ..entity import EntityList, FSNode, TelemetryConfiguration +from ..entity import FSNode, TelemetryConfiguration from ..error import SmartSimError, SSDBFilesNotParseable, SSUnsupportedError from ..log import get_logger from ..servertype import CLUSTERED, STANDALONE @@ -165,7 +165,7 @@ def _check_local_constraints(launcher: str, batch: bool) -> None: # pylint: disable-next=too-many-public-methods -class FeatureStore(EntityList[FSNode]): +class FeatureStore: """The FeatureStore is an in-memory database that can be launched alongside entities in SmartSim. Data can be transferred between entities by using one of the Python, C, C++ or Fortran clients diff --git a/smartsim/entity/__init__.py b/smartsim/entity/__init__.py index 7ffa290b2..2f75e8ecd 100644 --- a/smartsim/entity/__init__.py +++ b/smartsim/entity/__init__.py @@ -29,5 +29,4 @@ from .dbobject import * from .ensemble import Ensemble from .entity import SmartSimEntity, TelemetryConfiguration -from .entityList import EntityList, EntitySequence from .files import TaggedFilesHierarchy diff --git a/smartsim/entity/entityList.py b/smartsim/entity/entityList.py deleted file mode 100644 index e6bb64f8f..000000000 --- a/smartsim/entity/entityList.py +++ /dev/null @@ -1,138 +0,0 @@ -# BSD 2-Clause License -# -# Copyright (c) 2021-2024, Hewlett Packard Enterprise -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. -# -# 2. Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation -# and/or other materials provided with the distribution. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import typing as t - -from .entity import SmartSimEntity - -if t.TYPE_CHECKING: - # pylint: disable-next=unused-import - import smartsim - -_T = t.TypeVar("_T", bound=SmartSimEntity) -# Old style pyint from TF 2.6.x does not know about pep484 style ``TypeVar`` names -# pylint: disable-next=invalid-name -_T_co = t.TypeVar("_T_co", bound=SmartSimEntity, covariant=True) - - -class EntitySequence(t.Generic[_T_co]): - """Abstract class for containers for SmartSimEntities""" - - def __init__(self, name: str, **kwargs: t.Any) -> None: - self.name: str = name - - # >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - # WARNING: This class cannot be made truly covariant until the - # following properties are made read-only. It is currently - # designed for in-house type checking only!! - # - # Despite the fact that these properties are type hinted as - # ``Sequence``s, the underlying types must remain ``list``s as that is - # what subclasses are expecting when implementing their - # ``_initialize_entities`` methods. - # - # I'm leaving it "as is" for now as to not introduce a potential API - # break in case any users subclassed the invariant version of this - # class (``EntityList``), but a "proper" solution would be to turn - # ``EntitySequence``/``EntityList`` into proper ``abc.ABC``s and have - # the properties we expect to be initialized represented as abstract - # properties. An additional benefit of this solution is would be that - # users could actually initialize their entities in the ``__init__`` - # method, and it would remove the need for the cumbersome and - # un-type-hint-able ``_initialize_entities`` method by returning all - # object construction into the class' constructor. - # --------------------------------------------------------------------- - # - self.entities: t.Sequence[_T_co] = [] - self._fs_models: t.Sequence["smartsim.entity.FSModel"] = [] - self._fs_scripts: t.Sequence["smartsim.entity.FSScript"] = [] - # - # <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< - - self._initialize_entities(**kwargs) - - def _initialize_entities(self, **kwargs: t.Any) -> None: - """Initialize the SmartSimEntity objects in the container""" - raise NotImplementedError - - @property - def fs_models(self) -> t.Iterable["smartsim.entity.FSModel"]: - """Return an immutable collection of attached models""" - return (model for model in self._fs_models) - - @property - def fs_scripts(self) -> t.Iterable["smartsim.entity.FSScript"]: - """Return an immutable collection of attached scripts""" - return (script for script in self._fs_scripts) - - @property - def batch(self) -> bool: - """Property indicating whether or not the entity sequence should be - launched as a batch job - - :return: ``True`` if entity sequence should be launched as a batch job, - ``False`` if the members will be launched individually. - """ - # pylint: disable-next=no-member - return hasattr(self, "batch_settings") and self.batch_settings - - @property - def type(self) -> str: - """Return the name of the class""" - return type(self).__name__ - - def __getitem__(self, name: str) -> t.Optional[_T_co]: - for entity in self.entities: - if entity.name == name: - return entity - return None - - def __iter__(self) -> t.Iterator[_T_co]: - for entity in self.entities: - yield entity - - def __len__(self) -> int: - return len(self.entities) - - -class EntityList(EntitySequence[_T]): - """An invariant subclass of an ``EntitySequence`` with mutable containers""" - - def __init__(self, name: str, **kwargs: t.Any) -> None: - super().__init__(name=name, **kwargs) - # Change container types to be invariant ``list``s - self.entities: t.List[_T] = list(self.entities) - self._fs_models: t.List["smartsim.entity.FSModel"] = list(self._fs_models) - self._fs_scripts: t.List["smartsim.entity.FSScript"] = list(self._fs_scripts) - - def _initialize_entities(self, **kwargs: t.Any) -> None: - """Initialize the SmartSimEntity objects in the container""" - # Need to identically re-define this "abstract method" or pylint - # complains that we are trying to define a concrete implementation of - # an abstract class despite the fact that we want this class to also be - # abstract. All the more reason to turn both of these classes into - # ``abc.ABC``s in my opinion. - raise NotImplementedError