-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathclusterrun.py
70 lines (58 loc) · 2.78 KB
/
clusterrun.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def ClusterRunSlurm(function, parameter_list, max_jobs=64, procs_per_job=1,
mem='5GB'):
'''function: The routine run in parallel, which must contain all necessary
imports internally.
parameter_list: should be an iterable of elements, for which each
element will be passed as the parameter to function for each parallel
execution.
max_jobs: Standard Rhino cluster etiquette is to stay within 100 jobs
running at a time. Please ask for permission before using more.
procs_per_job: The number of concurrent processes to reserve per job.
mem: A string specifying the amount of RAM required per job, formatted
like '5GB'. Standard Rhino cluster etiquette is to stay within 320GB
total across all jobs.
In jupyterlab, the number of engines reported as initially running may
be smaller than the number actually running. Check usage from an ssh
terminal using: "squeue" or "squeue -u $USER"
Undesired running jobs can be killed by reading the JOBID at the left
of that squeue command, then doing: scancel JOBID
'''
import cmldask.CMLDask as da
from dask.distributed import wait, as_completed, progress
num_jobs = len(parameter_list)
num_jobs = min(num_jobs, max_jobs)
with da.new_dask_client_slurm(function.__name__, mem, max_n_jobs=num_jobs,
processes_per_job=procs_per_job) as client:
futures = client.map(function, parameter_list)
wait(futures)
res = client.gather(futures)
return res
def ClusterChecked(function, parameter_list, *args, **kwargs):
'''Parallelizes and raises an exception if any results are False.'''
res = ClusterRunSlurm(function, parameter_list, *args, **kwargs)
if all(res):
print('All', len(res), 'jobs successful.')
else:
failed = sum([not bool(b) for b in res])
if failed == len(res):
raise RuntimeError('All '+str(failed)+' jobs failed!')
else:
print('Error on job parameters:\n ' +
'\n '.join(str(parameter_list[i]) for i in range(len(res))
if not bool(res[i])))
raise RuntimeError(str(failed)+' of '+str(len(res))+' jobs failed!')
def ClusterCheckedTup(function, parameter_list, *args, **kwargs):
'''Parallelizes and raises an exception if any results have False
as the first value of the returned list/tuple.'''
res = ClusterRunSlurm(function, parameter_list, *args, **kwargs)
if all(res):
print('All', len(res), 'jobs successful.')
else:
failed = sum([not bool(b[0]) for b in res])
if failed == len(res):
raise RuntimeError('All '+str(failed)+' jobs failed!')
else:
print('Error on job parameters:\n ' +
'\n '.join(str(parameter_list[i]) for i in range(len(res))
if not bool(res[i])))
raise RuntimeError(str(failed)+' of '+str(len(res))+' jobs failed!')