Skip to content

Commit

Permalink
Merge pull request dask#10 from guillaumeeb/7_pbs_cluster_init
Browse files Browse the repository at this point in the history
[WIP] Issue 7 : pbs cluster init, class inheritance refactoring
  • Loading branch information
Joe Hamman authored Mar 27, 2018
2 parents 9dbdf4b + b0cae6d commit 3ea2dad
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 150 deletions.
118 changes: 111 additions & 7 deletions dask_jobqueue/core.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,134 @@
from contextlib import contextmanager
import logging
import subprocess
import toolz
import socket
import os
import sys
import docrep

from distributed.utils import tmpfile, ignoring
from distributed.utils import tmpfile, ignoring, get_ip_interface, parse_bytes
from distributed import LocalCluster

dirname = os.path.dirname(sys.executable)

logger = logging.getLogger(__name__)
docstrings = docrep.DocstringProcessor()


@docstrings.get_sectionsf('JobQueueCluster')
class JobQueueCluster(object):
""" Base class to launch Dask Clusters for Job queues
This class should not be used directly, use inherited class appropriate
for your queueing system (e.g. PBScluster or SLURMCluster)
Parameters
----------
name : str
Name of Dask workers.
threads : int
Number of threads per process.
processes : int
Number of processes per node.
memory : str
Bytes of memory that the worker can use. This should be a string
like "7GB" that can be interpretted both by PBS and Dask.
interface : str
Network interface like 'eth0' or 'ib0'.
death_timeout : float
Seconds to wait for a scheduler before closing workers
local_directory : str
Dask worker local directory for file spilling.
extra : str
Additional arguments to pass to `dask-worker`
kwargs : dict
Additional keyword arguments to pass to `LocalCluster`
Attributes
----------
submit_command: str
Abstract attribute for job scheduler submit command, should be overriden
cancel_command: str
Abstract attribute for job scheduler cancel command, should be overriden
See Also
--------
PBSCluster
SLURMCluster
"""
def __init__(self):
raise NotImplemented

_script_template = """
#!/bin/bash
%(job_header)s
%(worker_command)s
""".lstrip()

#Following class attributes should be overriden by extending classes.
submit_command = None
cancel_command = None

def __init__(self,
name='dask-worker',
threads=4,
processes=6,
memory='16GB',
interface=None,
death_timeout=60,
local_directory=None,
extra='',
**kwargs
):
"""
This initializer should be considered as Abstract, and never used directly.
"""
if not self.cancel_command or not self.submit_command:
raise NotImplementedError('JobQueueCluster is an abstract class that should not be instanciated.')

#This attribute should be overriden
self.job_header = None

if interface:
host = get_ip_interface(interface)
extra += ' --interface %s ' % interface
else:
host = socket.gethostname()

self.cluster = LocalCluster(n_workers=0, ip=host, **kwargs)

#Keep information on process, threads and memory, for use in subclasses
self.worker_memory = parse_bytes(memory)
self.worker_processes = processes
self.worker_threads = threads

self.jobs = dict()
self.n = 0
self._adaptive = None

#dask-worker command line build
self._command_template = os.path.join(dirname, 'dask-worker %s' % self.scheduler.address)
if threads is not None:
self._command_template += " --nthreads %d" % threads
if processes is not None:
self._command_template += " --nprocs %d" % processes
if memory is not None:
self._command_template += " --memory-limit %s" % memory
if name is not None:
self._command_template += " --name %s" % name
self._command_template += "-%(n)d" #Keep %(n) to be replaced later.
if death_timeout is not None:
self._command_template += " --death-timeout %s" % death_timeout
if local_directory is not None:
self._command_template += " --local-directory %s" % local_directory
if extra is not None:
self._command_template += extra

def job_script(self):
self.n += 1
return self._template % toolz.merge(self.config, {'n': self.n})
return self._script_template % {'job_header': self.job_header,
'worker_command': self._command_template % {'n': self.n}
}

@contextmanager
def job_file(self):
Expand All @@ -39,7 +143,7 @@ def start_workers(self, n=1):
workers = []
for _ in range(n):
with self.job_file() as fn:
out = self._call([self._submitcmd, fn])
out = self._call([self.submit_command, fn])
job = out.decode().split('.')[0]
self.jobs[self.n] = job
workers.append(self.n)
Expand Down Expand Up @@ -99,7 +203,7 @@ def stop_workers(self, workers):
return
workers = list(map(int, workers))
jobs = [self.jobs[w] for w in workers]
self._call([self._cancelcmd] + list(jobs))
self._call([self.cancel_command] + list(jobs))
for w in workers:
with ignoring(KeyError):
del self.jobs[w]
Expand Down
153 changes: 72 additions & 81 deletions dask_jobqueue/pbs.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,38 @@
import logging
import os
import socket
import sys
import math

from distributed import LocalCluster
from distributed.utils import get_ip_interface

from .core import JobQueueCluster
from .core import JobQueueCluster, docstrings

logger = logging.getLogger(__name__)

dirname = os.path.dirname(sys.executable)


