Skip to content

Commit

Permalink
refactor and fix
Browse files Browse the repository at this point in the history
  • Loading branch information
mauriliogenovese committed Mar 25, 2024
1 parent f1f5d76 commit a642430
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 14 deletions.
5 changes: 3 additions & 2 deletions nipype/pipeline/engine/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -822,8 +822,9 @@ def update(self, **opts):
self.inputs.update(**opts)

def is_gpu_node(self):
return ((hasattr(self.inputs, 'use_cuda') and self.inputs.use_cuda)
or (hasattr(self.inputs, 'use_gpu') and self.inputs.use_gpu))
return (hasattr(self.inputs, 'use_cuda') and self.inputs.use_cuda) or (
hasattr(self.inputs, 'use_gpu') and self.inputs.use_gpu
)


class JoinNode(Node):
Expand Down
25 changes: 15 additions & 10 deletions nipype/pipeline/plugins/multiproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,14 @@ def __init__(self, plugin_args=None):
# GPU found on system
self.n_gpus_visible = MultiProcPlugin.gpu_count()
# proc per GPU set by user
self.n_gpu_procs = plugin_args.get('n_gpu_procs', self.n_gpus_visible)
self.n_gpu_procs = self.plugin_args.get('n_gpu_procs', self.n_gpus_visible)

# total no. of processes allowed on all gpus
if self.n_gpu_procs > self.n_gpus_visible:
logger.info(
'Total number of GPUs proc requested (%d) exceeds the available number of GPUs (%d) on the system. Using requested GPU slots at your own risk!' % (
self.n_gpu_procs, self.n_gpus_visible))
'Total number of GPUs proc requested (%d) exceeds the available number of GPUs (%d) on the system. Using requested GPU slots at your own risk!'
% (self.n_gpu_procs, self.n_gpus_visible)
)

# Instantiate different thread pools for non-daemon processes
logger.debug(
Expand Down Expand Up @@ -220,9 +221,7 @@ def _prerun_check(self, graph):
if self.raise_insufficient:
raise RuntimeError("Insufficient resources available for job")
if np.any(np.array(tasks_gpu_th) > self.n_gpu_procs):
logger.warning(
'Nodes demand more GPU than allowed (%d).',
self.n_gpu_procs)
logger.warning('Nodes demand more GPU than allowed (%d).', self.n_gpu_procs)
if self.raise_insufficient:
raise RuntimeError('Insufficient GPU resources available for job')

Expand Down Expand Up @@ -257,7 +256,9 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
)

# Check available resources by summing all threads and memory used
free_memory_gb, free_processors, free_gpu_slots = self._check_resources(self.pending_tasks)
free_memory_gb, free_processors, free_gpu_slots = self._check_resources(
self.pending_tasks
)

stats = (
len(self.pending_tasks),
Expand All @@ -267,7 +268,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
free_processors,
self.processors,
free_gpu_slots,
self.n_gpu_procs
self.n_gpu_procs,
)
if self._stats != stats:
tasks_list_msg = ""
Expand Down Expand Up @@ -338,8 +339,11 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
is_gpu_node = self.procs[jobid].is_gpu_node()

# If node does not fit, skip at this moment
if (next_job_th > free_processors or next_job_gb > free_memory_gb
or (is_gpu_node and next_job_gpu_th > free_gpu_slots)):
if (
next_job_th > free_processors
or next_job_gb > free_memory_gb
or (is_gpu_node and next_job_gpu_th > free_gpu_slots)
):
logger.debug(
"Cannot allocate job %d (%0.2fGB, %d threads, %d GPU slots).",
jobid,
Expand Down Expand Up @@ -424,6 +428,7 @@ def gpu_count():
n_gpus = 1
try:
import GPUtil

return len(GPUtil.getGPUs())
except ImportError:
return n_gpus

Check warning on line 434 in nipype/pipeline/plugins/multiproc.py

View check run for this annotation

Codecov / codecov/patch

nipype/pipeline/plugins/multiproc.py#L433-L434

Added lines #L433 - L434 were not covered by tests
8 changes: 6 additions & 2 deletions nipype/pipeline/plugins/tests/test_multiproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def test_run_multiproc(tmpdir):
class InputSpecSingleNode(nib.TraitedSpec):
input1 = nib.traits.Int(desc="a random int")
input2 = nib.traits.Int(desc="a random int")
use_gpu = nib.traits.Bool(False, mandatory = False, desc="boolean for GPU nodes")
use_gpu = nib.traits.Bool(False, mandatory=False, desc="boolean for GPU nodes")


class OutputSpecSingleNode(nib.TraitedSpec):
Expand Down Expand Up @@ -117,6 +117,7 @@ def test_no_more_threads_than_specified(tmpdir):
with pytest.raises(RuntimeError):
pipe.run(plugin="MultiProc", plugin_args={"n_procs": max_threads})


def test_no_more_gpu_threads_than_specified(tmpdir):
tmpdir.chdir()

Expand All @@ -129,7 +130,10 @@ def test_no_more_gpu_threads_than_specified(tmpdir):
max_threads = 2
max_gpu = 1
with pytest.raises(RuntimeError):
pipe.run(plugin="MultiProc", plugin_args={"n_procs": max_threads, 'n_gpu_procs': max_gpu})
pipe.run(
plugin="MultiProc",
plugin_args={"n_procs": max_threads, 'n_gpu_procs': max_gpu},
)


@pytest.mark.skipif(
Expand Down

0 comments on commit a642430

Please sign in to comment.