Skip to content

Commit

Permalink
[k8s] Remove privileged container requirement to support GPU isolation (
Browse files Browse the repository at this point in the history
#3443)

* Add fuse device manager

* Add resource field for checking fuse requirement

* lint

* lint

* lint

* lint

* lint

* comments

* lint

* fix

* fix

* fix backward compat

* add comment
  • Loading branch information
romilbhardwaj authored Apr 21, 2024
1 parent 281e489 commit 118fc79
Show file tree
Hide file tree
Showing 11 changed files with 281 additions and 6 deletions.
10 changes: 10 additions & 0 deletions sky/adaptors/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
_networking_api = None
_custom_objects_api = None
_node_api = None
_apps_api = None

# Timeout to use for API calls
API_TIMEOUT = 5
Expand Down Expand Up @@ -108,6 +109,15 @@ def node_api():
return _node_api


def apps_api():
global _apps_api
if _apps_api is None:
_load_config()
_apps_api = kubernetes.client.AppsV1Api()

return _apps_api


def api_exception():
return kubernetes.client.rest.ApiException

Expand Down
11 changes: 11 additions & 0 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -2561,6 +2561,17 @@ def check_resources_fit_cluster(
f'{example_resource.zone!r},'
'but the existing cluster '
f'{zone_str}')
if (example_resource.requires_fuse and
not launched_resources.requires_fuse):
# Will not be reached for non-k8s case since the
# less_demanding_than only fails fuse requirement when
# the cloud is Kubernetes AND the cluster doesn't have fuse.
with ux_utils.print_exception_no_traceback():
raise exceptions.ResourcesMismatchError(
'Task requires FUSE support for mounting object '
'stores, but the existing cluster with '
f'{launched_resources!r} does not support FUSE '
f'mounting. Launch a new cluster to run this task.')
requested_resource_str = ', '.join(requested_resource_list)
if isinstance(task.resources, list):
requested_resource_str = f'[{requested_resource_str}]'
Expand Down
10 changes: 10 additions & 0 deletions sky/clouds/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
DEFAULT_KUBECONFIG_PATH = '~/.kube/config'
CREDENTIAL_PATH = os.environ.get('KUBECONFIG', DEFAULT_KUBECONFIG_PATH)

# Namespace for SkyPilot resources shared across multiple tenants on the
# same cluster (even if they might be running in different namespaces).
# E.g., FUSE device manager daemonset is run in this namespace.
_SKY_SYSTEM_NAMESPACE = 'skypilot-system'


@clouds.CLOUD_REGISTRY.register
class Kubernetes(clouds.Cloud):
Expand Down Expand Up @@ -255,6 +260,8 @@ def make_deploy_resources_variables(

port_mode = network_utils.get_port_mode(None)

fuse_device_required = bool(resources.requires_fuse)

deploy_vars = {
'instance_type': resources.instance_type,
'custom_resources': custom_resources,
Expand All @@ -271,6 +278,9 @@ def make_deploy_resources_variables(
'k8s_acc_label_value': k8s_acc_label_value,
'k8s_ssh_jump_name': self.SKY_SSH_JUMP_NAME,
'k8s_ssh_jump_image': ssh_jump_image,
'k8s_fuse_device_required': fuse_device_required,
# Namespace to run the FUSE device manager in
'k8s_fuse_device_manager_namespace': _SKY_SYSTEM_NAMESPACE,
'image_id': image_id,
}

Expand Down
73 changes: 73 additions & 0 deletions sky/provision/kubernetes/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
import copy
import logging
import math
import os
from typing import Any, Dict, Union

import yaml

from sky.adaptors import kubernetes
from sky.provision import common
from sky.provision.kubernetes import utils as kubernetes_utils
Expand All @@ -24,6 +27,9 @@ def bootstrap_instances(

config = _configure_ssh_jump(namespace, config)

if config.provider_config.get('fuse_device_required', False):
_configure_fuse_mounting(config.provider_config)

if not config.provider_config.get('_operator'):
# These steps are unecessary when using the Operator.
_configure_autoscaler_service_account(namespace, config.provider_config)
Expand Down Expand Up @@ -311,6 +317,73 @@ def _configure_ssh_jump(namespace, config: common.ProvisionConfig):
return config


def _configure_fuse_mounting(provider_config: Dict[str, Any]) -> None:
"""Creates sidecars required for FUSE mounting.
FUSE mounting in Kubernetes without privileged containers requires us to
run a sidecar container with the necessary capabilities. We run a daemonset
which exposes the host /dev/fuse device as a Kubernetes resource. The
SkyPilot pod requests this resource to mount the FUSE filesystem.
We create this daemonset in a common namespace, which is configurable in the
provider config. This allows the FUSE mounting sidecar to be shared across
multiple tenants. The default namespace is 'sky-system' (populated in
clouds.Kubernetes)
"""

logger.info('_configure_fuse_mounting: Setting up FUSE device manager.')

fuse_device_manager_namespace = provider_config.get(
'fuse_device_manager_namespace', 'default')
kubernetes_utils.create_namespace(fuse_device_manager_namespace)

# Read the device manager YAMLs from the manifests directory
root_dir = os.path.dirname(os.path.dirname(__file__))

# Load and create the ConfigMap
logger.info('_configure_fuse_mounting: Creating configmap.')
config_map_path = os.path.join(
root_dir, 'kubernetes/manifests/smarter-device-manager-configmap.yaml')
with open(config_map_path, 'r', encoding='utf-8') as file:
config_map = yaml.safe_load(file)
kubernetes_utils.merge_custom_metadata(config_map['metadata'])
try:
kubernetes.core_api().create_namespaced_config_map(
fuse_device_manager_namespace, config_map)
except kubernetes.api_exception() as e:
if e.status == 409:
logger.info('_configure_fuse_mounting: ConfigMap already exists '
f'in namespace {fuse_device_manager_namespace!r}')
else:
raise
else:
logger.info('_configure_fuse_mounting: ConfigMap created '
f'in namespace {fuse_device_manager_namespace!r}')

# Load and create the DaemonSet
logger.info('_configure_fuse_mounting: Creating daemonset.')
daemonset_path = os.path.join(
root_dir, 'kubernetes/manifests/smarter-device-manager-daemonset.yaml')
with open(daemonset_path, 'r', encoding='utf-8') as file:
daemonset = yaml.safe_load(file)
kubernetes_utils.merge_custom_metadata(daemonset['metadata'])
try:
kubernetes.apps_api().create_namespaced_daemon_set(
fuse_device_manager_namespace, daemonset)
except kubernetes.api_exception() as e:
if e.status == 409:
logger.info('_configure_fuse_mounting: DaemonSet already exists '
f'in namespace {fuse_device_manager_namespace!r}')
else:
raise
else:
logger.info('_configure_fuse_mounting: DaemonSet created '
f'in namespace {fuse_device_manager_namespace!r}')

logger.info('FUSE device manager setup complete '
f'in namespace {fuse_device_manager_namespace!r}')


def _configure_services(namespace: str, provider_config: Dict[str,
Any]) -> None:
service_field = 'services'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: smarter-device-manager
labels:
parent: skypilot
data:
conf.yaml: |
- devicematch: ^fuse$
nummaxdevices: 1000 # Max number of simultaneous pods that can use fuse
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# A daemonset for the smarter-device-manager agent. Used to expose FUSE devices
# to SkyPilot pods, bypassing the need to run SkyPilot pods as privileged.
# From smarter-device-manager daemonset: https://gitlab.com/arm-research/smarter/smarter-device-manager/-/blob/master/smarter-device-manager-ds.yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: smarter-device-manager
labels:
name: smarter-device-manager
role: agent
parent: skypilot
spec:
selector:
matchLabels:
name: smarter-device-manager
updateStrategy:
type: RollingUpdate
template:
metadata:
labels:
name: smarter-device-manager
parent: skypilot
annotations:
node.kubernetes.io/bootstrap-checkpoint: "true"
spec:
hostname: smarter-device-management
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
containers:
- name: smarter-device-manager
image: us-central1-docker.pkg.dev/skypilot-375900/skypilotk8s/smarter-device-manager:v1.1.2
imagePullPolicy: IfNotPresent
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop: ["ALL"]
resources:
limits:
cpu: 100m
memory: 15Mi
requests:
cpu: 10m
memory: 15Mi
volumeMounts:
- name: device-plugin
mountPath: /var/lib/kubelet/device-plugins
- name: dev-dir
mountPath: /dev
- name: sys-dir
mountPath: /sys
- name: config
mountPath: /root/config
volumes:
- name: device-plugin
hostPath:
path: /var/lib/kubelet/device-plugins
- name: dev-dir
hostPath:
path: /dev
- name: sys-dir
hostPath:
path: /sys
- name: config
configMap:
name: smarter-device-manager
21 changes: 21 additions & 0 deletions sky/provision/kubernetes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1271,3 +1271,24 @@ def check_secret_exists(secret_name: str, namespace: str) -> bool:
raise
else:
return True


def create_namespace(namespace: str) -> None:
"""Creates a namespace in the cluster.
If the namespace already exists, logs a message and does nothing.
Args:
namespace: Name of the namespace to create
"""
kubernetes_client = kubernetes.kubernetes.client
ns_metadata = dict(name=namespace, labels={'parent': 'skypilot'})
merge_custom_metadata(ns_metadata)
namespace_obj = kubernetes_client.V1Namespace(metadata=ns_metadata)
try:
kubernetes.core_api().create_namespace(namespace_obj)
except kubernetes.api_exception() as e:
if e.status == 409:
logger.info(f'Namespace {namespace} already exists in the cluster.')
else:
raise
36 changes: 35 additions & 1 deletion sky/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class Resources:
"""
# If any fields changed, increment the version. For backward compatibility,
# modify the __setstate__ method to handle the old version.
_VERSION = 15
_VERSION = 16

def __init__(
self,
Expand All @@ -65,6 +65,7 @@ def __init__(
# pylint: disable=invalid-name
_docker_login_config: Optional[docker_utils.DockerLoginConfig] = None,
_is_image_managed: Optional[bool] = None,
_requires_fuse: Optional[bool] = None,
):
"""Initialize a Resources object.
Expand Down Expand Up @@ -131,6 +132,11 @@ def __init__(
_docker_login_config: the docker configuration to use. This include
the docker username, password, and registry server. If None, skip
docker login.
_requires_fuse: whether the task requires FUSE mounting support. This
is used internally by certain cloud implementations to do additional
setup for FUSE mounting. This flag also safeguards against using
FUSE mounting on existing clusters that do not support it. If None,
defaults to False.
Raises:
ValueError: if some attributes are invalid.
Expand Down Expand Up @@ -200,6 +206,9 @@ def __init__(

self._docker_login_config = _docker_login_config

self._requires_fuse = (_requires_fuse
if _requires_fuse is not None else False)

self._set_cpus(cpus)
self._set_memory(memory)
self._set_accelerators(accelerators, accelerator_args)
Expand Down Expand Up @@ -412,6 +421,14 @@ def ports(self) -> Optional[List[str]]:
def is_image_managed(self) -> Optional[bool]:
return self._is_image_managed

@property
def requires_fuse(self) -> Optional[bool]:
return self._requires_fuse

@requires_fuse.setter
def requires_fuse(self, value: Optional[bool]) -> None:
self._requires_fuse = value

def _set_cpus(
self,
cpus: Union[None, int, float, str],
Expand Down Expand Up @@ -1071,6 +1088,13 @@ def less_demanding_than(
if not self_ports <= other_ports:
return False

if self.requires_fuse and not other.requires_fuse:
# On Kubernetes, we can't launch a task that requires FUSE on a pod
# that wasn't initialized with FUSE support at the start.
# Other clouds don't have this limitation.
if other.cloud.is_same_cloud(clouds.Kubernetes()):
return False

# self <= other
return True

Expand Down Expand Up @@ -1136,6 +1160,7 @@ def copy(self, **override) -> 'Resources':
self._docker_login_config),
_is_image_managed=override.pop('_is_image_managed',
self._is_image_managed),
_requires_fuse=override.pop('_requires_fuse', self._requires_fuse),
)
assert len(override) == 0
return resources
Expand Down Expand Up @@ -1274,6 +1299,7 @@ def _from_yaml_config_single(cls, config: Dict[str, str]) -> 'Resources':
'_docker_login_config', None)
resources_fields['_is_image_managed'] = config.pop(
'_is_image_managed', None)
resources_fields['_requires_fuse'] = config.pop('_requires_fuse', None)

if resources_fields['cpus'] is not None:
resources_fields['cpus'] = str(resources_fields['cpus'])
Expand Down Expand Up @@ -1315,6 +1341,8 @@ def add_if_not_none(key, value):
add_if_not_none('ports', self.ports)
if self._is_image_managed is not None:
config['_is_image_managed'] = self._is_image_managed
if self._requires_fuse is not None:
config['_requires_fuse'] = self._requires_fuse
return config

def __setstate__(self, state):
Expand Down Expand Up @@ -1412,4 +1440,10 @@ def __setstate__(self, state):
state['_disk_tier'] = resources_utils.DiskTier(
original_disk_tier)

if version < 16:
# Kubernetes clusters launched prior to version 16 run in privileged
# mode and have FUSE support enabled by default. As a result, we
# set the default to True for backward compatibility.
state['_requires_fuse'] = state.get('_requires_fuse', True)

self.__dict__.update(state)
1 change: 1 addition & 0 deletions sky/setup_files/MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
include sky/backends/monkey_patches/*.py
exclude sky/clouds/service_catalog/data_fetchers/analyze.py
include sky/provision/kubernetes/manifests/*
include sky/setup_files/*
include sky/skylet/*.sh
include sky/skylet/LICENSE
Expand Down
Loading

0 comments on commit 118fc79

Please sign in to comment.