Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[k8s] Realtime GPU availability of kubernetes cluster in sky show-gpus #3499

Merged
merged 28 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
e6b975d
wip
romilbhardwaj Apr 30, 2024
a6b5bfc
filtering support
romilbhardwaj Apr 30, 2024
1346159
lint
romilbhardwaj Apr 30, 2024
6bbbf25
update doc
romilbhardwaj Apr 30, 2024
a263365
rename headers
romilbhardwaj May 2, 2024
6dfb785
Merge branch 'master' of https://github.com/skypilot-org/skypilot int…
romilbhardwaj May 17, 2024
0bd06a4
comments
romilbhardwaj May 17, 2024
6bf3045
add TODO
romilbhardwaj May 17, 2024
f960322
Add autoscaler note
romilbhardwaj May 18, 2024
8e1821d
Merge branch 'master' of https://github.com/skypilot-org/skypilot int…
romilbhardwaj May 23, 2024
8878254
case sensitive fix
romilbhardwaj May 24, 2024
3fe8fc6
case sensitive fix
romilbhardwaj May 24, 2024
2203d6b
show kubernetes GPUs in a separate table in sky show-gpus
romilbhardwaj May 24, 2024
b75e471
lint
romilbhardwaj May 24, 2024
ba98957
lint
romilbhardwaj May 24, 2024
b44b759
fix for non-k8s cloud specified
romilbhardwaj May 24, 2024
57cc132
fix for region specified with k8s
romilbhardwaj May 24, 2024
4665386
lint
romilbhardwaj May 24, 2024
400336f
show kubernetes in separate section
romilbhardwaj May 24, 2024
3d3e121
wip
romilbhardwaj May 24, 2024
e13ba3d
move messages to the end
romilbhardwaj May 24, 2024
9e308e0
lint
romilbhardwaj May 24, 2024
8a36851
lint
romilbhardwaj May 24, 2024
db95895
show sections if name is specified
romilbhardwaj May 24, 2024
91a4356
comments
romilbhardwaj May 27, 2024
8e48e68
lint
romilbhardwaj May 27, 2024
997bec1
fix bugs and move warning for show_all to the top
romilbhardwaj May 27, 2024
72f08d9
lint
romilbhardwaj May 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 108 additions & 33 deletions sky/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2966,17 +2966,31 @@ def show_gpus(
To show all regions for a specified accelerator, use
``sky show-gpus <accelerator> --all-regions``.

If ``--region`` or ``--all-regions`` is not specified, the price displayed
for each instance type is the lowest across all regions for both on-demand
and spot instances. There may be multiple regions with the same lowest
price.

If ``--cloud kubernetes`` is specified, it will show the maximum quantities
of the GPU available on a single node and the real-time availability of
the GPU across all nodes in the Kubernetes cluster.

Definitions of certain fields:

* ``DEVICE_MEM``: Memory of a single device; does not depend on the device
count of the instance (VM).

* ``HOST_MEM``: Memory of the host instance (VM).

If ``--region`` or ``--all-regions`` is not specified, the price displayed
for each instance type is the lowest across all regions for both on-demand
and spot instances. There may be multiple regions with the same lowest
price.
* ``QTY_PER_NODE`` (Kubernetes only): GPU quantities that can be requested
on a single node.

* ``TOTAL_GPUS`` (Kubernetes only): Total number of GPUs available in the
Kubernetes cluster.

* ``TOTAL_FREE_GPUS`` (Kubernetes only): Number of currently free GPUs
in the Kubernetes cluster. This is fetched in real-time and may change
when other users are using the cluster.
"""
# validation for the --region flag
if region is not None and cloud is None:
Expand All @@ -2999,9 +3013,64 @@ def show_gpus(
if show_all and accelerator_str is not None:
raise click.UsageError('--all is only allowed without a GPU name.')

# Kubernetes specific bools
cloud_is_kubernetes = isinstance(cloud_obj, sky_clouds.Kubernetes)
kubernetes_autoscaling = kubernetes_utils.get_autoscaler_type() is not None
kubernetes_is_enabled = sky_clouds.cloud_in_iterable(
sky_clouds.Kubernetes(), global_user_state.get_cached_enabled_clouds())

if cloud_is_kubernetes and region is not None:
raise click.UsageError(
'The --region flag cannot be set with --cloud kubernetes.')

def _list_to_str(lst):
return ', '.join([str(e) for e in lst])

def _kubernetes_realtime_gpu_output(name_filter: Optional[str] = None,
quantity_filter: Optional[int] = None,
gpu_col_name: Optional[str] = None):
if gpu_col_name is None:
gpu_col_name = 'GPU'
if quantity_filter:
qty_header = 'QTY_FILTER'
free_header = 'FILTERED_FREE_GPUS'
else:
qty_header = 'QTY_PER_NODE'
free_header = 'TOTAL_FREE_GPUS'
realtime_gpu_table = log_utils.create_table(
[gpu_col_name, qty_header, 'TOTAL_GPUS', free_header])
counts, capacity, available = service_catalog.list_accelerator_realtime(
gpus_only=True,
clouds='kubernetes',
name_filter=name_filter,
region_filter=region,
quantity_filter=quantity_filter,
case_sensitive=False)
assert (set(counts.keys()) == set(capacity.keys()) == set(
available.keys())), (f'Keys of counts ({list(counts.keys())}), '
f'capacity ({list(capacity.keys())}), '
f'and available ({list(available.keys())}) '
'must be same.')
if len(counts) == 0:
gpu_info_msg = ''
debug_msg = 'To further debug, run: sky check.'
if name_filter is not None:
gpu_info_msg = f' matching name {name_filter!r}'
debug_msg = ('To list all available accelerators, '
'run: sky show-gpus --cloud kubernetes.')
if quantity_filter is not None:
gpu_info_msg += f' with quantity {quantity_filter}'
err_msg = kubernetes_utils.NO_GPU_ERROR_MESSAGE.format(
gpu_info_msg=gpu_info_msg, debug_msg=debug_msg)
yield err_msg
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
return
for gpu, _ in sorted(counts.items()):
realtime_gpu_table.add_row([
gpu,
_list_to_str(counts.pop(gpu)), capacity[gpu], available[gpu]
])
yield from realtime_gpu_table.get_string()

def _output():
gpu_table = log_utils.create_table(
['COMMON_GPU', 'AVAILABLE_QUANTITIES'])
Expand All @@ -3012,37 +3081,40 @@ def _output():

name, quantity = None, None

# Kubernetes specific bools
cloud_is_kubernetes = isinstance(cloud_obj, sky_clouds.Kubernetes)
kubernetes_autoscaling = kubernetes_utils.get_autoscaler_type(
) is not None

if accelerator_str is None:
# If cloud is kubernetes, we want to show real-time capacity
if cloud_is_kubernetes:
yield from _kubernetes_realtime_gpu_output()
if kubernetes_utils.get_autoscaler_type() is not None:
yield '\n'
yield kubernetes_utils.KUBERNETES_AUTOSCALER_NOTE
return
romilbhardwaj marked this conversation as resolved.
Show resolved Hide resolved

# Optimization - do not poll for Kubernetes API for fetching
# common GPUs because that will be fetched later for the table after
# common GPUs.
clouds_to_list = cloud
if cloud is None and not show_all:
clouds_to_list = (
c for c in service_catalog.ALL_CLOUDS if c != 'kubernetes')
result = service_catalog.list_accelerator_counts(
gpus_only=True,
clouds=cloud,
clouds=clouds_to_list,
region_filter=region,
)

if len(result) == 0 and cloud_is_kubernetes:
yield kubernetes_utils.NO_GPU_ERROR_MESSAGE
if kubernetes_autoscaling:
yield '\n'
yield kubernetes_utils.KUBERNETES_AUTOSCALER_NOTE
return

# "Common" GPUs
# If cloud is kubernetes, we want to show all GPUs here, even if
# they are not listed as common in SkyPilot.
if cloud_is_kubernetes:
for gpu, _ in sorted(result.items()):
for gpu in service_catalog.get_common_gpus():
if gpu in result:
gpu_table.add_row([gpu, _list_to_str(result.pop(gpu))])
else:
for gpu in service_catalog.get_common_gpus():
if gpu in result:
gpu_table.add_row([gpu, _list_to_str(result.pop(gpu))])
yield from gpu_table.get_string()

# Kubernetes GPUs with realtime information
if cloud is None and kubernetes_is_enabled:
yield '\n\n'
yield from _kubernetes_realtime_gpu_output(
gpu_col_name='KUBERNETES_GPU')

# Google TPUs
for tpu in service_catalog.get_tpus():
if tpu in result:
Expand All @@ -3058,15 +3130,16 @@ def _output():
other_table.add_row([gpu, _list_to_str(qty)])
yield from other_table.get_string()
yield '\n\n'
if (cloud_is_kubernetes or
cloud is None) and kubernetes_autoscaling:
if (cloud is None and kubernetes_is_enabled and
kubernetes_autoscaling):
yield kubernetes_utils.KUBERNETES_AUTOSCALER_NOTE
yield '\n\n'
else:
yield ('\n\nHint: use -a/--all to see all accelerators '
'(including non-common ones) and pricing.')
if (cloud_is_kubernetes or
cloud is None) and kubernetes_autoscaling:
if (cloud is None and kubernetes_is_enabled and
kubernetes_autoscaling):
yield '\n'
yield kubernetes_utils.KUBERNETES_AUTOSCALER_NOTE
return
else:
Expand All @@ -3091,6 +3164,12 @@ def _output():
else:
name, quantity = accelerator_str, None

if cloud_is_kubernetes:
# Get real-time availability of GPUs for Kubernetes
yield from _kubernetes_realtime_gpu_output(name_filter=name,
quantity_filter=quantity)
return
# For clouds other than Kubernetes, get the accelerator details
# Case-sensitive
result = service_catalog.list_accelerators(gpus_only=True,
name_filter=name,
Expand Down Expand Up @@ -3129,10 +3208,6 @@ def _output():
result = new_result

if len(result) == 0:
if cloud == 'kubernetes':
yield kubernetes_utils.NO_GPU_ERROR_MESSAGE
return

quantity_str = (f' with requested quantity {quantity}'
if quantity else '')
yield f'Resources \'{name}\'{quantity_str} not found. '
Expand Down
40 changes: 40 additions & 0 deletions sky/clouds/service_catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,46 @@ def list_accelerator_counts(
return ret


def list_accelerator_realtime(
gpus_only: bool = True,
name_filter: Optional[str] = None,
region_filter: Optional[str] = None,
quantity_filter: Optional[int] = None,
clouds: CloudFilter = None,
case_sensitive: bool = True,
) -> Tuple[Dict[str, List[int]], Dict[str, int], Dict[str, int]]:
"""List all accelerators offered by Sky with their realtime availability.

Realtime availability is the total number of accelerators in the cluster
and number of accelerators available at the time of the call.

Used for fixed size cluster settings, such as Kubernetes.

Returns:
A tuple of three dictionaries mapping canonical accelerator names to:
- A list of available counts. (e.g., [1, 2, 4])
- Total number of accelerators in the cluster (capacity).
- Number of accelerators available at the time of call (availability).
"""
qtys_map, total_accelerators_capacity, total_accelerators_available = (
_map_clouds_catalog(clouds,
'list_accelerators_realtime',
gpus_only,
name_filter,
region_filter,
quantity_filter,
case_sensitive=case_sensitive,
all_regions=False,
require_price=False))
accelerator_counts: Dict[str, List[int]] = collections.defaultdict(list)
for gpu, items in qtys_map.items():
for item in items:
accelerator_counts[gpu].append(item.accelerator_count)
accelerator_counts[gpu] = sorted(accelerator_counts[gpu])
return (accelerator_counts, total_accelerators_capacity,
total_accelerators_available)


def instance_type_exists(instance_type: str,
clouds: CloudFilter = None) -> bool:
"""Check the existence of a instance type."""
Expand Down
94 changes: 85 additions & 9 deletions sky/clouds/service_catalog/kubernetes_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
Kubernetes does not require a catalog of instances, but we need an image catalog
mapping SkyPilot image tags to corresponding container image tags.
"""
import re
import typing
from typing import Dict, List, Optional, Set, Tuple

Expand Down Expand Up @@ -46,38 +47,109 @@ def list_accelerators(
case_sensitive: bool = True,
all_regions: bool = False,
require_price: bool = True) -> Dict[str, List[common.InstanceTypeInfo]]:
# TODO(romilb): We should consider putting a lru_cache() with TTL to
# avoid multiple calls to kubernetes API in a short period of time (e.g.,
# from the optimizer).
return list_accelerators_realtime(gpus_only, name_filter, region_filter,
quantity_filter, case_sensitive,
all_regions, require_price)[0]
Comment on lines +53 to +55
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will calling this adds additional overhead to the list_accelerators? Since we are relying on the list_accelerators to generate the optimization candidate resources, which will be called multiple times during the failover process. Would be nice to make sure this does not add overhead. : )

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point.. the overhead compared to a the previous implementation isn't much different since the previous implementation was also invoking the kubernetes API:

This branch:
multitime -n 5 sky launch --dryrun -y --gpus T4:1
===> multitime results
1: sky launch --dryrun -y --gpus T4:1
            Mean        Std.Dev.    Min         Median      Max
real        3.883       0.064       3.782       3.883       3.982
user        2.775       0.081       2.654       2.766       2.871
sys         3.136       0.285       2.676       3.268       3.448


Master: 
multitime -n 5 sky launch --dryrun -y --gpus T4:1
1: sky launch --dryrun -y --gpus T4:1
            Mean        Std.Dev.    Min         Median      Max
real        3.863       0.032       3.829       3.860       3.917
user        2.713       0.023       2.670       2.716       2.735
sys         3.438       0.097       3.267       3.471       3.535

That said, we should put a lru cache with a time-to-live (TTL) to expire based on time. Added a TODO.



def list_accelerators_realtime(
gpus_only: bool,
name_filter: Optional[str],
region_filter: Optional[str],
quantity_filter: Optional[int],
case_sensitive: bool = True,
all_regions: bool = False,
require_price: bool = True
) -> Tuple[Dict[str, List[common.InstanceTypeInfo]], Dict[str, int], Dict[str,
int]]:
del all_regions, require_price # Unused.
k8s_cloud = Kubernetes()
if not any(
map(k8s_cloud.is_same_cloud,
sky_check.get_cached_enabled_clouds_or_refresh())
) or not kubernetes_utils.check_credentials()[0]:
return {}
return {}, {}, {}

has_gpu = kubernetes_utils.detect_gpu_resource()
if not has_gpu:
return {}
return {}, {}, {}

label_formatter, _ = kubernetes_utils.detect_gpu_label_formatter()
if not label_formatter:
return {}
return {}, {}, {}

accelerators: Set[Tuple[str, int]] = set()
accelerators_qtys: Set[Tuple[str, int]] = set()
key = label_formatter.get_label_key()
nodes = kubernetes_utils.get_kubernetes_nodes()
# Get the pods to get the real-time GPU usage
pods = kubernetes_utils.get_kubernetes_pods()
# Total number of GPUs in the cluster
total_accelerators_capacity: Dict[str, int] = {}
# Total number of GPUs currently available in the cluster
total_accelerators_available: Dict[str, int] = {}
min_quantity_filter = quantity_filter if quantity_filter else 1

for node in nodes:
if key in node.metadata.labels:
allocated_qty = 0
accelerator_name = label_formatter.get_accelerator_from_label_value(
node.metadata.labels.get(key))

# Check if name_filter regex matches the accelerator_name
regex_flags = 0 if case_sensitive else re.IGNORECASE
if name_filter and not re.match(
name_filter, accelerator_name, flags=regex_flags):
continue

accelerator_count = int(
node.status.allocatable.get('nvidia.com/gpu', 0))

