Skip to content

Commit

Permalink
Asynchronous job submission and removal (dask#610)
Browse files Browse the repository at this point in the history
* Asynchronous job submission and removal

This solution is using asyncio.create_subprocess_exec

* Code style for black
  • Loading branch information
jrueb authored Aug 30, 2023
1 parent 4bbd0a0 commit ef45c0e
Show file tree
Hide file tree
Showing 10 changed files with 36 additions and 66 deletions.
30 changes: 14 additions & 16 deletions dask_jobqueue/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
import os
import re
import shlex
import subprocess
import asyncio
import sys
import weakref
import abc
import tempfile
import copy
Expand Down Expand Up @@ -403,8 +402,7 @@ def job_file(self):
yield fn

async def _submit_job(self, script_filename):
# Should we make this async friendly?
return self._call(shlex.split(self.submit_command) + [script_filename])
return await self._call(shlex.split(self.submit_command) + [script_filename])

@property
def worker_process_threads(self):
Expand All @@ -424,8 +422,6 @@ async def start(self):
out = await self._submit_job(fn)
self.job_id = self._job_id_from_submit_output(out)

weakref.finalize(self, self._close_job, self.job_id, self.cancel_command)

logger.debug("Starting job: %s", self.job_id)
await super().start()

Expand All @@ -452,18 +448,18 @@ def _job_id_from_submit_output(self, out):

async def close(self):
logger.debug("Stopping worker: %s job: %s", self.name, self.job_id)
self._close_job(self.job_id, self.cancel_command)
await self._close_job(self.job_id, self.cancel_command)

@classmethod
def _close_job(cls, job_id, cancel_command):
async def _close_job(cls, job_id, cancel_command):
if job_id:
with suppress(RuntimeError): # deleting job when job already gone
cls._call(shlex.split(cancel_command) + [job_id])
await cls._call(shlex.split(cancel_command) + [job_id])
logger.debug("Closed job %s", job_id)

@staticmethod
def _call(cmd, **kwargs):
"""Call a command using subprocess.Popen.
async def _call(cmd, **kwargs):
"""Call a command using asyncio.create_subprocess_exec.
This centralizes calls out to the command line, providing consistent
outputs, logging, and an opportunity to go asynchronous in the future.
Expand All @@ -472,7 +468,7 @@ def _call(cmd, **kwargs):
----------
cmd: List(str))
A command, each of which is a list of strings to hand to
subprocess.Popen
asyncio.create_subprocess_exec
Examples
--------
Expand All @@ -491,11 +487,14 @@ def _call(cmd, **kwargs):
"Executing the following command to command line\n{}".format(cmd_str)
)

proc = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
**kwargs
)

out, err = proc.communicate()
out, err = await proc.communicate()
out, err = out.decode(), err.decode()

if proc.returncode != 0:
Expand Down Expand Up @@ -758,7 +757,6 @@ def _get_worker_security(self, security):
for key, value in worker_security_dict.items():
# dump worker in-memory keys for use in job_script
if value is not None and "\n" in value:

try:
f = tempfile.NamedTemporaryFile(
mode="wt",
Expand Down
2 changes: 1 addition & 1 deletion dask_jobqueue/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async def _submit_job(self, script_filename):
return str(self.process.pid)

@classmethod
def _close_job(self, job_id, cancel_command):
async def _close_job(self, job_id, cancel_command):
os.kill(int(job_id), 9)
# from distributed.utils_test import terminate_process
# terminate_process(self.process)
Expand Down
1 change: 0 additions & 1 deletion dask_jobqueue/oar.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@


class OARJob(Job):

# Override class variables
submit_command = "oarsub"
cancel_command = "oardel"
Expand Down
1 change: 0 additions & 1 deletion dask_jobqueue/tests/test_htcondor.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ def test_extra_args_broken_cancel(loop):
cancel_command_extra=["-name", "wrong.docker"],
) as cluster:
with Client(cluster) as client:

cluster.scale(2)

client.wait_for_workers(2, timeout=QUEUE_WAIT)
Expand Down
42 changes: 21 additions & 21 deletions dask_jobqueue/tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,25 +432,25 @@ def test_deprecation_job_extra(Cluster):
assert "old_param" in job_script


def test_jobqueue_job_call(tmpdir, Cluster):
cluster = Cluster(cores=1, memory="1GB")

path = tmpdir.join("test.py")
path.write('print("this is the stdout")')

out = cluster.job_cls._call([sys.executable, path.strpath])
assert out == "this is the stdout\n"

path_with_error = tmpdir.join("non-zero-exit-code.py")
path_with_error.write('print("this is the stdout")\n1/0')

