From f883aa65077ff5becda40c47abb96cfea3c3606e Mon Sep 17 00:00:00 2001 From: hemildesai Date: Mon, 31 Jul 2023 23:02:36 -0700 Subject: [PATCH 1/3] Query cloud specific env vars in task setup --- sky/adaptors/kubernetes.py | 5 +++++ sky/backends/cloud_vm_ray_backend.py | 4 ++++ sky/clouds/cloud.py | 14 ++++++++++++++ sky/clouds/kubernetes.py | 24 ++++++++++++++++++++++++ sky/task.py | 9 ++------- sky/utils/common_utils.py | 7 +++++++ 6 files changed, 56 insertions(+), 7 deletions(-) diff --git a/sky/adaptors/kubernetes.py b/sky/adaptors/kubernetes.py index 79daa6f2434..f746d3d03fd 100644 --- a/sky/adaptors/kubernetes.py +++ b/sky/adaptors/kubernetes.py @@ -138,3 +138,8 @@ def config_exception(): @import_package def max_retry_error(): return urllib3.exceptions.MaxRetryError + + +@import_package +def stream(): + return kubernetes.stream.stream diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index b8daa6adabe..480b3d1b29b 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -2828,6 +2828,10 @@ def _setup(self, handle: CloudVmRayResourceHandle, task: task_lib.Task, style = colorama.Style fore = colorama.Fore + cloud_env_vars = handle.launched_resources.cloud.query_env_vars( + handle.cluster_name) + task.update_envs(cloud_env_vars) + if task.setup is None: return diff --git a/sky/clouds/cloud.py b/sky/clouds/cloud.py index 5e4fdec042f..781a8db2e3d 100644 --- a/sky/clouds/cloud.py +++ b/sky/clouds/cloud.py @@ -609,6 +609,20 @@ def query_status(cls, name: str, tag_filters: Dict[str, str], """ raise NotImplementedError + @classmethod + def query_env_vars(cls, name: str) -> Dict[str, str]: + """Queries cloud specific environment variables of the cluster. + + Since all commands in the cluster are executed using ssh, we need + to query the cloud specific environment variables which may be missed when using ssh. + For instance, on K8s, environment variables when using kubectl exec are different from + those when using ssh. + + Returns: + A dictionary of environment variables. + """ + return {} + # === Image related methods === # These three methods are used to create, move and delete images. They # are currently only used in `sky launch --clone-disk-from` to clone a diff --git a/sky/clouds/kubernetes.py b/sky/clouds/kubernetes.py index a2c339f9791..a249cdc200c 100644 --- a/sky/clouds/kubernetes.py +++ b/sky/clouds/kubernetes.py @@ -406,3 +406,27 @@ def query_status(cls, name: str, tag_filters: Dict[str, str], cluster_status.append(status_lib.ClusterStatus.INIT) # If pods are not found, we don't add them to the return list return cluster_status + + @classmethod + def query_env_vars(cls, name: str) -> Dict[str, str]: + namespace = kubernetes_utils.get_current_kube_config_context_namespace() + pod = kubernetes.core_api().list_namespaced_pod( + namespace, + label_selector=f'skypilot-cluster={name},ray-node-type=head' + ).items[0] + response = kubernetes.stream()( + kubernetes.core_api().connect_get_namespaced_pod_exec, + pod.metadata.name, + namespace, + command=['env'], + stderr=True, + stdin=False, + stdout=True, + tty=False, + ) + return dict([ + line.split('=', 1) + for line in response.split('\n') + if '=' in line and + common_utils.is_valid_env_var(line.split('=', 1)[0]) + ]) diff --git a/sky/task.py b/sky/task.py index 39c2727c53d..14682d994b4 100644 --- a/sky/task.py +++ b/sky/task.py @@ -18,6 +18,7 @@ from sky.skylet import constants from sky.utils import schemas from sky.utils import ux_utils +from sky.utils.common_utils import is_valid_env_var if typing.TYPE_CHECKING: from sky import resources as resources_lib @@ -27,7 +28,6 @@ CommandOrCommandGen = Union[str, CommandGen] _VALID_NAME_REGEX = '[a-z0-9]+(?:[._-]{1,2}[a-z0-9]+)*' -_VALID_ENV_VAR_REGEX = '[a-zA-Z_][a-zA-Z0-9_]*' _VALID_NAME_DESCR = ('ASCII characters and may contain lowercase and' ' uppercase letters, digits, underscores, periods,' ' and dashes. Must start and end with alphanumeric' @@ -64,11 +64,6 @@ def _is_valid_name(name: str) -> bool: return bool(re.fullmatch(_VALID_NAME_REGEX, name)) -def _is_valid_env_var(name: str) -> bool: - """Checks if the task environment variable name is valid.""" - return bool(re.fullmatch(_VALID_ENV_VAR_REGEX, name)) - - def _fill_in_env_vars_in_file_mounts( file_mounts: Dict[str, Any], task_envs: Dict[str, str], @@ -446,7 +441,7 @@ def update_envs( if not isinstance(key, str): with ux_utils.print_exception_no_traceback(): raise ValueError('Env keys must be strings.') - if not _is_valid_env_var(key): + if not is_valid_env_var(key): with ux_utils.print_exception_no_traceback(): raise ValueError(f'Invalid env key: {key}') else: diff --git a/sky/utils/common_utils.py b/sky/utils/common_utils.py index 2467b88003b..425c96575c3 100644 --- a/sky/utils/common_utils.py +++ b/sky/utils/common_utils.py @@ -30,6 +30,8 @@ _PAYLOAD_PATTERN = re.compile(r'(.*)') _PAYLOAD_STR = '{}' +_VALID_ENV_VAR_REGEX = '[a-zA-Z_][a-zA-Z0-9_]*' + logger = sky_logging.init_logger(__name__) _usage_run_id = None @@ -409,3 +411,8 @@ def find_free_port(start_port: int) -> int: except OSError: pass raise OSError('No free ports available.') + + +def is_valid_env_var(name: str) -> bool: + """Checks if the task environment variable name is valid.""" + return bool(re.fullmatch(_VALID_ENV_VAR_REGEX, name)) From 40b9fddc714776dd5c1a161a467db08848fe0db5 Mon Sep 17 00:00:00 2001 From: hemildesai Date: Tue, 1 Aug 2023 18:39:08 -0700 Subject: [PATCH 2/3] Make query_env_vars specific to Kubernetes cloud --- sky/backends/cloud_vm_ray_backend.py | 7 ++++--- sky/clouds/cloud.py | 14 -------------- 2 files changed, 4 insertions(+), 17 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 480b3d1b29b..791e153103e 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -2828,9 +2828,10 @@ def _setup(self, handle: CloudVmRayResourceHandle, task: task_lib.Task, style = colorama.Style fore = colorama.Fore - cloud_env_vars = handle.launched_resources.cloud.query_env_vars( - handle.cluster_name) - task.update_envs(cloud_env_vars) + if isinstance(handle.launched_resources.cloud, clouds.Kubernetes): + cloud_env_vars = handle.launched_resources.cloud.query_env_vars( + handle.cluster_name) + task.update_envs(cloud_env_vars) if task.setup is None: return diff --git a/sky/clouds/cloud.py b/sky/clouds/cloud.py index 781a8db2e3d..5e4fdec042f 100644 --- a/sky/clouds/cloud.py +++ b/sky/clouds/cloud.py @@ -609,20 +609,6 @@ def query_status(cls, name: str, tag_filters: Dict[str, str], """ raise NotImplementedError - @classmethod - def query_env_vars(cls, name: str) -> Dict[str, str]: - """Queries cloud specific environment variables of the cluster. - - Since all commands in the cluster are executed using ssh, we need - to query the cloud specific environment variables which may be missed when using ssh. - For instance, on K8s, environment variables when using kubectl exec are different from - those when using ssh. - - Returns: - A dictionary of environment variables. - """ - return {} - # === Image related methods === # These three methods are used to create, move and delete images. They # are currently only used in `sky launch --clone-disk-from` to clone a From b95ed1e0aba006b6a0180bb3aa64c9b211d03127 Mon Sep 17 00:00:00 2001 From: hemildesai Date: Thu, 3 Aug 2023 18:08:22 -0700 Subject: [PATCH 3/3] Address PR comments --- sky/backends/cloud_vm_ray_backend.py | 19 +++++++++++++++---- sky/clouds/kubernetes.py | 13 ++++++------- sky/task.py | 4 ++-- 3 files changed, 23 insertions(+), 13 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 791e153103e..c5675d02dcc 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -2822,16 +2822,26 @@ def _sync_file_mounts( self._execute_file_mounts(handle, all_file_mounts) self._execute_storage_mounts(handle, storage_mounts) + def _update_envs_for_k8s(self, handle: CloudVmRayResourceHandle, + task: task_lib.Task) -> None: + """Update envs for a task with Kubernetes specific env vars if cloud is Kubernetes.""" + if isinstance(handle.launched_resources.cloud, clouds.Kubernetes): + temp_envs = copy.deepcopy(task.envs) + cloud_env_vars = handle.launched_resources.cloud.query_env_vars( + handle.cluster_name) + task.update_envs(cloud_env_vars) + + # Re update the envs with the original envs to give priority to + # the original envs. + task.update_envs(temp_envs) + def _setup(self, handle: CloudVmRayResourceHandle, task: task_lib.Task, detach_setup: bool) -> None: start = time.time() style = colorama.Style fore = colorama.Fore - if isinstance(handle.launched_resources.cloud, clouds.Kubernetes): - cloud_env_vars = handle.launched_resources.cloud.query_env_vars( - handle.cluster_name) - task.update_envs(cloud_env_vars) + self._update_envs_for_k8s(handle, task) if task.setup is None: return @@ -3143,6 +3153,7 @@ def _execute( # Check the task resources vs the cluster resources. Since `sky exec` # will not run the provision and _check_existing_cluster self.check_resources_fit_cluster(handle, task) + self._update_envs_for_k8s(handle, task) resources_str = backend_utils.get_task_resources_str(task) diff --git a/sky/clouds/kubernetes.py b/sky/clouds/kubernetes.py index a249cdc200c..41d266b8c81 100644 --- a/sky/clouds/kubernetes.py +++ b/sky/clouds/kubernetes.py @@ -423,10 +423,9 @@ def query_env_vars(cls, name: str) -> Dict[str, str]: stdin=False, stdout=True, tty=False, - ) - return dict([ - line.split('=', 1) - for line in response.split('\n') - if '=' in line and - common_utils.is_valid_env_var(line.split('=', 1)[0]) - ]) + _request_timeout=kubernetes.API_TIMEOUT) + lines: List[List[str]] = [ + line.split('=', 1) for line in response.split('\n') if '=' in line + ] + return dict( + [line for line in lines if common_utils.is_valid_env_var(line[0])]) diff --git a/sky/task.py b/sky/task.py index 14682d994b4..6ab8c4aac22 100644 --- a/sky/task.py +++ b/sky/task.py @@ -18,7 +18,7 @@ from sky.skylet import constants from sky.utils import schemas from sky.utils import ux_utils -from sky.utils.common_utils import is_valid_env_var +from sky.utils import common_utils if typing.TYPE_CHECKING: from sky import resources as resources_lib @@ -441,7 +441,7 @@ def update_envs( if not isinstance(key, str): with ux_utils.print_exception_no_traceback(): raise ValueError('Env keys must be strings.') - if not is_valid_env_var(key): + if not common_utils.is_valid_env_var(key): with ux_utils.print_exception_no_traceback(): raise ValueError(f'Invalid env key: {key}') else: