-
Notifications
You must be signed in to change notification settings - Fork 56
Running workflows in parallel with Slurm
In this section, we extend the previous example on how to execute SciLuigi workflows with Slurm by showing how we can run multiple instances of the same pipeline in parallel.
To send out multiple instances of our pipeline, we need to set up a 'meta' workflow which builds all the individual workflows. This meta workflow has a parameter n_tasks
which allows to specify from the command line how many workflows to generate. Each workflow will be constructed with a unique integer parameter task_n
obtained by iterating over range(n_tasks)
.
import luigi
import sciluigi
import os
class MyMetaWorkflow(sciluigi.WorkflowTask):
runmode = luigi.Parameter()
n_tasks = luigi.IntParameter()
def workflow(self):
if self.runmode == 'local':
runmode = sciluigi.RUNMODE_LOCAL
elif self.runmode == 'hpc':
runmode = sciluigi.RUNMODE_HPC
elif self.runmode == 'mpi':
runmode = sciluigi.RUNMODE_MPI
else:
raise Exception('Runmode is none of local, hpc, nor mpi. Please fix and try again!')
# here we creat each workflow with a parameter task_n
# and append all workflows to a list
tasks = []
for t in range(self.n_tasks):
wf = self.new_task('wf', MyWorkflow, task_n=t,
runmode=runmode)
tasks.append(wf)
return tasks
Next, we can define the 'actual' pipeline similarly to the previous example. The main difference is that our tasks will take an extra task_n
parameters, passed by the workflow class. The pipeline will execute the following steps (as an example, suppose task_n=3
):
- The task
MyFooWriter
will write a file calledfoo3.txt
which contains the stringfoo
. - The task
MyFooReplacer
will replace the string with the host name of the cluster node where the job is running, and save it to a file calledhost3.txt
.
We will send out several instances of this pipeline to run (potentially) on different nodes, and then we will be able to read in the host*.txt
files where they have been executed.
class MyWorkflow(sciluigi.WorkflowTask):
runmode = luigi.Parameter()
task_n = luigi.IntParameter() # added the task_n parameter
def workflow(self):
foowriter = self.new_task('foowriter', MyFooWriter,
task_n=self.task_n,
slurminfo=sciluigi.SlurmInfo(
runmode=self.runmode,
project='myname',
partition='mypartition',
cores='1',
time='1:00:00',
jobname='foowriter',
threads='1'))
fooreplacer = self.new_task('fooreplacer', MyFooReplacer,
task_n=self.task_n,
slurminfo=sciluigi.SlurmInfo(
runmode=self.runmode,
project='myname',
partition='mypartition',
cores='1',
time='1:00:00',
jobname='fooreplacer',
threads='1'))
fooreplacer.in_foo = foowriter.out_foo
return fooreplacer
class MyFooWriter(sciluigi.SlurmTask):
task_n = luigi.IntParameter()
def out_foo(self):
return sciluigi.TargetInfo(self, 'foo{}.txt'.format(str(self.task_n)))
def run(self):
self.ex('touch {out_file}; echo foo > {out_file}; sleep 30'.format(
out_file = self.out_foo().path))
class MyFooReplacer(sciluigi.SlurmTask):
task_n = luigi.IntParameter()
in_foo = None
def out_replaced(self):
out_file = os.path.join(os.path.dirname(self.in_foo().path),
'host{}.txt'.format(str(self.task_n)))
return sciluigi.TargetInfo(self, out_file)
def run(self):
self.ex('./replace_with_hostname.sh {in_file} {out_file}; sleep 30'.format(
in_file = self.in_foo().path,
out_file = self.out_replaced().path))
Here, the run
method calls a script replace_with_hostname.sh
introduced in the previous tutorial.
Note that we also changed the way we write the foo*
files. Instead of using Python code, we make a call to ex()
, which ensures that the task is sent to Slurm and executed as a batch job. The ex()
method is in fact what is actually sensitive to the runmode
, and if called with rumode
set to hpc
, it will call salloc
to allocate resources and srun
to run the command. Thus, only calls to ex()
will be sent out as batch jobs to the Slurm queue. In contrast, any Python code present in the run
method of the task will be executed locally.
Lastly, we added sleep 30
to the commands in both tasks, which is an instruction to wait for 30 seconds. This is just to slow down the execution a bit so that we can see jobs appearing in the queue and in what order they were executed.
At the end of the script, we have to add
if __name__ == '__main__':
sciluigi.run_local(main_task_cls=MyMetaWorkflow)
Finally, we can run our pipeline. Importantly, we have to set the number of workers to a number larger than one (Luigi's default is one) to allow our tasks to run concurrently. Supposing the code is in a script called sciluigi_slurm_example_parallel.py
, we can type
python sciluigi_slurm_example_parallel.py --runmode hpc --n-tasks 10 --workers 30
This command will execute 10 pipelines which can run in parallel (depending on the availability of resources) on different nodes.
You can check that the host name written in the host*.txt
agrees with Slurm by calling sacct
and passing the NodeList
field to the --format
option:
sacct --user <username> --format=JobID,JobName,Partition,Account,AllocCPUS,State,NodeList
The last column of the output of sacct
will then show the nodes where the tasks were executed.
Of course, you may want to limit the number of batch jobs which can be executed in parallel at any given time. If we were to set --workers 10
, then SciLuigi would send out all the 10 foowriter
tasks as batch jobs, but then, since it only has 10 workers available, it will wait until workers are free before sending the fooreplacer
tasks.