match = (
"Command exited with non-zero exit code.+"
"Exit code: 1.+"
"stdout:\nthis is the stdout.+"
"stderr:.+ZeroDivisionError"
)
@pytest.mark.asyncio
async def test_jobqueue_job_call(tmpdir, Cluster):
async with Cluster(cores=1, memory="1GB", asynchronous=True) as cluster:
path = tmpdir.join("test.py")
path.write('print("this is the stdout")')

out = await cluster.job_cls._call([sys.executable, path.strpath])
assert out == "this is the stdout\n"

path_with_error = tmpdir.join("non-zero-exit-code.py")
path_with_error.write('print("this is the stdout")\n1/0')

match = (
"Command exited with non-zero exit code.+"
"Exit code: 1.+"
"stdout:\nthis is the stdout.+"
"stderr:.+ZeroDivisionError"
)

match = re.compile(match, re.DOTALL)
with pytest.raises(RuntimeError, match=match):
cluster.job_cls._call([sys.executable, path_with_error.strpath])
match = re.compile(match, re.DOTALL)
with pytest.raises(RuntimeError, match=match):
await cluster.job_cls._call([sys.executable, path_with_error.strpath])
2 changes: 0 additions & 2 deletions dask_jobqueue/tests/test_jobqueue_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,6 @@ def get_interface_and_port(index=0):


def test_scheduler_options(Cluster):

interface, port = get_interface_and_port()

with Cluster(
Expand Down Expand Up @@ -324,7 +323,6 @@ def test_import_scheduler_options_from_config(Cluster):
with dask.config.set(
{"jobqueue.%s.scheduler-options" % default_config_name: scheduler_options}
):

with Cluster(cores=2, memory="2GB") as cluster:
scheduler_options = cluster.scheduler_spec["options"]
assert scheduler_options.get("interface") == config_scheduler_interface
Expand Down
8 changes: 0 additions & 8 deletions dask_jobqueue/tests/test_lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

def test_header():
with LSFCluster(walltime="00:02", processes=4, cores=8, memory="8GB") as cluster:

assert "#BSUB" in cluster.job_header
assert "#BSUB -J dask-worker" in cluster.job_header
assert "#BSUB -n 8" in cluster.job_header
Expand All @@ -35,7 +34,6 @@ def test_header():
ncpus=24,
mem=100000000000,
) as cluster:

assert "#BSUB -q general" in cluster.job_header
assert "#BSUB -J dask-worker" in cluster.job_header
assert "#BSUB -n 24" in cluster.job_header
Expand All @@ -54,7 +52,6 @@ def test_header():
ncpus=24,
mem=100000000000,
) as cluster:

assert "#BSUB -q general" in cluster.job_header
assert "#BSUB -J dask-worker" in cluster.job_header
assert "#BSUB -n 24" in cluster.job_header
Expand All @@ -65,7 +62,6 @@ def test_header():
assert '#BSUB -P "Dask On LSF"' in cluster.job_header

with LSFCluster(cores=4, memory="8GB") as cluster:

assert "#BSUB -n" in cluster.job_header
assert "#BSUB -W" in cluster.job_header
assert "#BSUB -M" in cluster.job_header
Expand All @@ -75,7 +71,6 @@ def test_header():
with LSFCluster(
cores=4, memory="8GB", job_extra_directives=["-u [email protected]"]
) as cluster:

assert "#BSUB -u [email protected]" in cluster.job_header
assert "#BSUB -n" in cluster.job_header
assert "#BSUB -W" in cluster.job_header
Expand All @@ -86,7 +81,6 @@ def test_header():

def test_job_script():
with LSFCluster(walltime="00:02", processes=4, cores=8, memory="28GB") as cluster:

job_script = cluster.job_script()
assert "#BSUB" in job_script
assert "#BSUB -J dask-worker" in job_script
Expand Down Expand Up @@ -114,7 +108,6 @@ def test_job_script():
ncpus=24,
mem=100000000000,
) as cluster:

job_script = cluster.job_script()
assert "#BSUB -q general" in cluster.job_header
assert "#BSUB -J dask-worker" in cluster.job_header
Expand All @@ -141,7 +134,6 @@ def test_job_script():
project="Dask On LSF",
job_extra_directives=["-R rusage[mem=16GB]"],
) as cluster:

job_script = cluster.job_script()

assert "#BSUB -J dask-worker" in cluster.job_header
Expand Down
10 changes: 0 additions & 10 deletions dask_jobqueue/tests/test_pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ def test_header(Cluster):
with Cluster(
walltime="00:02:00", processes=4, cores=8, memory="28GB", name="dask-worker"
) as cluster:

assert "#PBS" in cluster.job_header
assert "#PBS -N dask-worker" in cluster.job_header
assert "#PBS -l select=1:ncpus=8:mem=27GB" in cluster.job_header
Expand All @@ -34,7 +33,6 @@ def test_header(Cluster):
resource_spec="select=1:ncpus=24:mem=100GB",
memory="28GB",
) as cluster:

