Skip to content

Commit

Permalink
[Core] Optimize kubernetes cmd executions with kubernetes command run…
Browse files Browse the repository at this point in the history
…ner (#3157)

* remove job_owner

* remove some clouds.Local related code

* Remove Local cloud entirely

* remove local cloud

* fix

* slurm runner

* kubernetes runner

* Use command runner for kubernetes

* rename back to ssh

* refactor runners in backend

* fix

* fix

* fix rsync

* Fix runner

* Fix run()

* errors and fix head runner

* support different mode

* format

* use whoami instead of $USER

* timeline for run and rsync

* lazy imports for pandas and lazy data frame

* fix fetch_aws

* fix fetchers

* avoid sync script for task

* add timeline

* cache cluster_info

* format

* cache cluster info

* do not stream

* fix skip lines

* format

* avoid source bashrc or -i for internal exec

* format

* use -i

* Add None arg

* fix merge conflicts

* Fix source bashrc

* add connect_timeout

* format

* Correctly quote the script without source bashrc

* fix output

* Fix connection output

* Fix

* check twice

* add Job ID

* fix

* format

* fix ip

* fix rsync for kubectl command runner

* format

* Enable output check for kubernetes

* Fix *

* Fix comments

* longer wait

* longer wait

* Update sky/backends/cloud_vm_ray_backend.py

Co-authored-by: Tian Xia <[email protected]>

* Update sky/provision/kubernetes/instance.py

Co-authored-by: Tian Xia <[email protected]>

* address comments

* refactor rsync

* add comment

* fix interface

* Update sky/utils/command_runner.py

Co-authored-by: Tian Xia <[email protected]>

* fix quote

* Fix skip lines

* fix smoke

* format

* fix

* fix serve failures

* Fix condition

* trigger test

---------

Co-authored-by: Ubuntu <azureuser@ray-dev-zhwu-9ce1-head-e359-868f0.h4nxbv2ixrmevnfzs0oyii0g1h.bx.internal.cloudapp.net>
Co-authored-by: Tian Xia <[email protected]>
  • Loading branch information
3 people committed Aug 23, 2024
1 parent 64e2900 commit 158a62f
Show file tree
Hide file tree
Showing 9 changed files with 485 additions and 200 deletions.
21 changes: 0 additions & 21 deletions sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2654,27 +2654,6 @@ def stop_handler(signum, frame):
raise KeyboardInterrupt(exceptions.SIGTSTP_CODE)


def run_command_and_handle_ssh_failure(runner: command_runner.SSHCommandRunner,
command: str,
failure_message: str) -> str:
"""Runs command remotely and returns output with proper error handling."""
rc, stdout, stderr = runner.run(command,
require_outputs=True,
stream_logs=False)
if rc == 255:
# SSH failed
raise RuntimeError(
f'SSH with user {runner.ssh_user} and key {runner.ssh_private_key} '
f'to {runner.ip} failed. This is most likely due to incorrect '
'credentials or incorrect permissions for the key file. Check '
'your credentials and try again.')
subprocess_utils.handle_returncode(rc,
command,
failure_message,
stderr=stderr)
return stdout


def check_rsync_installed() -> None:
"""Checks if rsync is installed.
Expand Down
7 changes: 5 additions & 2 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -3646,7 +3646,10 @@ def _rsync_down(args) -> None:
try:
os.makedirs(local_log_dir, exist_ok=True)
runner.rsync(
source=f'{remote_log_dir}/*',
# Require a `/` at the end to make sure the parent dir
# are not created locally. We do not add additional '*' as
# kubernetes's rsync does not work with an ending '*'.
source=f'{remote_log_dir}/',
target=local_log_dir,
up=False,
stream_logs=False,
Expand All @@ -3655,7 +3658,7 @@ def _rsync_down(args) -> None:
if e.returncode == exceptions.RSYNC_FILE_NOT_FOUND_CODE:
# Raised by rsync_down. Remote log dir may not exist, since
# the job can be run on some part of the nodes.
logger.debug(f'{runner.ip} does not have the tasks/*.')
logger.debug(f'{runner.node_id} does not have the tasks/*.')
else:
raise

Expand Down
1 change: 1 addition & 0 deletions sky/provision/kubernetes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from sky.provision.kubernetes.config import bootstrap_instances
from sky.provision.kubernetes.instance import get_cluster_info
from sky.provision.kubernetes.instance import get_command_runners
from sky.provision.kubernetes.instance import query_instances
from sky.provision.kubernetes.instance import run_instances
from sky.provision.kubernetes.instance import stop_instances
Expand Down
196 changes: 98 additions & 98 deletions sky/provision/kubernetes/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from sky.provision import docker_utils
from sky.provision.kubernetes import config as config_lib
from sky.provision.kubernetes import utils as kubernetes_utils
from sky.utils import command_runner
from sky.utils import common_utils
from sky.utils import kubernetes_enums
from sky.utils import status_lib
Expand Down Expand Up @@ -158,6 +159,15 @@ def _raise_pod_scheduling_errors(namespace, new_nodes):
raise config_lib.KubernetesError(f'{timeout_err_msg}')


def _raise_command_running_error(message: str, command: str, pod_name: str,
rc: int, stdout: str) -> None:
if rc == 0:
return
raise config_lib.KubernetesError(
f'Failed to {message} for pod {pod_name} with return '
f'code {rc}: {command!r}\nOutput: {stdout}.')


def _wait_for_pods_to_schedule(namespace, new_nodes, timeout: int):
"""Wait for all pods to be scheduled.
Expand Down Expand Up @@ -250,39 +260,6 @@ def _wait_for_pods_to_run(namespace, new_nodes):
time.sleep(1)


def _run_command_on_pods(node_name: str,
node_namespace: str,
command: List[str],
stream_logs: bool = False):
"""Run command on Kubernetes pods.
If `stream_logs` is True, we poll for output and error messages while the
command is executing, and the stdout and stderr is written to logger.info.
When called from the provisioner, this logger.info is written to the
provision.log file (see setup_provision_logging()).
"""
cmd_output = kubernetes.stream()(
kubernetes.core_api().connect_get_namespaced_pod_exec,
node_name,
node_namespace,
command=command,
stderr=True,
stdin=False,
stdout=True,
tty=False,
_preload_content=(not stream_logs),
_request_timeout=kubernetes.API_TIMEOUT)
if stream_logs:
while cmd_output.is_open():
cmd_output.update(timeout=1)
if cmd_output.peek_stdout():
logger.info(f'{cmd_output.read_stdout().strip()}')
if cmd_output.peek_stderr():
logger.info(f'{cmd_output.read_stderr().strip()}')
cmd_output.close()
return cmd_output


def _set_env_vars_in_pods(namespace: str, new_pods: List):
"""Setting environment variables in pods.
Expand All @@ -299,42 +276,44 @@ def _set_env_vars_in_pods(namespace: str, new_pods: List):
/etc/profile.d/, making them available for all users in future
shell sessions.
"""
set_k8s_env_var_cmd = [
'/bin/sh',
'-c',
docker_utils.SETUP_ENV_VARS_CMD,
]
set_k8s_env_var_cmd = docker_utils.SETUP_ENV_VARS_CMD

for new_pod in new_pods:
_run_command_on_pods(new_pod.metadata.name, namespace,
set_k8s_env_var_cmd)
runner = command_runner.KubernetesCommandRunner(
(namespace, new_pod.metadata.name))
rc, stdout, _ = runner.run(set_k8s_env_var_cmd,
require_outputs=True,
stream_logs=False)
_raise_command_running_error('set env vars', set_k8s_env_var_cmd,
new_pod.metadata.name, rc, stdout)


def _check_user_privilege(namespace: str, new_nodes: List) -> None:
# Checks if the default user has sufficient privilege to set up
# the kubernetes instance pod.
check_k8s_user_sudo_cmd = [
'/bin/sh',
'-c',
(
'if [ $(id -u) -eq 0 ]; then'
# If user is root, create an alias for sudo used in skypilot setup
' echo \'alias sudo=""\' >> ~/.bashrc; '
'else '
' if command -v sudo >/dev/null 2>&1; then '
' timeout 2 sudo -l >/dev/null 2>&1 || '
f' ( echo {exceptions.INSUFFICIENT_PRIVILEGES_CODE!r}; ); '
' else '
f' ( echo {exceptions.INSUFFICIENT_PRIVILEGES_CODE!r}; ); '
' fi; '
'fi')
]
check_k8s_user_sudo_cmd = (
'if [ $(id -u) -eq 0 ]; then'
# If user is root, create an alias for sudo used in skypilot setup
' echo \'alias sudo=""\' >> ~/.bashrc; echo succeed;'
'else '
' if command -v sudo >/dev/null 2>&1; then '
' timeout 2 sudo -l >/dev/null 2>&1 && echo succeed || '
f' ( echo {exceptions.INSUFFICIENT_PRIVILEGES_CODE!r}; ); '
' else '
f' ( echo {exceptions.INSUFFICIENT_PRIVILEGES_CODE!r}; ); '
' fi; '
'fi')

for new_node in new_nodes:
privilege_check = _run_command_on_pods(new_node.metadata.name,
namespace,
check_k8s_user_sudo_cmd)
if privilege_check == str(exceptions.INSUFFICIENT_PRIVILEGES_CODE):
runner = command_runner.KubernetesCommandRunner(
(namespace, new_node.metadata.name))
rc, stdout, _ = runner.run(check_k8s_user_sudo_cmd,
require_outputs=True,
stream_logs=False)
_raise_command_running_error('check user privilege',
check_k8s_user_sudo_cmd,
new_node.metadata.name, rc, stdout)
if stdout == str(exceptions.INSUFFICIENT_PRIVILEGES_CODE):
raise config_lib.KubernetesError(
'Insufficient system privileges detected. '
'Ensure the default user has root access or '
Expand All @@ -345,44 +324,43 @@ def _check_user_privilege(namespace: str, new_nodes: List) -> None:
def _setup_ssh_in_pods(namespace: str, new_nodes: List) -> None:
# Setting up ssh for the pod instance. This is already setup for
# the jump pod so it does not need to be run for it.
set_k8s_ssh_cmd = [
'/bin/sh',
'-c',
(
'set -x; '
'prefix_cmd() '
'{ if [ $(id -u) -ne 0 ]; then echo "sudo"; else echo ""; fi; }; '
'export DEBIAN_FRONTEND=noninteractive;'
'$(prefix_cmd) apt-get update;'
'$(prefix_cmd) apt install openssh-server rsync -y; '
'$(prefix_cmd) mkdir -p /var/run/sshd; '
'$(prefix_cmd) '
'sed -i "s/PermitRootLogin prohibit-password/PermitRootLogin yes/" '
'/etc/ssh/sshd_config; '
'$(prefix_cmd) sed '
'"s@session\\s*required\\s*pam_loginuid.so@session optional '
'pam_loginuid.so@g" -i /etc/pam.d/sshd; '
'cd /etc/ssh/ && $(prefix_cmd) ssh-keygen -A; '
'$(prefix_cmd) mkdir -p ~/.ssh; '
'$(prefix_cmd) chown -R $(whoami) ~/.ssh;'
'$(prefix_cmd) chmod 700 ~/.ssh; '
'$(prefix_cmd) chmod 644 ~/.ssh/authorized_keys; '
'$(prefix_cmd) cat /etc/secret-volume/ssh-publickey* > '
'~/.ssh/authorized_keys; '
'$(prefix_cmd) service ssh restart; '
# Eliminate the error
# `mesg: ttyname failed: inappropriate ioctl for device`.
# See https://www.educative.io/answers/error-mesg-ttyname-failed-inappropriate-ioctl-for-device # pylint: disable=line-too-long
'$(prefix_cmd) sed -i "s/mesg n/tty -s \\&\\& mesg n/" ~/.profile;')
]
set_k8s_ssh_cmd = (
'set -ex; '
'prefix_cmd() '
'{ if [ $(id -u) -ne 0 ]; then echo "sudo"; else echo ""; fi; }; '
'export DEBIAN_FRONTEND=noninteractive;'
'$(prefix_cmd) apt-get update;'
'$(prefix_cmd) apt install openssh-server rsync -y; '
'$(prefix_cmd) mkdir -p /var/run/sshd; '
'$(prefix_cmd) '
'sed -i "s/PermitRootLogin prohibit-password/PermitRootLogin yes/" '
'/etc/ssh/sshd_config; '
'$(prefix_cmd) sed '
'"s@session\\s*required\\s*pam_loginuid.so@session optional '
'pam_loginuid.so@g" -i /etc/pam.d/sshd; '
'cd /etc/ssh/ && $(prefix_cmd) ssh-keygen -A; '
'$(prefix_cmd) mkdir -p ~/.ssh; '
'$(prefix_cmd) chown -R $(whoami) ~/.ssh;'
'$(prefix_cmd) chmod 700 ~/.ssh; '
'$(prefix_cmd) cat /etc/secret-volume/ssh-publickey* > '
'~/.ssh/authorized_keys; '
'$(prefix_cmd) chmod 644 ~/.ssh/authorized_keys; '
'$(prefix_cmd) service ssh restart; '
# Eliminate the error
# `mesg: ttyname failed: inappropriate ioctl for device`.
# See https://www.educative.io/answers/error-mesg-ttyname-failed-inappropriate-ioctl-for-device # pylint: disable=line-too-long
'$(prefix_cmd) sed -i "s/mesg n/tty -s \\&\\& mesg n/" ~/.profile;')

# TODO(romilb): Parallelize the setup of SSH in pods for multi-node clusters
for new_node in new_nodes:
pod_name = new_node.metadata.name
runner = command_runner.KubernetesCommandRunner((namespace, pod_name))
logger.info(f'{"-"*20}Start: Set up SSH in pod {pod_name!r} {"-"*20}')
_run_command_on_pods(new_node.metadata.name,
namespace,
set_k8s_ssh_cmd,
stream_logs=True)
rc, stdout, _ = runner.run(set_k8s_ssh_cmd,
require_outputs=True,
stream_logs=False)
_raise_command_running_error('setup ssh', set_k8s_ssh_cmd, pod_name, rc,
stdout)
logger.info(f'{"-"*20}End: Set up SSH in pod {pod_name!r} {"-"*20}')


Expand Down Expand Up @@ -709,11 +687,15 @@ def get_cluster_info(
assert cpu_request is not None, 'cpu_request should not be None'

ssh_user = 'sky'
get_k8s_ssh_user_cmd = ['/bin/sh', '-c', ('echo $(whoami)')]
get_k8s_ssh_user_cmd = 'echo $(whoami)'
assert head_pod_name is not None
ssh_user = _run_command_on_pods(head_pod_name, namespace,
get_k8s_ssh_user_cmd)
ssh_user = ssh_user.strip()
runner = command_runner.KubernetesCommandRunner((namespace, head_pod_name))
rc, stdout, _ = runner.run(get_k8s_ssh_user_cmd,
require_outputs=True,
stream_logs=False)
_raise_command_running_error('get ssh user', get_k8s_ssh_user_cmd,
head_pod_name, rc, stdout)
ssh_user = stdout.strip()
logger.debug(
f'Using ssh user {ssh_user} for cluster {cluster_name_on_cloud}')

Expand Down Expand Up @@ -776,3 +758,21 @@ def query_instances(
continue
cluster_status[pod.metadata.name] = pod_status
return cluster_status


def get_command_runners(
cluster_info: common.ClusterInfo,
**credentials: Dict[str, Any],
) -> List[command_runner.CommandRunner]:
"""Get a command runner for the given cluster."""
assert cluster_info.provider_config is not None, cluster_info
instances = cluster_info.instances
namespace = _get_namespace(cluster_info.provider_config)
node_list = []
if cluster_info.head_instance_id is not None:
node_list = [(namespace, cluster_info.head_instance_id)]
node_list.extend((namespace, pod_name)
for pod_name in instances.keys()
if pod_name != cluster_info.head_instance_id)
return command_runner.KubernetesCommandRunner.make_runner_list(
node_list=node_list, **credentials)
Loading

0 comments on commit 158a62f

Please sign in to comment.