From d5f3db3227e9e55bb7b9ee2fc1f13c940dfcad51 Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Thu, 12 Apr 2018 22:54:42 +0200 Subject: [PATCH] cleanup imports/flake8/setup.py --- README.rst | 2 +- dask_jobqueue/core.py | 36 ++++++++++++++++++------------- dask_jobqueue/pbs.py | 35 ++++++++++++++++++------------ dask_jobqueue/slurm.py | 31 +++++++++++++++----------- dask_jobqueue/tests/test_pbs.py | 11 +++++----- dask_jobqueue/tests/test_slurm.py | 8 +++---- setup.cfg | 5 +++++ setup.py | 17 ++++++++++++--- 8 files changed, 90 insertions(+), 55 deletions(-) diff --git a/README.rst b/README.rst index c3d4f91e..08787143 100644 --- a/README.rst +++ b/README.rst @@ -19,7 +19,7 @@ Example Adaptivity ---------- -This can also adapt the cluster size dynamicaly based on current load. +This can also adapt the cluster size dynamically based on current load. This helps to scale up the cluster when necessary but scale it down and save resources when not actively computing. diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 7806c937..cb5400d6 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -1,14 +1,14 @@ -from contextlib import contextmanager import logging -import subprocess -import socket import os +import socket +import subprocess import sys -import docrep +from contextlib import contextmanager -from distributed.utils import tmpfile, ignoring, get_ip_interface, parse_bytes +import docrep from distributed import LocalCluster from distributed.deploy import Cluster +from distributed.utils import get_ip_interface, ignoring, parse_bytes, tmpfile dirname = os.path.dirname(sys.executable) @@ -50,9 +50,11 @@ class JobQueueCluster(Cluster): Attributes ---------- submit_command: str - Abstract attribute for job scheduler submit command, should be overriden + Abstract attribute for job scheduler submit command, + should be overriden cancel_command: str - Abstract attribute for job scheduler cancel command, should be overriden + Abstract attribute for job scheduler cancel command, + should be overriden See Also -------- @@ -87,10 +89,12 @@ def __init__(self, **kwargs ): """ - This initializer should be considered as Abstract, and never used directly. + This initializer should be considered as Abstract, and never used + directly. """ if not self.cancel_command or not self.submit_command: - raise NotImplementedError('JobQueueCluster is an abstract class that should not be instanciated.') + raise NotImplementedError('JobQueueCluster is an abstract class ' + 'that should not be instanciated.') #This attribute should be overriden self.job_header = None @@ -103,7 +107,8 @@ def __init__(self, self.cluster = LocalCluster(n_workers=0, ip=host, **kwargs) - #Keep information on process, threads and memory, for use in subclasses + # Keep information on process, threads and memory, for use in + # subclasses self.worker_memory = parse_bytes(memory) self.worker_processes = processes self.worker_threads = threads @@ -115,8 +120,9 @@ def __init__(self, self._env_header = '\n'.join(env_extra) - #dask-worker command line build - self._command_template = os.path.join(dirname, 'dask-worker %s' % self.scheduler.address) + # dask-worker command line build + self._command_template = os.path.join( + dirname, 'dask-worker %s' % self.scheduler.address) if threads is not None: self._command_template += " --nthreads %d" % threads if processes is not None: @@ -125,7 +131,7 @@ def __init__(self, self._command_template += " --memory-limit %s" % memory if name is not None: self._command_template += " --name %s" % name - self._command_template += "-%(n)d" #Keep %(n) to be replaced later. + self._command_template += "-%(n)d" # Keep %(n) to be replaced later if death_timeout is not None: self._command_template += " --death-timeout %s" % death_timeout if local_directory is not None: @@ -135,10 +141,10 @@ def __init__(self, def job_script(self): self.n += 1 + template = self._command_template % {'n': self.n} return self._script_template % {'job_header': self.job_header, 'env_header': self._env_header, - 'worker_command': self._command_template % {'n': self.n} - } + 'worker_command': template} @contextmanager def job_file(self): diff --git a/dask_jobqueue/pbs.py b/dask_jobqueue/pbs.py index dc39ccf0..f4940a71 100644 --- a/dask_jobqueue/pbs.py +++ b/dask_jobqueue/pbs.py @@ -1,6 +1,6 @@ import logging -import os import math +import os from .core import JobQueueCluster, docstrings @@ -24,7 +24,8 @@ class PBSCluster(JobQueueCluster): walltime : str Walltime for each worker job. job_extra : list - List of other PBS options, for example -j oe. Each option will be prepended with the #PBS prefix. + List of other PBS options, for example -j oe. Each option will be + prepended with the #PBS prefix. %(JobQueueCluster.parameters)s Examples @@ -41,13 +42,17 @@ class PBSCluster(JobQueueCluster): >>> cluster.adapt() - It is a good practice to define local_directory to your PBS system scratch directory, - and you should specify resource_spec according to the processes and threads asked: - >>> cluster = PBSCluster(queue='regular', project='DaskOnPBS',local_directory=os.getenv('TMPDIR', '/tmp'), \ - threads=4, processes=6, memory='16GB', resource_spec='select=1:ncpus=24:mem=100GB') + It is a good practice to define local_directory to your PBS system scratch + directory, + and you should specify resource_spec according to the processes and + threads asked: + >>> cluster = PBSCluster(queue='regular', project='DaskOnPBS', + local_directory=os.getenv('TMPDIR', '/tmp'), + threads=4, processes=6, memory='16GB', + resource_spec='select=1:ncpus=24:mem=100GB') """ - #Override class variables + # Override class variables submit_command = 'qsub' cancel_command = 'qdel' @@ -59,14 +64,14 @@ def __init__(self, job_extra=[], **kwargs): - #Instantiate args and parameters from parent abstract class + # Instantiate args and parameters from parent abstract class super(PBSCluster, self).__init__(**kwargs) - #Try to find a project name from environment variable + # Try to find a project name from environment variable project = project or os.environ.get('PBS_ACCOUNT') header_lines = [] - #PBS header build + # PBS header build if self.name is not None: header_lines.append('#PBS -N %s' % self.name) if queue is not None: @@ -74,17 +79,19 @@ def __init__(self, if project is not None: header_lines.append('#PBS -A %s' % project) if resource_spec is None: - #Compute default resources specifications + # Compute default resources specifications ncpus = self.worker_processes * self.worker_threads - memory_string = pbs_format_bytes_ceil(self.worker_memory * self.worker_processes) + memory = self.worker_memory * self.worker_processes + memory_string = pbs_format_bytes_ceil(memory) resource_spec = "select=1:ncpus=%d:mem=%s" % (ncpus, memory_string) - logger.info("Resource specification for PBS not set, initializing it to %s" % resource_spec) + logger.info("Resource specification for PBS not set, " + "initializing it to %s" % resource_spec) header_lines.append('#PBS -l %s' % resource_spec) if walltime is not None: header_lines.append('#PBS -l walltime=%s' % walltime) header_lines.extend(['#PBS %s' % arg for arg in job_extra]) - #Declare class attribute that shall be overriden + # Declare class attribute that shall be overriden self.job_header = '\n'.join(header_lines) logger.debug("Job script: \n %s" % self.job_script()) diff --git a/dask_jobqueue/slurm.py b/dask_jobqueue/slurm.py index bdd96f4b..29da1d52 100644 --- a/dask_jobqueue/slurm.py +++ b/dask_jobqueue/slurm.py @@ -1,7 +1,7 @@ import logging +import math import os import sys -import math from .core import JobQueueCluster, docstrings @@ -24,19 +24,23 @@ class SLURMCluster(JobQueueCluster): walltime : str Walltime for each worker job. job_cpu : int - Number of cpu to book in SLURM, if None, defaults to worker threads * processes + Number of cpu to book in SLURM, if None, defaults to worker + threads * processes job_mem : str - Amount of memory to request in SLURM. If None, defaults to worker processes * memory + Amount of memory to request in SLURM. If None, defaults to worker + processes * memory job_extra : list - List of other Slurm options, for example -j oe. Each option will be prepended with the #SBATCH prefix. + List of other Slurm options, for example -j oe. Each option will be + prepended with the #SBATCH prefix. %(JobQueueCluster.parameters)s Examples -------- >>> from dask_jobqueue import SLURMCluster - >>> cluster = SLURMCluster(processes=6, threads=4, memory="16GB", \ -env_extra=['export LANG="en_US.utf8"', \ -'export LANGUAGE="en_US.utf8"', 'export LC_ALL="en_US.utf8"']) + >>> cluster = SLURMCluster(processes=6, threads=4, memory="16GB", + env_extra=['export LANG="en_US.utf8"', + 'export LANGUAGE="en_US.utf8"', + 'export LC_ALL="en_US.utf8"']) >>> cluster.start_workers(10) # this may take a few seconds to launch >>> from dask.distributed import Client @@ -63,9 +67,9 @@ def __init__(self, super(SLURMCluster, self).__init__(**kwargs) - #Always ask for only one task + # Always ask for only one task header_lines = [] - #SLURM header build + # SLURM header build if self.name is not None: header_lines.append('#SBATCH -J %s' % self.name) header_lines.append('#SBATCH -e %s.err' % self.name) @@ -75,17 +79,18 @@ def __init__(self, if project is not None: header_lines.append('#SBATCH -A %s' % project) - #Init resources, always 1 task, + # Init resources, always 1 task, # and then number of cpu is processes * threads if not set header_lines.append('#SBATCH -n 1') ncpus = job_cpu if ncpus is None: ncpus = self.worker_processes * self.worker_threads header_lines.append('#SBATCH --cpus-per-task=%d' % ncpus) - #Memory + # Memory total_memory = job_mem if job_mem is None and self.worker_memory is not None: - total_memory = slurm_format_bytes_ceil(self.worker_processes * self.worker_memory) + memory = self.worker_processes * self.worker_memory + total_memory = slurm_format_bytes_ceil(memory) if total_memory is not None: header_lines.append('#SBATCH --mem=%s' % total_memory) @@ -93,7 +98,7 @@ def __init__(self, header_lines.append('#SBATCH -t %s' % walltime) header_lines.extend(['#SBATCH %s' % arg for arg in job_extra]) - #Declare class attribute that shall be overriden + # Declare class attribute that shall be overriden self.job_header = '\n'.join(header_lines) logger.debug("Job script: \n %s" % self.job_script()) diff --git a/dask_jobqueue/tests/test_pbs.py b/dask_jobqueue/tests/test_pbs.py index a72ccad5..0d21d50e 100644 --- a/dask_jobqueue/tests/test_pbs.py +++ b/dask_jobqueue/tests/test_pbs.py @@ -1,11 +1,10 @@ import os -from time import time, sleep +from time import sleep, time import pytest - from dask.distributed import Client -from distributed.utils_test import loop # noqa: F401 from dask_jobqueue import PBSCluster +from distributed.utils_test import loop # noqa: F401 pytestmark = pytest.mark.env("pbs") @@ -44,7 +43,8 @@ def test_adaptive(loop): # noqa: F811 assert cluster.jobs start = time() - while len(client.scheduler_info()['workers']) != cluster.config['processes']: + processes = cluster.config['processes'] + while len(client.scheduler_info()['workers']) != processes: sleep(0.1) assert time() < start + 10 @@ -61,7 +61,8 @@ def test_adaptive(loop): # noqa: F811 assert time() < start + 10 -@pytest.mark.skipif('PBS_ACCOUNT' in os.environ, reason='PBS_ACCOUNT defined') # noqa: F811 +@pytest.mark.skipif('PBS_ACCOUNT' in os.environ, # noqa: F811 + reason='PBS_ACCOUNT defined') def test_errors(loop): with pytest.raises(ValueError) as info: PBSCluster() diff --git a/dask_jobqueue/tests/test_slurm.py b/dask_jobqueue/tests/test_slurm.py index ba107b77..cad80945 100644 --- a/dask_jobqueue/tests/test_slurm.py +++ b/dask_jobqueue/tests/test_slurm.py @@ -1,11 +1,10 @@ import os -from time import time, sleep +from time import sleep, time import pytest - from dask.distributed import Client -from distributed.utils_test import loop # noqa: F401 from dask_jobqueue import SLURMCluster +from distributed.utils_test import loop # noqa: F401 pytestmark = pytest.mark.env("pbs") @@ -62,7 +61,8 @@ def test_adaptive(loop): # noqa: F811 assert time() < start + 10 -@pytest.mark.skipif('PBS_ACCOUNT' in os.environ, reason='PBS_ACCOUNT defined') # noqa: F811 +@pytest.mark.skipif('PBS_ACCOUNT' in os.environ, # noqa: F811 + reason='PBS_ACCOUNT defined') def test_errors(loop): with pytest.raises(ValueError) as info: SLURMCluster() diff --git a/setup.cfg b/setup.cfg index c3513747..45fee8a5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -35,3 +35,8 @@ parentdir_prefix = dask- [aliases] test = pytest + +[isort] +default_section=THIRDPARTY +known_first_party=xarray +multi_line_output=4 diff --git a/setup.py b/setup.py index 8cbfe7d8..bc32dfaa 100755 --- a/setup.py +++ b/setup.py @@ -1,14 +1,25 @@ #!/usr/bin/env python from os.path import exists + from setuptools import setup +with open('requirements.txt') as f: + install_requires = f.read().strip().split('\n') + +if exists('README.rst'): + with open('README.rst') as f: + long_description = f.read() +else: + long_description = '' + setup(name='dask-jobqueue', version='0.1.0', - description='Deploy Dask on job queuing systems like PBS and SLURM', + description='Deploy Dask on job queuing systems like PBS or SLURM', url='https://github.com/dask/dask-jobqueue', license='BSD 3-Clause', packages=['dask_jobqueue'], - install_requires=open('requirements.txt').read().strip().split('\n'), - long_description=(open('README.rst').read() if exists('README.rst') else ''), + install_requires=install_requires, + tests_require=['pytest >= 2.7.1'], + long_description=long_description, zip_safe=False)