assert "#PBS -q regular" in cluster.job_header
assert "#PBS -N dask-worker" in cluster.job_header
assert "#PBS -l select=1:ncpus=24:mem=100GB" in cluster.job_header
Expand All @@ -43,15 +41,13 @@ def test_header(Cluster):
assert "#PBS -A DaskOnPBS" in cluster.job_header

with Cluster(cores=4, memory="8GB") as cluster:

assert "#PBS -j oe" not in cluster.job_header
assert "#PBS -N" in cluster.job_header
assert "#PBS -l walltime=" in cluster.job_header
assert "#PBS -A" not in cluster.job_header
assert "#PBS -q" not in cluster.job_header

with Cluster(cores=4, memory="8GB", job_extra_directives=["-j oe"]) as cluster:

assert "#PBS -j oe" in cluster.job_header
assert "#PBS -N" in cluster.job_header
assert "#PBS -l walltime=" in cluster.job_header
Expand All @@ -62,7 +58,6 @@ def test_header(Cluster):
@pytest.mark.parametrize("Cluster", [PBSCluster, MoabCluster])
def test_job_script(Cluster):
with Cluster(walltime="00:02:00", processes=4, cores=8, memory="28GB") as cluster:

job_script = cluster.job_script()
assert "#PBS" in job_script
assert "#PBS -N dask-worker" in job_script
Expand All @@ -88,7 +83,6 @@ def test_job_script(Cluster):
resource_spec="select=1:ncpus=24:mem=100GB",
memory="28GB",
) as cluster:

job_script = cluster.job_script()
assert "#PBS -q regular" in job_script
assert "#PBS -N dask-worker" in job_script
Expand Down Expand Up @@ -119,7 +113,6 @@ def test_basic(loop):
loop=loop,
) as cluster:
with Client(cluster) as client:

cluster.scale(2)
client.wait_for_workers(2, timeout=QUEUE_WAIT)

Expand Down Expand Up @@ -154,7 +147,6 @@ def test_scale_cores_memory(loop):
loop=loop,
) as cluster:
with Client(cluster) as client:

cluster.scale(cores=2)
client.wait_for_workers(1, timeout=QUEUE_WAIT)

Expand Down Expand Up @@ -188,7 +180,6 @@ def test_basic_scale_edge_cases(loop):
job_extra_directives=["-V"],
loop=loop,
) as cluster:

cluster.scale(2)
cluster.scale(0)

Expand Down Expand Up @@ -299,7 +290,6 @@ def test_scale_grouped(loop):
loop=loop,
) as cluster:
with Client(cluster) as client:

cluster.scale(4) # Start 2 jobs

start = time()
Expand Down
1 change: 0 additions & 1 deletion dask_jobqueue/tests/test_sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ def test_basic(loop):
walltime="00:02:00", cores=8, processes=4, memory="2GiB", loop=loop
) as cluster:
with Client(cluster, loop=loop) as client:

cluster.scale(2)

start = time()
Expand Down
5 changes: 0 additions & 5 deletions dask_jobqueue/tests/test_slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ def test_header():
with SLURMCluster(
walltime="00:02:00", processes=4, cores=8, memory="28GB"
) as cluster:

assert "#SBATCH" in cluster.job_header
assert "#SBATCH -J dask-worker" in cluster.job_header
assert "#SBATCH -n 1" in cluster.job_header
Expand All @@ -35,7 +34,6 @@ def test_header():
job_cpu=16,
job_mem="100G",
) as cluster:

assert "#SBATCH --cpus-per-task=16" in cluster.job_header
assert "#SBATCH --cpus-per-task=8" not in cluster.job_header
assert "#SBATCH --mem=100G" in cluster.job_header
Expand All @@ -44,7 +42,6 @@ def test_header():
assert "#SBATCH -p regular" in cluster.job_header

with SLURMCluster(cores=4, memory="8GB") as cluster:

assert "#SBATCH" in cluster.job_header
assert "#SBATCH -J " in cluster.job_header
assert "#SBATCH -n 1" in cluster.job_header
Expand All @@ -57,7 +54,6 @@ def test_job_script():
with SLURMCluster(
walltime="00:02:00", processes=4, cores=8, memory="28GB"
) as cluster:

job_script = cluster.job_script()
assert "#SBATCH" in job_script
assert "#SBATCH -J dask-worker" in job_script
Expand Down Expand Up @@ -127,7 +123,6 @@ def test_basic(loop):
loop=loop,
) as cluster:
with Client(cluster) as client:

cluster.scale(2)

start = time()
Expand Down

0 comments on commit ef45c0e

Please sign in to comment.