From 4b6dc8507c78f9f2fe837c1eed75df2ec38e3021 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Sun, 14 Jul 2024 04:27:08 +0000 Subject: [PATCH] wip --- sky/backends/cloud_vm_ray_backend.py | 26 +++++++++++++++++++------- sky/jobs/utils.py | 15 +++------------ 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 2821d17a9c6..9f20625418e 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -702,9 +702,8 @@ class FailoverCloudErrorHandlerV1: """ @staticmethod - def _handle_errors( - stdout: str, stderr: str, - is_error_str_known: Callable[[str], bool]) -> List[str]: + def _handle_errors(stdout: str, stderr: str, + is_error_str_known: Callable[[str], bool]) -> List[str]: stdout_splits = stdout.split('\n') stderr_splits = stderr.split('\n') errors = [ @@ -820,11 +819,12 @@ def _oci_handler(blocked_resources: Set['resources_lib.Resources'], stderr: str): known_service_errors = [ 'NotAuthorizedOrNotFound', 'CannotParseRequest', 'InternalError', - 'LimitExceeded', 'NotAuthenticated'] + 'LimitExceeded', 'NotAuthenticated' + ] errors = FailoverCloudErrorHandlerV1._handle_errors( stdout, stderr, lambda x: 'VcnSubnetNotFound' in x.strip() or - ('oci.exceptions.ServiceError' in x.strip() and - any(known_err in x.strip() for known_err in known_service_errors))) + ('oci.exceptions.ServiceError' in x.strip() and any( + known_err in x.strip() for known_err in known_service_errors))) logger.warning(f'Got error(s) in {region.name}:') messages = '\n\t'.join(errors) style = colorama.Style @@ -2339,8 +2339,20 @@ def get_command_runners(self, self.cluster_yaml, self.docker_user, self.ssh_user) if avoid_ssh_control: ssh_credentials.pop('ssh_control_name', None) + updated_to_skypilot_provisioner_after_provisioned = ( + self.launched_resources.cloud.PROVISIONER_VERSION >= + clouds.ProvisionerVersion.SKYPILOT and + self.cached_external_ips is not None and + self.cached_cluster_info is None) + if updated_to_skypilot_provisioner_after_provisioned: + logger.debug( + f'{self.launched_resources.cloud} has been updated to the new ' + f'provisioner after cluster {self.cluster_name} was ' + f'provisioned. Cached IPs are used for connecting to the ' + 'cluster.') if (clouds.ProvisionerVersion.RAY_PROVISIONER_SKYPILOT_TERMINATOR >= - self.launched_resources.cloud.PROVISIONER_VERSION): + self.launched_resources.cloud.PROVISIONER_VERSION or + updated_to_skypilot_provisioner_after_provisioned): ip_list = (self.cached_external_ips if force_cached else self.external_ips()) if ip_list is None: diff --git a/sky/jobs/utils.py b/sky/jobs/utils.py index 7593c97d773..69f3acf341e 100644 --- a/sky/jobs/utils.py +++ b/sky/jobs/utils.py @@ -822,12 +822,7 @@ def stream_logs(cls, follow={follow}, controller={controller}) print(msg, flush=True) """) - # Activate the python env to make sure some cloud CLIs are available in - # the subprocess, such as az. This is useful for a controller to query - # statuses of old Azure instances that was provisioned with ray - # autoscaler. - # TODO(zhwu): figure out why we need to activate env here. - return cls._build(code, activate_env=True) + return cls._build(code) @classmethod def set_pending(cls, job_id: int, managed_job_dag: 'dag_lib.Dag') -> str: @@ -846,12 +841,8 @@ def set_pending(cls, job_id: int, managed_job_dag: 'dag_lib.Dag') -> str: return cls._build(code) @classmethod - def _build(cls, code: str, activate_env: bool=False) -> str: + def _build(cls, code: str) -> str: generated_code = cls._PREFIX + '\n' + code - - active_cmd = '' - if activate_env: - active_cmd = f'{constants.ACTIVATE_SKY_REMOTE_PYTHON_ENV}; ' + return ( - f'{active_cmd}' f'{constants.SKY_PYTHON_CMD} -u -c {shlex.quote(generated_code)}')