diff --git a/examples/tpu/tpuvm_mnist.yaml b/examples/tpu/tpuvm_mnist.yaml index d1fd434fad6..41b14283fac 100644 --- a/examples/tpu/tpuvm_mnist.yaml +++ b/examples/tpu/tpuvm_mnist.yaml @@ -5,7 +5,7 @@ resources: # The setup command. Will be run under the working directory. setup: | - git clone https://github.com/google/flax.git --branch v0.8.2 + git clone https://github.com/google/flax.git --branch v0.10.1 conda activate flax if [ $? -eq 0 ]; then @@ -15,7 +15,7 @@ setup: | conda activate flax # Make sure to install TPU related packages in a conda env to avoid package conflicts. pip install \ - -f https://storage.googleapis.com/jax-releases/libtpu_releases.html "jax[tpu]==0.4.25" \ + -f https://storage.googleapis.com/jax-releases/libtpu_releases.html "jax[tpu]==0.4.35" \ clu \ tensorflow tensorflow-datasets pip install -e flax diff --git a/sky/cli.py b/sky/cli.py index 230f43d6f90..490749d1231 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -3143,7 +3143,8 @@ def _get_kubernetes_realtime_gpu_table( 'in Kubernetes cluster. ') debug_msg = ('To show available accelerators on kubernetes,' ' run: sky show-gpus --cloud kubernetes ') - full_err_msg = (err_msg + kubernetes_utils.NO_GPU_HELP_MESSAGE + + full_err_msg = (err_msg + + kubernetes_utils.NO_ACCELERATOR_HELP_MESSAGE + debug_msg) raise ValueError(full_err_msg) for gpu, _ in sorted(counts.items()): @@ -3161,11 +3162,12 @@ def _get_kubernetes_node_info_table(context: Optional[str]): node_info_dict = kubernetes_utils.get_kubernetes_node_info(context) for node_name, node_info in node_info_dict.items(): - available = node_info.free['nvidia.com/gpu'] if node_info.free[ - 'nvidia.com/gpu'] != -1 else no_permissions_str + available = node_info.free[ + 'accelerators_available'] if node_info.free[ + 'accelerators_available'] != -1 else no_permissions_str node_table.add_row([ - node_name, node_info.gpu_type, - node_info.total['nvidia.com/gpu'], available + node_name, node_info.accelerator_type, + node_info.total['accelerator_count'], available ]) return node_table @@ -3220,8 +3222,18 @@ def _output(): yield from k8s_realtime_table.get_string() k8s_node_table = _get_kubernetes_node_info_table(context) yield '\n\n' + # TODO(Doyoung): Update the message with the multi-host TPU + # support. + k8s_per_node_acc_message = ( + 'Kubernetes per node accelerator availability ') + if kubernetes_utils.multi_host_tpu_exists_in_cluster( + context): + k8s_per_node_acc_message += ( + '(Note: Multi-host TPUs are detected and excluded ' + 'from the display as multi-host TPUs are not ' + 'supported.)') yield (f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' - f'Kubernetes per node GPU availability' + f'{k8s_per_node_acc_message}' f'{colorama.Style.RESET_ALL}\n') yield from k8s_node_table.get_string() if kubernetes_autoscaling: diff --git a/sky/clouds/kubernetes.py b/sky/clouds/kubernetes.py index d930a24271f..5e1b46d52eb 100644 --- a/sky/clouds/kubernetes.py +++ b/sky/clouds/kubernetes.py @@ -362,11 +362,23 @@ def make_deploy_resources_variables( k8s_acc_label_key = None k8s_acc_label_value = None + k8s_topology_label_key = None + k8s_topology_label_value = None + k8s_resource_key = None + tpu_requested = False - # If GPUs are requested, set node label to match the GPU type. + # If GPU/TPUs are requested, set node label to match the GPU/TPU type. if acc_count > 0 and acc_type is not None: - k8s_acc_label_key, k8s_acc_label_value = \ - kubernetes_utils.get_gpu_label_key_value(context, acc_type) + (k8s_acc_label_key, k8s_acc_label_value, k8s_topology_label_key, + k8s_topology_label_value) = ( + kubernetes_utils.get_accelerator_label_key_value( + context, acc_type, acc_count)) + if (k8s_acc_label_key == + kubernetes_utils.GKELabelFormatter.TPU_LABEL_KEY): + tpu_requested = True + k8s_resource_key = kubernetes_utils.TPU_RESOURCE_KEY + else: + k8s_resource_key = kubernetes_utils.GPU_RESOURCE_KEY port_mode = network_utils.get_port_mode(None) @@ -428,6 +440,10 @@ def make_deploy_resources_variables( 'k8s_skypilot_system_namespace': _SKYPILOT_SYSTEM_NAMESPACE, 'k8s_spot_label_key': spot_label_key, 'k8s_spot_label_value': spot_label_value, + 'tpu_requested': tpu_requested, + 'k8s_topology_label_key': k8s_topology_label_key, + 'k8s_topology_label_value': k8s_topology_label_value, + 'k8s_resource_key': k8s_resource_key, 'image_id': image_id, } diff --git a/sky/clouds/service_catalog/kubernetes_catalog.py b/sky/clouds/service_catalog/kubernetes_catalog.py index 14aa8f93040..6d11d1715e2 100644 --- a/sky/clouds/service_catalog/kubernetes_catalog.py +++ b/sky/clouds/service_catalog/kubernetes_catalog.py @@ -104,16 +104,16 @@ def list_accelerators_realtime( ) or not kubernetes_utils.check_credentials(context)[0]: return {}, {}, {} - has_gpu = kubernetes_utils.detect_gpu_resource(context) + has_gpu = kubernetes_utils.detect_accelerator_resource(context) if not has_gpu: return {}, {}, {} - label_formatter, _ = kubernetes_utils.detect_gpu_label_formatter(context) - if not label_formatter: + lf, _ = kubernetes_utils.detect_gpu_label_formatter(context) + if not lf: return {}, {}, {} accelerators_qtys: Set[Tuple[str, int]] = set() - key = label_formatter.get_label_key() + keys = lf.get_label_keys() nodes = kubernetes_utils.get_kubernetes_nodes(context) # Get the pods to get the real-time GPU usage try: @@ -134,67 +134,85 @@ def list_accelerators_realtime( min_quantity_filter = quantity_filter if quantity_filter else 1 for node in nodes: - if key in node.metadata.labels: - allocated_qty = 0 - accelerator_name = label_formatter.get_accelerator_from_label_value( - node.metadata.labels.get(key)) - - # Check if name_filter regex matches the accelerator_name - regex_flags = 0 if case_sensitive else re.IGNORECASE - if name_filter and not re.match( - name_filter, accelerator_name, flags=regex_flags): - continue - - accelerator_count = int( - node.status.allocatable.get('nvidia.com/gpu', 0)) - - # Generate the GPU quantities for the accelerators - if accelerator_name and accelerator_count > 0: - count = 1 - while count <= accelerator_count: - accelerators_qtys.add((accelerator_name, count)) - count *= 2 - # Add the accelerator count if it's not already in the set - # (e.g., if there's 12 GPUs, we should have qtys 1, 2, 4, 8, 12) - if accelerator_count not in accelerators_qtys: - accelerators_qtys.add((accelerator_name, accelerator_count)) - - if accelerator_count >= min_quantity_filter: - quantized_count = (min_quantity_filter * - (accelerator_count // min_quantity_filter)) - if accelerator_name not in total_accelerators_capacity: - total_accelerators_capacity[ - accelerator_name] = quantized_count - else: - total_accelerators_capacity[ - accelerator_name] += quantized_count - - if pods is None: - # If we can't get the pods, we can't get the GPU usage - total_accelerators_available[accelerator_name] = -1 - continue - - for pod in pods: - # Get all the pods running on the node - if (pod.spec.node_name == node.metadata.name and - pod.status.phase in ['Running', 'Pending']): - # Iterate over all the containers in the pod and sum the - # GPU requests - for container in pod.spec.containers: - if container.resources.requests: - allocated_qty += int( - container.resources.requests.get( - 'nvidia.com/gpu', 0)) - - accelerators_available = accelerator_count - allocated_qty - - if accelerator_name not in total_accelerators_available: - total_accelerators_available[accelerator_name] = 0 - if accelerators_available >= min_quantity_filter: - quantized_availability = min_quantity_filter * ( - accelerators_available // min_quantity_filter) - total_accelerators_available[ - accelerator_name] += quantized_availability + for key in keys: + if key in node.metadata.labels: + allocated_qty = 0 + accelerator_name = lf.get_accelerator_from_label_value( + node.metadata.labels.get(key)) + + # Exclude multi-host TPUs from being processed. + # TODO(Doyoung): Remove the logic when adding support for + # multi-host TPUs. + if kubernetes_utils.is_multi_host_tpu(node.metadata.labels): + continue + + # Check if name_filter regex matches the accelerator_name + regex_flags = 0 if case_sensitive else re.IGNORECASE + if name_filter and not re.match( + name_filter, accelerator_name, flags=regex_flags): + continue + + # Generate the accelerator quantities + accelerator_count = ( + kubernetes_utils.get_node_accelerator_count( + node.status.allocatable)) + + if accelerator_name and accelerator_count > 0: + # TPUs are counted in a different way compared to GPUs. + # Multi-node GPUs can be split into smaller units and be + # provisioned, but TPUs are considered as an atomic unit. + if kubernetes_utils.is_tpu_on_gke(accelerator_name): + accelerators_qtys.add( + (accelerator_name, accelerator_count)) + else: + count = 1 + while count <= accelerator_count: + accelerators_qtys.add((accelerator_name, count)) + count *= 2 + # Add the accelerator count if it's not already in the + # set (e.g., if there's 12 GPUs, we should have qtys 1, + # 2, 4, 8, 12) + if accelerator_count not in accelerators_qtys: + accelerators_qtys.add( + (accelerator_name, accelerator_count)) + + if accelerator_count >= min_quantity_filter: + quantized_count = ( + min_quantity_filter * + (accelerator_count // min_quantity_filter)) + if accelerator_name not in total_accelerators_capacity: + total_accelerators_capacity[ + accelerator_name] = quantized_count + else: + total_accelerators_capacity[ + accelerator_name] += quantized_count + + if pods is None: + # If we can't get the pods, we can't get the GPU usage + total_accelerators_available[accelerator_name] = -1 + continue + + for pod in pods: + # Get all the pods running on the node + if (pod.spec.node_name == node.metadata.name and + pod.status.phase in ['Running', 'Pending']): + # Iterate over all the containers in the pod and sum + # the GPU requests + for container in pod.spec.containers: + if container.resources.requests: + allocated_qty += ( + kubernetes_utils.get_node_accelerator_count( + container.resources.requests)) + + accelerators_available = accelerator_count - allocated_qty + + if accelerator_name not in total_accelerators_available: + total_accelerators_available[accelerator_name] = 0 + if accelerators_available >= min_quantity_filter: + quantized_availability = min_quantity_filter * ( + accelerators_available // min_quantity_filter) + total_accelerators_available[ + accelerator_name] += quantized_availability result = [] diff --git a/sky/clouds/utils/gcp_utils.py b/sky/clouds/utils/gcp_utils.py index cfb893c8cb4..e899c60fa4c 100644 --- a/sky/clouds/utils/gcp_utils.py +++ b/sky/clouds/utils/gcp_utils.py @@ -17,6 +17,7 @@ from sky import sky_logging from sky import skypilot_config from sky.provision.gcp import constants +from sky.provision.kubernetes import utils as kubernetes_utils from sky.utils import subprocess_utils if typing.TYPE_CHECKING: @@ -35,7 +36,10 @@ def is_tpu(resources: Optional['resources_lib.Resources']) -> bool: def is_tpu_vm(resources: Optional['resources_lib.Resources']) -> bool: if not is_tpu(resources): return False - assert resources is not None + assert (resources is not None and len(resources.accelerators) == 1) + acc, _ = list(resources.accelerators.items())[0] + if kubernetes_utils.is_tpu_on_gke(acc): + return False if resources.accelerator_args is None: return True return resources.accelerator_args.get('tpu_vm', True) diff --git a/sky/provision/kubernetes/instance.py b/sky/provision/kubernetes/instance.py index b54575301a1..2dcf38f2365 100644 --- a/sky/provision/kubernetes/instance.py +++ b/sky/provision/kubernetes/instance.py @@ -2,7 +2,7 @@ import copy import json import time -from typing import Any, Callable, Dict, List, Optional +from typing import Any, Callable, Dict, List, Optional, Union import uuid from sky import exceptions @@ -47,6 +47,72 @@ def head_service_selector(cluster_name: str) -> Dict[str, str]: return {'component': f'{cluster_name}-head'} +def _formatted_resource_requirements(pod_or_spec: Union[Any, dict]) -> str: + # Returns a formatted string of resource requirements for a pod. + resource_requirements = {} + + if isinstance(pod_or_spec, dict): + containers = pod_or_spec.get('spec', {}).get('containers', []) + else: + containers = pod_or_spec.spec.containers + + for container in containers: + if isinstance(container, dict): + resources = container.get('resources', {}) + requests = resources.get('requests', {}) + else: + resources = container.resources + requests = resources.requests or {} + + for resource, value in requests.items(): + if resource not in resource_requirements: + resource_requirements[resource] = 0 + if resource == 'memory': + int_value = kubernetes_utils.parse_memory_resource(value) + else: + int_value = kubernetes_utils.parse_cpu_or_gpu_resource(value) + resource_requirements[resource] += int(int_value) + return ', '.join(f'{resource}={value}' + for resource, value in resource_requirements.items()) + + +def _formatted_node_selector(pod_or_spec: Union[Any, dict]) -> Optional[str]: + # Returns a formatted string of node selectors for a pod. + node_selectors = [] + + if isinstance(pod_or_spec, dict): + selectors = pod_or_spec.get('spec', {}).get('nodeSelector', {}) + else: + selectors = pod_or_spec.spec.node_selector + + if not selectors: + return None + + for label_key, label_value in selectors.items(): + node_selectors.append(f'{label_key}={label_value}') + return ', '.join(node_selectors) + + +def _lack_resource_msg(resource: str, + pod_or_spec: Union[Any, dict], + extra_msg: Optional[str] = None, + details: Optional[str] = None) -> str: + resource_requirements = _formatted_resource_requirements(pod_or_spec) + node_selectors = _formatted_node_selector(pod_or_spec) + node_selector_str = f' and labels ({node_selectors})' if ( + node_selectors) else '' + msg = (f'Insufficient {resource} capacity on the cluster. ' + f'Required resources ({resource_requirements}){node_selector_str} ' + 'were not found in a single node. Other SkyPilot tasks or pods may ' + 'be using resources. Check resource usage by running ' + '`kubectl describe nodes`.') + if extra_msg: + msg += f' {extra_msg}' + if details: + msg += f'\nFull error: {details}' + return msg + + def _raise_pod_scheduling_errors(namespace, context, new_nodes): """Raise pod scheduling failure reason. @@ -54,52 +120,6 @@ def _raise_pod_scheduling_errors(namespace, context, new_nodes): are recorded as events. This function retrieves those events and raises descriptive errors for better debugging and user feedback. """ - - def _formatted_resource_requirements(pod): - # Returns a formatted string of resource requirements for a pod. - resource_requirements = {} - for container in pod.spec.containers: - for resource, value in container.resources.requests.items(): - if resource not in resource_requirements: - resource_requirements[resource] = 0 - if resource == 'memory': - int_value = kubernetes_utils.parse_memory_resource(value) - else: - int_value = kubernetes_utils.parse_cpu_or_gpu_resource( - value) - resource_requirements[resource] += int_value - return ', '.join(f'{resource}={value}' - for resource, value in resource_requirements.items()) - - def _formatted_node_selector(pod) -> Optional[str]: - # Returns a formatted string of node selectors for a pod. - node_selectors = [] - if pod.spec.node_selector is None: - return None - for label_key, label_value in pod.spec.node_selector.items(): - node_selectors.append(f'{label_key}={label_value}') - return ', '.join(node_selectors) - - def _lack_resource_msg(resource: str, - pod, - extra_msg: Optional[str] = None, - details: Optional[str] = None) -> str: - resource_requirements = _formatted_resource_requirements(pod) - node_selectors = _formatted_node_selector(pod) - node_selector_str = f' and labels ({node_selectors})' if ( - node_selectors) else '' - msg = ( - f'Insufficient {resource} capacity on the cluster. ' - f'Required resources ({resource_requirements}){node_selector_str} ' - 'were not found in a single node. Other SkyPilot tasks or pods may ' - 'be using resources. Check resource usage by running ' - '`kubectl describe nodes`.') - if extra_msg: - msg += f' {extra_msg}' - if details: - msg += f'\nFull error: {details}' - return msg - for new_node in new_nodes: pod = kubernetes.core_api(context).read_namespaced_pod( new_node.metadata.name, namespace) @@ -148,8 +168,8 @@ def _lack_resource_msg(resource: str, '`kubectl delete pods -n skypilot-system -l name=smarter-device-manager`.' # pylint: disable=line-too-long f' Full error: {event_message}') gpu_lf_keys = [ - lf.get_label_key() - for lf in kubernetes_utils.LABEL_FORMATTER_REGISTRY + key for lf in kubernetes_utils.LABEL_FORMATTER_REGISTRY + for key in lf.get_label_keys() ] if pod.spec.node_selector: for label_key in pod.spec.node_selector.keys(): @@ -157,10 +177,24 @@ def _lack_resource_msg(resource: str, # TODO(romilb): We may have additional node # affinity selectors in the future - in that # case we will need to update this logic. - if (('Insufficient nvidia.com/gpu' - in event_message) or - ('didn\'t match Pod\'s node affinity/selector' - in event_message)): + # TODO(Doyoung): Update the error message raised + # with the multi-host TPU support. + if 'Insufficient google.com/tpu' in event_message: + extra_msg = ( + f'Verify if ' + f'{pod.spec.node_selector[label_key]}' + ' is available in the cluster. Note ' + 'that multi-host TPU podslices are ' + 'currently not unsupported.') + raise config_lib.KubernetesError( + _lack_resource_msg('TPU', + pod, + extra_msg, + details=event_message)) + elif (('Insufficient nvidia.com/gpu' + in event_message) or + ('didn\'t match Pod\'s node affinity/selector' + in event_message)): extra_msg = ( f'Verify if ' f'{pod.spec.node_selector[label_key]}' @@ -553,6 +587,20 @@ def _create_namespaced_pod_with_retries(namespace: str, pod_spec: dict, logger.info('Failed to create Pod without AppArmor annotation: ' f'{retry_exception}') raise retry_exception + # Unlike other error from resource lackage on CPU/GPU/Memory, TPU + # lackage error is raised when pod is attemtped to be created. + # TODO(Doyoung): Update the error message raised with the multi-host + # TPU support. + elif 'Invalid resource requests for google.com/tpu.' in error_message: + extra_message = ('Verify if the cluster has a TPU slice node with ' + 'a topology matching the number of TPU(s) ' + 'requested. Note that multi-host TPU podslices ' + 'are currently not unsupported.') + raise config_lib.KubernetesError( + _lack_resource_msg('TPU', + pod_spec, + details=error_message, + extra_msg=extra_message)) else: # Re-raise the exception if it's a different error raise e @@ -633,8 +681,14 @@ def _create_pods(region: str, cluster_name_on_cloud: str, 'override runtimeClassName in ~/.sky/config.yaml. ' 'For more details, refer to https://skypilot.readthedocs.io/en/latest/reference/config.html') # pylint: disable=line-too-long - needs_gpus = (pod_spec['spec']['containers'][0].get('resources', {}).get( - 'limits', {}).get('nvidia.com/gpu', 0) > 0) + needs_gpus = False + limits = pod_spec['spec']['containers'][0].get('resources', + {}).get('limits') + if limits is not None: + needs_gpus = limits.get(kubernetes_utils.GPU_RESOURCE_KEY, 0) > 0 + + # TPU pods provisioned on GKE use the default containerd runtime. + # Reference: https://cloud.google.com/kubernetes-engine/docs/how-to/migrate-containerd#overview # pylint: disable=line-too-long if nvidia_runtime_exists and needs_gpus: pod_spec['spec']['runtimeClassName'] = 'nvidia' @@ -679,6 +733,22 @@ def _create_pods(region: str, cluster_name_on_cloud: str, } } + # TPU slice nodes are given a taint, google.com/tpu=present:NoSchedule. + # This is to prevent from non-TPU workloads from being scheduled on TPU + # slice nodes. We need this toleration to allow the pod to be scheduled + # on TPU nodes. + # Reference: https://cloud.google.com/kubernetes-engine/docs/concepts/tpus#how_tpus_work # pylint: disable=line-too-long + tpu_label = kubernetes_utils.GKELabelFormatter.TPU_LABEL_KEY + if tpu_label in config.node_config.get('spec', + {}).get('nodeSelector', {}): + tpu_toleration = { + 'key': kubernetes_utils.TPU_RESOURCE_KEY, + 'operator': 'Equal', + 'value': 'present', + 'effect': 'NoSchedule' + } + pod_spec['spec']['tolerations'] = [tpu_toleration] + pod = _create_namespaced_pod_with_retries(namespace, pod_spec, context) created_pods[pod.metadata.name] = pod if head_pod_name is None: diff --git a/sky/provision/kubernetes/utils.py b/sky/provision/kubernetes/utils.py index cba6c9987c9..e5bc4228a8e 100644 --- a/sky/provision/kubernetes/utils.py +++ b/sky/provision/kubernetes/utils.py @@ -48,10 +48,18 @@ 'T': 2**40, 'P': 2**50, } -NO_GPU_HELP_MESSAGE = ('If your cluster contains GPUs, make sure ' - 'nvidia.com/gpu resource is available on the nodes and ' - 'the node labels for identifying GPUs ' - '(e.g., skypilot.co/accelerator) are setup correctly. ') + +# The resource keys used by Kubernetes to track NVIDIA GPUs and Google TPUs on +# nodes. These keys are typically used in the node's status.allocatable +# or status.capacity fields to indicate the available resources on the node. +GPU_RESOURCE_KEY = 'nvidia.com/gpu' +TPU_RESOURCE_KEY = 'google.com/tpu' + +NO_ACCELERATOR_HELP_MESSAGE = ( + 'If your cluster contains GPUs or TPUs, make sure ' + f'{GPU_RESOURCE_KEY} or {TPU_RESOURCE_KEY} resource is available ' + 'on the nodes and the node labels for identifying GPUs/TPUs ' + '(e.g., skypilot.co/accelerator) are setup correctly. ') KUBERNETES_AUTOSCALER_NOTE = ( 'Note: Kubernetes cluster autoscaling is enabled. ' @@ -74,6 +82,17 @@ PORT_FORWARD_PROXY_CMD_PATH = ('~/.sky/kubernetes-port-forward-proxy-command-' f'v{PORT_FORWARD_PROXY_CMD_VERSION}.sh') +# Mapping used to get generation for TPU accelerator name. +# https://cloud.google.com/kubernetes-engine/docs/how-to/tpus#run +GKE_TPU_ACCELERATOR_TO_GENERATION = { + 'tpu-v4-podslice': 'v4', + # Only Single-host v5e TPU configurations are allowed. + 'tpu-v5-lite-device': 'v5e', + # Multi-host compatible v5e TPU configurations allowed. + 'tpu-v5-lite-podslice': 'v5e', + 'tpu-v5p-slice': 'v5p', +} + POD_STATUSES = { 'Pending', 'Running', 'Succeeded', 'Failed', 'Unknown', 'Terminating' } @@ -96,15 +115,25 @@ class GPULabelFormatter: """ @classmethod - def get_label_key(cls) -> str: + def get_label_key(cls, accelerator: Optional[str] = None) -> str: """Returns the label key for GPU type used by the Kubernetes cluster""" raise NotImplementedError + @classmethod + def get_label_keys(cls) -> List[str]: + """Returns a list of label keys for GPU used by Kubernetes cluster.""" + raise NotImplementedError + @classmethod def get_label_value(cls, accelerator: str) -> str: """Given a GPU type, returns the label value to be used""" raise NotImplementedError + @classmethod + def match_label_key(cls, label_key: str) -> bool: + """Checks if the given label key matches the formatter's label keys""" + raise NotImplementedError + @classmethod def get_accelerator_from_label_value(cls, value: str) -> str: """Given a label value, returns the GPU type""" @@ -126,10 +155,11 @@ def validate_label_value(cls, value: str) -> Tuple[bool, str]: def get_gke_accelerator_name(accelerator: str) -> str: - """Returns the accelerator name for GKE clusters + """Returns the accelerator name for GKE clusters. Uses the format - nvidia-tesla-. - A100-80GB, H100-80GB and L4 are an exception. They use nvidia-. + A100-80GB, H100-80GB, L4 are an exception. They use nvidia-. + TPU types are an exception as well keeping the given name. """ if accelerator == 'H100': # H100 is named as H100-80GB in GKE. @@ -138,6 +168,8 @@ def get_gke_accelerator_name(accelerator: str) -> str: # A100-80GB, L4, H100-80GB and H100-MEGA-80GB # have a different name pattern. return 'nvidia-{}'.format(accelerator.lower()) + elif accelerator.startswith('tpu-'): + return accelerator else: return 'nvidia-tesla-{}'.format(accelerator.lower()) @@ -152,15 +184,23 @@ class SkyPilotLabelFormatter(GPULabelFormatter): LABEL_KEY = 'skypilot.co/accelerator' @classmethod - def get_label_key(cls) -> str: + def get_label_key(cls, accelerator: Optional[str] = None) -> str: return cls.LABEL_KEY + @classmethod + def get_label_keys(cls) -> List[str]: + return [cls.LABEL_KEY] + @classmethod def get_label_value(cls, accelerator: str) -> str: # For SkyPilot formatter, we use the accelerator str directly. # See sky.utils.kubernetes.gpu_labeler. return accelerator.lower() + @classmethod + def match_label_key(cls, label_key: str) -> bool: + return label_key == cls.LABEL_KEY + @classmethod def get_accelerator_from_label_value(cls, value: str) -> str: return value.upper() @@ -184,13 +224,21 @@ class CoreWeaveLabelFormatter(GPULabelFormatter): LABEL_KEY = 'gpu.nvidia.com/class' @classmethod - def get_label_key(cls) -> str: + def get_label_key(cls, accelerator: Optional[str] = None) -> str: return cls.LABEL_KEY + @classmethod + def get_label_keys(cls) -> List[str]: + return [cls.LABEL_KEY] + @classmethod def get_label_value(cls, accelerator: str) -> str: return accelerator.upper() + @classmethod + def match_label_key(cls, label_key: str) -> bool: + return label_key == cls.LABEL_KEY + @classmethod def get_accelerator_from_label_value(cls, value: str) -> str: return value @@ -203,11 +251,28 @@ class GKELabelFormatter(GPULabelFormatter): label, which is used to identify the GPU type. """ - LABEL_KEY = 'cloud.google.com/gke-accelerator' + GPU_LABEL_KEY = 'cloud.google.com/gke-accelerator' + TPU_LABEL_KEY = 'cloud.google.com/gke-tpu-accelerator' + ACCELERATOR_COUNT_LABEL_KEY = 'cloud.google.com/gke-accelerator-count' + TPU_TOPOLOGY_LABEL_KEY = 'cloud.google.com/gke-tpu-topology' @classmethod - def get_label_key(cls) -> str: - return cls.LABEL_KEY + def get_label_key(cls, accelerator: Optional[str] = None) -> str: + if accelerator is not None and accelerator.startswith('tpu-'): + return cls.TPU_LABEL_KEY + return cls.GPU_LABEL_KEY + + @classmethod + def get_label_keys(cls) -> List[str]: + return [cls.GPU_LABEL_KEY, cls.TPU_LABEL_KEY] + + @classmethod + def match_label_key(cls, label_key: str) -> bool: + return label_key in cls.get_label_keys() + + @classmethod + def get_tpu_topology_label_key(cls) -> str: + return cls.TPU_TOPOLOGY_LABEL_KEY @classmethod def get_label_value(cls, accelerator: str) -> str: @@ -225,6 +290,8 @@ def get_accelerator_from_label_value(cls, value: str) -> str: # to distinguish between a3-high and a3-mega instances return 'H100' return acc + elif is_tpu_on_gke(value): + return value else: raise ValueError( f'Invalid accelerator name in GKE cluster: {value}') @@ -248,9 +315,13 @@ class GFDLabelFormatter(GPULabelFormatter): LABEL_KEY = 'nvidia.com/gpu.product' @classmethod - def get_label_key(cls) -> str: + def get_label_key(cls, accelerator: Optional[str] = None) -> str: return cls.LABEL_KEY + @classmethod + def get_label_keys(cls) -> List[str]: + return [cls.LABEL_KEY] + @classmethod def get_label_value(cls, accelerator: str) -> str: """An accelerator can map to many Nvidia GFD labels @@ -258,6 +329,10 @@ def get_label_value(cls, accelerator: str) -> str: As a result, we do not support get_label_value for GFDLabelFormatter.""" raise NotImplementedError + @classmethod + def match_label_key(cls, label_key: str) -> bool: + return label_key == cls.LABEL_KEY + @classmethod def get_accelerator_from_label_value(cls, value: str) -> str: """Searches against a canonical list of NVIDIA GPUs and pattern @@ -335,10 +410,9 @@ def detect_gpu_label_formatter( # Check if the node labels contain any of the GPU label prefixes for lf in LABEL_FORMATTER_REGISTRY: - label_key = lf.get_label_key() for _, label_list in node_labels.items(): for label, _ in label_list: - if label.startswith(label_key): + if lf.match_label_key(label): label_formatter = lf() return label_formatter, node_labels @@ -346,24 +420,28 @@ def detect_gpu_label_formatter( @functools.lru_cache(maxsize=10) -def detect_gpu_resource(context: Optional[str]) -> Tuple[bool, Set[str]]: - """Checks if the Kubernetes cluster has nvidia.com/gpu resource. +def detect_accelerator_resource( + context: Optional[str]) -> Tuple[bool, Set[str]]: + """Checks if the Kubernetes cluster has GPU/TPU resource. - If nvidia.com/gpu resource is missing, that typically means that the - Kubernetes cluster does not have GPUs or the nvidia GPU operator and/or - device drivers are not installed. + Two types of accelerator resources are available which are each checked + with nvidia.com/gpu and google.com/tpu. If nvidia.com/gpu resource is + missing, that typically means that the Kubernetes cluster does not have + GPUs or the nvidia GPU operator and/or device drivers are not installed. Returns: - bool: True if the cluster has nvidia.com/gpu resource, False otherwise. + bool: True if the cluster has GPU_RESOURCE_KEY or TPU_RESOURCE_KEY + resource, False otherwise. """ # Get the set of resources across all nodes cluster_resources: Set[str] = set() nodes = get_kubernetes_nodes(context) for node in nodes: cluster_resources.update(node.status.allocatable.keys()) - has_gpu = 'nvidia.com/gpu' in cluster_resources + has_accelerator = (GPU_RESOURCE_KEY in cluster_resources or + TPU_RESOURCE_KEY in cluster_resources) - return has_gpu, cluster_resources + return has_accelerator, cluster_resources @functools.lru_cache(maxsize=10) @@ -451,16 +529,52 @@ def check_cpu_mem_fits(candidate_instance_type: 'KubernetesInstanceType', 'Maximum resources found on a single node: ' f'{max_cpu} CPUs, {common_utils.format_float(max_mem)}G Memory') + def check_tpu_fits(candidate_instance_type: 'KubernetesInstanceType', + node_list: List[Any]) -> Tuple[bool, Optional[str]]: + """Checks if the instance fits on the cluster based on requested TPU. + + It checks if the TPU type and count on each node match the required + number of TPU chips for the instance. In the case of multi-host TPU + podslice, the function ensures that the number of TPU chips on a single + node (node_tpu_chip_count) and the total TPU chips across the entire + podslice (topology_chip_count) are correctly handled. + """ + acc_type = candidate_instance_type.accelerator_type + acc_count = candidate_instance_type.accelerator_count + tpu_list_in_cluster = [] + for node in node_list: + if acc_type == node.metadata.labels[ + GKELabelFormatter.TPU_LABEL_KEY]: + # TODO(Doyoung): Update the logic when adding support for + # multi-host TPUs. + if is_multi_host_tpu(node.metadata.labels): + continue + node_tpu_chip_count = int(node.metadata.labels[ + GKELabelFormatter.ACCELERATOR_COUNT_LABEL_KEY]) + tpu_type = f'{acc_type}:{node_tpu_chip_count}' + tpu_list_in_cluster.append(tpu_type) + if node_tpu_chip_count == acc_count: + return True, None + tpu_list_in_cluster_str = ','.join(tpu_list_in_cluster) + # TODO(Doyoung): Update the error message raised with the multi-host + # TPU support. + return False, ('Requested TPU type was not found in the cluster. TPU ' + 'types found in the cluster: ' + f'{tpu_list_in_cluster_str}. Note that multi-host TPU ' + 'podslices are currently not unsupported.') + nodes = get_kubernetes_nodes(context) k8s_instance_type = KubernetesInstanceType.\ from_instance_type(instance) acc_type = k8s_instance_type.accelerator_type + acc_count = k8s_instance_type.accelerator_count if acc_type is not None: - # If GPUs are requested, check if GPU type is available, and if so, - # check if CPU and memory requirements on the specific node are met. + # If GPU/TPUs are requested, check if GPU/TPU type is available, and + # if so, check if CPU and memory requirements on the specific node are + # met. try: - gpu_label_key, gpu_label_val = get_gpu_label_key_value( - context, acc_type) + gpu_label_key, gpu_label_val, _, _ = ( + get_accelerator_label_key_value(context, acc_type, acc_count)) except exceptions.ResourcesUnavailableError as e: # If GPU not found, return empty list and error message. return False, str(e) @@ -470,6 +584,13 @@ def check_cpu_mem_fits(candidate_instance_type: 'KubernetesInstanceType', node.metadata.labels[gpu_label_key] == gpu_label_val ] assert len(gpu_nodes) > 0, 'GPU nodes not found' + if is_tpu_on_gke(acc_type): + # If requested accelerator is a TPU type, check if the cluster + # has sufficient TPU resource to meet the requirement. + fits, reason = check_tpu_fits(k8s_instance_type, gpu_nodes) + if reason is not None: + return fits, reason + candidate_nodes = gpu_nodes not_fit_reason_prefix = ( f'GPU nodes with {acc_type} do not have ' @@ -481,7 +602,7 @@ def check_cpu_mem_fits(candidate_instance_type: 'KubernetesInstanceType', f'CPU (> {k8s_instance_type.cpus} CPUs) ' 'and/or memory ' f'(> {k8s_instance_type.memory} G). ') - # Check if CPU and memory requirements are met on at least one + # Check if CPU and memory requirements are met on at least one # candidate node. fits, reason = check_cpu_mem_fits(k8s_instance_type, candidate_nodes) if not fits: @@ -492,25 +613,33 @@ def check_cpu_mem_fits(candidate_instance_type: 'KubernetesInstanceType', return fits, reason -def get_gpu_label_key_value(context: Optional[str], - acc_type: str, - check_mode=False) -> Tuple[str, str]: - """Returns the label key and value for the given GPU type. +def get_accelerator_label_key_value( + context: Optional[str], + acc_type: str, + acc_count: Optional[int], + check_mode=False +) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str]]: + """Returns the label key and value for the given GPU/TPU type. Args: - acc_type: The GPU type required by the task. - check_mode: If True, only checks if the cluster has GPU resources and - labels are setup on the cluster. acc_type is ignore does not return - the label key and value. Useful for checking if GPUs are configured - correctly on the cluster without explicitly requesting a acc_type. + acc_type: The GPU/TPU type required by the task. + acc_count: Number of GPU/TPUs required by the task. + check_mode: If True, only checks if the cluster has GPU/TPU resources + and labels are setup on the cluster. acc_type is ignore does not + return the label key and value. Useful for checking if GPUs are + configured correctly on the cluster without explicitly requesting + a acc_type. Returns: - A tuple of the label key and value. Returns empty strings if check_mode - is True. + A tuple of the accelerator label key, value, topology label key, and + topology value. The topology label key and value are populated only if + the requested accelerator type is TPU. Returns None if check_mode is + True. Raises: ResourcesUnavailableError: Can be raised from the following conditions: - - The cluster does not have GPU resources (nvidia.com/gpu) - - The cluster does not have GPU labels setup correctly - - The cluster doesn't have any nodes with acc_type GPU + - The cluster does not have GPU/TPU resources + (nvidia.com/gpu, google.com/tpu) + - The cluster does not have GPU/TPU labels setup correctly + - The cluster doesn't have any nodes with acc_type GPU/TPU """ # Check if the cluster has GPU resources # TODO(romilb): This assumes the accelerator is a nvidia GPU. We @@ -529,13 +658,14 @@ def get_gpu_label_key_value(context: Optional[str], # If check mode is enabled and autoscaler is set, we can return # early since we assume the cluster autoscaler will handle GPU # node provisioning. - return '', '' + return None, None, None, None formatter = AUTOSCALER_TO_LABEL_FORMATTER.get(autoscaler_type) assert formatter is not None, ('Unsupported autoscaler type:' f' {autoscaler_type}') - return formatter.get_label_key(), formatter.get_label_value(acc_type) + return formatter.get_label_key(acc_type), formatter.get_label_value( + acc_type), None, None - has_gpus, cluster_resources = detect_gpu_resource(context) + has_gpus, cluster_resources = detect_accelerator_resource(context) if has_gpus: # Check if the cluster has GPU labels setup correctly label_formatter, node_labels = \ @@ -544,8 +674,10 @@ def get_gpu_label_key_value(context: Optional[str], # If none of the GPU labels from LABEL_FORMATTER_REGISTRY are # detected, raise error with ux_utils.print_exception_no_traceback(): - supported_formats = ', '.join( - [f.get_label_key() for f in LABEL_FORMATTER_REGISTRY]) + supported_formats = ', '.join([ + key for f in LABEL_FORMATTER_REGISTRY + for key in f.get_label_keys() + ]) suffix = '' if env_options.Options.SHOW_DEBUG_INFO.get(): suffix = f' Found node labels: {node_labels}' @@ -561,7 +693,7 @@ def get_gpu_label_key_value(context: Optional[str], # correctly setup and will behave as expected. for node_name, label_list in node_labels.items(): for label, value in label_list: - if label == label_formatter.get_label_key(): + if label_formatter.match_label_key(label): is_valid, reason = label_formatter.validate_label_value( value) if not is_valid: @@ -571,8 +703,7 @@ def get_gpu_label_key_value(context: Optional[str], if check_mode: # If check mode is enabled and we reached so far, we can # conclude that the cluster is setup correctly and return. - return '', '' - k8s_acc_label_key = label_formatter.get_label_key() + return None, None, None, None # Search in node_labels to see if any node has the requested # GPU type. # Note - this only checks if the label is available on a @@ -580,11 +711,38 @@ def get_gpu_label_key_value(context: Optional[str], # quantity is available since that is dynamic and can change # during scheduling. for node_name, label_list in node_labels.items(): + node_metadata_labels = dict(label_list) + # TODO(Doyoung): Update the logic when adding support for + # multi-host TPUs. + if is_multi_host_tpu(node_metadata_labels): + continue for label, value in label_list: - if (label == k8s_acc_label_key and + if (label_formatter.match_label_key(label) and label_formatter.get_accelerator_from_label_value( value) == acc_type): - return label, value + if is_tpu_on_gke(acc_type): + assert isinstance(label_formatter, + GKELabelFormatter) + if node_metadata_labels.get( + label_formatter.TPU_LABEL_KEY) == acc_type: + topology_label_key = ( + label_formatter.TPU_TOPOLOGY_LABEL_KEY) + topology_value = node_metadata_labels.get( + topology_label_key) + assert topology_value is not None + tpu_topology_chip_count = reduce_tpu_topology( + topology_value) + # For single-host TPUs, there aren't multiple + # different topologies that maps to identical + # number of TPU chips. + if tpu_topology_chip_count == acc_count: + return (label, value, topology_label_key, + topology_value) + else: + continue + else: + return label, value, None, None + # If no node is found with the requested acc_type, raise error with ux_utils.print_exception_no_traceback(): suffix = '' @@ -592,15 +750,19 @@ def get_gpu_label_key_value(context: Optional[str], all_labels = [] for node_name, label_list in node_labels.items(): all_labels.extend(label_list) - gpus_available = set( - v for k, v in all_labels if k == k8s_acc_label_key) - suffix = f' Available GPUs on the cluster: {gpus_available}' + acc_available = set(v for k, v in all_labels + if label_formatter.match_label_key(k)) + suffix = (' Available GPU/TPUs on the cluster: ' + f'{acc_available}') + # TODO(Doyoung): Update the error message raised with the + # multi-host TPU support. raise exceptions.ResourcesUnavailableError( 'Could not find any node in the Kubernetes cluster ' - f'with {acc_type} GPU. Please ensure at least ' - f'one node in the cluster has {acc_type} GPU and node ' - 'labels are setup correctly. ' - f'Please refer to the documentation for more. {suffix}') + f'with {acc_type}. Please ensure at least one node in the ' + f'cluster has {acc_type} and node labels are setup ' + 'correctly. Please refer to the documentration for more. ' + f'{suffix}. Note that multi-host TPU podslices are ' + 'currently not unsupported.') else: # If GPU resources are not detected, raise error with ux_utils.print_exception_no_traceback(): @@ -609,13 +771,14 @@ def get_gpu_label_key_value(context: Optional[str], suffix = (' Available resources on the cluster: ' f'{cluster_resources}') raise exceptions.ResourcesUnavailableError( - 'Could not detect GPU resources (`nvidia.com/gpu`) in ' - 'Kubernetes cluster. If this cluster contains GPUs, please ' - 'ensure GPU drivers are installed on the node. Check if the ' - 'GPUs are setup correctly by running `kubectl describe nodes` ' - 'and looking for the nvidia.com/gpu resource. ' - 'Please refer to the documentation on how ' - f'to set up GPUs.{suffix}') + f'Could not detect GPU/TPU resources ({GPU_RESOURCE_KEY!r} or ' + f'{TPU_RESOURCE_KEY!r}) in Kubernetes cluster. If this cluster' + ' contains GPUs, please ensure GPU drivers are installed on ' + 'the node. Check if the GPUs are setup correctly by running ' + '`kubectl describe nodes` and looking for the ' + f'{GPU_RESOURCE_KEY!r} or {TPU_RESOURCE_KEY!r} resource. ' + 'Please refer to the documentation on how to set up GPUs.' + f'{suffix}') def get_head_ssh_port(cluster_name: str, namespace: str, @@ -710,7 +873,10 @@ def check_credentials(context: Optional[str], # provider if their cluster GPUs are not setup correctly. gpu_msg = '' try: - _, _ = get_gpu_label_key_value(context, acc_type='', check_mode=True) + get_accelerator_label_key_value(context, + acc_type='', + acc_count=0, + check_mode=True) except exceptions.ResourcesUnavailableError as e: # If GPUs are not available, we return cluster as enabled (since it can # be a CPU-only cluster) but we also return the exception message which @@ -1787,7 +1953,7 @@ def __init__(self, obj): class KubernetesNodeInfo: """Dataclass to store Kubernetes node information.""" name: str - gpu_type: Optional[str] + accelerator_type: Optional[str] # Resources available on the node. E.g., {'nvidia.com/gpu': '2'} total: Dict[str, int] free: Dict[str, int] @@ -1818,47 +1984,54 @@ def get_kubernetes_node_info( else: raise - label_formatter, _ = detect_gpu_label_formatter(context) - if not label_formatter: + lf, _ = detect_gpu_label_formatter(context) + if not lf: label_key = None else: - label_key = label_formatter.get_label_key() + label_keys = lf.get_label_keys() node_info_dict: Dict[str, KubernetesNodeInfo] = {} - for node in nodes: - allocated_qty = 0 - if label_formatter is not None and label_key in node.metadata.labels: - accelerator_name = label_formatter.get_accelerator_from_label_value( - node.metadata.labels.get(label_key)) - else: - accelerator_name = None + for label_key in label_keys: + for node in nodes: + allocated_qty = 0 + if lf is not None and label_key in node.metadata.labels: + accelerator_name = lf.get_accelerator_from_label_value( + node.metadata.labels.get(label_key)) + else: + accelerator_name = None - accelerator_count = int(node.status.allocatable.get( - 'nvidia.com/gpu', 0)) + accelerator_count = get_node_accelerator_count( + node.status.allocatable) - if pods is None: - accelerators_available = -1 + if pods is None: + accelerators_available = -1 - else: - for pod in pods: - # Get all the pods running on the node - if (pod.spec.node_name == node.metadata.name and - pod.status.phase in ['Running', 'Pending']): - # Iterate over all the containers in the pod and sum the - # GPU requests - for container in pod.spec.containers: - if container.resources.requests: - allocated_qty += int( - container.resources.requests.get( - 'nvidia.com/gpu', 0)) - accelerators_available = accelerator_count - allocated_qty - - node_info_dict[node.metadata.name] = KubernetesNodeInfo( - name=node.metadata.name, - gpu_type=accelerator_name, - total={'nvidia.com/gpu': int(accelerator_count)}, - free={'nvidia.com/gpu': int(accelerators_available)}) + else: + for pod in pods: + # Get all the pods running on the node + if (pod.spec.node_name == node.metadata.name and + pod.status.phase in ['Running', 'Pending']): + # Iterate over all the containers in the pod and sum the + # GPU requests + for container in pod.spec.containers: + if container.resources.requests: + allocated_qty += get_node_accelerator_count( + container.resources.requests) + + accelerators_available = accelerator_count - allocated_qty + + # Exclude multi-host TPUs from being processed. + # TODO(Doyoung): Remove the logic when adding support for + # multi-host TPUs. + if is_multi_host_tpu(node.metadata.labels): + continue + + node_info_dict[node.metadata.name] = KubernetesNodeInfo( + name=node.metadata.name, + accelerator_type=accelerator_name, + total={'accelerator_count': int(accelerator_count)}, + free={'accelerators_available': int(accelerators_available)}) return node_info_dict @@ -2040,6 +2213,80 @@ def get_skypilot_pods(context: Optional[str] = None) -> List[Any]: return pods +def is_tpu_on_gke(accelerator: str) -> bool: + """Determins if the given accelerator is a TPU supported on GKE.""" + return accelerator in GKE_TPU_ACCELERATOR_TO_GENERATION + + +def get_node_accelerator_count(attribute_dict: dict) -> int: + """Retrieves the count of accelerators from a node's resource dictionary. + + This method checks the node's allocatable resources or the accelerators + already deployed on the node, using pod objects that describe resource + requests. + + Args: + attribute_dict: Containing resource information from a node, such as + allocatable or requested resources. + + Returns: + Number of accelerators allocated or available from the node. If no + resource is found, it returns 0. + """ + assert not (GPU_RESOURCE_KEY in attribute_dict and + TPU_RESOURCE_KEY in attribute_dict) + if GPU_RESOURCE_KEY in attribute_dict: + return int(attribute_dict[GPU_RESOURCE_KEY]) + elif TPU_RESOURCE_KEY in attribute_dict: + return int(attribute_dict[TPU_RESOURCE_KEY]) + return 0 + + +def reduce_tpu_topology(topology: str) -> int: + """Computes the number of TPU chips from its topology string.""" + chip_dimensions = [int(chip_count) for chip_count in topology.split('x')] + # tpu_topology_chip_count represents the total number of TPU chips in the + # entire podslice, whether it is a single-host or multi-host TPU podslice. + tpu_topology_chip_count = functools.reduce(lambda x, y: x * y, + chip_dimensions) + return tpu_topology_chip_count + + +def is_multi_host_tpu(node_metadata_labels: dict) -> bool: + """Determines whether the given node is a multi-host TPU configuration.""" + if GKELabelFormatter.TPU_LABEL_KEY in node_metadata_labels: + assert GKELabelFormatter.TPU_TOPOLOGY_LABEL_KEY in node_metadata_labels + topology_value = ( + node_metadata_labels[GKELabelFormatter.TPU_TOPOLOGY_LABEL_KEY]) + accelerator_count_label_key = ( + GKELabelFormatter.ACCELERATOR_COUNT_LABEL_KEY) + assert accelerator_count_label_key in node_metadata_labels + # node_tpu_chip_count represents the number of TPU chips + # available in this node. If the node is part of a node pool + # forming a multi-host TPU podslice, it only reflects the + # number of TPU chips in this individual node, not the entire + # multi-host TPU podslice. + node_tpu_chip_count = int( + node_metadata_labels[accelerator_count_label_key]) + topology_chip_count = reduce_tpu_topology(topology_value) + # For multi-host TPU podslices, topology_chip_count and + # node_tpu_chip_count will differ, as topology_chip_count + # reflects the total across all hosts, while + # node_tpu_chip_count reflects only the chips in a single node. + if node_tpu_chip_count != topology_chip_count: + return True + return False + + +def multi_host_tpu_exists_in_cluster(context: Optional[str] = None) -> bool: + """Checks if there exists a multi-host TPU within the cluster.""" + nodes = get_kubernetes_nodes(context) + for node in nodes: + if is_multi_host_tpu(node.metadata.labels): + return True + return False + + @dataclasses.dataclass class KubernetesSkyPilotClusterInfo: cluster_name_on_cloud: str diff --git a/sky/resources.py b/sky/resources.py index 3b33476713b..deb05a6eade 100644 --- a/sky/resources.py +++ b/sky/resources.py @@ -14,6 +14,7 @@ from sky import skypilot_config from sky.clouds import service_catalog from sky.provision import docker_utils +from sky.provision.kubernetes import utils as kubernetes_utils from sky.skylet import constants from sky.utils import accelerator_registry from sky.utils import common_utils @@ -582,36 +583,46 @@ def _set_accelerators( acc, _ = list(accelerators.items())[0] if 'tpu' in acc.lower(): if self.cloud is None: - self._cloud = clouds.GCP() - assert self.cloud.is_same_cloud( - clouds.GCP()), 'Cloud must be GCP.' + if kubernetes_utils.is_tpu_on_gke(acc): + self._cloud = clouds.Kubernetes() + else: + self._cloud = clouds.GCP() + assert (self.cloud.is_same_cloud(clouds.GCP()) or + self.cloud.is_same_cloud(clouds.Kubernetes())), ( + 'Cloud must be GCP or Kubernetes for TPU ' + 'accelerators.') + if accelerator_args is None: accelerator_args = {} + use_tpu_vm = accelerator_args.get('tpu_vm', True) - if self.instance_type is not None and use_tpu_vm: - if self.instance_type != 'TPU-VM': - with ux_utils.print_exception_no_traceback(): - raise ValueError( - 'Cannot specify instance type' - f' (got "{self.instance_type}") for TPU VM.') - if 'runtime_version' not in accelerator_args: - - def _get_default_runtime_version() -> str: - if not use_tpu_vm: - return '2.12.0' - # TPU V5 requires a newer runtime version. - if acc.startswith('tpu-v5'): - return 'v2-alpha-tpuv5' - # TPU V6e requires a newer runtime version. - if acc.startswith('tpu-v6e'): - return 'v2-alpha-tpuv6e' - return 'tpu-vm-base' - - accelerator_args['runtime_version'] = ( - _get_default_runtime_version()) - logger.info( - 'Missing runtime_version in accelerator_args, using' - f' default ({accelerator_args["runtime_version"]})') + if (self.cloud.is_same_cloud(clouds.GCP()) and + not kubernetes_utils.is_tpu_on_gke(acc)): + if 'runtime_version' not in accelerator_args: + + def _get_default_runtime_version() -> str: + if not use_tpu_vm: + return '2.12.0' + # TPU V5 requires a newer runtime version. + if acc.startswith('tpu-v5'): + return 'v2-alpha-tpuv5' + # TPU V6e requires a newer runtime version. + elif acc.startswith('tpu-v6e'): + return 'v2-alpha-tpuv6e' + return 'tpu-vm-base' + + accelerator_args['runtime_version'] = ( + _get_default_runtime_version()) + logger.info( + 'Missing runtime_version in accelerator_args, using' + f' default ({accelerator_args["runtime_version"]})') + + if self.instance_type is not None and use_tpu_vm: + if self.instance_type != 'TPU-VM': + with ux_utils.print_exception_no_traceback(): + raise ValueError( + 'Cannot specify instance type (got ' + f'{self.instance_type!r}) for TPU VM.') self._accelerators = accelerators self._accelerator_args = accelerator_args diff --git a/sky/templates/kubernetes-ray.yml.j2 b/sky/templates/kubernetes-ray.yml.j2 index 6a2a1091b98..b981ee8bf12 100644 --- a/sky/templates/kubernetes-ray.yml.j2 +++ b/sky/templates/kubernetes-ray.yml.j2 @@ -283,12 +283,15 @@ available_node_types: restartPolicy: Never - # Add node selector if GPUs are requested: + # Add node selector if GPU/TPUs are requested: {% if (k8s_acc_label_key is not none and k8s_acc_label_value is not none) or (k8s_spot_label_key is not none) %} nodeSelector: {% if k8s_acc_label_key is not none and k8s_acc_label_value is not none %} {{k8s_acc_label_key}}: {{k8s_acc_label_value}} {% endif %} + {% if k8s_topology_label_key is not none and k8s_topology_label_value is not none %} + {{k8s_topology_label_key}}: {{k8s_topology_label_value}} + {% endif %} {% if k8s_spot_label_key is not none %} {{k8s_spot_label_key}}: {{k8s_spot_label_value|tojson}} {% endif %} @@ -409,14 +412,24 @@ available_node_types: requests: cpu: {{cpus}} memory: {{memory}}G - nvidia.com/gpu: {{accelerator_count}} + {% if k8s_resource_key is not none %} + # Number of requested google.com/tpu must be equal to the total + # number of available TPU chips on the TPU slice node either it + # being a node from multi-host TPU slice or single-host TPU + # slice. Example reference: + # https://cloud.google.com/kubernetes-engine/docs/concepts/tpus#how_tpus_work + {{k8s_resource_key}}: {{accelerator_count}} + {% endif %} {% if k8s_fuse_device_required %} # Kubernetes resource exposed by the fuse device manager # https://gitlab.com/arm-research/smarter/smarter-device-manager smarter-devices/fuse: "1" {% endif %} limits: - nvidia.com/gpu: {{accelerator_count}} # Limits need to be defined for GPU requests + # Limits need to be defined for GPU/TPU requests + {% if k8s_resource_key is not none %} + {{k8s_resource_key}}: {{accelerator_count}} + {% endif %} {% if k8s_fuse_device_required %} smarter-devices/fuse: "1" {% endif %} @@ -451,6 +464,19 @@ setup_commands: sudo grep -e '^DefaultTasksMax' /etc/systemd/system.conf || (sudo bash -c 'echo "DefaultTasksMax=infinity" >> /etc/systemd/system.conf'); sudo systemctl set-property user-$(id -u $(whoami)).slice TasksMax=infinity; sudo systemctl daemon-reload; mkdir -p ~/.ssh; (grep -Pzo -q "Host \*\n StrictHostKeyChecking no" ~/.ssh/config) || printf "Host *\n StrictHostKeyChecking no\n" >> ~/.ssh/config; [ -f /etc/fuse.conf ] && sudo sed -i 's/#user_allow_other/user_allow_other/g' /etc/fuse.conf || (sudo sh -c 'echo "user_allow_other" > /etc/fuse.conf'); # This is needed for `-o allow_other` option for `goofys`; + {% if tpu_requested %} + # The /tmp/tpu_logs directory is where TPU-related logs, such as logs from + # the TPU runtime, are written. These capture runtime information about the + # TPU execution, including any warnings, errors, or general activity of + # the TPU driver. By default, the /tmp/tpu_logs directory is created with + # 755 permissions, and the user of the provisioned pod is not necessarily + # a root. Hence, we need to update the write permission so the logs can be + # properly written. + # TODO(Doyoung): Investigate to see why TPU workload fails to run without + # execution permission, such as granting 766 to log file. Check if it's a + # must and see if there's a workaround to grant minimum permission. + - sudo chmod 777 /tmp/tpu_logs; + {% endif %} # Format: `REMOTE_PATH : LOCAL_PATH` file_mounts: { diff --git a/sky/utils/kubernetes/gpu_labeler.py b/sky/utils/kubernetes/gpu_labeler.py index b00bd4f21ae..14fbbdedca5 100644 --- a/sky/utils/kubernetes/gpu_labeler.py +++ b/sky/utils/kubernetes/gpu_labeler.py @@ -101,7 +101,7 @@ def label(): # Get the list of nodes with GPUs gpu_nodes = [] for node in nodes: - if 'nvidia.com/gpu' in node.status.capacity: + if kubernetes_utils.GPU_RESOURCE_KEY in node.status.capacity: gpu_nodes.append(node) print(f'Found {len(gpu_nodes)} GPU nodes in the cluster') @@ -142,7 +142,7 @@ def label(): if len(gpu_nodes) == 0: print('No GPU nodes found in the cluster. If you have GPU nodes, ' 'please ensure that they have the label ' - '`nvidia.com/gpu: `') + f'`{kubernetes_utils.GPU_RESOURCE_KEY}: `') else: print('GPU labeling started - this may take 10 min or more to complete.' '\nTo check the status of GPU labeling jobs, run ' diff --git a/tests/common.py b/tests/common.py index d41ff3bead0..5f38cb73855 100644 --- a/tests/common.py +++ b/tests/common.py @@ -64,8 +64,9 @@ def _get_az_mappings(_): monkeypatch.setattr( 'sky.provision.kubernetes.utils.detect_gpu_label_formatter', lambda *_args, **_kwargs: [kubernetes_utils.SkyPilotLabelFormatter, {}]) - monkeypatch.setattr('sky.provision.kubernetes.utils.detect_gpu_resource', - lambda *_args, **_kwargs: [True, []]) + monkeypatch.setattr( + 'sky.provision.kubernetes.utils.detect_accelerator_resource', + lambda *_args, **_kwargs: [True, []]) monkeypatch.setattr('sky.provision.kubernetes.utils.check_instance_fits', lambda *_args, **_kwargs: [True, '']) monkeypatch.setattr('sky.provision.kubernetes.utils.get_spot_label', diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 14628ccdf73..b1ccf0b7d51 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -1999,6 +1999,25 @@ def test_tpu_vm_pod(): run_one_test(test) +# ---------- TPU Pod Slice on GKE. ---------- +@pytest.mark.kubernetes +def test_tpu_pod_slice_gke(): + name = _get_cluster_name() + test = Test( + 'tpu_pod_slice_gke', + [ + f'sky launch -y -c {name} examples/tpu/tpuvm_mnist.yaml --cloud kubernetes --gpus tpu-v5-lite-podslice', + f'sky logs {name} 1', # Ensure the job finished. + f'sky logs {name} 1 --status', # Ensure the job succeeded. + f'sky exec {name} "conda activate flax; python -c \'import jax; print(jax.devices()[0].platform);\' | grep tpu || exit 1;"', # Ensure TPU is reachable. + f'sky logs {name} 2 --status' + ], + f'sky down -y {name}', + timeout=30 * 60, # can take 30 mins + ) + run_one_test(test) + + # ---------- Simple apps. ---------- @pytest.mark.no_scp # SCP does not support num_nodes > 1 yet def test_multi_hostname(generic_cloud: str):