Skip to content

Commit

Permalink
cleanup imports/flake8/setup.py
Browse files Browse the repository at this point in the history
  • Loading branch information
Joseph Hamman committed Apr 12, 2018
1 parent 2e406ae commit d5f3db3
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 55 deletions.
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Example
Adaptivity
----------

This can also adapt the cluster size dynamicaly based on current load.
This can also adapt the cluster size dynamically based on current load.
This helps to scale up the cluster when necessary but scale it down and save
resources when not actively computing.

Expand Down
36 changes: 21 additions & 15 deletions dask_jobqueue/core.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
from contextlib import contextmanager
import logging
import subprocess
import socket
import os
import socket
import subprocess
import sys
import docrep
from contextlib import contextmanager

from distributed.utils import tmpfile, ignoring, get_ip_interface, parse_bytes
import docrep
from distributed import LocalCluster
from distributed.deploy import Cluster
from distributed.utils import get_ip_interface, ignoring, parse_bytes, tmpfile

dirname = os.path.dirname(sys.executable)

Expand Down Expand Up @@ -50,9 +50,11 @@ class JobQueueCluster(Cluster):
Attributes
----------
submit_command: str
Abstract attribute for job scheduler submit command, should be overriden
Abstract attribute for job scheduler submit command,
should be overriden
cancel_command: str
Abstract attribute for job scheduler cancel command, should be overriden
Abstract attribute for job scheduler cancel command,
should be overriden
See Also
--------
Expand Down Expand Up @@ -87,10 +89,12 @@ def __init__(self,
**kwargs
):
"""
This initializer should be considered as Abstract, and never used directly.
This initializer should be considered as Abstract, and never used
directly.
"""
if not self.cancel_command or not self.submit_command:
raise NotImplementedError('JobQueueCluster is an abstract class that should not be instanciated.')
raise NotImplementedError('JobQueueCluster is an abstract class '
'that should not be instanciated.')