# Generate the GPU quantities for the accelerators
if accelerator_name and accelerator_count > 0:
for count in range(1, accelerator_count + 1):
accelerators.add((accelerator_name, count))
accelerators_qtys.add((accelerator_name, count))

for pod in pods:
# Get all the pods running on the node
if (pod.spec.node_name == node.metadata.name and
pod.status.phase in ['Running', 'Pending']):
# Iterate over all the containers in the pod and sum the
# GPU requests
for container in pod.spec.containers:
if container.resources.requests:
allocated_qty += int(
container.resources.requests.get(
'nvidia.com/gpu', 0))

accelerators_available = accelerator_count - allocated_qty

if accelerator_count >= min_quantity_filter:
quantized_count = (min_quantity_filter *
(accelerator_count // min_quantity_filter))
if accelerator_name not in total_accelerators_capacity:
total_accelerators_capacity[
accelerator_name] = quantized_count
else:
total_accelerators_capacity[
accelerator_name] += quantized_count

if accelerators_available >= min_quantity_filter:
quantized_availability = min_quantity_filter * (
accelerators_available // min_quantity_filter)
if accelerator_name not in total_accelerators_available:
total_accelerators_available[
accelerator_name] = quantized_availability
else:
total_accelerators_available[
accelerator_name] += quantized_availability

result = []
for accelerator_name, accelerator_count in accelerators:

# Generate dataframe for common.list_accelerators_impl
for accelerator_name, accelerator_count in accelerators_qtys:
result.append(
common.InstanceTypeInfo(cloud='Kubernetes',
instance_type=None,
Expand All @@ -98,9 +170,13 @@ def list_accelerators(
])
df['GpuInfo'] = True

return common.list_accelerators_impl('Kubernetes', df, gpus_only,
name_filter, region_filter,
quantity_filter, case_sensitive)
# Use common.list_accelerators_impl to get InstanceTypeInfo objects used
# by sky show-gpus when cloud is not specified.
qtys_map = common.list_accelerators_impl('Kubernetes', df, gpus_only,
name_filter, region_filter,
quantity_filter, case_sensitive)

return qtys_map, total_accelerators_capacity, total_accelerators_available


def validate_region_zone(
Expand Down
Loading
Loading