diff --git a/sky/adaptors/kubernetes.py b/sky/adaptors/kubernetes.py index c6b4e8bce0b..ce6f93a8905 100644 --- a/sky/adaptors/kubernetes.py +++ b/sky/adaptors/kubernetes.py @@ -21,6 +21,7 @@ _networking_api = None _custom_objects_api = None _node_api = None +_apps_api = None # Timeout to use for API calls API_TIMEOUT = 5 @@ -108,6 +109,15 @@ def node_api(): return _node_api +def apps_api(): + global _apps_api + if _apps_api is None: + _load_config() + _apps_api = kubernetes.client.AppsV1Api() + + return _apps_api + + def api_exception(): return kubernetes.client.rest.ApiException diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 44ade8c9c5e..38bebf1e602 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -2561,6 +2561,17 @@ def check_resources_fit_cluster( f'{example_resource.zone!r},' 'but the existing cluster ' f'{zone_str}') + if (example_resource.requires_fuse and + not launched_resources.requires_fuse): + # Will not be reached for non-k8s case since the + # less_demanding_than only fails fuse requirement when + # the cloud is Kubernetes AND the cluster doesn't have fuse. + with ux_utils.print_exception_no_traceback(): + raise exceptions.ResourcesMismatchError( + 'Task requires FUSE support for mounting object ' + 'stores, but the existing cluster with ' + f'{launched_resources!r} does not support FUSE ' + f'mounting. Launch a new cluster to run this task.') requested_resource_str = ', '.join(requested_resource_list) if isinstance(task.resources, list): requested_resource_str = f'[{requested_resource_str}]' diff --git a/sky/clouds/kubernetes.py b/sky/clouds/kubernetes.py index e6b3b5bbe9e..87668944e9d 100644 --- a/sky/clouds/kubernetes.py +++ b/sky/clouds/kubernetes.py @@ -24,6 +24,11 @@ DEFAULT_KUBECONFIG_PATH = '~/.kube/config' CREDENTIAL_PATH = os.environ.get('KUBECONFIG', DEFAULT_KUBECONFIG_PATH) +# Namespace for SkyPilot resources shared across multiple tenants on the +# same cluster (even if they might be running in different namespaces). +# E.g., FUSE device manager daemonset is run in this namespace. +_SKY_SYSTEM_NAMESPACE = 'skypilot-system' + @clouds.CLOUD_REGISTRY.register class Kubernetes(clouds.Cloud): @@ -255,6 +260,8 @@ def make_deploy_resources_variables( port_mode = network_utils.get_port_mode(None) + fuse_device_required = bool(resources.requires_fuse) + deploy_vars = { 'instance_type': resources.instance_type, 'custom_resources': custom_resources, @@ -271,6 +278,9 @@ def make_deploy_resources_variables( 'k8s_acc_label_value': k8s_acc_label_value, 'k8s_ssh_jump_name': self.SKY_SSH_JUMP_NAME, 'k8s_ssh_jump_image': ssh_jump_image, + 'k8s_fuse_device_required': fuse_device_required, + # Namespace to run the FUSE device manager in + 'k8s_fuse_device_manager_namespace': _SKY_SYSTEM_NAMESPACE, 'image_id': image_id, } diff --git a/sky/provision/kubernetes/config.py b/sky/provision/kubernetes/config.py index 3649f123658..ef1926ac9ce 100644 --- a/sky/provision/kubernetes/config.py +++ b/sky/provision/kubernetes/config.py @@ -2,8 +2,11 @@ import copy import logging import math +import os from typing import Any, Dict, Union +import yaml + from sky.adaptors import kubernetes from sky.provision import common from sky.provision.kubernetes import utils as kubernetes_utils @@ -24,6 +27,9 @@ def bootstrap_instances( config = _configure_ssh_jump(namespace, config) + if config.provider_config.get('fuse_device_required', False): + _configure_fuse_mounting(config.provider_config) + if not config.provider_config.get('_operator'): # These steps are unecessary when using the Operator. _configure_autoscaler_service_account(namespace, config.provider_config) @@ -311,6 +317,73 @@ def _configure_ssh_jump(namespace, config: common.ProvisionConfig): return config +def _configure_fuse_mounting(provider_config: Dict[str, Any]) -> None: + """Creates sidecars required for FUSE mounting. + + FUSE mounting in Kubernetes without privileged containers requires us to + run a sidecar container with the necessary capabilities. We run a daemonset + which exposes the host /dev/fuse device as a Kubernetes resource. The + SkyPilot pod requests this resource to mount the FUSE filesystem. + + We create this daemonset in a common namespace, which is configurable in the + provider config. This allows the FUSE mounting sidecar to be shared across + multiple tenants. The default namespace is 'sky-system' (populated in + clouds.Kubernetes) + """ + + logger.info('_configure_fuse_mounting: Setting up FUSE device manager.') + + fuse_device_manager_namespace = provider_config.get( + 'fuse_device_manager_namespace', 'default') + kubernetes_utils.create_namespace(fuse_device_manager_namespace) + + # Read the device manager YAMLs from the manifests directory + root_dir = os.path.dirname(os.path.dirname(__file__)) + + # Load and create the ConfigMap + logger.info('_configure_fuse_mounting: Creating configmap.') + config_map_path = os.path.join( + root_dir, 'kubernetes/manifests/smarter-device-manager-configmap.yaml') + with open(config_map_path, 'r', encoding='utf-8') as file: + config_map = yaml.safe_load(file) + kubernetes_utils.merge_custom_metadata(config_map['metadata']) + try: + kubernetes.core_api().create_namespaced_config_map( + fuse_device_manager_namespace, config_map) + except kubernetes.api_exception() as e: + if e.status == 409: + logger.info('_configure_fuse_mounting: ConfigMap already exists ' + f'in namespace {fuse_device_manager_namespace!r}') + else: + raise + else: + logger.info('_configure_fuse_mounting: ConfigMap created ' + f'in namespace {fuse_device_manager_namespace!r}') + + # Load and create the DaemonSet + logger.info('_configure_fuse_mounting: Creating daemonset.') + daemonset_path = os.path.join( + root_dir, 'kubernetes/manifests/smarter-device-manager-daemonset.yaml') + with open(daemonset_path, 'r', encoding='utf-8') as file: + daemonset = yaml.safe_load(file) + kubernetes_utils.merge_custom_metadata(daemonset['metadata']) + try: + kubernetes.apps_api().create_namespaced_daemon_set( + fuse_device_manager_namespace, daemonset) + except kubernetes.api_exception() as e: + if e.status == 409: + logger.info('_configure_fuse_mounting: DaemonSet already exists ' + f'in namespace {fuse_device_manager_namespace!r}') + else: + raise + else: + logger.info('_configure_fuse_mounting: DaemonSet created ' + f'in namespace {fuse_device_manager_namespace!r}') + + logger.info('FUSE device manager setup complete ' + f'in namespace {fuse_device_manager_namespace!r}') + + def _configure_services(namespace: str, provider_config: Dict[str, Any]) -> None: service_field = 'services' diff --git a/sky/provision/kubernetes/manifests/smarter-device-manager-configmap.yaml b/sky/provision/kubernetes/manifests/smarter-device-manager-configmap.yaml new file mode 100644 index 00000000000..e3ecd0304b4 --- /dev/null +++ b/sky/provision/kubernetes/manifests/smarter-device-manager-configmap.yaml @@ -0,0 +1,10 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: smarter-device-manager + labels: + parent: skypilot +data: + conf.yaml: | + - devicematch: ^fuse$ + nummaxdevices: 1000 # Max number of simultaneous pods that can use fuse diff --git a/sky/provision/kubernetes/manifests/smarter-device-manager-daemonset.yaml b/sky/provision/kubernetes/manifests/smarter-device-manager-daemonset.yaml new file mode 100644 index 00000000000..664fd69a8c8 --- /dev/null +++ b/sky/provision/kubernetes/manifests/smarter-device-manager-daemonset.yaml @@ -0,0 +1,65 @@ +# A daemonset for the smarter-device-manager agent. Used to expose FUSE devices +# to SkyPilot pods, bypassing the need to run SkyPilot pods as privileged. +# From smarter-device-manager daemonset: https://gitlab.com/arm-research/smarter/smarter-device-manager/-/blob/master/smarter-device-manager-ds.yaml +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: smarter-device-manager + labels: + name: smarter-device-manager + role: agent + parent: skypilot +spec: + selector: + matchLabels: + name: smarter-device-manager + updateStrategy: + type: RollingUpdate + template: + metadata: + labels: + name: smarter-device-manager + parent: skypilot + annotations: + node.kubernetes.io/bootstrap-checkpoint: "true" + spec: + hostname: smarter-device-management + hostNetwork: true + dnsPolicy: ClusterFirstWithHostNet + containers: + - name: smarter-device-manager + image: us-central1-docker.pkg.dev/skypilot-375900/skypilotk8s/smarter-device-manager:v1.1.2 + imagePullPolicy: IfNotPresent + securityContext: + allowPrivilegeEscalation: false + capabilities: + drop: ["ALL"] + resources: + limits: + cpu: 100m + memory: 15Mi + requests: + cpu: 10m + memory: 15Mi + volumeMounts: + - name: device-plugin + mountPath: /var/lib/kubelet/device-plugins + - name: dev-dir + mountPath: /dev + - name: sys-dir + mountPath: /sys + - name: config + mountPath: /root/config + volumes: + - name: device-plugin + hostPath: + path: /var/lib/kubelet/device-plugins + - name: dev-dir + hostPath: + path: /dev + - name: sys-dir + hostPath: + path: /sys + - name: config + configMap: + name: smarter-device-manager diff --git a/sky/provision/kubernetes/utils.py b/sky/provision/kubernetes/utils.py index 60e9857333b..1cb31328d50 100644 --- a/sky/provision/kubernetes/utils.py +++ b/sky/provision/kubernetes/utils.py @@ -1271,3 +1271,24 @@ def check_secret_exists(secret_name: str, namespace: str) -> bool: raise else: return True + + +def create_namespace(namespace: str) -> None: + """Creates a namespace in the cluster. + + If the namespace already exists, logs a message and does nothing. + + Args: + namespace: Name of the namespace to create + """ + kubernetes_client = kubernetes.kubernetes.client + ns_metadata = dict(name=namespace, labels={'parent': 'skypilot'}) + merge_custom_metadata(ns_metadata) + namespace_obj = kubernetes_client.V1Namespace(metadata=ns_metadata) + try: + kubernetes.core_api().create_namespace(namespace_obj) + except kubernetes.api_exception() as e: + if e.status == 409: + logger.info(f'Namespace {namespace} already exists in the cluster.') + else: + raise diff --git a/sky/resources.py b/sky/resources.py index aeb37b79649..4391c2d5fb9 100644 --- a/sky/resources.py +++ b/sky/resources.py @@ -43,7 +43,7 @@ class Resources: """ # If any fields changed, increment the version. For backward compatibility, # modify the __setstate__ method to handle the old version. - _VERSION = 15 + _VERSION = 16 def __init__( self, @@ -65,6 +65,7 @@ def __init__( # pylint: disable=invalid-name _docker_login_config: Optional[docker_utils.DockerLoginConfig] = None, _is_image_managed: Optional[bool] = None, + _requires_fuse: Optional[bool] = None, ): """Initialize a Resources object. @@ -131,6 +132,11 @@ def __init__( _docker_login_config: the docker configuration to use. This include the docker username, password, and registry server. If None, skip docker login. + _requires_fuse: whether the task requires FUSE mounting support. This + is used internally by certain cloud implementations to do additional + setup for FUSE mounting. This flag also safeguards against using + FUSE mounting on existing clusters that do not support it. If None, + defaults to False. Raises: ValueError: if some attributes are invalid. @@ -200,6 +206,9 @@ def __init__( self._docker_login_config = _docker_login_config + self._requires_fuse = (_requires_fuse + if _requires_fuse is not None else False) + self._set_cpus(cpus) self._set_memory(memory) self._set_accelerators(accelerators, accelerator_args) @@ -412,6 +421,14 @@ def ports(self) -> Optional[List[str]]: def is_image_managed(self) -> Optional[bool]: return self._is_image_managed + @property + def requires_fuse(self) -> Optional[bool]: + return self._requires_fuse + + @requires_fuse.setter + def requires_fuse(self, value: Optional[bool]) -> None: + self._requires_fuse = value + def _set_cpus( self, cpus: Union[None, int, float, str], @@ -1071,6 +1088,13 @@ def less_demanding_than( if not self_ports <= other_ports: return False + if self.requires_fuse and not other.requires_fuse: + # On Kubernetes, we can't launch a task that requires FUSE on a pod + # that wasn't initialized with FUSE support at the start. + # Other clouds don't have this limitation. + if other.cloud.is_same_cloud(clouds.Kubernetes()): + return False + # self <= other return True @@ -1136,6 +1160,7 @@ def copy(self, **override) -> 'Resources': self._docker_login_config), _is_image_managed=override.pop('_is_image_managed', self._is_image_managed), + _requires_fuse=override.pop('_requires_fuse', self._requires_fuse), ) assert len(override) == 0 return resources @@ -1274,6 +1299,7 @@ def _from_yaml_config_single(cls, config: Dict[str, str]) -> 'Resources': '_docker_login_config', None) resources_fields['_is_image_managed'] = config.pop( '_is_image_managed', None) + resources_fields['_requires_fuse'] = config.pop('_requires_fuse', None) if resources_fields['cpus'] is not None: resources_fields['cpus'] = str(resources_fields['cpus']) @@ -1315,6 +1341,8 @@ def add_if_not_none(key, value): add_if_not_none('ports', self.ports) if self._is_image_managed is not None: config['_is_image_managed'] = self._is_image_managed + if self._requires_fuse is not None: + config['_requires_fuse'] = self._requires_fuse return config def __setstate__(self, state): @@ -1412,4 +1440,10 @@ def __setstate__(self, state): state['_disk_tier'] = resources_utils.DiskTier( original_disk_tier) + if version < 16: + # Kubernetes clusters launched prior to version 16 run in privileged + # mode and have FUSE support enabled by default. As a result, we + # set the default to True for backward compatibility. + state['_requires_fuse'] = state.get('_requires_fuse', True) + self.__dict__.update(state) diff --git a/sky/setup_files/MANIFEST.in b/sky/setup_files/MANIFEST.in index 5a80a5dbcf4..170dc59e7de 100644 --- a/sky/setup_files/MANIFEST.in +++ b/sky/setup_files/MANIFEST.in @@ -1,5 +1,6 @@ include sky/backends/monkey_patches/*.py exclude sky/clouds/service_catalog/data_fetchers/analyze.py +include sky/provision/kubernetes/manifests/* include sky/setup_files/* include sky/skylet/*.sh include sky/skylet/LICENSE diff --git a/sky/task.py b/sky/task.py index d66e6313f06..9350e26101a 100644 --- a/sky/task.py +++ b/sky/task.py @@ -616,6 +616,14 @@ def set_resources( resources = {resources} # TODO(woosuk): Check if the resources are None. self.resources = _with_docker_login_config(resources, self.envs) + + # Evaluate if the task requires FUSE and set the requires_fuse flag + for _, storage_obj in self.storage_mounts.items(): + if storage_obj.mode == storage_lib.StorageMode.MOUNT: + for r in self.resources: + r.requires_fuse = True + break + return self def set_resources_override(self, override_params: Dict[str, Any]) -> 'Task': @@ -805,8 +813,11 @@ def set_storage_mounts( """ if storage_mounts is None: self.storage_mounts = {} + # Clear the requires_fuse flag if no storage mounts are set. + for r in self.resources: + r.requires_fuse = False return self - for target, _ in storage_mounts.items(): + for target, storage_obj in storage_mounts.items(): # TODO(zhwu): /home/username/sky_workdir as the target path need # to be filtered out as well. if (target == constants.SKY_REMOTE_WORKDIR and @@ -824,6 +835,12 @@ def set_storage_mounts( raise ValueError( 'Storage mount destination path cannot be cloud storage' ) + + if storage_obj.mode == storage_lib.StorageMode.MOUNT: + # If any storage is using MOUNT mode, we need to enable FUSE in + # the resources. + for r in self.resources: + r.requires_fuse = True # Storage source validation is done in Storage object self.storage_mounts = storage_mounts return self diff --git a/sky/templates/kubernetes-ray.yml.j2 b/sky/templates/kubernetes-ray.yml.j2 index 893d5c8565a..73277716301 100644 --- a/sky/templates/kubernetes-ray.yml.j2 +++ b/sky/templates/kubernetes-ray.yml.j2 @@ -33,6 +33,11 @@ provider: ssh_jump_image: {{k8s_ssh_jump_image}} + # Boolean flag to indicate if the cluster requires FUSE mounting. + # Used to set up the necessary permissions and sidecars. + fuse_device_required: {{k8s_fuse_device_required}} + fuse_device_manager_namespace: {{k8s_fuse_device_manager_namespace}} + # ServiceAccount created by the autoscaler for the head node pod that it # runs in. If this field isn't provided, the head pod config below must # contain a user-created service account with the proper permissions. @@ -139,6 +144,11 @@ available_node_types: skypilot-cluster: {{cluster_name_on_cloud}} # Identifies the SSH jump pod used by this pod. Used in life cycle management of the ssh jump pod. skypilot-ssh-jump: {{k8s_ssh_jump_name}} + {% if k8s_fuse_device_required %} + annotations: + # Required for FUSE mounting to access /dev/fuse + container.apparmor.security.beta.kubernetes.io/ray-node: unconfined + {% endif %} spec: # Change this if you altered the autoscaler_service_account above # or want to provide your own. @@ -186,19 +196,32 @@ available_node_types: - name: secret-volume readOnly: true mountPath: "/etc/secret-volume" + # This volume allocates shared memory for Ray to use for its plasma + # object store. If you do not provide this, Ray will fall back to + # /tmp which cause slowdowns if is not a shared memory volume. - mountPath: /dev/shm name: dshm - - mountPath: /dev/fuse # Required for FUSE mounting - name: dev-fuse - securityContext: # Required for FUSE mounting. TODO(romilb): See if we can grant a reduced set of privileges. - privileged: true + {% if k8s_fuse_device_required %} + securityContext: + capabilities: + add: + - "SYS_ADMIN" + {% endif %} resources: requests: cpu: {{cpus}} memory: {{memory}}G nvidia.com/gpu: {{accelerator_count}} + {% if k8s_fuse_device_required %} + # Kubernetes resource exposed by the fuse device manager + # https://gitlab.com/arm-research/smarter/smarter-device-manager + smarter-devices/fuse: "1" + {% endif %} limits: nvidia.com/gpu: {{accelerator_count}} # Limits need to be defined for GPU requests + {% if k8s_fuse_device_required %} + smarter-devices/fuse: "1" + {% endif %} setup_commands: # Disable `unattended-upgrades` to prevent apt-get from hanging. It should be called at the beginning before the process started to avoid being blocked. (This is a temporary fix.)