diff --git a/docs/source/schedulers/ray.rst b/docs/source/schedulers/ray.rst index 170e2224d..b961821f9 100644 --- a/docs/source/schedulers/ray.rst +++ b/docs/source/schedulers/ray.rst @@ -13,7 +13,6 @@ Ray :show-inheritance: .. autofunction:: create_scheduler - .. autofunction:: has_ray .. autofunction:: serialize .. autoclass:: RayJob diff --git a/torchx/schedulers/ray_scheduler.py b/torchx/schedulers/ray_scheduler.py index af7e1be76..53f881749 100644 --- a/torchx/schedulers/ray_scheduler.py +++ b/torchx/schedulers/ray_scheduler.py @@ -18,6 +18,10 @@ import urllib3 +from ray.autoscaler import sdk as ray_autoscaler_sdk +from ray.dashboard.modules.job.common import JobStatus +from ray.dashboard.modules.job.sdk import JobSubmissionClient + from torchx.schedulers.api import ( AppDryRunInfo, AppState, @@ -35,22 +39,6 @@ from typing_extensions import TypedDict -try: - from ray.autoscaler import sdk as ray_autoscaler_sdk - from ray.dashboard.modules.job.common import JobStatus - from ray.dashboard.modules.job.sdk import JobSubmissionClient - - _has_ray = True - -except ImportError: - _has_ray = False - - -def has_ray() -> bool: - """Indicates whether Ray is installed in the current Python environment.""" - return _has_ray - - class RayOpts(TypedDict, total=False): cluster_config_file: Optional[str] cluster_name: Optional[str] @@ -59,397 +47,391 @@ class RayOpts(TypedDict, total=False): requirements: Optional[str] -if _has_ray: - _logger: logging.Logger = logging.getLogger(__name__) - - _ray_status_to_torchx_appstate: Dict[JobStatus, AppState] = { - JobStatus.PENDING: AppState.PENDING, - JobStatus.RUNNING: AppState.RUNNING, - JobStatus.SUCCEEDED: AppState.SUCCEEDED, - JobStatus.FAILED: AppState.FAILED, - JobStatus.STOPPED: AppState.CANCELLED, - } - - class _EnhancedJSONEncoder(json.JSONEncoder): - def default(self, o: RayActor): # pyre-ignore[3] - if dataclasses.is_dataclass(o): - return dataclasses.asdict(o) - return super().default(o) - - def serialize( - actors: List[RayActor], dirpath: str, output_filename: str = "actors.json" +_logger: logging.Logger = logging.getLogger(__name__) + +_ray_status_to_torchx_appstate: Dict[JobStatus, AppState] = { + JobStatus.PENDING: AppState.PENDING, + JobStatus.RUNNING: AppState.RUNNING, + JobStatus.SUCCEEDED: AppState.SUCCEEDED, + JobStatus.FAILED: AppState.FAILED, + JobStatus.STOPPED: AppState.CANCELLED, +} + + +class _EnhancedJSONEncoder(json.JSONEncoder): + def default(self, o: RayActor): # pyre-ignore[3] + if dataclasses.is_dataclass(o): + return dataclasses.asdict(o) + return super().default(o) + + +def serialize( + actors: List[RayActor], dirpath: str, output_filename: str = "actors.json" +) -> None: + actors_json = json.dumps(actors, cls=_EnhancedJSONEncoder) + with open(os.path.join(dirpath, output_filename), "w") as tmp: + json.dump(actors_json, tmp) + + +@dataclass +class RayJob: + """Represents a job that should be run on a Ray cluster. + + Attributes: + app_id: + The unique ID of the application (a.k.a. job). + cluster_config_file: + The Ray cluster configuration file. + cluster_name: + The cluster name to use. + dashboard_address: + The existing dashboard IP address to connect to + working_dir: + The working directory to copy to the cluster + requirements: + The libraries to install on the cluster per requirements.txt + actors: + The Ray actors which represent the job to be run. This attribute is + dumped to a JSON file and copied to the cluster where `ray_main.py` + uses it to initiate the job. + """ + + app_id: str + working_dir: str + cluster_config_file: Optional[str] = None + cluster_name: Optional[str] = None + dashboard_address: Optional[str] = None + requirements: Optional[str] = None + actors: List[RayActor] = field(default_factory=list) + + +class RayScheduler( + TmpDirWorkspaceMixin, Scheduler[RayOpts, AppDef, AppDryRunInfo[RayJob]] +): + """ + RayScheduler is a TorchX scheduling interface to Ray. The job def + workers will be launched as Ray actors + + The job environment is specified by the TorchX workspace. Any files in + the workspace will be present in the Ray job unless specified in + ``.torchxignore``. Python dependencies will be read from the + ``requirements.txt`` file located at the root of the workspace unless + it's overridden via ``-c ...,requirements=foo/requirements.txt``. + + **Config Options** + + .. runopts:: + class: torchx.schedulers.ray_scheduler.create_scheduler + + **Compatibility** + + .. compatibility:: + type: scheduler + features: + cancel: true + logs: | + Partial support. Ray only supports a single log stream so + only a dummy "ray/0" combined log role is supported. + Tailing and time seeking are not supported. + distributed: true + describe: | + Partial support. RayScheduler will return job status but + does not provide the complete original AppSpec. + workspaces: true + mounts: false + elasticity: Partial support. Multi role jobs are not supported. + + """ + + def __init__( + self, session_name: str, ray_client: Optional[JobSubmissionClient] = None ) -> None: - actors_json = json.dumps(actors, cls=_EnhancedJSONEncoder) - with open(os.path.join(dirpath, output_filename), "w") as tmp: - json.dump(actors_json, tmp) - - @dataclass - class RayJob: - """Represents a job that should be run on a Ray cluster. - - Attributes: - app_id: - The unique ID of the application (a.k.a. job). - cluster_config_file: - The Ray cluster configuration file. - cluster_name: - The cluster name to use. - dashboard_address: - The existing dashboard IP address to connect to - working_dir: - The working directory to copy to the cluster - requirements: - The libraries to install on the cluster per requirements.txt - actors: - The Ray actors which represent the job to be run. This attribute is - dumped to a JSON file and copied to the cluster where `ray_main.py` - uses it to initiate the job. - """ - - app_id: str - working_dir: str - cluster_config_file: Optional[str] = None - cluster_name: Optional[str] = None - dashboard_address: Optional[str] = None - requirements: Optional[str] = None - actors: List[RayActor] = field(default_factory=list) - - class RayScheduler( - TmpDirWorkspaceMixin, Scheduler[RayOpts, AppDef, AppDryRunInfo[RayJob]] - ): - """ - RayScheduler is a TorchX scheduling interface to Ray. The job def - workers will be launched as Ray actors - - The job environment is specified by the TorchX workspace. Any files in - the workspace will be present in the Ray job unless specified in - ``.torchxignore``. Python dependencies will be read from the - ``requirements.txt`` file located at the root of the workspace unless - it's overridden via ``-c ...,requirements=foo/requirements.txt``. - - **Config Options** - - .. runopts:: - class: torchx.schedulers.ray_scheduler.create_scheduler - - **Compatibility** - - .. compatibility:: - type: scheduler - features: - cancel: true - logs: | - Partial support. Ray only supports a single log stream so - only a dummy "ray/0" combined log role is supported. - Tailing and time seeking are not supported. - distributed: true - describe: | - Partial support. RayScheduler will return job status but - does not provide the complete original AppSpec. - workspaces: true - mounts: false - elasticity: Partial support. Multi role jobs are not supported. - - """ - - def __init__( - self, session_name: str, ray_client: Optional[JobSubmissionClient] = None - ) -> None: - # NOTE: make sure any new init options are supported in create_scheduler(...) - super().__init__("ray", session_name) - - # w/o Final None check in _get_ray_client does not work as it pyre assumes mutability - self._ray_client: Final[Optional[JobSubmissionClient]] = ray_client - - def _get_ray_client( - self, job_submission_netloc: Optional[str] = None - ) -> JobSubmissionClient: - if self._ray_client is not None: - client_netloc = urllib3.util.parse_url( - self._ray_client.get_address() - ).netloc - if job_submission_netloc and job_submission_netloc != client_netloc: - raise ValueError( - f"client netloc ({client_netloc}) does not match job netloc ({job_submission_netloc})" - ) - return self._ray_client - elif os.getenv("RAY_ADDRESS"): - return JobSubmissionClient(os.getenv("RAY_ADDRESS")) - elif not job_submission_netloc: - raise Exception( - "RAY_ADDRESS env variable or a scheduler with an attached Ray JobSubmissionClient is expected." - " See https://docs.ray.io/en/latest/cluster/jobs-package-ref.html#job-submission-sdk for more info" + # NOTE: make sure any new init options are supported in create_scheduler(...) + super().__init__("ray", session_name) + + # w/o Final None check in _get_ray_client does not work as it pyre assumes mutability + self._ray_client: Final[Optional[JobSubmissionClient]] = ray_client + + def _get_ray_client( + self, job_submission_netloc: Optional[str] = None + ) -> JobSubmissionClient: + if self._ray_client is not None: + client_netloc = urllib3.util.parse_url( + self._ray_client.get_address() + ).netloc + if job_submission_netloc and job_submission_netloc != client_netloc: + raise ValueError( + f"client netloc ({client_netloc}) does not match job netloc ({job_submission_netloc})" ) - return JobSubmissionClient(f"http://{job_submission_netloc}") - - # TODO: Add address as a potential CLI argument after writing ray.status() or passing in config file - def _run_opts(self) -> runopts: - opts = runopts() - opts.add( - "cluster_config_file", - type_=str, - required=False, - help="Use CLUSTER_CONFIG_FILE to access or create the Ray cluster.", - ) - opts.add( - "cluster_name", - type_=str, - help="Override the configured cluster name.", + return self._ray_client + elif os.getenv("RAY_ADDRESS"): + return JobSubmissionClient(os.getenv("RAY_ADDRESS")) + elif not job_submission_netloc: + raise Exception( + "RAY_ADDRESS env variable or a scheduler with an attached Ray JobSubmissionClient is expected." + " See https://docs.ray.io/en/latest/cluster/jobs-package-ref.html#job-submission-sdk for more info" ) - opts.add( - "dashboard_address", - type_=str, - required=False, - default="127.0.0.1:8265", - help="Use ray status to get the dashboard address you will submit jobs against", + return JobSubmissionClient(f"http://{job_submission_netloc}") + + # TODO: Add address as a potential CLI argument after writing ray.status() or passing in config file + def _run_opts(self) -> runopts: + opts = runopts() + opts.add( + "cluster_config_file", + type_=str, + required=False, + help="Use CLUSTER_CONFIG_FILE to access or create the Ray cluster.", + ) + opts.add( + "cluster_name", + type_=str, + help="Override the configured cluster name.", + ) + opts.add( + "dashboard_address", + type_=str, + required=False, + default="127.0.0.1:8265", + help="Use ray status to get the dashboard address you will submit jobs against", + ) + opts.add("requirements", type_=str, help="Path to requirements.txt") + return opts + + def schedule(self, dryrun_info: AppDryRunInfo[RayJob]) -> str: + cfg: RayJob = dryrun_info.request + + # Create serialized actors for ray_driver.py + actors = cfg.actors + dirpath = cfg.working_dir + serialize(actors, dirpath) + + job_submission_addr: str = "" + if cfg.cluster_config_file: + job_submission_addr = ray_autoscaler_sdk.get_head_node_ip( + cfg.cluster_config_file + ) # pragma: no cover + elif cfg.dashboard_address: + job_submission_addr = cfg.dashboard_address + else: + raise RuntimeError( + "Either `dashboard_address` or `cluster_config_file` must be specified" ) - opts.add("requirements", type_=str, help="Path to requirements.txt") - return opts - - def schedule(self, dryrun_info: AppDryRunInfo[RayJob]) -> str: - cfg: RayJob = dryrun_info.request - - # Create serialized actors for ray_driver.py - actors = cfg.actors - dirpath = cfg.working_dir - serialize(actors, dirpath) - - job_submission_addr: str = "" - if cfg.cluster_config_file: - job_submission_addr = ray_autoscaler_sdk.get_head_node_ip( - cfg.cluster_config_file - ) # pragma: no cover - elif cfg.dashboard_address: - job_submission_addr = cfg.dashboard_address - else: - raise RuntimeError( - "Either `dashboard_address` or `cluster_config_file` must be specified" - ) - - # 0. Create Job Client - client = self._get_ray_client(job_submission_netloc=job_submission_addr) - - # 1. Copy Ray driver utilities - current_directory = os.path.dirname(os.path.abspath(__file__)) - copy2(os.path.join(current_directory, "ray", "ray_driver.py"), dirpath) - copy2(os.path.join(current_directory, "ray", "ray_common.py"), dirpath) - runtime_env = {"working_dir": dirpath} - if cfg.requirements: - runtime_env["pip"] = cfg.requirements - - # 1. Submit Job via the Ray Job Submission API - try: - job_id: str = client.submit_job( - submission_id=cfg.app_id, - # we will pack, hash, zip, upload, register working_dir in GCS of ray cluster - # and use it to configure your job execution. - entrypoint="python3 ray_driver.py", - runtime_env=runtime_env, - ) - finally: - if dirpath.startswith(tempfile.gettempdir()): - rmtree(dirpath) + # 0. Create Job Client + client = self._get_ray_client(job_submission_netloc=job_submission_addr) + + # 1. Copy Ray driver utilities + current_directory = os.path.dirname(os.path.abspath(__file__)) + copy2(os.path.join(current_directory, "ray", "ray_driver.py"), dirpath) + copy2(os.path.join(current_directory, "ray", "ray_common.py"), dirpath) + runtime_env = {"working_dir": dirpath} + if cfg.requirements: + runtime_env["pip"] = cfg.requirements + + # 1. Submit Job via the Ray Job Submission API + try: + job_id: str = client.submit_job( + submission_id=cfg.app_id, + # we will pack, hash, zip, upload, register working_dir in GCS of ray cluster + # and use it to configure your job execution. + entrypoint="python3 ray_driver.py", + runtime_env=runtime_env, + ) - # Encode job submission client in job_id - return f"{job_submission_addr}-{job_id}" + finally: + if dirpath.startswith(tempfile.gettempdir()): + rmtree(dirpath) - def _submit_dryrun(self, app: AppDef, cfg: RayOpts) -> AppDryRunInfo[RayJob]: - app_id = make_unique(app.name) + # Encode job submission client in job_id + return f"{job_submission_addr}-{job_id}" - working_dir = app.roles[0].image - if not os.path.exists(working_dir): - raise RuntimeError( - f"Role image must be a valid directory, got: {working_dir} " - ) + def _submit_dryrun(self, app: AppDef, cfg: RayOpts) -> AppDryRunInfo[RayJob]: + app_id = make_unique(app.name) - requirements: Optional[str] = cfg.get("requirements") - if requirements is None: - workspace_reqs = os.path.join(working_dir, "requirements.txt") - if os.path.exists(workspace_reqs): - requirements = workspace_reqs - - cluster_cfg = cfg.get("cluster_config_file") - if cluster_cfg: - if not isinstance(cluster_cfg, str) or not os.path.isfile(cluster_cfg): - raise ValueError( - "The cluster configuration file must be a YAML file." - ) + working_dir = app.roles[0].image + if not os.path.exists(working_dir): + raise RuntimeError( + f"Role image must be a valid directory, got: {working_dir} " + ) - job: RayJob = RayJob( - app_id, - cluster_config_file=cluster_cfg, - requirements=requirements, - working_dir=working_dir, - ) + requirements: Optional[str] = cfg.get("requirements") + if requirements is None: + workspace_reqs = os.path.join(working_dir, "requirements.txt") + if os.path.exists(workspace_reqs): + requirements = workspace_reqs + + cluster_cfg = cfg.get("cluster_config_file") + if cluster_cfg: + if not isinstance(cluster_cfg, str) or not os.path.isfile(cluster_cfg): + raise ValueError("The cluster configuration file must be a YAML file.") + + job: RayJob = RayJob( + app_id, + cluster_config_file=cluster_cfg, + requirements=requirements, + working_dir=working_dir, + ) - else: # pragma: no cover - dashboard_address = cfg.get("dashboard_address") - job: RayJob = RayJob( + else: # pragma: no cover + dashboard_address = cfg.get("dashboard_address") + job: RayJob = RayJob( + app_id=app_id, + dashboard_address=dashboard_address, + requirements=requirements, + working_dir=working_dir, + ) + job.cluster_name = cfg.get("cluster_name") + + for role in app.roles: + for replica_id in range(role.num_replicas): + # Replace the ${img_root}, ${app_id}, and ${replica_id} placeholders + # in arguments and environment variables. + replica_role = macros.Values( + img_root=role.image, app_id=app_id, - dashboard_address=dashboard_address, - requirements=requirements, - working_dir=working_dir, + replica_id=str(replica_id), + rank0_env=TORCHX_RANK0_HOST, + ).apply(role) + + actor = RayActor( + name=role.name, + min_replicas=role.min_replicas, + command=[replica_role.entrypoint] + replica_role.args, + env=replica_role.env, + num_cpus=max(1, replica_role.resource.cpu), + num_gpus=max(0, replica_role.resource.gpu), ) - job.cluster_name = cfg.get("cluster_name") - - for role in app.roles: - for replica_id in range(role.num_replicas): - # Replace the ${img_root}, ${app_id}, and ${replica_id} placeholders - # in arguments and environment variables. - replica_role = macros.Values( - img_root=role.image, - app_id=app_id, - replica_id=str(replica_id), - rank0_env=TORCHX_RANK0_HOST, - ).apply(role) - - actor = RayActor( - name=role.name, - min_replicas=role.min_replicas, - command=[replica_role.entrypoint] + replica_role.args, - env=replica_role.env, - num_cpus=max(1, replica_role.resource.cpu), - num_gpus=max(0, replica_role.resource.gpu), - ) - job.actors.append(actor) + job.actors.append(actor) - if len(app.roles) > 1 and app.roles[0].min_replicas is not None: - raise ValueError("min_replicas is only supported with single role jobs") + if len(app.roles) > 1 and app.roles[0].min_replicas is not None: + raise ValueError("min_replicas is only supported with single role jobs") - return AppDryRunInfo(job, repr) + return AppDryRunInfo(job, repr) - def _validate(self, app: AppDef, scheduler: str, cfg: RayOpts) -> None: - if scheduler != "ray": - raise ValueError( - f"An unknown scheduler backend '{scheduler}' has been passed to the Ray scheduler." + def _validate(self, app: AppDef, scheduler: str, cfg: RayOpts) -> None: + if scheduler != "ray": + raise ValueError( + f"An unknown scheduler backend '{scheduler}' has been passed to the Ray scheduler." + ) + + if app.metadata: + _logger.warning("The Ray scheduler does not use metadata information.") + + for role in app.roles: + if role.resource.capabilities: + _logger.warning( + "The Ray scheduler does not support custom resource capabilities." ) + break - if app.metadata: - _logger.warning("The Ray scheduler does not use metadata information.") + for role in app.roles: + if role.port_map: + _logger.warning("The Ray scheduler does not support port mapping.") + break - for role in app.roles: - if role.resource.capabilities: - _logger.warning( - "The Ray scheduler does not support custom resource capabilities." - ) - break - - for role in app.roles: - if role.port_map: - _logger.warning("The Ray scheduler does not support port mapping.") - break - - def wait_until_finish(self, app_id: str, timeout: int = 30) -> None: - """ - ``wait_until_finish`` waits until the specified job has finished - with a given timeout. This is intended for testing. Programmatic - usage should use the runner wait method instead. - """ - - start = time.time() - while time.time() - start <= timeout: - status_info = self._get_job_status(app_id) - status = status_info - if status in {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED}: - break - time.sleep(1) - - def _parse_app_id(self, app_id: str) -> Tuple[str, str]: - # find index of '-' in the first :\d+- - m = re.search(r":\d+-", app_id) - if m: - sep = m.span()[1] - addr = app_id[: sep - 1] - app_id = app_id[sep:] - return addr, app_id - - addr, _, app_id = app_id.partition("-") + def wait_until_finish(self, app_id: str, timeout: int = 30) -> None: + """ + ``wait_until_finish`` waits until the specified job has finished + with a given timeout. This is intended for testing. Programmatic + usage should use the runner wait method instead. + """ + + start = time.time() + while time.time() - start <= timeout: + status_info = self._get_job_status(app_id) + status = status_info + if status in {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED}: + break + time.sleep(1) + + def _parse_app_id(self, app_id: str) -> Tuple[str, str]: + # find index of '-' in the first :\d+- + m = re.search(r":\d+-", app_id) + if m: + sep = m.span()[1] + addr = app_id[: sep - 1] + app_id = app_id[sep:] return addr, app_id - def _cancel_existing(self, app_id: str) -> None: # pragma: no cover - addr, app_id = self._parse_app_id(app_id) - client = self._get_ray_client(job_submission_netloc=addr) - client.stop_job(app_id) - - def _get_job_status(self, app_id: str) -> JobStatus: - addr, app_id = self._parse_app_id(app_id) - client = self._get_ray_client(job_submission_netloc=addr) - status = client.get_job_status(app_id) - if isinstance(status, str): - return cast(JobStatus, status) - return status.status - - def describe(self, app_id: str) -> Optional[DescribeAppResponse]: - job_status_info = self._get_job_status(app_id) - state = _ray_status_to_torchx_appstate[job_status_info] - roles = [Role(name="ray", num_replicas=1, image="")] - - # get ip_address and put it in hostname - - roles_statuses = [ - RoleStatus( - role="ray", - replicas=[ - ReplicaStatus( - id=0, - role="ray", - hostname=NONE, - state=state, - ) - ], - ) - ] - return DescribeAppResponse( - app_id=app_id, - state=state, - msg=job_status_info, - roles_statuses=roles_statuses, - roles=roles, + addr, _, app_id = app_id.partition("-") + return addr, app_id + + def _cancel_existing(self, app_id: str) -> None: # pragma: no cover + addr, app_id = self._parse_app_id(app_id) + client = self._get_ray_client(job_submission_netloc=addr) + client.stop_job(app_id) + + def _get_job_status(self, app_id: str) -> JobStatus: + addr, app_id = self._parse_app_id(app_id) + client = self._get_ray_client(job_submission_netloc=addr) + status = client.get_job_status(app_id) + if isinstance(status, str): + return cast(JobStatus, status) + return status.status + + def describe(self, app_id: str) -> Optional[DescribeAppResponse]: + job_status_info = self._get_job_status(app_id) + state = _ray_status_to_torchx_appstate[job_status_info] + roles = [Role(name="ray", num_replicas=1, image="")] + + # get ip_address and put it in hostname + + roles_statuses = [ + RoleStatus( + role="ray", + replicas=[ + ReplicaStatus( + id=0, + role="ray", + hostname=NONE, + state=state, + ) + ], ) + ] + return DescribeAppResponse( + app_id=app_id, + state=state, + msg=job_status_info, + roles_statuses=roles_statuses, + roles=roles, + ) - def log_iter( - self, - app_id: str, - role_name: Optional[str] = None, - k: int = 0, - regex: Optional[str] = None, - since: Optional[datetime] = None, - until: Optional[datetime] = None, - should_tail: bool = False, - streams: Optional[Stream] = None, - ) -> Iterable[str]: - # TODO: support tailing, streams etc.. - addr, app_id = self._parse_app_id(app_id) - client: JobSubmissionClient = self._get_ray_client( - job_submission_netloc=addr + def log_iter( + self, + app_id: str, + role_name: Optional[str] = None, + k: int = 0, + regex: Optional[str] = None, + since: Optional[datetime] = None, + until: Optional[datetime] = None, + should_tail: bool = False, + streams: Optional[Stream] = None, + ) -> Iterable[str]: + # TODO: support tailing, streams etc.. + addr, app_id = self._parse_app_id(app_id) + client: JobSubmissionClient = self._get_ray_client(job_submission_netloc=addr) + logs: str = client.get_job_logs(app_id) + iterator = split_lines(logs) + if regex: + return filter_regex(regex, iterator) + return iterator + + def list(self) -> List[ListAppResponse]: + client = self._get_ray_client() + jobs = client.list_jobs() + netloc = urllib3.util.parse_url(client.get_address()).netloc + return [ + ListAppResponse( + app_id=f"{netloc}-{details.submission_id}", + state=_ray_status_to_torchx_appstate[details.status], ) - logs: str = client.get_job_logs(app_id) - iterator = split_lines(logs) - if regex: - return filter_regex(regex, iterator) - return iterator - - def list(self) -> List[ListAppResponse]: - client = self._get_ray_client() - jobs = client.list_jobs() - netloc = urllib3.util.parse_url(client.get_address()).netloc - return [ - ListAppResponse( - app_id=f"{netloc}-{details.submission_id}", - state=_ray_status_to_torchx_appstate[details.status], - ) - for details in jobs - ] + for details in jobs + ] def create_scheduler( session_name: str, ray_client: Optional[JobSubmissionClient] = None, **kwargs: Any ) -> "RayScheduler": - if not has_ray(): # pragma: no cover - raise ModuleNotFoundError( - "Ray is not installed in the current Python environment." - ) - return RayScheduler(session_name=session_name, ray_client=ray_client) diff --git a/torchx/schedulers/test/ray_scheduler_test.py b/torchx/schedulers/test/ray_scheduler_test.py index 8fedd4773..6205c6e75 100644 --- a/torchx/schedulers/test/ray_scheduler_test.py +++ b/torchx/schedulers/test/ray_scheduler_test.py @@ -13,694 +13,685 @@ from unittest import TestCase from unittest.mock import MagicMock, patch +import ray +from ray.cluster_utils import Cluster +from ray.dashboard.modules.job.sdk import JobSubmissionClient +from ray.util.placement_group import remove_placement_group + from torchx.schedulers import get_scheduler_factories from torchx.schedulers.api import AppDryRunInfo, DescribeAppResponse, ListAppResponse +from torchx.schedulers.ray import ray_driver from torchx.schedulers.ray.ray_common import RayActor -from torchx.schedulers.ray_scheduler import has_ray +from torchx.schedulers.ray_scheduler import ( + _logger, + RayJob, + RayOpts, + RayScheduler, + serialize, +) from torchx.specs import AppDef, Resource, Role, runopts -if has_ray(): - import ray - from ray.cluster_utils import Cluster - from ray.dashboard.modules.job.sdk import JobSubmissionClient - from ray.util.placement_group import remove_placement_group - from torchx.schedulers.ray import ray_driver - from torchx.schedulers.ray_scheduler import ( - _logger, - RayJob, - RayOpts, - RayScheduler, - serialize, - ) - class RaySchedulerRegistryTest(TestCase): - def test_get_schedulers_returns_ray_scheduler(self) -> None: - schedulers = get_scheduler_factories() +class RaySchedulerRegistryTest(TestCase): + def test_get_schedulers_returns_ray_scheduler(self) -> None: + schedulers = get_scheduler_factories() - self.assertIn("ray", schedulers) + self.assertIn("ray", schedulers) - scheduler = schedulers["ray"]("test_session") + scheduler = schedulers["ray"]("test_session") - self.assertIsInstance(scheduler, RayScheduler) + self.assertIsInstance(scheduler, RayScheduler) - ray_scheduler = cast(RayScheduler, scheduler) + ray_scheduler = cast(RayScheduler, scheduler) - self.assertEqual(ray_scheduler.backend, "ray") - self.assertEqual(ray_scheduler.session_name, "test_session") + self.assertEqual(ray_scheduler.backend, "ray") + self.assertEqual(ray_scheduler.session_name, "test_session") - class RaySchedulerTest(TestCase): - def setUp(self) -> None: - self._scripts = ["dummy1.py", "dummy2.py"] - self.tempdir = tempfile.TemporaryDirectory() +class RaySchedulerTest(TestCase): + def setUp(self) -> None: + self._scripts = ["dummy1.py", "dummy2.py"] - self._app_def = AppDef( - name="dummy_app", - roles=[ - Role( - name="dummy_role1", - image=self.tempdir.name, - entrypoint="dummy_entrypoint1", - args=["arg1", self._scripts[0], "arg2"], - num_replicas=3, - env={"dummy_env": "dummy_value"}, - resource=Resource(cpu=2, gpu=3, memMB=0), - ), - Role( - name="dummy_role2", - image=self.tempdir.name, - entrypoint="dummy_entrypoint2", - args=["arg3", "arg4", self._scripts[1]], - ), - ], - ) + self.tempdir = tempfile.TemporaryDirectory() - self._run_cfg = RayOpts( - { - "cluster_config_file": "dummy_file", - "cluster_name": "dummy_name", - "working_dir": None, - "requirements": None, - } - ) + self._app_def = AppDef( + name="dummy_app", + roles=[ + Role( + name="dummy_role1", + image=self.tempdir.name, + entrypoint="dummy_entrypoint1", + args=["arg1", self._scripts[0], "arg2"], + num_replicas=3, + env={"dummy_env": "dummy_value"}, + resource=Resource(cpu=2, gpu=3, memMB=0), + ), + Role( + name="dummy_role2", + image=self.tempdir.name, + entrypoint="dummy_entrypoint2", + args=["arg3", "arg4", self._scripts[1]], + ), + ], + ) - # mock validation step so that instantiation doesn't fail due to inability to reach dashboard - JobSubmissionClient._check_connection_and_version = MagicMock() + self._run_cfg = RayOpts( + { + "cluster_config_file": "dummy_file", + "cluster_name": "dummy_name", + "working_dir": None, + "requirements": None, + } + ) - self._scheduler = RayScheduler("test_session") + # mock validation step so that instantiation doesn't fail due to inability to reach dashboard + JobSubmissionClient._check_connection_and_version = MagicMock() - self._isfile_patch = patch("torchx.schedulers.ray_scheduler.os.path.isfile") + self._scheduler = RayScheduler("test_session") - self._mock_isfile = self._isfile_patch.start() - self._mock_isfile.return_value = True + self._isfile_patch = patch("torchx.schedulers.ray_scheduler.os.path.isfile") - def tearDown(self) -> None: - self.tempdir.cleanup() - self._isfile_patch.stop() + self._mock_isfile = self._isfile_patch.start() + self._mock_isfile.return_value = True - def test_init_sets_session_and_backend_name(self) -> None: - self.assertEqual(self._scheduler.backend, "ray") - self.assertEqual(self._scheduler.session_name, "test_session") + def tearDown(self) -> None: + self.tempdir.cleanup() + self._isfile_patch.stop() - def test_run_opts_returns_expected_options(self) -> None: - opts: runopts = self._scheduler.run_opts() + def test_init_sets_session_and_backend_name(self) -> None: + self.assertEqual(self._scheduler.backend, "ray") + self.assertEqual(self._scheduler.session_name, "test_session") - @dataclass - class Option: - name: str - opt_type: Type - is_required: bool = False - default: Any = None - - def assert_option(expected_opt: Option) -> None: - opt = opts.get(expected_opt.name) - - self.assertIsNotNone(opt) - - self.assertEqual(opt.opt_type, expected_opt.opt_type) - self.assertEqual(opt.is_required, expected_opt.is_required) - - if expected_opt.default is None: - self.assertIsNone(opt.default) - else: - self.assertEqual(opt.default, expected_opt.default) - - expected_opts = [ - Option("cluster_config_file", str, is_required=False), - Option("cluster_name", str), - Option("dashboard_address", str, default="127.0.0.1:8265"), - Option("requirements", str, is_required=False), - ] - - self.assertEqual(len(opts), len(expected_opts)) - - for expected_opt in expected_opts: - assert_option(expected_opt) - - def test_validate_does_not_raise_error_and_does_not_log_warning(self) -> None: - with self.assertLogs(_logger, "WARNING") as cm: - self._scheduler._validate( - self._app_def, scheduler="ray", cfg=self._run_cfg - ) - - _logger.warning("dummy log") - - self.assertEqual(len(cm.records), 1) - - def test_validate_raises_error_if_backend_name_is_not_ray(self) -> None: - with self.assertRaisesRegex( - ValueError, - r"^An unknown scheduler backend 'dummy' has been passed to the Ray scheduler.$", - ): - self._scheduler._validate( - self._app_def, scheduler="dummy", cfg=self._run_cfg - ) - - @contextmanager - def _assert_log_message(self, level: str, msg: str) -> Iterator[None]: - with self.assertLogs(_logger) as cm: - yield - - self.assertEqual(len(cm.records), 1) - - log_record = cm.records[0] - - self.assertEqual(log_record.levelname, level) - self.assertEqual(log_record.message, msg) - - def test_validate_warns_when_app_def_contains_metadata(self) -> None: - self._app_def.metadata["dummy_key"] = "dummy_value" - - with self._assert_log_message( - "WARNING", "The Ray scheduler does not use metadata information." - ): - self._scheduler._validate( - self._app_def, scheduler="ray", cfg=self._run_cfg - ) - - def test_validate_warns_when_role_contains_resource_capability(self) -> None: - self._app_def.roles[1].resource.capabilities["dummy_cap1"] = 1 - self._app_def.roles[1].resource.capabilities["dummy_cap2"] = 2 - - with self._assert_log_message( - "WARNING", - "The Ray scheduler does not support custom resource capabilities.", - ): - self._scheduler._validate( - self._app_def, scheduler="ray", cfg=self._run_cfg - ) - - def test_validate_warns_when_role_contains_port_map(self) -> None: - self._app_def.roles[1].port_map["dummy_map1"] = 1 - self._app_def.roles[1].port_map["dummy_map2"] = 2 - - with self._assert_log_message( - "WARNING", "The Ray scheduler does not support port mapping." - ): - self._scheduler._validate( - self._app_def, scheduler="ray", cfg=self._run_cfg - ) - - def test_submit_dryrun_raises_error_if_cluster_config_file_is_not_str( - self, - ) -> None: - # pyre-fixme: Expects string type - self._run_cfg["cluster_config_file"] = 1 - - with self.assertRaisesRegex( - ValueError, - r"^The cluster configuration file must be a YAML file.$", - ): - self._scheduler._submit_dryrun(self._app_def, self._run_cfg) - - def test_submit_dryrun_raises_error_if_cluster_config_file_is_not_found( - self, - ) -> None: - self._mock_isfile.return_value = False - - with self.assertRaisesRegex( - ValueError, - r"^The cluster configuration file must be a YAML file.$", - ): - self._scheduler._submit_dryrun(self._app_def, self._run_cfg) - - # pyre-ignore[2]: Parameter `value` must have a type other than `Any` - def _assert_config_value(self, name: str, value: Any, type_name: str) -> None: - # pyre-fixme: TypedDict indexes by string literal - self._run_cfg[name] = value - - with self.assertRaisesRegex( - TypeError, - rf"^The configuration value '{name}' must be of type {type_name}.$", - ): - self._scheduler._submit_dryrun(self._app_def, self._run_cfg) - - def _assert_submit_dryrun_constructs_job_definition(self) -> None: - run_info = self._scheduler._submit_dryrun(self._app_def, self._run_cfg) - - job = run_info.request - - self.assertTrue(job.app_id.startswith(self._app_def.name)) - self.assertGreater(len(job.app_id), len(self._app_def.name)) - - self.assertEqual( - job.cluster_config_file, self._run_cfg.get("cluster_config_file") - ) - self.assertEqual(job.cluster_name, self._run_cfg.get("cluster_name")) + def test_run_opts_returns_expected_options(self) -> None: + opts: runopts = self._scheduler.run_opts() - actor_roles = [] - for role in self._app_def.roles: - actor_roles += [role] * role.num_replicas + @dataclass + class Option: + name: str + opt_type: Type + is_required: bool = False + default: Any = None - self.assertEqual(len(job.actors), len(actor_roles)) + def assert_option(expected_opt: Option) -> None: + opt = opts.get(expected_opt.name) - for actor, role in zip(job.actors, actor_roles): - self.assertEqual(actor.name, role.name) - self.assertEqual(actor.command, [role.entrypoint] + role.args) - self.assertEqual(actor.env, role.env) - self.assertEqual(actor.num_cpus, max(1, role.resource.cpu)) - self.assertEqual(actor.num_gpus, max(0, role.resource.gpu)) + self.assertIsNotNone(opt) - def test_submit_dryrun_constructs_job_definition(self) -> None: - self._assert_submit_dryrun_constructs_job_definition() + self.assertEqual(opt.opt_type, expected_opt.opt_type) + self.assertEqual(opt.is_required, expected_opt.is_required) - self._run_cfg["cluster_name"] = None - self._run_cfg["working_dir"] = None - self._run_cfg["requirements"] = None + if expected_opt.default is None: + self.assertIsNone(opt.default) + else: + self.assertEqual(opt.default, expected_opt.default) - self._assert_submit_dryrun_constructs_job_definition() + expected_opts = [ + Option("cluster_config_file", str, is_required=False), + Option("cluster_name", str), + Option("dashboard_address", str, default="127.0.0.1:8265"), + Option("requirements", str, is_required=False), + ] - def test_submit_dryrun_constructs_actor_command(self) -> None: - run_info = self._scheduler._submit_dryrun(self._app_def, self._run_cfg) + self.assertEqual(len(opts), len(expected_opts)) - job = run_info.request + for expected_opt in expected_opts: + assert_option(expected_opt) - self.assertEqual( - job.actors[0].command, - ["dummy_entrypoint1", "arg1", "dummy1.py", "arg2"], - ) + def test_validate_does_not_raise_error_and_does_not_log_warning(self) -> None: + with self.assertLogs(_logger, "WARNING") as cm: + self._scheduler._validate(self._app_def, scheduler="ray", cfg=self._run_cfg) - def test_no_dir(self) -> None: - app = AppDef( - name="dummy_app", - roles=[ - Role( - name="dummy_role1", - image="invalid_path", - ), - ], + _logger.warning("dummy log") + + self.assertEqual(len(cm.records), 1) + + def test_validate_raises_error_if_backend_name_is_not_ray(self) -> None: + with self.assertRaisesRegex( + ValueError, + r"^An unknown scheduler backend 'dummy' has been passed to the Ray scheduler.$", + ): + self._scheduler._validate( + self._app_def, scheduler="dummy", cfg=self._run_cfg ) - with self.assertRaisesRegex( - RuntimeError, "Role image must be a valid directory, got: invalid_path" - ): - self._scheduler._submit_dryrun(app, cfg={}) - - def test_requirements(self) -> None: - with tempfile.TemporaryDirectory() as path: - reqs = os.path.join(path, "requirements.txt") - with open(reqs, "w") as f: - f.write("asdf") - - app = AppDef( - name="app", - roles=[ - Role( - name="role", - image=path, - ), - ], - ) - req = self._scheduler._submit_dryrun(app, cfg={}) - job = req.request - self.assertEqual(job.requirements, reqs) - - def test_parse_app_id(self) -> None: - test_addr_appid = [ - ( - "0.0.0.0:1234-app_id", - "0.0.0.0:1234", - "app_id", - ), # (full address, address:port, app_id) - ("addr-of-cluster:1234-app-id", "addr-of-cluster:1234", "app-id"), - ("www.test.com:1234-app:id", "www.test.com:1234", "app:id"), - ("foo", "foo", ""), - ("foo-bar-bar", "foo", "bar-bar"), - ] - for test_example, addr, app_id in test_addr_appid: - parsed_addr, parsed_appid = self._scheduler._parse_app_id(test_example) - self.assertEqual(parsed_addr, addr) - self.assertEqual(parsed_appid, app_id) - - def test_list_throws_without_address(self) -> None: - if "RAY_ADDRESS" in os.environ: - del os.environ["RAY_ADDRESS"] - with self.assertRaisesRegex(Exception, "RAY_ADDRESS env variable"): - self._scheduler.list() - - def test_list_doesnt_throw_with_client(self) -> None: - ray_client = JobSubmissionClient(address="https://test.com") - ray_client.list_jobs = MagicMock(return_value=[]) - _scheduler_with_client = RayScheduler("client_session", ray_client) - _scheduler_with_client.list() # testing for success (should not throw exception) - - def test_min_replicas(self) -> None: + + @contextmanager + def _assert_log_message(self, level: str, msg: str) -> Iterator[None]: + with self.assertLogs(_logger) as cm: + yield + + self.assertEqual(len(cm.records), 1) + + log_record = cm.records[0] + + self.assertEqual(log_record.levelname, level) + self.assertEqual(log_record.message, msg) + + def test_validate_warns_when_app_def_contains_metadata(self) -> None: + self._app_def.metadata["dummy_key"] = "dummy_value" + + with self._assert_log_message( + "WARNING", "The Ray scheduler does not use metadata information." + ): + self._scheduler._validate(self._app_def, scheduler="ray", cfg=self._run_cfg) + + def test_validate_warns_when_role_contains_resource_capability(self) -> None: + self._app_def.roles[1].resource.capabilities["dummy_cap1"] = 1 + self._app_def.roles[1].resource.capabilities["dummy_cap2"] = 2 + + with self._assert_log_message( + "WARNING", + "The Ray scheduler does not support custom resource capabilities.", + ): + self._scheduler._validate(self._app_def, scheduler="ray", cfg=self._run_cfg) + + def test_validate_warns_when_role_contains_port_map(self) -> None: + self._app_def.roles[1].port_map["dummy_map1"] = 1 + self._app_def.roles[1].port_map["dummy_map2"] = 2 + + with self._assert_log_message( + "WARNING", "The Ray scheduler does not support port mapping." + ): + self._scheduler._validate(self._app_def, scheduler="ray", cfg=self._run_cfg) + + def test_submit_dryrun_raises_error_if_cluster_config_file_is_not_str( + self, + ) -> None: + # pyre-fixme: Expects string type + self._run_cfg["cluster_config_file"] = 1 + + with self.assertRaisesRegex( + ValueError, + r"^The cluster configuration file must be a YAML file.$", + ): + self._scheduler._submit_dryrun(self._app_def, self._run_cfg) + + def test_submit_dryrun_raises_error_if_cluster_config_file_is_not_found( + self, + ) -> None: + self._mock_isfile.return_value = False + + with self.assertRaisesRegex( + ValueError, + r"^The cluster configuration file must be a YAML file.$", + ): + self._scheduler._submit_dryrun(self._app_def, self._run_cfg) + + # pyre-ignore[2]: Parameter `value` must have a type other than `Any` + def _assert_config_value(self, name: str, value: Any, type_name: str) -> None: + # pyre-fixme: TypedDict indexes by string literal + self._run_cfg[name] = value + + with self.assertRaisesRegex( + TypeError, + rf"^The configuration value '{name}' must be of type {type_name}.$", + ): + self._scheduler._submit_dryrun(self._app_def, self._run_cfg) + + def _assert_submit_dryrun_constructs_job_definition(self) -> None: + run_info = self._scheduler._submit_dryrun(self._app_def, self._run_cfg) + + job = run_info.request + + self.assertTrue(job.app_id.startswith(self._app_def.name)) + self.assertGreater(len(job.app_id), len(self._app_def.name)) + + self.assertEqual( + job.cluster_config_file, self._run_cfg.get("cluster_config_file") + ) + self.assertEqual(job.cluster_name, self._run_cfg.get("cluster_name")) + + actor_roles = [] + for role in self._app_def.roles: + actor_roles += [role] * role.num_replicas + + self.assertEqual(len(job.actors), len(actor_roles)) + + for actor, role in zip(job.actors, actor_roles): + self.assertEqual(actor.name, role.name) + self.assertEqual(actor.command, [role.entrypoint] + role.args) + self.assertEqual(actor.env, role.env) + self.assertEqual(actor.num_cpus, max(1, role.resource.cpu)) + self.assertEqual(actor.num_gpus, max(0, role.resource.gpu)) + + def test_submit_dryrun_constructs_job_definition(self) -> None: + self._assert_submit_dryrun_constructs_job_definition() + + self._run_cfg["cluster_name"] = None + self._run_cfg["working_dir"] = None + self._run_cfg["requirements"] = None + + self._assert_submit_dryrun_constructs_job_definition() + + def test_submit_dryrun_constructs_actor_command(self) -> None: + run_info = self._scheduler._submit_dryrun(self._app_def, self._run_cfg) + + job = run_info.request + + self.assertEqual( + job.actors[0].command, + ["dummy_entrypoint1", "arg1", "dummy1.py", "arg2"], + ) + + def test_no_dir(self) -> None: + app = AppDef( + name="dummy_app", + roles=[ + Role( + name="dummy_role1", + image="invalid_path", + ), + ], + ) + with self.assertRaisesRegex( + RuntimeError, "Role image must be a valid directory, got: invalid_path" + ): + self._scheduler._submit_dryrun(app, cfg={}) + + def test_requirements(self) -> None: + with tempfile.TemporaryDirectory() as path: + reqs = os.path.join(path, "requirements.txt") + with open(reqs, "w") as f: + f.write("asdf") + app = AppDef( name="app", roles=[ Role( name="role", - image="/tmp/", - num_replicas=2, + image=path, ), ], ) req = self._scheduler._submit_dryrun(app, cfg={}) job = req.request - self.assertEqual(job.actors[0].min_replicas, None) - - app.roles[0].min_replicas = 1 - req = self._scheduler._submit_dryrun(app, cfg={}) - job = req.request - self.assertEqual(job.actors[0].min_replicas, 1) - - app.roles.append( + self.assertEqual(job.requirements, reqs) + + def test_parse_app_id(self) -> None: + test_addr_appid = [ + ( + "0.0.0.0:1234-app_id", + "0.0.0.0:1234", + "app_id", + ), # (full address, address:port, app_id) + ("addr-of-cluster:1234-app-id", "addr-of-cluster:1234", "app-id"), + ("www.test.com:1234-app:id", "www.test.com:1234", "app:id"), + ("foo", "foo", ""), + ("foo-bar-bar", "foo", "bar-bar"), + ] + for test_example, addr, app_id in test_addr_appid: + parsed_addr, parsed_appid = self._scheduler._parse_app_id(test_example) + self.assertEqual(parsed_addr, addr) + self.assertEqual(parsed_appid, app_id) + + def test_list_throws_without_address(self) -> None: + if "RAY_ADDRESS" in os.environ: + del os.environ["RAY_ADDRESS"] + with self.assertRaisesRegex(Exception, "RAY_ADDRESS env variable"): + self._scheduler.list() + + def test_list_doesnt_throw_with_client(self) -> None: + ray_client = JobSubmissionClient(address="https://test.com") + ray_client.list_jobs = MagicMock(return_value=[]) + _scheduler_with_client = RayScheduler("client_session", ray_client) + _scheduler_with_client.list() # testing for success (should not throw exception) + + def test_min_replicas(self) -> None: + app = AppDef( + name="app", + roles=[ Role( name="role", image="/tmp/", num_replicas=2, - min_replicas=1, - ) - ) - with self.assertRaisesRegex( - ValueError, "min_replicas is only supported with single role jobs" - ): - self._scheduler._submit_dryrun(app, cfg={}) - - def test_nonmatching_address(self) -> None: - ray_client = JobSubmissionClient(address="https://test.address.com") - _scheduler_with_client = RayScheduler("client_session", ray_client) - app = AppDef( - name="app", - roles=[ - Role(name="role", image="."), - ], - ) - with self.assertRaisesRegex( - ValueError, "client netloc .* does not match job netloc .*" - ): - _scheduler_with_client.submit(app=app, cfg={}) - - def _assertDictContainsSubset( - self, - expected: dict[str, Any], - actual: dict[str, Any], - msg: Optional[str] = None, - ) -> None: - # NB: implement unittest.TestCase.assertDictContainsSubsetNew() since it was removed in python-3.11 - for key, value in expected.items(): - self.assertIn(key, actual, msg) - self.assertEqual(actual[key], value, msg) - - def test_client_with_headers(self) -> None: - # This tests only one option for the client. Different versions may have more options available. - headers = {"Authorization": "Bearer: token"} - ray_client = JobSubmissionClient( - address="https://test.com", headers=headers, verify=False + ), + ], + ) + req = self._scheduler._submit_dryrun(app, cfg={}) + job = req.request + self.assertEqual(job.actors[0].min_replicas, None) + + app.roles[0].min_replicas = 1 + req = self._scheduler._submit_dryrun(app, cfg={}) + job = req.request + self.assertEqual(job.actors[0].min_replicas, 1) + + app.roles.append( + Role( + name="role", + image="/tmp/", + num_replicas=2, + min_replicas=1, ) - _scheduler_with_client = RayScheduler("client_session", ray_client) - scheduler_client = _scheduler_with_client._get_ray_client() - self._assertDictContainsSubset(scheduler_client._headers, headers) - - class RayClusterSetup: - _instance = None # pyre-ignore - _cluster = None # pyre-ignore - - def __new__(cls): # pyre-ignore - if cls._instance is None: - cls._instance = super(RayClusterSetup, cls).__new__(cls) - ray.shutdown() - cls._cluster = Cluster( - initialize_head=True, - head_node_args={ - "num_cpus": 1, - }, - ) - cls._cluster.connect() # connect before any node changes - cls._cluster.add_node() # total of 2 cpus available - cls.reference_count: int = 4 - return cls._instance - - @property - def workers(self) -> List[object]: - return list(self._cluster.worker_nodes) - - def add_node(self, num_cpus: int = 1) -> None: - # add 1 node with 2 cpus to the cluster - self._cluster.add_node(num_cpus=num_cpus) - - def remove_node(self) -> None: - # randomly remove 1 node from the cluster - self._cluster.remove_node(self.workers[0]) - - def decrement_reference(self) -> None: - self.reference_count -= 1 - if self.reference_count == 0: - self.teardown_ray_cluster() - - def teardown_ray_cluster(self) -> None: + ) + with self.assertRaisesRegex( + ValueError, "min_replicas is only supported with single role jobs" + ): + self._scheduler._submit_dryrun(app, cfg={}) + + def test_nonmatching_address(self) -> None: + ray_client = JobSubmissionClient(address="https://test.address.com") + _scheduler_with_client = RayScheduler("client_session", ray_client) + app = AppDef( + name="app", + roles=[ + Role(name="role", image="."), + ], + ) + with self.assertRaisesRegex( + ValueError, "client netloc .* does not match job netloc .*" + ): + _scheduler_with_client.submit(app=app, cfg={}) + + def _assertDictContainsSubset( + self, + expected: dict[str, Any], + actual: dict[str, Any], + msg: Optional[str] = None, + ) -> None: + # NB: implement unittest.TestCase.assertDictContainsSubsetNew() since it was removed in python-3.11 + for key, value in expected.items(): + self.assertIn(key, actual, msg) + self.assertEqual(actual[key], value, msg) + + def test_client_with_headers(self) -> None: + # This tests only one option for the client. Different versions may have more options available. + headers = {"Authorization": "Bearer: token"} + ray_client = JobSubmissionClient( + address="https://test.com", headers=headers, verify=False + ) + _scheduler_with_client = RayScheduler("client_session", ray_client) + scheduler_client = _scheduler_with_client._get_ray_client() + self._assertDictContainsSubset(scheduler_client._headers, headers) + + +class RayClusterSetup: + _instance = None # pyre-ignore + _cluster = None # pyre-ignore + + def __new__(cls): # pyre-ignore + if cls._instance is None: + cls._instance = super(RayClusterSetup, cls).__new__(cls) ray.shutdown() - self._cluster.shutdown() - del os.environ["RAY_ADDRESS"] - - class RayDriverTest(TestCase): - def test_actors_serialize(self) -> None: - actor1 = RayActor( - name="test_actor_1", - command=["python", "1", "2"], - env={"fake": "1"}, - min_replicas=2, + cls._cluster = Cluster( + initialize_head=True, + head_node_args={ + "num_cpus": 1, + }, ) - actor2 = RayActor( - name="test_actor_2", - command=["python", "3", "4"], - env={"fake": "2"}, + cls._cluster.connect() # connect before any node changes + cls._cluster.add_node() # total of 2 cpus available + cls.reference_count: int = 4 + return cls._instance + + @property + def workers(self) -> List[object]: + return list(self._cluster.worker_nodes) + + def add_node(self, num_cpus: int = 1) -> None: + # add 1 node with 2 cpus to the cluster + self._cluster.add_node(num_cpus=num_cpus) + + def remove_node(self) -> None: + # randomly remove 1 node from the cluster + self._cluster.remove_node(self.workers[0]) + + def decrement_reference(self) -> None: + self.reference_count -= 1 + if self.reference_count == 0: + self.teardown_ray_cluster() + + def teardown_ray_cluster(self) -> None: + ray.shutdown() + self._cluster.shutdown() + del os.environ["RAY_ADDRESS"] + + +class RayDriverTest(TestCase): + def test_actors_serialize(self) -> None: + actor1 = RayActor( + name="test_actor_1", + command=["python", "1", "2"], + env={"fake": "1"}, + min_replicas=2, + ) + actor2 = RayActor( + name="test_actor_2", + command=["python", "3", "4"], + env={"fake": "2"}, + min_replicas=2, + ) + actors = [actor1, actor2] + current_dir = os.path.dirname(os.path.realpath(__file__)) + serialize(actors, current_dir) + + loaded_actor = ray_driver.load_actor_json( + os.path.join(current_dir, "actors.json") + ) + self.assertEqual(loaded_actor, actors) + + def test_unknown_result(self) -> None: + actor1 = RayActor( + name="test_actor_1", + command=[ + "python", + "-c" 'import time; time.sleep(1); print("test_actor_1")', + ], + env={"fake": "1"}, + ) + actors = [ + actor1, + ] + driver = ray_driver.RayDriver(actors) + ray_cluster_setup = RayClusterSetup() + self.assertEqual(driver.min_replicas, 1) + self.assertEqual(driver.max_replicas, 1) + + @ray.remote + def f() -> int: + return 1 + + driver.active_tasks = [f.remote()] + with self.assertRaises(RuntimeError): + driver._step() + + ray_cluster_setup.decrement_reference() + + def test_ray_driver_gang(self) -> None: + """Test launching a gang scheduling job""" + actor1 = RayActor( + name="test_actor_1", + command=[ + "python", + "-c" 'import time; time.sleep(1); print("test_actor_1")', + ], + env={"fake": "1"}, + min_replicas=2, + ) + actor2 = RayActor( + name="test_actor_2", + command=[ + "python", + "-c" 'import time; time.sleep(1); print("test_actor_2")', + ], + env={"fake": "2"}, + min_replicas=2, + ) + actors = [actor1, actor2] + + driver = ray_driver.RayDriver(actors) + ray_cluster_setup = RayClusterSetup() + + # test init_placement_groups + driver.init_placement_groups() + self.assertEqual(len(driver.placement_groups), 1) + self.assertEqual(len(driver.active_tasks), 0) + + driver.place_command_actors() + self.assertEqual(len(driver.active_tasks), 2) + self.assertEqual(len(driver.actor_info_of_id), 2) + + driver.run() # execute commands on command actors + self.assertEqual( + len(driver.active_tasks), 0 + ) # wait util all active tasks finishes + self.assertEqual(driver.command_actors_count, 0) + self.assertIsNotNone(driver.rank_0_address) + self.assertIsNotNone(driver.rank_0_port) + + # ray.available_resources()['CPU'] == 0 + for pg in driver.placement_groups: + # clear used placement groups + remove_placement_group(pg) + # ray.available_resources()['CPU'] == 2 + + ray_cluster_setup.decrement_reference() + + def test_ray_driver_elasticity(self) -> None: + """Test launching an elasticity job""" + actor1 = RayActor( + name="test_actor_1", + command=[ + "python", + "-c" 'import time; time.sleep(1); print("test_actor_elasticity_1")', + ], + env={"fake": "1"}, + min_replicas=1, + ) + actor2 = RayActor( + name="test_actor_2", + command=[ + "python", + "-c" 'import time; time.sleep(1); print("test_actor_elasticity_2")', + ], + env={"fake": "2"}, + min_replicas=1, + ) + actors = [actor1, actor2] + + driver = ray_driver.RayDriver(actors) + ray_cluster_setup = RayClusterSetup() + ray_cluster_setup.remove_node() # Remove 1 cpu, should have 1 cpu in the cluster + + # 1. test init_placement_groups + driver.init_placement_groups() + self.assertEqual(len(driver.placement_groups), 2) # 2 placement groups created + self.assertEqual(len(driver.active_tasks), 0) + created, pending = ray.wait( + [driver.placement_groups[0].ready(), driver.placement_groups[1].ready()] + ) + self.assertEqual(len(created), 1) + self.assertEqual(len(pending), 1) + + # 2. test place_command_actors + driver.place_command_actors() + self.assertEqual(len(driver.active_tasks), 2) # 2 command actors + self.assertEqual(len(driver.actor_info_of_id), 2) + self.assertEqual(driver.command_actors_count, 0) + + # 3-1 + teriminal = driver._step() # actor 1 scheduled, execute the script + self.assertEqual(teriminal, False) + self.assertEqual(len(driver.active_tasks), 2) # actor1 should be finished + self.assertEqual(driver.command_actors_count, 1) + self.assertIsNotNone(driver.rank_0_address) + self.assertIsNotNone(driver.rank_0_port) + + # 3-2 + terminal = ( + driver._step() + ) # actor 1 finished, actor 2 has been scheduled yet, usually, the driver stops here + self.assertEqual(terminal, True) + self.assertEqual(driver.command_actors_count, 0) + self.assertEqual(len(driver.active_tasks), 1) # actor schedule task + self.assertEqual(driver.terminating, True) + + ray_cluster_setup.add_node() # add 1 cpu to the cluster + # 3-3 + teriminal = ( + driver._step() + ) # pg 2 becomes available, but actor 2 shouldn't be executed + self.assertEqual(teriminal, False) + self.assertEqual(len(driver.active_tasks), 0) # actor1 should be finished + self.assertEqual(driver.command_actors_count, 0) + + for pg in driver.placement_groups: + # clear used placement groups + remove_placement_group(pg) + + ray_cluster_setup.decrement_reference() + + +class RayIntegrationTest(TestCase): + def test_ray_cluster(self) -> None: + ray_cluster_setup = RayClusterSetup() + ray_scheduler = self.setup_ray_cluster() + self.assertTrue(ray.is_initialized()) + + job_id = self.schedule_ray_job(ray_scheduler) + self.assertIsNotNone(job_id) + + ray_scheduler.wait_until_finish(job_id, 100) + + logs = self.check_logs(ray_scheduler=ray_scheduler, app_id=job_id) + print(logs) + self.assertIsNotNone(logs) + + status = self.describe(ray_scheduler, job_id) + self.assertIsNotNone(status) + + apps = self.list(ray_scheduler) + self.assertEqual(len(apps), 2) + self.assertEqual(apps[0].app_id, job_id) + + ray_cluster_setup.decrement_reference() + + def setup_ray_cluster(self) -> RayScheduler: + ray_scheduler = RayScheduler(session_name="test") + return ray_scheduler + + def schedule_ray_job(self, ray_scheduler: RayScheduler, app_id: str = "123") -> str: + current_dir = os.path.dirname(os.path.realpath(__file__)) + # Ray packaging honours .gitignore file -> create staging directory just for packaging: + # - job will use it as a cwd and copy ray_driver.py + # - test will copy training script to the same destination + staging_dir = os.path.join(current_dir, "staging") + os.makedirs(staging_dir, exist_ok=True) + copy2(os.path.join(current_dir, "train.py"), staging_dir) + actors = [ + RayActor( + name="ddp", + num_cpus=1, + command=[os.path.join(staging_dir, "train.py")], min_replicas=2, - ) - actors = [actor1, actor2] - current_dir = os.path.dirname(os.path.realpath(__file__)) - serialize(actors, current_dir) - - loaded_actor = ray_driver.load_actor_json( - os.path.join(current_dir, "actors.json") - ) - self.assertEqual(loaded_actor, actors) - - def test_unknown_result(self) -> None: - actor1 = RayActor( - name="test_actor_1", - command=[ - "python", - "-c" 'import time; time.sleep(1); print("test_actor_1")', - ], - env={"fake": "1"}, - ) - actors = [ - actor1, - ] - driver = ray_driver.RayDriver(actors) - ray_cluster_setup = RayClusterSetup() - self.assertEqual(driver.min_replicas, 1) - self.assertEqual(driver.max_replicas, 1) - - @ray.remote - def f() -> int: - return 1 - - driver.active_tasks = [f.remote()] - with self.assertRaises(RuntimeError): - driver._step() - - ray_cluster_setup.decrement_reference() - - def test_ray_driver_gang(self) -> None: - """Test launching a gang scheduling job""" - actor1 = RayActor( - name="test_actor_1", - command=[ - "python", - "-c" 'import time; time.sleep(1); print("test_actor_1")', - ], - env={"fake": "1"}, + ), + RayActor( + name="ddp", + num_cpus=1, + command=[os.path.join(staging_dir, "train.py")], min_replicas=2, - ) - actor2 = RayActor( - name="test_actor_2", - command=[ - "python", - "-c" 'import time; time.sleep(1); print("test_actor_2")', - ], - env={"fake": "2"}, - min_replicas=2, - ) - actors = [actor1, actor2] - - driver = ray_driver.RayDriver(actors) - ray_cluster_setup = RayClusterSetup() - - # test init_placement_groups - driver.init_placement_groups() - self.assertEqual(len(driver.placement_groups), 1) - self.assertEqual(len(driver.active_tasks), 0) - - driver.place_command_actors() - self.assertEqual(len(driver.active_tasks), 2) - self.assertEqual(len(driver.actor_info_of_id), 2) - - driver.run() # execute commands on command actors - self.assertEqual( - len(driver.active_tasks), 0 - ) # wait util all active tasks finishes - self.assertEqual(driver.command_actors_count, 0) - self.assertIsNotNone(driver.rank_0_address) - self.assertIsNotNone(driver.rank_0_port) - - # ray.available_resources()['CPU'] == 0 - for pg in driver.placement_groups: - # clear used placement groups - remove_placement_group(pg) - # ray.available_resources()['CPU'] == 2 - - ray_cluster_setup.decrement_reference() - - def test_ray_driver_elasticity(self) -> None: - """Test launching an elasticity job""" - actor1 = RayActor( - name="test_actor_1", - command=[ - "python", - "-c" 'import time; time.sleep(1); print("test_actor_elasticity_1")', - ], - env={"fake": "1"}, - min_replicas=1, - ) - actor2 = RayActor( - name="test_actor_2", - command=[ - "python", - "-c" 'import time; time.sleep(1); print("test_actor_elasticity_2")', - ], - env={"fake": "2"}, - min_replicas=1, - ) - actors = [actor1, actor2] - - driver = ray_driver.RayDriver(actors) - ray_cluster_setup = RayClusterSetup() - ray_cluster_setup.remove_node() # Remove 1 cpu, should have 1 cpu in the cluster - - # 1. test init_placement_groups - driver.init_placement_groups() - self.assertEqual( - len(driver.placement_groups), 2 - ) # 2 placement groups created - self.assertEqual(len(driver.active_tasks), 0) - created, pending = ray.wait( - [driver.placement_groups[0].ready(), driver.placement_groups[1].ready()] - ) - self.assertEqual(len(created), 1) - self.assertEqual(len(pending), 1) - - # 2. test place_command_actors - driver.place_command_actors() - self.assertEqual(len(driver.active_tasks), 2) # 2 command actors - self.assertEqual(len(driver.actor_info_of_id), 2) - self.assertEqual(driver.command_actors_count, 0) - - # 3-1 - teriminal = driver._step() # actor 1 scheduled, execute the script - self.assertEqual(teriminal, False) - self.assertEqual(len(driver.active_tasks), 2) # actor1 should be finished - self.assertEqual(driver.command_actors_count, 1) - self.assertIsNotNone(driver.rank_0_address) - self.assertIsNotNone(driver.rank_0_port) - - # 3-2 - terminal = ( - driver._step() - ) # actor 1 finished, actor 2 has been scheduled yet, usually, the driver stops here - self.assertEqual(terminal, True) - self.assertEqual(driver.command_actors_count, 0) - self.assertEqual(len(driver.active_tasks), 1) # actor schedule task - self.assertEqual(driver.terminating, True) - - ray_cluster_setup.add_node() # add 1 cpu to the cluster - # 3-3 - teriminal = ( - driver._step() - ) # pg 2 becomes available, but actor 2 shouldn't be executed - self.assertEqual(teriminal, False) - self.assertEqual(len(driver.active_tasks), 0) # actor1 should be finished - self.assertEqual(driver.command_actors_count, 0) - - for pg in driver.placement_groups: - # clear used placement groups - remove_placement_group(pg) - - ray_cluster_setup.decrement_reference() - - class RayIntegrationTest(TestCase): - def test_ray_cluster(self) -> None: - ray_cluster_setup = RayClusterSetup() - ray_scheduler = self.setup_ray_cluster() - self.assertTrue(ray.is_initialized()) - - job_id = self.schedule_ray_job(ray_scheduler) - self.assertIsNotNone(job_id) - - ray_scheduler.wait_until_finish(job_id, 100) - - logs = self.check_logs(ray_scheduler=ray_scheduler, app_id=job_id) - print(logs) - self.assertIsNotNone(logs) - - status = self.describe(ray_scheduler, job_id) - self.assertIsNotNone(status) - - apps = self.list(ray_scheduler) - self.assertEqual(len(apps), 2) - self.assertEqual(apps[0].app_id, job_id) - - ray_cluster_setup.decrement_reference() - - def setup_ray_cluster(self) -> RayScheduler: - ray_scheduler = RayScheduler(session_name="test") - return ray_scheduler - - def schedule_ray_job( - self, ray_scheduler: RayScheduler, app_id: str = "123" - ) -> str: - current_dir = os.path.dirname(os.path.realpath(__file__)) - # Ray packaging honours .gitignore file -> create staging directory just for packaging: - # - job will use it as a cwd and copy ray_driver.py - # - test will copy training script to the same destination - staging_dir = os.path.join(current_dir, "staging") - os.makedirs(staging_dir, exist_ok=True) - copy2(os.path.join(current_dir, "train.py"), staging_dir) - actors = [ - RayActor( - name="ddp", - num_cpus=1, - command=[os.path.join(staging_dir, "train.py")], - min_replicas=2, - ), - RayActor( - name="ddp", - num_cpus=1, - command=[os.path.join(staging_dir, "train.py")], - min_replicas=2, - ), - ] - - ray_job = RayJob( - app_id=app_id, - dashboard_address="127.0.0.1:8265", - actors=actors, - working_dir=staging_dir, - ) - app_info = AppDryRunInfo(ray_job, repr) - job_id = ray_scheduler.schedule(app_info) - return job_id - - def describe( - self, ray_scheduler: RayScheduler, app_id: str = "123" - ) -> Optional[DescribeAppResponse]: - return ray_scheduler.describe(app_id) - - def check_logs( - self, ray_scheduler: RayScheduler, app_id: str = "123" - ) -> Iterable[str]: - return ray_scheduler.log_iter(app_id=app_id) - - def list(self, ray_scheduler: RayScheduler) -> List[ListAppResponse]: - os.environ["RAY_ADDRESS"] = "http://127.0.0.1:8265" - return ray_scheduler.list() + ), + ] + + ray_job = RayJob( + app_id=app_id, + dashboard_address="127.0.0.1:8265", + actors=actors, + working_dir=staging_dir, + ) + app_info = AppDryRunInfo(ray_job, repr) + job_id = ray_scheduler.schedule(app_info) + return job_id + + def describe( + self, ray_scheduler: RayScheduler, app_id: str = "123" + ) -> Optional[DescribeAppResponse]: + return ray_scheduler.describe(app_id) + + def check_logs( + self, ray_scheduler: RayScheduler, app_id: str = "123" + ) -> Iterable[str]: + return ray_scheduler.log_iter(app_id=app_id) + + def list(self, ray_scheduler: RayScheduler) -> List[ListAppResponse]: + os.environ["RAY_ADDRESS"] = "http://127.0.0.1:8265" + return ray_scheduler.list()