diff --git a/sky/cli.py b/sky/cli.py index 9a45a35ae55..0bcec3d2f4b 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -2966,6 +2966,15 @@ def show_gpus( To show all regions for a specified accelerator, use ``sky show-gpus --all-regions``. + If ``--region`` or ``--all-regions`` is not specified, the price displayed + for each instance type is the lowest across all regions for both on-demand + and spot instances. There may be multiple regions with the same lowest + price. + + If ``--cloud kubernetes`` is specified, it will show the maximum quantities + of the GPU available on a single node and the real-time availability of + the GPU across all nodes in the Kubernetes cluster. + Definitions of certain fields: * ``DEVICE_MEM``: Memory of a single device; does not depend on the device @@ -2973,10 +2982,15 @@ def show_gpus( * ``HOST_MEM``: Memory of the host instance (VM). - If ``--region`` or ``--all-regions`` is not specified, the price displayed - for each instance type is the lowest across all regions for both on-demand - and spot instances. There may be multiple regions with the same lowest - price. + * ``QTY_PER_NODE`` (Kubernetes only): GPU quantities that can be requested + on a single node. + + * ``TOTAL_GPUS`` (Kubernetes only): Total number of GPUs available in the + Kubernetes cluster. + + * ``TOTAL_FREE_GPUS`` (Kubernetes only): Number of currently free GPUs + in the Kubernetes cluster. This is fetched in real-time and may change + when other users are using the cluster. """ # validation for the --region flag if region is not None and cloud is None: @@ -2999,9 +3013,64 @@ def show_gpus( if show_all and accelerator_str is not None: raise click.UsageError('--all is only allowed without a GPU name.') + # Kubernetes specific bools + cloud_is_kubernetes = isinstance(cloud_obj, sky_clouds.Kubernetes) + kubernetes_autoscaling = kubernetes_utils.get_autoscaler_type() is not None + kubernetes_is_enabled = sky_clouds.cloud_in_iterable( + sky_clouds.Kubernetes(), global_user_state.get_cached_enabled_clouds()) + + if cloud_is_kubernetes and region is not None: + raise click.UsageError( + 'The --region flag cannot be set with --cloud kubernetes.') + def _list_to_str(lst): return ', '.join([str(e) for e in lst]) + def _get_kubernetes_realtime_gpu_table( + name_filter: Optional[str] = None, + quantity_filter: Optional[int] = None): + if quantity_filter: + qty_header = 'QTY_FILTER' + free_header = 'FILTERED_FREE_GPUS' + else: + qty_header = 'QTY_PER_NODE' + free_header = 'TOTAL_FREE_GPUS' + realtime_gpu_table = log_utils.create_table( + ['GPU', qty_header, 'TOTAL_GPUS', free_header]) + counts, capacity, available = service_catalog.list_accelerator_realtime( + gpus_only=True, + clouds='kubernetes', + name_filter=name_filter, + region_filter=region, + quantity_filter=quantity_filter, + case_sensitive=False) + assert (set(counts.keys()) == set(capacity.keys()) == set( + available.keys())), (f'Keys of counts ({list(counts.keys())}), ' + f'capacity ({list(capacity.keys())}), ' + f'and available ({list(available.keys())}) ' + 'must be same.') + if len(counts) == 0: + err_msg = 'No GPUs found in Kubernetes cluster. ' + debug_msg = 'To further debug, run: sky check ' + if name_filter is not None: + gpu_info_msg = f' {name_filter!r}' + if quantity_filter is not None: + gpu_info_msg += (' with requested quantity' + f' {quantity_filter}') + err_msg = (f'Resources{gpu_info_msg} not found ' + 'in Kubernetes cluster. ') + debug_msg = ('To show available accelerators on kubernetes,' + ' run: sky show-gpus --cloud kubernetes ') + full_err_msg = (err_msg + kubernetes_utils.NO_GPU_HELP_MESSAGE + + debug_msg) + raise ValueError(full_err_msg) + for gpu, _ in sorted(counts.items()): + realtime_gpu_table.add_row([ + gpu, + _list_to_str(counts.pop(gpu)), capacity[gpu], available[gpu] + ]) + return realtime_gpu_table + def _output(): gpu_table = log_utils.create_table( ['COMMON_GPU', 'AVAILABLE_QUANTITIES']) @@ -3012,35 +3081,69 @@ def _output(): name, quantity = None, None - # Kubernetes specific bools - cloud_is_kubernetes = isinstance(cloud_obj, sky_clouds.Kubernetes) - kubernetes_autoscaling = kubernetes_utils.get_autoscaler_type( - ) is not None + # Optimization - do not poll for Kubernetes API for fetching + # common GPUs because that will be fetched later for the table after + # common GPUs. + clouds_to_list = cloud + if cloud is None: + clouds_to_list = [ + c for c in service_catalog.ALL_CLOUDS if c != 'kubernetes' + ] + k8s_messages = '' if accelerator_str is None: + # Collect k8s related messages in k8s_messages and print them at end + print_section_titles = False + # If cloud is kubernetes, we want to show real-time capacity + if kubernetes_is_enabled and (cloud is None or cloud_is_kubernetes): + try: + # If --cloud kubernetes is not specified, we want to catch + # the case where no GPUs are available on the cluster and + # print the warning at the end. + k8s_realtime_table = _get_kubernetes_realtime_gpu_table() + except ValueError as e: + if not cloud_is_kubernetes: + # Make it a note if cloud is not kubernetes + k8s_messages += 'Note: ' + k8s_messages += str(e) + else: + print_section_titles = True + yield (f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' + f'Kubernetes GPUs{colorama.Style.RESET_ALL}\n') + yield from k8s_realtime_table.get_string() + if kubernetes_autoscaling: + k8s_messages += ( + '\n' + kubernetes_utils.KUBERNETES_AUTOSCALER_NOTE) + if cloud_is_kubernetes: + # Do not show clouds if --cloud kubernetes is specified + if not kubernetes_is_enabled: + yield ('Kubernetes is not enabled. To fix, run: ' + 'sky check kubernetes ') + yield k8s_messages + return + + # For show_all, show the k8s message at the start since output is + # long and the user may not scroll to the end. + if show_all and k8s_messages: + yield k8s_messages + yield '\n\n' + result = service_catalog.list_accelerator_counts( gpus_only=True, - clouds=cloud, + clouds=clouds_to_list, region_filter=region, ) - if len(result) == 0 and cloud_is_kubernetes: - yield kubernetes_utils.NO_GPU_ERROR_MESSAGE - if kubernetes_autoscaling: - yield '\n' - yield kubernetes_utils.KUBERNETES_AUTOSCALER_NOTE - return + if print_section_titles: + # If section titles were printed above, print again here + yield '\n\n' + yield (f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' + f'Cloud GPUs{colorama.Style.RESET_ALL}\n') # "Common" GPUs - # If cloud is kubernetes, we want to show all GPUs here, even if - # they are not listed as common in SkyPilot. - if cloud_is_kubernetes: - for gpu, _ in sorted(result.items()): + for gpu in service_catalog.get_common_gpus(): + if gpu in result: gpu_table.add_row([gpu, _list_to_str(result.pop(gpu))]) - else: - for gpu in service_catalog.get_common_gpus(): - if gpu in result: - gpu_table.add_row([gpu, _list_to_str(result.pop(gpu))]) yield from gpu_table.get_string() # Google TPUs @@ -3058,16 +3161,12 @@ def _output(): other_table.add_row([gpu, _list_to_str(qty)]) yield from other_table.get_string() yield '\n\n' - if (cloud_is_kubernetes or - cloud is None) and kubernetes_autoscaling: - yield kubernetes_utils.KUBERNETES_AUTOSCALER_NOTE - yield '\n\n' else: yield ('\n\nHint: use -a/--all to see all accelerators ' '(including non-common ones) and pricing.') - if (cloud_is_kubernetes or - cloud is None) and kubernetes_autoscaling: - yield kubernetes_utils.KUBERNETES_AUTOSCALER_NOTE + if k8s_messages: + yield '\n' + yield k8s_messages return else: # Parse accelerator string @@ -3091,12 +3190,40 @@ def _output(): else: name, quantity = accelerator_str, None + print_section_titles = False + if (kubernetes_is_enabled and (cloud is None or cloud_is_kubernetes) and + not show_all): + # Print section title if not showing all and instead a specific + # accelerator is requested + print_section_titles = True + yield (f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' + f'Kubernetes GPUs{colorama.Style.RESET_ALL}\n') + try: + k8s_realtime_table = _get_kubernetes_realtime_gpu_table( + name_filter=name, quantity_filter=quantity) + yield from k8s_realtime_table.get_string() + except ValueError as e: + # In the case of a specific accelerator, show the error message + # immediately (e.g., "Resources H100 not found ...") + yield str(e) + if kubernetes_autoscaling: + k8s_messages += ('\n' + + kubernetes_utils.KUBERNETES_AUTOSCALER_NOTE) + yield k8s_messages + if cloud_is_kubernetes: + # Do not show clouds if --cloud kubernetes is specified + if not kubernetes_is_enabled: + yield ('Kubernetes is not enabled. To fix, run: ' + 'sky check kubernetes ') + return + + # For clouds other than Kubernetes, get the accelerator details # Case-sensitive result = service_catalog.list_accelerators(gpus_only=True, name_filter=name, quantity_filter=quantity, region_filter=region, - clouds=cloud, + clouds=clouds_to_list, case_sensitive=False, all_regions=all_regions) # Import here to save module load speed. @@ -3128,16 +3255,17 @@ def _output(): new_result[gpu] = sorted_dataclasses result = new_result - if len(result) == 0: - if cloud == 'kubernetes': - yield kubernetes_utils.NO_GPU_ERROR_MESSAGE - return + if print_section_titles and not show_all: + yield '\n\n' + yield (f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' + f'Cloud GPUs{colorama.Style.RESET_ALL}\n') + if len(result) == 0: quantity_str = (f' with requested quantity {quantity}' if quantity else '') - yield f'Resources \'{name}\'{quantity_str} not found. ' - yield 'Try \'sky show-gpus --all\' ' - yield 'to show available accelerators.' + cloud_str = f' on {cloud_obj}.' if cloud else ' in cloud catalogs.' + yield f'Resources \'{name}\'{quantity_str} not found{cloud_str} ' + yield 'To show available accelerators, run: sky show-gpus --all' return for i, (gpu, items) in enumerate(result.items()): diff --git a/sky/clouds/service_catalog/__init__.py b/sky/clouds/service_catalog/__init__.py index d380cce6757..7479cd77cf7 100644 --- a/sky/clouds/service_catalog/__init__.py +++ b/sky/clouds/service_catalog/__init__.py @@ -117,6 +117,46 @@ def list_accelerator_counts( return ret +def list_accelerator_realtime( + gpus_only: bool = True, + name_filter: Optional[str] = None, + region_filter: Optional[str] = None, + quantity_filter: Optional[int] = None, + clouds: CloudFilter = None, + case_sensitive: bool = True, +) -> Tuple[Dict[str, List[int]], Dict[str, int], Dict[str, int]]: + """List all accelerators offered by Sky with their realtime availability. + + Realtime availability is the total number of accelerators in the cluster + and number of accelerators available at the time of the call. + + Used for fixed size cluster settings, such as Kubernetes. + + Returns: + A tuple of three dictionaries mapping canonical accelerator names to: + - A list of available counts. (e.g., [1, 2, 4]) + - Total number of accelerators in the cluster (capacity). + - Number of accelerators available at the time of call (availability). + """ + qtys_map, total_accelerators_capacity, total_accelerators_available = ( + _map_clouds_catalog(clouds, + 'list_accelerators_realtime', + gpus_only, + name_filter, + region_filter, + quantity_filter, + case_sensitive=case_sensitive, + all_regions=False, + require_price=False)) + accelerator_counts: Dict[str, List[int]] = collections.defaultdict(list) + for gpu, items in qtys_map.items(): + for item in items: + accelerator_counts[gpu].append(item.accelerator_count) + accelerator_counts[gpu] = sorted(accelerator_counts[gpu]) + return (accelerator_counts, total_accelerators_capacity, + total_accelerators_available) + + def instance_type_exists(instance_type: str, clouds: CloudFilter = None) -> bool: """Check the existence of a instance type.""" diff --git a/sky/clouds/service_catalog/kubernetes_catalog.py b/sky/clouds/service_catalog/kubernetes_catalog.py index bd44847016e..602e19b5ff0 100644 --- a/sky/clouds/service_catalog/kubernetes_catalog.py +++ b/sky/clouds/service_catalog/kubernetes_catalog.py @@ -3,6 +3,7 @@ Kubernetes does not require a catalog of instances, but we need an image catalog mapping SkyPilot image tags to corresponding container image tags. """ +import re import typing from typing import Dict, List, Optional, Set, Tuple @@ -46,38 +47,109 @@ def list_accelerators( case_sensitive: bool = True, all_regions: bool = False, require_price: bool = True) -> Dict[str, List[common.InstanceTypeInfo]]: + # TODO(romilb): We should consider putting a lru_cache() with TTL to + # avoid multiple calls to kubernetes API in a short period of time (e.g., + # from the optimizer). + return list_accelerators_realtime(gpus_only, name_filter, region_filter, + quantity_filter, case_sensitive, + all_regions, require_price)[0] + + +def list_accelerators_realtime( + gpus_only: bool, + name_filter: Optional[str], + region_filter: Optional[str], + quantity_filter: Optional[int], + case_sensitive: bool = True, + all_regions: bool = False, + require_price: bool = True +) -> Tuple[Dict[str, List[common.InstanceTypeInfo]], Dict[str, int], Dict[str, + int]]: del all_regions, require_price # Unused. k8s_cloud = Kubernetes() if not any( map(k8s_cloud.is_same_cloud, sky_check.get_cached_enabled_clouds_or_refresh()) ) or not kubernetes_utils.check_credentials()[0]: - return {} + return {}, {}, {} has_gpu = kubernetes_utils.detect_gpu_resource() if not has_gpu: - return {} + return {}, {}, {} label_formatter, _ = kubernetes_utils.detect_gpu_label_formatter() if not label_formatter: - return {} + return {}, {}, {} - accelerators: Set[Tuple[str, int]] = set() + accelerators_qtys: Set[Tuple[str, int]] = set() key = label_formatter.get_label_key() nodes = kubernetes_utils.get_kubernetes_nodes() + # Get the pods to get the real-time GPU usage + pods = kubernetes_utils.get_kubernetes_pods() + # Total number of GPUs in the cluster + total_accelerators_capacity: Dict[str, int] = {} + # Total number of GPUs currently available in the cluster + total_accelerators_available: Dict[str, int] = {} + min_quantity_filter = quantity_filter if quantity_filter else 1 + for node in nodes: if key in node.metadata.labels: + allocated_qty = 0 accelerator_name = label_formatter.get_accelerator_from_label_value( node.metadata.labels.get(key)) + + # Check if name_filter regex matches the accelerator_name + regex_flags = 0 if case_sensitive else re.IGNORECASE + if name_filter and not re.match( + name_filter, accelerator_name, flags=regex_flags): + continue + accelerator_count = int( node.status.allocatable.get('nvidia.com/gpu', 0)) + # Generate the GPU quantities for the accelerators if accelerator_name and accelerator_count > 0: for count in range(1, accelerator_count + 1): - accelerators.add((accelerator_name, count)) + accelerators_qtys.add((accelerator_name, count)) + + for pod in pods: + # Get all the pods running on the node + if (pod.spec.node_name == node.metadata.name and + pod.status.phase in ['Running', 'Pending']): + # Iterate over all the containers in the pod and sum the + # GPU requests + for container in pod.spec.containers: + if container.resources.requests: + allocated_qty += int( + container.resources.requests.get( + 'nvidia.com/gpu', 0)) + + accelerators_available = accelerator_count - allocated_qty + + if accelerator_count >= min_quantity_filter: + quantized_count = (min_quantity_filter * + (accelerator_count // min_quantity_filter)) + if accelerator_name not in total_accelerators_capacity: + total_accelerators_capacity[ + accelerator_name] = quantized_count + else: + total_accelerators_capacity[ + accelerator_name] += quantized_count + + if accelerators_available >= min_quantity_filter: + quantized_availability = min_quantity_filter * ( + accelerators_available // min_quantity_filter) + if accelerator_name not in total_accelerators_available: + total_accelerators_available[ + accelerator_name] = quantized_availability + else: + total_accelerators_available[ + accelerator_name] += quantized_availability result = [] - for accelerator_name, accelerator_count in accelerators: + + # Generate dataframe for common.list_accelerators_impl + for accelerator_name, accelerator_count in accelerators_qtys: result.append( common.InstanceTypeInfo(cloud='Kubernetes', instance_type=None, @@ -98,9 +170,13 @@ def list_accelerators( ]) df['GpuInfo'] = True - return common.list_accelerators_impl('Kubernetes', df, gpus_only, - name_filter, region_filter, - quantity_filter, case_sensitive) + # Use common.list_accelerators_impl to get InstanceTypeInfo objects used + # by sky show-gpus when cloud is not specified. + qtys_map = common.list_accelerators_impl('Kubernetes', df, gpus_only, + name_filter, region_filter, + quantity_filter, case_sensitive) + + return qtys_map, total_accelerators_capacity, total_accelerators_available def validate_region_zone( diff --git a/sky/provision/kubernetes/utils.py b/sky/provision/kubernetes/utils.py index d5140d8846b..d5f91f639f6 100644 --- a/sky/provision/kubernetes/utils.py +++ b/sky/provision/kubernetes/utils.py @@ -35,10 +35,10 @@ 'T': 2**40, 'P': 2**50, } -NO_GPU_ERROR_MESSAGE = 'No GPUs found in Kubernetes cluster. \ -If your cluster contains GPUs, make sure nvidia.com/gpu resource is available on the nodes and the node labels for identifying GPUs \ -(e.g., skypilot.co/accelerator) are setup correctly. \ -To further debug, run: sky check.' +NO_GPU_HELP_MESSAGE = ('If your cluster contains GPUs, make sure ' + 'nvidia.com/gpu resource is available on the nodes and ' + 'the node labels for identifying GPUs ' + '(e.g., skypilot.co/accelerator) are setup correctly. ') KUBERNETES_AUTOSCALER_NOTE = ( 'Note: Kubernetes cluster autoscaling is enabled. ' @@ -280,6 +280,18 @@ def get_kubernetes_nodes() -> List[Any]: return nodes +def get_kubernetes_pods() -> List[Any]: + try: + ns = get_current_kube_config_context_namespace() + pods = kubernetes.core_api().list_namespaced_pod( + ns, _request_timeout=kubernetes.API_TIMEOUT).items + except kubernetes.max_retry_error(): + raise exceptions.ResourcesUnavailableError( + 'Timed out when trying to get pod info from Kubernetes cluster. ' + 'Please check if the cluster is healthy and retry.') from None + return pods + + def check_instance_fits(instance: str) -> Tuple[bool, Optional[str]]: """Checks if the instance fits on the Kubernetes cluster.