#This attribute should be overriden
self.job_header = None
Expand All @@ -103,7 +107,8 @@ def __init__(self,

self.cluster = LocalCluster(n_workers=0, ip=host, **kwargs)

#Keep information on process, threads and memory, for use in subclasses
# Keep information on process, threads and memory, for use in
# subclasses
self.worker_memory = parse_bytes(memory)
self.worker_processes = processes
self.worker_threads = threads
Expand All @@ -115,8 +120,9 @@ def __init__(self,

self._env_header = '\n'.join(env_extra)

#dask-worker command line build
self._command_template = os.path.join(dirname, 'dask-worker %s' % self.scheduler.address)
# dask-worker command line build
self._command_template = os.path.join(
dirname, 'dask-worker %s' % self.scheduler.address)
if threads is not None:
self._command_template += " --nthreads %d" % threads
if processes is not None:
Expand All @@ -125,7 +131,7 @@ def __init__(self,
self._command_template += " --memory-limit %s" % memory
if name is not None:
self._command_template += " --name %s" % name
self._command_template += "-%(n)d" #Keep %(n) to be replaced later.
self._command_template += "-%(n)d" # Keep %(n) to be replaced later
if death_timeout is not None:
self._command_template += " --death-timeout %s" % death_timeout
if local_directory is not None:
Expand All @@ -135,10 +141,10 @@ def __init__(self,

def job_script(self):
self.n += 1
template = self._command_template % {'n': self.n}
return self._script_template % {'job_header': self.job_header,
'env_header': self._env_header,
'worker_command': self._command_template % {'n': self.n}
}
'worker_command': template}

@contextmanager
def job_file(self):
Expand Down
35 changes: 21 additions & 14 deletions dask_jobqueue/pbs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
import os
import math
import os

from .core import JobQueueCluster, docstrings

Expand All @@ -24,7 +24,8 @@ class PBSCluster(JobQueueCluster):
walltime : str
Walltime for each worker job.
job_extra : list
List of other PBS options, for example -j oe. Each option will be prepended with the #PBS prefix.
List of other PBS options, for example -j oe. Each option will be
prepended with the #PBS prefix.
%(JobQueueCluster.parameters)s
Examples
Expand All @@ -41,13 +42,17 @@ class PBSCluster(JobQueueCluster):
>>> cluster.adapt()
It is a good practice to define local_directory to your PBS system scratch directory,
and you should specify resource_spec according to the processes and threads asked:
>>> cluster = PBSCluster(queue='regular', project='DaskOnPBS',local_directory=os.getenv('TMPDIR', '/tmp'), \
threads=4, processes=6, memory='16GB', resource_spec='select=1:ncpus=24:mem=100GB')
It is a good practice to define local_directory to your PBS system scratch
directory,
and you should specify resource_spec according to the processes and
threads asked:
>>> cluster = PBSCluster(queue='regular', project='DaskOnPBS',
local_directory=os.getenv('TMPDIR', '/tmp'),
threads=4, processes=6, memory='16GB',
resource_spec='select=1:ncpus=24:mem=100GB')
"""

#Override class variables
# Override class variables
submit_command = 'qsub'
cancel_command = 'qdel'

Expand All @@ -59,32 +64,34 @@ def __init__(self,
job_extra=[],
**kwargs):

#Instantiate args and parameters from parent abstract class
# Instantiate args and parameters from parent abstract class
super(PBSCluster, self).__init__(**kwargs)

#Try to find a project name from environment variable
# Try to find a project name from environment variable
project = project or os.environ.get('PBS_ACCOUNT')

header_lines = []
#PBS header build
# PBS header build
if self.name is not None:
header_lines.append('#PBS -N %s' % self.name)
if queue is not None:
header_lines.append('#PBS -q %s' % queue)
if project is not None:
header_lines.append('#PBS -A %s' % project)
if resource_spec is None:
#Compute default resources specifications
# Compute default resources specifications
ncpus = self.worker_processes * self.worker_threads
memory_string = pbs_format_bytes_ceil(self.worker_memory * self.worker_processes)
memory = self.worker_memory * self.worker_processes
memory_string = pbs_format_bytes_ceil(memory)
resource_spec = "select=1:ncpus=%d:mem=%s" % (ncpus, memory_string)
logger.info("Resource specification for PBS not set, initializing it to %s" % resource_spec)
logger.info("Resource specification for PBS not set, "
"initializing it to %s" % resource_spec)
header_lines.append('#PBS -l %s' % resource_spec)
if walltime is not None:
header_lines.append('#PBS -l walltime=%s' % walltime)
header_lines.extend(['#PBS %s' % arg for arg in job_extra])

#Declare class attribute that shall be overriden
# Declare class attribute that shall be overriden
self.job_header = '\n'.join(header_lines)

logger.debug("Job script: \n %s" % self.job_script())
Expand Down
31 changes: 18 additions & 13 deletions dask_jobqueue/slurm.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import math
import os
import sys
import math

from .core import JobQueueCluster, docstrings

Expand All @@ -24,19 +24,23 @@ class SLURMCluster(JobQueueCluster):
walltime : str
Walltime for each worker job.
job_cpu : int
Number of cpu to book in SLURM, if None, defaults to worker threads * processes
Number of cpu to book in SLURM, if None, defaults to worker
threads * processes
job_mem : str
Amount of memory to request in SLURM. If None, defaults to worker processes * memory
Amount of memory to request in SLURM. If None, defaults to worker
processes * memory
job_extra : list
List of other Slurm options, for example -j oe. Each option will be prepended with the #SBATCH prefix.
List of other Slurm options, for example -j oe. Each option will be
prepended with the #SBATCH prefix.
%(JobQueueCluster.parameters)s
Examples
--------
>>> from dask_jobqueue import SLURMCluster
>>> cluster = SLURMCluster(processes=6, threads=4, memory="16GB", \
env_extra=['export LANG="en_US.utf8"', \
'export LANGUAGE="en_US.utf8"', 'export LC_ALL="en_US.utf8"'])
>>> cluster = SLURMCluster(processes=6, threads=4, memory="16GB",
env_extra=['export LANG="en_US.utf8"',
'export LANGUAGE="en_US.utf8"',
'export LC_ALL="en_US.utf8"'])
>>> cluster.start_workers(10) # this may take a few seconds to launch
>>> from dask.distributed import Client
Expand All @@ -63,9 +67,9 @@ def __init__(self,

super(SLURMCluster, self).__init__(**kwargs)

#Always ask for only one task
# Always ask for only one task
header_lines = []
#SLURM header build
# SLURM header build
if self.name is not None:
header_lines.append('#SBATCH -J %s' % self.name)
header_lines.append('#SBATCH -e %s.err' % self.name)
Expand All @@ -75,25 +79,26 @@ def __init__(self,
if project is not None:
header_lines.append('#SBATCH -A %s' % project)

#Init resources, always 1 task,
# Init resources, always 1 task,
# and then number of cpu is processes * threads if not set
header_lines.append('#SBATCH -n 1')
ncpus = job_cpu
if ncpus is None:
ncpus = self.worker_processes * self.worker_threads
header_lines.append('#SBATCH --cpus-per-task=%d' % ncpus)
#Memory
# Memory
total_memory = job_mem
if job_mem is None and self.worker_memory is not None:
total_memory = slurm_format_bytes_ceil(self.worker_processes * self.worker_memory)
memory = self.worker_processes * self.worker_memory
total_memory = slurm_format_bytes_ceil(memory)
if total_memory is not None:
header_lines.append('#SBATCH --mem=%s' % total_memory)

if walltime is not None:
header_lines.append('#SBATCH -t %s' % walltime)
header_lines.extend(['#SBATCH %s' % arg for arg in job_extra])

#Declare class attribute that shall be overriden
# Declare class attribute that shall be overriden
self.job_header = '\n'.join(header_lines)

logger.debug("Job script: \n %s" % self.job_script())
Expand Down
11 changes: 6 additions & 5 deletions dask_jobqueue/tests/test_pbs.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import os
from time import time, sleep
from time import sleep, time

import pytest

from dask.distributed import Client
from distributed.utils_test import loop # noqa: F401
from dask_jobqueue import PBSCluster
from distributed.utils_test import loop # noqa: F401

pytestmark = pytest.mark.env("pbs")

Expand Down Expand Up @@ -44,7 +43,8 @@ def test_adaptive(loop): # noqa: F811
assert cluster.jobs

start = time()
while len(client.scheduler_info()['workers']) != cluster.config['processes']:
processes = cluster.config['processes']
while len(client.scheduler_info()['workers']) != processes:
sleep(0.1)
assert time() < start + 10

Expand All @@ -61,7 +61,8 @@ def test_adaptive(loop): # noqa: F811
assert time() < start + 10


@pytest.mark.skipif('PBS_ACCOUNT' in os.environ, reason='PBS_ACCOUNT defined') # noqa: F811
@pytest.mark.skipif('PBS_ACCOUNT' in os.environ, # noqa: F811
reason='PBS_ACCOUNT defined')
def test_errors(loop):
with pytest.raises(ValueError) as info:
PBSCluster()
Expand Down
8 changes: 4 additions & 4 deletions dask_jobqueue/tests/test_slurm.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import os
from time import time, sleep
from time import sleep, time

import pytest

from dask.distributed import Client
from distributed.utils_test import loop # noqa: F401
from dask_jobqueue import SLURMCluster
from distributed.utils_test import loop # noqa: F401

pytestmark = pytest.mark.env("pbs")

Expand Down Expand Up @@ -62,7 +61,8 @@ def test_adaptive(loop): # noqa: F811
assert time() < start + 10


@pytest.mark.skipif('PBS_ACCOUNT' in os.environ, reason='PBS_ACCOUNT defined') # noqa: F811
@pytest.mark.skipif('PBS_ACCOUNT' in os.environ, # noqa: F811
reason='PBS_ACCOUNT defined')
def test_errors(loop):
with pytest.raises(ValueError) as info:
SLURMCluster()
Expand Down
5 changes: 5 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,8 @@ parentdir_prefix = dask-

[aliases]
test = pytest

[isort]
default_section=THIRDPARTY
known_first_party=xarray
multi_line_output=4
17 changes: 14 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
#!/usr/bin/env python

from os.path import exists

from setuptools import setup

with open('requirements.txt') as f:
install_requires = f.read().strip().split('\n')

if exists('README.rst'):
with open('README.rst') as f:
long_description = f.read()
else:
long_description = ''

setup(name='dask-jobqueue',
version='0.1.0',
description='Deploy Dask on job queuing systems like PBS and SLURM',
description='Deploy Dask on job queuing systems like PBS or SLURM',
url='https://github.com/dask/dask-jobqueue',
license='BSD 3-Clause',
packages=['dask_jobqueue'],
install_requires=open('requirements.txt').read().strip().split('\n'),
long_description=(open('README.rst').read() if exists('README.rst') else ''),
install_requires=install_requires,
tests_require=['pytest >= 2.7.1'],
long_description=long_description,
zip_safe=False)

0 comments on commit d5f3db3

Please sign in to comment.