diff --git a/sky/templates/kubernetes-ray.yml.j2 b/sky/templates/kubernetes-ray.yml.j2 index bd4bafd43d5..a68c01bc696 100644 --- a/sky/templates/kubernetes-ray.yml.j2 +++ b/sky/templates/kubernetes-ray.yml.j2 @@ -317,7 +317,54 @@ available_node_types: # Do not change this command - it keeps the pod alive until it is # explicitly killed. command: ["/bin/bash", "-c", "--"] - args: ['trap : TERM INT; sleep infinity & wait;'] + args: + - | + # Tails file and checks every 5 sec for + # open file handlers with write access + # closes if none exist + monitor_file() { + tail -f $file & + TAIL_PID=$! + while kill -0 $TAIL_PID 2> /dev/null; do + # only two PIDs should be accessing the file + # the log appender and log tailer + if [ $(lsof -w $file | wc -l) -lt 3 ]; then + kill $TAIL_PID + break + fi + # Sleep for 5 seconds before checking again. Do not make this + # too short as it will consume CPU, and too long will cause + # the file to be closed too late keeping the pod alive. + sleep 5 + done + } + + log_tail() { + FILE_PATTERN="~/sky_logs/*/tasks/*.log" + while ! ls $(eval echo $FILE_PATTERN) 1> /dev/null 2>&1; do + sleep 1 + done + + # Keep track of already monitored files + already_monitored="" + + # Infinite loop to continuously check for new files + while true; do + for file in $(eval echo $FILE_PATTERN); do + if echo $already_monitored | grep -q $file; then + # File is already being monitored + continue + fi + + # Monitor the new file + monitor_file $file & + already_monitored="${already_monitored} ${file}" + done + sleep 0.1 + done + } + trap : TERM INT; log_tail || sleep infinity & wait + ports: - containerPort: 22 # Used for SSH - containerPort: {{ray_port}} # Redis port @@ -365,7 +412,7 @@ setup_commands: # Line 'sudo grep ..': set the number of threads per process to unlimited to avoid ray job submit stucking issue when the number of running ray jobs increase. # Line 'mkdir -p ..': disable host key check # Line 'python3 -c ..': patch the buggy ray files and enable `-o allow_other` option for `goofys` - - sudo DEBIAN_FRONTEND=noninteractive apt install gcc patch pciutils rsync fuse curl -y; + - sudo DEBIAN_FRONTEND=noninteractive apt install lsof gcc patch pciutils rsync fuse curl -y; mkdir -p ~/.ssh; touch ~/.ssh/config; {%- for initial_setup_command in initial_setup_commands %} {{ initial_setup_command }} diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 0413447dcec..bfb5457ad03 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -2044,6 +2044,91 @@ def test_task_labels_kubernetes(): run_one_test(test) +# ---------- Container logs from task on Kubernetes ---------- +@pytest.mark.kubernetes +def test_container_logs_multinode_kubernetes(): + name = _get_cluster_name() + task_yaml = 'tests/test_yamls/test_k8s_logs.yaml' + head_logs = ('kubectl get pods ' + f' | grep {name} | grep head | ' + " awk '{print $1}' | xargs -I {} kubectl logs {}") + worker_logs = ('kubectl get pods ' + f' | grep {name} | grep worker |' + " awk '{print $1}' | xargs -I {} kubectl logs {}") + with tempfile.NamedTemporaryFile(suffix='.yaml', mode='w') as f: + test = Test( + 'container_logs_multinode_kubernetes', + [ + f'sky launch -y -c {name} {task_yaml} --num-nodes 2', + f'{head_logs} | wc -l | grep 9', + f'{worker_logs} | wc -l | grep 9', + ], + f'sky down -y {name}', + ) + run_one_test(test) + + +@pytest.mark.kubernetes +def test_container_logs_two_jobs_kubernetes(): + name = _get_cluster_name() + task_yaml = 'tests/test_yamls/test_k8s_logs.yaml' + pod_logs = ('kubectl get pods ' + f' | grep {name} | grep head |' + " awk '{print $1}' | xargs -I {} kubectl logs {}") + with tempfile.NamedTemporaryFile(suffix='.yaml', mode='w') as f: + test = Test( + 'test_container_logs_two_jobs_kubernetes', + [ + f'sky launch -y -c {name} {task_yaml}', + f'{pod_logs} | wc -l | grep 9', + f'sky launch -y -c {name} {task_yaml}', + f'{pod_logs} | wc -l | grep 18', + f'{pod_logs} | grep 1 | wc -l | grep 2', + f'{pod_logs} | grep 2 | wc -l | grep 2', + f'{pod_logs} | grep 3 | wc -l | grep 2', + f'{pod_logs} | grep 4 | wc -l | grep 2', + f'{pod_logs} | grep 5 | wc -l | grep 2', + f'{pod_logs} | grep 6 | wc -l | grep 2', + f'{pod_logs} | grep 7 | wc -l | grep 2', + f'{pod_logs} | grep 8 | wc -l | grep 2', + f'{pod_logs} | grep 9 | wc -l | grep 2', + ], + f'sky down -y {name}', + ) + run_one_test(test) + + +@pytest.mark.kubernetes +def test_container_logs_two_simultaneous_jobs_kubernetes(): + name = _get_cluster_name() + task_yaml = 'tests/test_yamls/test_k8s_logs.yaml ' + pod_logs = ('kubectl get pods ' + f' | grep {name} | grep head |' + " awk '{print $1}' | xargs -I {} kubectl logs {}") + with tempfile.NamedTemporaryFile(suffix='.yaml', mode='w') as f: + test = Test( + 'test_container_logs_two_simultaneous_jobs_kubernetes', + [ + f'sky launch -y -c {name}', + f'sky exec -c {name} -d {task_yaml}', + f'sky exec -c {name} -d {task_yaml}', + 'sleep 30', + f'{pod_logs} | wc -l | grep 18', + f'{pod_logs} | grep 1 | wc -l | grep 2', + f'{pod_logs} | grep 2 | wc -l | grep 2', + f'{pod_logs} | grep 3 | wc -l | grep 2', + f'{pod_logs} | grep 4 | wc -l | grep 2', + f'{pod_logs} | grep 5 | wc -l | grep 2', + f'{pod_logs} | grep 6 | wc -l | grep 2', + f'{pod_logs} | grep 7 | wc -l | grep 2', + f'{pod_logs} | grep 8 | wc -l | grep 2', + f'{pod_logs} | grep 9 | wc -l | grep 2', + ], + f'sky down -y {name}', + ) + run_one_test(test) + + # ---------- Task: n=2 nodes with setups. ---------- @pytest.mark.no_lambda_cloud # Lambda Cloud does not have V100 gpus @pytest.mark.no_ibm # IBM cloud currently doesn't provide public image with CUDA @@ -3140,6 +3225,7 @@ def test_kubernetes_custom_image(image_id): run_one_test(test) +@pytest.mark.azure def test_azure_start_stop_two_nodes(): name = _get_cluster_name() test = Test( diff --git a/tests/test_yamls/test_k8s_logs.yaml b/tests/test_yamls/test_k8s_logs.yaml new file mode 100644 index 00000000000..7d5ff368377 --- /dev/null +++ b/tests/test_yamls/test_k8s_logs.yaml @@ -0,0 +1,8 @@ +name: test-k8s-logs + +run: | + for i in $(seq 1 9) + do + echo "$i" + sleep 0.1 + done