From e77db37a4869bff6c81ce5331983fc435a9a70a2 Mon Sep 17 00:00:00 2001 From: liddle rain Date: Tue, 31 Oct 2023 01:17:57 -0700 Subject: [PATCH] FEAT: Slurm Deployment For Xorbits (#719) Signed-off-by: liddle rain <57928993+fengsxy@users.noreply.github.com> Signed-off-by: liddle rain Co-authored-by: ahs Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> Co-authored-by: Aprilies <767760161@qq.com> Co-authored-by: liddle_rain --- .github/workflows/python.yaml | 25 ++ .gitignore | 5 +- CI/slurm/Dockerfile | 2 + CI/slurm/docker-compose.yml | 120 +++++++++ CI/slurm/register_cluster.sh | 5 + CI/slurm/slurm.conf | 98 +++++++ CI/slurm/slurm.sh | 60 +++++ CI/slurm/start-slurm.sh | 29 +++ doc/source/user_guide/deployment_slurm.rst | 121 ++++++++- python/setup.cfg | 1 + python/xorbits/deploy/slurm/__init__.py | 15 ++ python/xorbits/deploy/slurm/slurm.py | 241 ++++++++++++++++++ python/xorbits/deploy/slurm/tests/__init__.py | 13 + .../xorbits/deploy/slurm/tests/test_slurm.py | 80 ++++++ 14 files changed, 811 insertions(+), 4 deletions(-) create mode 100644 CI/slurm/Dockerfile create mode 100644 CI/slurm/docker-compose.yml create mode 100755 CI/slurm/register_cluster.sh create mode 100644 CI/slurm/slurm.conf create mode 100644 CI/slurm/slurm.sh create mode 100755 CI/slurm/start-slurm.sh create mode 100644 python/xorbits/deploy/slurm/__init__.py create mode 100644 python/xorbits/deploy/slurm/slurm.py create mode 100644 python/xorbits/deploy/slurm/tests/__init__.py create mode 100644 python/xorbits/deploy/slurm/tests/test_slurm.py diff --git a/.github/workflows/python.yaml b/.github/workflows/python.yaml index a508a6f9a..feb75e3d0 100644 --- a/.github/workflows/python.yaml +++ b/.github/workflows/python.yaml @@ -96,6 +96,7 @@ jobs: - { os: self-hosted, module: gpu, python-version: 3.9} - { os: ubuntu-latest, module: jax, python-version: 3.9 } - { os: juicefs-ci, module: kubernetes-juicefs, python-version: 3.9 } + - { os: ubuntu-latest, module: slurm, python-version: 3.9 } - { os: ubuntu-latest, module: datasets, python-version: 3.9 } steps: - name: Check out code @@ -247,6 +248,18 @@ jobs: python setup.py build_ext -i working-directory: ./python + - name: Slurm Setup Job queuing system + if: ${{ matrix.module == 'slurm' }} + run: | + source CI/slurm/${{ matrix.module }}.sh + jobqueue_before_install + + - name: Slurm Install xorbits + if: ${{ matrix.module == 'slurm' }} + run: | + source CI/slurm/${{ matrix.module }}.sh + jobqueue_install + - name: Install on GPU if: ${{ matrix.module == 'gpu' }} run: | @@ -285,6 +298,11 @@ jobs: pytest --ignore xorbits/_mars/ --timeout=1500 \ -W ignore::PendingDeprecationWarning \ --cov-config=setup.cfg --cov-report=xml --cov=xorbits xorbits/deploy/kubernetes/external_storage/juicefs + elif [[ "$MODULE" == "slurm" ]]; then + docker exec c1 /bin/bash -c "pip install xorbits" + docker exec c2 /bin/bash -c "pip install xorbits" + docker exec slurmctld /bin/bash -c \ + "pytest /xorbits/python/xorbits/deploy/slurm/tests/test_slurm.py " elif [[ "$MODULE" == "hadoop" ]]; then export WITH_HADOOP="1" export HADOOP_HOME="/usr/local/hadoop" @@ -376,6 +394,13 @@ jobs: fi working-directory: ./python + + - name: Cleanup on slurm + if: ${{ matrix.module == 'slurm' }} + run: | + source CI/slurm/${{ matrix.module }}.sh + jobqueue_after_script + - name: Report coverage data uses: codecov/codecov-action@v3 with: diff --git a/.gitignore b/.gitignore index 602cc2a19..a27ce3f69 100644 --- a/.gitignore +++ b/.gitignore @@ -151,4 +151,7 @@ doc/source/savefig/ asv/results -.DS_Store \ No newline at end of file +.DS_Store + +#slrm.sh generated sh +python/xorbits/deploy/slurm/tests/slurm.sh diff --git a/CI/slurm/Dockerfile b/CI/slurm/Dockerfile new file mode 100644 index 000000000..1a57e7ccf --- /dev/null +++ b/CI/slurm/Dockerfile @@ -0,0 +1,2 @@ +FROM daskdev/dask-jobqueue:slurm +RUN pip install xorbits diff --git a/CI/slurm/docker-compose.yml b/CI/slurm/docker-compose.yml new file mode 100644 index 000000000..088f0e9e9 --- /dev/null +++ b/CI/slurm/docker-compose.yml @@ -0,0 +1,120 @@ +version: "2.2" + +services: + mysql: + image: mysql:5.7.29 + hostname: mysql + container_name: mysql + environment: + MYSQL_RANDOM_ROOT_PASSWORD: "yes" + MYSQL_DATABASE: slurm_acct_db + MYSQL_USER: slurm + MYSQL_PASSWORD: password + volumes: + - var_lib_mysql:/var/lib/mysql + networks: + common-network: + + slurmdbd: + image: daskdev/dask-jobqueue:slurm + build: . + command: ["slurmdbd"] + container_name: slurmdbd + hostname: slurmdbd + volumes: + - etc_munge:/etc/munge + - etc_slurm:/etc/slurm + - var_log_slurm:/var/log/slurm + expose: + - "6819" + depends_on: + - mysql + networks: + common-network: + + slurmctld: + image: daskdev/dask-jobqueue:slurm + build: . + command: ["slurmctld"] + container_name: slurmctld + hostname: slurmctld + environment: + - CI_SHARED_SPACE=/shared_space + volumes: + - etc_munge:/etc/munge + - etc_slurm:/etc/slurm + - slurm_jobdir:/data + - var_log_slurm:/var/log/slurm + - ../..:/xorbits + - shared_space:/shared_space + expose: + - "6817" + depends_on: + - "slurmdbd" + networks: + common-network: + ipv4_address: 10.1.1.10 + cap_add: + - NET_ADMIN + + c1: + image: daskdev/dask-jobqueue:slurm + build: . + command: ["slurmd"] + hostname: c1 + container_name: c1 + volumes: + - etc_munge:/etc/munge + - etc_slurm:/etc/slurm + - slurm_jobdir:/data + - var_log_slurm:/var/log/slurm + - ../..:/xorbits + - shared_space:/shared_space + expose: + - "6818" + depends_on: + - "slurmctld" + networks: + common-network: + ipv4_address: 10.1.1.11 + cap_add: + - NET_ADMIN + + c2: + image: daskdev/dask-jobqueue:slurm + build: . + command: ["slurmd"] + hostname: c2 + container_name: c2 + volumes: + - etc_munge:/etc/munge + - etc_slurm:/etc/slurm + - slurm_jobdir:/data + - var_log_slurm:/var/log/slurm + - ../..:/xorbits + - shared_space:/shared_space + expose: + - "6818" + depends_on: + - "slurmctld" + networks: + common-network: + ipv4_address: 10.1.1.12 + cap_add: + - NET_ADMIN + +volumes: + etc_munge: + etc_slurm: + slurm_jobdir: + var_lib_mysql: + var_log_slurm: + shared_space: + +networks: + common-network: + driver: bridge + ipam: + driver: default + config: + - subnet: 10.1.1.0/24 diff --git a/CI/slurm/register_cluster.sh b/CI/slurm/register_cluster.sh new file mode 100755 index 000000000..ef3d4d0fb --- /dev/null +++ b/CI/slurm/register_cluster.sh @@ -0,0 +1,5 @@ +#!/bin/bash +set -e + +docker exec slurmctld bash -c "/usr/bin/sacctmgr --immediate add cluster name=linux" && \ +docker-compose restart slurmdbd slurmctld diff --git a/CI/slurm/slurm.conf b/CI/slurm/slurm.conf new file mode 100644 index 000000000..6f1fa21a5 --- /dev/null +++ b/CI/slurm/slurm.conf @@ -0,0 +1,98 @@ +# slurm.conf +# +# See the slurm.conf man page for more information. +# +ClusterName=linux +ControlMachine=slurmctld +ControlAddr=slurmctld +#BackupController= +#BackupAddr= +# +SlurmUser=slurm +#SlurmdUser=root +SlurmctldPort=6817 +SlurmdPort=6818 +AuthType=auth/munge +#JobCredentialPrivateKey= +#JobCredentialPublicCertificate= +StateSaveLocation=/var/lib/slurmd +SlurmdSpoolDir=/var/spool/slurmd +SwitchType=switch/none +MpiDefault=none +SlurmctldPidFile=/var/run/slurmd/slurmctld.pid +SlurmdPidFile=/var/run/slurmd/slurmd.pid +ProctrackType=proctrack/linuxproc +#PluginDir= +CacheGroups=0 +#FirstJobId= +ReturnToService=0 +#MaxJobCount= +#PlugStackConfig= +#PropagatePrioProcess= +#PropagateResourceLimits= +#PropagateResourceLimitsExcept= +#Prolog= +#Epilog= +#SrunProlog= +#SrunEpilog= +#TaskProlog= +#TaskEpilog= +#TaskPlugin= +#TrackWCKey=no +#TreeWidth=50 +#TmpFS= +#UsePAM= +# +# TIMERS +SlurmctldTimeout=300 +SlurmdTimeout=300 +InactiveLimit=0 +MinJobAge=300 +KillWait=300 +Waittime=30 +#change this avoids low resource kill the process +#**log** +#srun: Job step aborted: Waiting up to 32 seconds for job step to finish. +#slurmstepd: error: *** STEP 27.0 ON c1 CANCELLED AT 2023-09-25T06:30:54 *** + +# SCHEDULING +SchedulerType=sched/backfill +#SchedulerAuth= +#SchedulerPort= +#SchedulerRootFilter= +SelectType=select/cons_res +SelectTypeParameters=CR_CPU_Memory +FastSchedule=1 +#PriorityType=priority/multifactor +#PriorityDecayHalfLife=14-0 +#PriorityUsageResetPeriod=14-0 +#PriorityWeightFairshare=100000 +#PriorityWeightAge=1000 +#PriorityWeightPartition=10000 +#PriorityWeightJobSize=1000 +#PriorityMaxAge=1-0 +# +# LOGGING +SlurmctldDebug=3 +SlurmctldLogFile=/var/log/slurm/slurmctld.log +SlurmdDebug=3 +SlurmdLogFile=/var/log/slurm/slurmd.log +JobCompType=jobcomp/filetxt +JobCompLoc=/var/log/slurm/jobcomp.log +# +# ACCOUNTING +JobAcctGatherType=jobacct_gather/linux +JobAcctGatherFrequency=30 +# +AccountingStorageType=accounting_storage/slurmdbd +AccountingStorageHost=slurmdbd +AccountingStoragePort=6819 +AccountingStorageLoc=slurm_acct_db +#AccountingStoragePass= +#AccountingStorageUser= +# +# COMPUTE NODES +NodeName=c[1-2] RealMemory=4096 CPUs=2 State=UNKNOWN +# +# PARTITIONS +PartitionName=normal Default=yes Nodes=c[1-2] Priority=50 DefMemPerCPU=2048 Shared=NO MaxNodes=2 MaxTime=5-00:00:00 DefaultTime=5-00:00:00 State=UP diff --git a/CI/slurm/slurm.sh b/CI/slurm/slurm.sh new file mode 100644 index 000000000..7386f6fa8 --- /dev/null +++ b/CI/slurm/slurm.sh @@ -0,0 +1,60 @@ +#!/usr/bin/env bash +# Copyright 2022-2023 XProbe Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +function jobqueue_before_install { + docker version + docker-compose version + + # start slurm cluster + cd ./CI/slurm + docker-compose pull + ./start-slurm.sh + cd - + + #Set shared space permissions + docker exec slurmctld /bin/bash -c "chmod -R 777 /shared_space" + + docker ps -a + docker images + show_network_interfaces +} + +function show_network_interfaces { + for c in slurmctld c1 c2; do + echo '------------------------------------------------------------' + echo docker container: $c + docker exec $c python -c 'import psutil; print(psutil.net_if_addrs().keys())' + echo '------------------------------------------------------------' + done +} + +function jobqueue_install { + docker exec slurmctld /bin/bash -c "cd xorbits/python/; pip install -e ." +} + +function jobqueue_script { + docker exec c1 /bin/bash -c "pip install xorbits" + docker exec c2 /bin/bash -c "pip install xorbits" + docker exec slurmctld /bin/bash -c \ + "pytest --ignore xorbits/_mars/ --timeout=1500 \ + -W ignore::PendingDeprecationWarning \ + --cov-config=setup.cfg --cov-report=xml --cov=xorbits xorbits/deploy/slurm" +} + +function jobqueue_after_script { + docker exec slurmctld bash -c 'sinfo' + docker exec slurmctld bash -c 'squeue' + docker exec slurmctld bash -c 'sacct -l' +} \ No newline at end of file diff --git a/CI/slurm/start-slurm.sh b/CI/slurm/start-slurm.sh new file mode 100755 index 000000000..f1936b58a --- /dev/null +++ b/CI/slurm/start-slurm.sh @@ -0,0 +1,29 @@ +#!/bin/bash +# Copyright 2022-2023 XProbe Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +docker-compose up -d --no-build + +while [ `./register_cluster.sh 2>&1 | grep "sacctmgr: error" | wc -l` -ne 0 ] + do + echo "Waiting for SLURM cluster to become ready"; + sleep 2 + done +echo "SLURM properly configured" + +# On some clusters the login node does not have the same interface as the +# compute nodes. The next three lines allow to test this edge case by adding +# separate interfaces on the worker and on the scheduler nodes. +docker exec slurmctld ip addr add 10.1.1.20/24 dev eth0 label eth0:scheduler +docker exec c1 ip addr add 10.1.1.21/24 dev eth0 label eth0:worker +docker exec c2 ip addr add 10.1.1.22/24 dev eth0 label eth0:worker diff --git a/doc/source/user_guide/deployment_slurm.rst b/doc/source/user_guide/deployment_slurm.rst index 47aa97b41..491c0e22c 100644 --- a/doc/source/user_guide/deployment_slurm.rst +++ b/doc/source/user_guide/deployment_slurm.rst @@ -1,8 +1,9 @@ .. _deployment_slurm: -================== + SLURM deployment -================== +============= + If you have access to a SLURM cluster, you can refer to the following guide to run an Xorbits job. Other HPC job schedulers like Torque or LSF are similar. You are recommended to read the :ref:`cluster deployment ` first to know some basic knowledge of a Xorbits cluster. @@ -33,6 +34,10 @@ The below walkthrough will do the following: 6. After the underlying Xorbits cluster is ready, submit the user-specified task. + +Script Method +-------------- + SLURM script file ~~~~~~~~~~~~~~~~~ @@ -168,7 +173,7 @@ Name this SLURM script file as ``xorbits_slurm.sh``. Submit the job via: Put all together ----------------- +~~~~~~~~~~~~~~~~~~~~~~ The SLURM script looks like this: @@ -219,3 +224,113 @@ The SLURM script looks like this: python -u test.py --endpoint "${address}" + +Code Method +----------- + + +Initialization +~~~~~~~~~~~~~~ + +To create an instance of the `SLURMCluster` class, you can use the following parameters: + + - `job_name` (str, optional): Name of the Slurm job. + - `num_nodes` (int, optional): Number of nodes in the Slurm cluster. + - `partition_option` (str, optional): Request a specific partition for resource allocation. + - `load_env` (str, optional): Conda Environment to load. + - `output_path` (str, optional): Path for log output. + - `error_path` (str, optional): Path for log errors. + - `work_dir` (str, optional): Slurm's working directory, the default location for logs and results. + - `time` (str, optional): Minimum time limit for job allocation. + - `processes` (int, optional): Number of processes. + - `cores` (int, optional): Number of cores. + - `memory` (str, optional): Specify the real memory required per node. Default units are megabytes. + - `account` (str, optional): Charge resources used by this job to the specified account. + - `webport` (int, optional): Xorbits' web port. + - `**kwargs`: Additional parameters that can be added using the Slurm interface. + + +.. code-block:: python + + from xorbits.deploy.slurm import SLURMCluster + cluster = SLURMCluster( + job_name="my_job", + num_nodes=4, + partition_option="compute", + load_env="my_env", + output_path="logs/output.log", + error_path="logs/error.log", + work_dir="/path/to/work_dir", + time="1:00:00", + processes=8, + cores=2, + memory="8G", + account="my_account", + webport=16379, + custom_param1="value1", + custom_param2="value2" + ) + + +.. note:: + Modify the parameters as needed for your specific use case. + +Running the Job +~~~~~~~~~~~~~~~ + +To submit the job to SLURM, use the `run()` method. It will return the job's address. + +.. code-block:: python + + address = cluster.run() + +Getting Job Information +~~~~~~~~~~~~~~~~~~~~~~~~ + + +- `get_job_id()`: This method extracts the job ID from the output of the `sbatch` command. + +.. code-block:: python + + job_id = cluster.get_job_id() + +- `cancel_job()`: This method cancels the job using the `scancel` command. A hook is designed so that while canceling the program, the Slurm task will also be canceled. + +.. code-block:: python + + cluster.cancel_job(job_id) + +- `update_head_node()`: This method retrieves the head node information from the SLURM job. + +.. code-block:: python + + cluster.update_head_node() + +- `get_job_address(retry_attempts=10, sleep_interval=30)`: This method retrieves the job address after deployment. It retries several times to get the job data. + +.. code-block:: python + + job_address = cluster.get_job_address(retry_attempts=10, sleep_interval=30) + + +Example +~~~~~~~ + + +Here's an example of how to use the `SLURMCluster` class + +.. code-block:: python + + import pandas as pd + from xorbits.deploy.slurm import SLURMCluster + + test_cluster = SLURMCluster( + job_name="xorbits", + num_nodes=2, + output_path="/shared_space/output.out", + time="00:30:00", + ) + address = test_cluster.run() + xorbits.init(address) + assert pd.Series([1, 2, 3]).sum() == 6 + diff --git a/python/setup.cfg b/python/setup.cfg index ec6576c65..fb7755738 100644 --- a/python/setup.cfg +++ b/python/setup.cfg @@ -113,6 +113,7 @@ omit = xorbits/_version.py *.pxd */tests/* + xorbits/deploy/slurm/slurm.py xorbits/deploy/kubernetes/core.py xorbits/deploy/kubernetes/supervisor.py xorbits/deploy/kubernetes/worker.py diff --git a/python/xorbits/deploy/slurm/__init__.py b/python/xorbits/deploy/slurm/__init__.py new file mode 100644 index 000000000..13bda3f82 --- /dev/null +++ b/python/xorbits/deploy/slurm/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2022-2023 XProbe Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .slurm import SLURMCluster diff --git a/python/xorbits/deploy/slurm/slurm.py b/python/xorbits/deploy/slurm/slurm.py new file mode 100644 index 000000000..26c185d0c --- /dev/null +++ b/python/xorbits/deploy/slurm/slurm.py @@ -0,0 +1,241 @@ +# Copyright 2022-2023 XProbe Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import atexit +import logging +import os +import re +import subprocess +import time + +# Configure logging +logger = logging.getLogger(__name__) + + +class SLURMCluster: + def __init__( + self, + job_name=None, + num_nodes=None, + partition_option=None, + load_env=None, + output_path=None, + error_path=None, + work_dir=None, + time=None, + processes=None, + cores=None, + memory=None, + account=None, + webport=16379, + **kwargs, + ): + """ + The entrance of deploying a SLURM cluster. + + Parameters + ---------- + job_name : str, optional + Name of the Slurm job, by default None + num_nodes : int, optional + Number of nodes in the Slurm cluster, by default None + partition_option : str, optional + Request a specific partition for the resource allocation, by default None + load_env : str, optional + Conda Environment to load, by default None + output_path : str, optional + Path for Log output, by default None + error_path : str, optional + Path for Log error, by default None + work_dir : str, optional + Slurm‘s Working directory,the default place to receive the logs and result, by default None + time : str, optional + Minimum time limit on the job allocation, by default None + processes : int, optional + Number of processes, by default None + cores : int, optional + Number of cores, by default None + memory : str, optional + Specify the real memory required per node. Default units are megabytes, by default None + account : str, optional + Charge resources used by this job to specified account, by default None + webport : int, optional + Xorbits' Web port, by default 16379 + If user have some specifics needing for can just follow the slurm interface we add it at the end automatically + """ + commands = ["#!/bin/bash"] + + self.job_name = job_name + self.num_nodes = num_nodes + self.partition_option = partition_option + self.output_path = output_path + self.work_dir = work_dir + self.time = time + self.processes = processes + self.cores = cores + self.memory = memory + self.load_env = load_env + self.error_path = error_path + self.account = account + slurm_params = { + "J": self.job_name, + "nodes": self.num_nodes, + "partition": self.partition_option, + "error": self.error_path, + "output": self.output_path, + "chdir": self.work_dir, + "time": self.time, + "ntasks": self.processes, + "cpus-per-task": self.cores, + "mem": self.memory, + "A": self.account, + **kwargs, + } + self.commands = None + self.web_port = webport + for param, value in slurm_params.items(): + if value is not None: + # there are two modes of sbatch, one is like --time, the other one is like -A,so i just judge it by using len + if len(str(param)) > 1: + commands.append(f"#SBATCH --{param}={value}") + else: + commands.append(f"#SBATCH -{param} {value}") + + if self.load_env: + commands.append(f"source activate {self.load_env}") + + commands += [ + "set -x", + 'nodes=$(scontrol show hostnames "$SLURM_JOB_NODELIST")', + "nodes_array=($nodes)", + "head_node=${nodes_array[0]}", + "port=16380", + f"web_port={self.web_port}", + 'echo "Starting SUPERVISOR at ${head_node}"', + 'srun --nodes=1 --ntasks=1 -w "${head_node}" \\', + ' xorbits-supervisor -H "${head_node}" -p "${port}" -w "${web_port}"&', + "sleep 30", + "worker_num=$((SLURM_JOB_NUM_NODES - 1))", + "for ((i = 1; i <= worker_num; i++)); do", + " node_i=${nodes_array[$i]}", + " port_i=$((port + i))", + ' echo "Starting WORKER $i at ${node_i}"', + ' srun --nodes=1 --ntasks=1 -w "${node_i}" \\', + ' xorbits-worker -H "${node_i}" -p "${port_i}" -s "${head_node}":"${port}"&', + "done", + "sleep 300", + 'address=http://"${head_node}":"${web_port}"', + ] + # here I give a very long sleep time to avoid when supervisor nodes don't start, and the other node can't find the supervisor node + self.commands = "\n".join(commands) + self.sbatch_out = "" + + def run(self): + shell_commands = self.commands + with open("slurm.sh", "w") as f: + f.write(shell_commands) + + os.chmod("slurm.sh", 0o770) + + result = subprocess.run(["sbatch", "slurm.sh"], capture_output=True, text=True) + + if result.returncode == 0: + logger.info("Job submitted successfully.") + self.sbatch_out = result.stdout + self.job_id = self.get_job_id() + if self.job_id: + logger.info(f"Job ID is {self.job_id}.") + atexit.register(self.cancel_job) + else: + logger.error("Could not get job ID. Cleanup upon exit is not possible.") + return self.get_job_address() + else: + logger.error("Job submission failed.") + logger.error("Output: {}".format(result.stdout)) + logger.error("Errors: {}".format(result.stderr)) + + return None + + def get_job_id(self): + sbatch_output = self.sbatch_out + job_id = None + for line in sbatch_output.split("\n"): + if "Submitted batch job" in line: + job_id = line.split(" ")[-1] + return job_id + + def get_sbatch_out(self): + logging.info(f"getting batch_out:{self.sbatch_out}") + return self.sbatch_out + + def cancel_job(self): + if self.job_id: + logger.info(f"Cancelling job {self.job_id}") + subprocess.run(["scancel", self.job_id]) + + def update_head_node(self): + try: + if self.job_id: + time.sleep(5) + command = ["scontrol", "show", "job", self.job_id] + result = subprocess.run(command, capture_output=True, text=True) + node_list = None + if result.returncode == 0: + job_info = result.stdout + node_list_pattern = r"NodeList=(c\[\d+-\d+\]|c\d)" + matches = re.search(node_list_pattern, job_info) + + if matches: + node_list = matches.group(1) + logger.info(f"NodeList:{node_list}") + if node_list is None: + raise ValueError( + f"Job {self.job_id} not found or NodeList information not available." + ) + # get_head_node from nodelist + if "[" in node_list: + head_node = node_list.split("-")[0].replace("[", "") + else: + # when only one node + head_node = node_list + + self.head_node = head_node + return head_node + else: + logger.warning("NodeList not found in the string.") + + except subprocess.CalledProcessError as e: + logger.error(f"Error executing scontrol: {e}") + except ValueError as e: + logger.error(f"Error: {e}") + except Exception as e: + logger.error(f"An unexpected error occurred: {e}") + + self.head_node = None + logger.warning("Failed to retrieve head node.") + + def get_job_address(self, retry_attempts=10, sleep_interval=30): + # We retry several times to get job data + for attempt in range(retry_attempts): + try: + self.update_head_node() + if self.head_node is not None: + address = f"http://{self.head_node}:{self.web_port}" + return address + else: + logger.warning( + f"Attempt {attempt + 1} failed, retrying after {sleep_interval}s..." + ) + time.sleep(sleep_interval) + except Exception as e: + logger.error(str(e)) diff --git a/python/xorbits/deploy/slurm/tests/__init__.py b/python/xorbits/deploy/slurm/tests/__init__.py new file mode 100644 index 000000000..37f6558d9 --- /dev/null +++ b/python/xorbits/deploy/slurm/tests/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-2023 XProbe Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/python/xorbits/deploy/slurm/tests/test_slurm.py b/python/xorbits/deploy/slurm/tests/test_slurm.py new file mode 100644 index 000000000..51eb791b9 --- /dev/null +++ b/python/xorbits/deploy/slurm/tests/test_slurm.py @@ -0,0 +1,80 @@ +# Copyright 2022-2023 XProbe Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from distutils.spawn import find_executable + +import pytest + +from .... import init +from .... import pandas as pd +from .. import SLURMCluster + +slurm_available = find_executable("sbatch") is not None + + +def test_header_core_process_memory(): + cluster = SLURMCluster(time="00:02:00", processes=4, cores=8, memory="28G") + assert "#SBATCH" in cluster.commands + assert "#SBATCH --cpus-per-task=8" in cluster.commands + assert "#SBATCH --mem=28G" in cluster.commands + assert "#SBATCH --time=00:02:00" in cluster.commands + assert "#SBATCH -A" not in cluster.commands + + +def test_header_partition_account(): + cluster = SLURMCluster( + partition_option="regular", + account="XorbitsOnSlurm", + processes=4, + cores=8, + memory="28G", + ) + assert "#SBATCH --cpus-per-task=8" in cluster.commands + assert "#SBATCH --mem=28G" in cluster.commands + assert "#SBATCH -A XorbitsOnSlurm" in cluster.commands + assert "#SBATCH --partition=regular" in cluster.commands + + +def test_header_work_outputdir_web(): + # Test additional parameters + cluster = SLURMCluster( + job_name="my_job", + num_nodes=10, + output_path="/path/to/output", + work_dir="/path/to/work", + error_path="/path/to/error", + webport=8080, + load_env="xorbits", + ) + assert "#SBATCH -J my_job" in cluster.commands + assert "#SBATCH --nodes=10" in cluster.commands + assert "#SBATCH --output=/path/to/output" in cluster.commands + assert "#SBATCH --chdir=/path/to/work" in cluster.commands + assert "#SBATCH --error=/path/to/error" in cluster.commands + assert "web_port=8080" in cluster.commands + assert "source activate xorbits" in cluster.commands + + +# Construct slurm in a docker environment, so this test could only be exec when there is sbatch command supported +@pytest.mark.skipif(not slurm_available, reason="Cannot run without slurm cluster") +def test_jobscript(): + exp = SLURMCluster( + job_name="xorbits", + num_nodes=2, + output_path="/shared_space/output.out", + time="00:30:00", + ) + address = exp.run() + assert address == "http://c1:16379" + init(address) + assert repr(pd.Series([1, 2, 3]).sum()) == "6"