@docstrings.with_indent(4)
class PBSCluster(JobQueueCluster):
""" Launch Dask on a PBS cluster
Parameters
----------
name : str
Name of worker jobs. Passed to `$PBS -N` option.
queue : str
Destination queue for each worker job. Passed to `#PBS -q` option.
project : str
Accounting string associated with each worker job. Passed to
`#PBS -A` option.
threads_per_worker : int
Number of threads per process.
processes : int
Number of processes per node.
memory : str
Bytes of memory that the worker can use. This should be a string
like "7GB" that can be interpretted both by PBS and Dask.
resource_spec : str
Request resources and specify job placement. Passed to `#PBS -l`
option.
walltime : str
Walltime for each worker job.
interface : str
Network interface like 'eth0' or 'ib0'.
death_timeout : float
Seconds to wait for a scheduler before closing workers
extra : str
Additional arguments to pass to `dask-worker`
kwargs : dict
Additional keyword arguments to pass to `LocalCluster`
job_extra : list
List of other PBS options, for example -j oe. Each option will be prepended with the #PBS prefix.
local_directory : str
Dask worker local directory for file spilling.
%(JobQueueCluster.parameters)s
Examples
--------
>>> from dask_jobqueue import PBSCluster
>>> cluster = PBSCluster(project='...')
>>> cluster = PBSCluster(queue='regular', project='DaskOnPBS')
>>> cluster.start_workers(10) # this may take a few seconds to launch
>>> from dask.distributed import Client
Expand All @@ -59,67 +42,75 @@ class PBSCluster(JobQueueCluster):
kill workers based on load.
>>> cluster.adapt()
It is a good practice to define local_directory to your PBS system scratch directory,
and you should specify resource_spec according to the processes and threads asked:
>>> cluster = PBSCluster(queue='regular', project='DaskOnPBS',local_directory=os.getenv('TMPDIR', '/tmp'), \
threads=4, processes=6, memory='16GB', resource_spec='select=1:ncpus=24:mem=100GB')
"""

#Override class variables
submit_command = 'qsub'
cancel_command = 'qdel'

def __init__(self,
name='dask',
queue='regular',
queue=None,
project=None,
resource_spec='select=1:ncpus=36:mem=109GB',
threads_per_worker=4,
processes=9,
memory='7GB',
resource_spec=None,
walltime='00:30:00',
interface=None,
death_timeout=60,
extra='',
job_extra=[],
**kwargs):
self._template = """
#!/bin/bash
#PBS -N %(name)s
#PBS -q %(queue)s
#PBS -A %(project)s
#PBS -l %(resource_spec)s
#PBS -l walltime=%(walltime)s
#PBS -j oe
%(base_path)s/dask-worker %(scheduler)s \
--nthreads %(threads_per_worker)d \
--nprocs %(processes)s \
--memory-limit %(memory)s \
--name %(name)s-%(n)d \
--death-timeout %(death_timeout)s \
%(extra)s
""".lstrip()

if interface:
host = get_ip_interface(interface)
extra += ' --interface %s ' % interface
else:
host = socket.gethostname()

#Instantiate args and parameters from parent abstract class
super(PBSCluster, self).__init__(**kwargs)

#Try to find a project name from environment variable
project = project or os.environ.get('PBS_ACCOUNT')
if not project:
raise ValueError("Must specify a project like `project='UCLB1234' "
"or set PBS_ACCOUNT environment variable")
self.cluster = LocalCluster(n_workers=0, ip=host, **kwargs)
memory = memory.replace(' ', '')
self.config = {'name': name,
'queue': queue,
'project': project,
'threads_per_worker': threads_per_worker,
'processes': processes,
'walltime': walltime,
'scheduler': self.scheduler.address,
'resource_spec': resource_spec,
'base_path': dirname,
'memory': memory,
'death_timeout': death_timeout,
'extra': extra}
self.jobs = dict()
self.n = 0
self._adaptive = None
self._submitcmd = 'qsub'
self._cancelcmd = 'qdel'

#PBS header build
if self.name is not None:
header_lines = ['#PBS -N %s' % self.name]
if queue is not None:
header_lines.append('#PBS -q %s' % queue)
if project is not None:
header_lines.append('#PBS -A %s' % project)
if resource_spec is None:
#Compute default resources specifications
ncpus = self.worker_processes * self.worker_threads
memory_string = pbs_format_bytes_ceil(self.worker_memory * self.worker_processes)
resource_spec = "select=1:ncpus=%d:mem=%s" % (ncpus, memory_string)
logger.info("Resource specification for PBS not set, initializing it to %s" % resource_spec)
header_lines.append('#PBS -l %s' % resource_spec)
if walltime is not None:
header_lines.append('#PBS -l walltime=%s' % walltime)
header_lines.extend(['#PBS %s' % arg for arg in job_extra])

#Declare class attribute that shall be overriden
self.job_header = '\n'.join(header_lines)

logger.debug("Job script: \n %s" % self.job_script())


def pbs_format_bytes_ceil(n):
""" Format bytes as text
PBS expects KiB, MiB or Gib, but names it KB, MB, GB
Whereas Dask makes the difference between KB and KiB
>>> pbs_format_bytes_ceil(1)
'1B'
>>> pbs_format_bytes_ceil(1234)
'1234B'
>>> pbs_format_bytes_ceil(12345678)
'13MB'
>>> pbs_format_bytes_ceil(1234567890)
'1177MB'
>>> pbs_format_bytes_ceil(15000000000)
'14GB'
"""
if n >= 10 * (1024**3):
return '%dGB' % math.ceil(n / (1024**3))
if n >= 10 * (1024**2):
return '%dMB' % (n / (1024**2))
if n >= 10 * 1024:
return '%dkB' % (n / 1024)
return '%dB' % n
Loading

0 comments on commit 3ea2dad

Please sign in to comment.