Skip to content

Commit

Permalink
Implement resource hints for the SlurmProvider
Browse files Browse the repository at this point in the history
Partially addresses #942.

This commit adds the `cores_per_node` and `mem_per_node` keyword args to
the SlurmProvider. These default to None, and behavior is not modified
in the default case. Setting either has three effects. First, it
modifies the Slurm submit script to request the appropriate cores and/or
memory.  Second, it sets the environment variables `PARSL_MEMORY_GB` and
`PARSL_CORES` on the node. Finally, the `workers_per_node` attribute is
added to the `HighThroughputExecutor` which will be calculated according
to the resource hints, if they are available. This is read by the
strategy piece, enabling a more accurate calculation for scaling
resources up and down. An example configuration, tested on Midway, is
provided below. This configuration requests 4 1-core workers, each with
3 GB of memory.

```
from parsl.config import Config
from parsl.providers import SlurmProvider
from parsl.addresses import address_by_hostname
from parsl.executors import HighThroughputExecutor

config = Config(
    executors=[
        HighThroughputExecutor(
            cores_per_worker=1,
            mem_per_worker=3,
            address=address_by_hostname(),
            provider=SlurmProvider(
                'broadwl',
                nodes_per_block=1,
                init_blocks=1,
                min_blocks=1,
                max_blocks=1,
                mem_per_node=12,
                cores_per_node=4,
                exclusive=False
            ),
        )
    ],
)
```
  • Loading branch information
annawoodard committed Aug 23, 2019
1 parent df71e4c commit bb00a12
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 11 deletions.
6 changes: 1 addition & 5 deletions parsl/dataflow/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,8 @@ def _strategy_simple(self, tasks, *args, kind=None, **kwargs):
# FIXME probably more of this logic should be moved to the provider
min_blocks = executor.provider.min_blocks
max_blocks = executor.provider.max_blocks
if isinstance(executor, IPyParallelExecutor):
if isinstance(executor, IPyParallelExecutor) or isinstance(executor, HighThroughputExecutor):
tasks_per_node = executor.workers_per_node
elif isinstance(executor, HighThroughputExecutor):
# This is probably wrong calculation, we need this to come from the executor
# since we can't know slots ahead of time.
tasks_per_node = 1
elif isinstance(executor, ExtremeScaleExecutor):
tasks_per_node = executor.ranks_per_node

Expand Down
16 changes: 16 additions & 0 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pickle
from multiprocessing import Process, Queue
from typing import Dict, List, Optional, Tuple, Union
import math

from ipyparallel.serialize import pack_apply_message # ,unpack_apply_message
from ipyparallel.serialize import deserialize_object # ,serialize_object
Expand Down Expand Up @@ -184,6 +185,21 @@ def __init__(self,
self.max_workers = max_workers
self.prefetch_capacity = prefetch_capacity

mem_slots = max_workers
cpu_slots = max_workers
if hasattr(self.provider, 'mem_per_node') and \
self.provider.mem_per_node is not None and \
mem_per_worker is not None and \
mem_per_worker > 0:
mem_slots = math.floor(self.provider.mem_per_node / mem_per_worker)
if hasattr(self.provider, 'cores_per_node') and \
self.provider.cores_per_node is not None:
cpu_slots = math.floor(self.provider.cores_per_node / cores_per_worker)

self.workers_per_node = min(max_workers, mem_slots, cpu_slots)
if self.workers_per_node == float('inf'):
self.workers_per_node = 1 # our best guess-- we do not have any provider hints

self._task_counter = 0
self.address = address
self.hub_address = None # set to the correct hub address in dfk
Expand Down
32 changes: 26 additions & 6 deletions parsl/providers/slurm/slurm.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import math
import time
import logging

Expand Down Expand Up @@ -43,6 +44,12 @@ class SlurmProvider(ClusterProvider, RepresentationMixin):
:class:`~parsl.channels.SSHInteractiveLoginChannel`.
nodes_per_block : int
Nodes to provision per block.
cores_per_node : int
Specify the number of cores to provision per node. If set to None, executors
will assume all cores on the node are available for computation. Default is None.
mem_per_node : float
Specify the real memory to provision per node in GB. If set to None, no
explicit request to the scheduler will be made. Default is None.
min_blocks : int
Minimum number of blocks to maintain.
max_blocks : int
Expand Down Expand Up @@ -71,6 +78,8 @@ def __init__(self,
partition,
channel=LocalChannel(),
nodes_per_block=1,
cores_per_node=None,
mem_per_node=None,
init_blocks=1,
min_blocks=0,
max_blocks=10,
Expand All @@ -95,13 +104,14 @@ def __init__(self,
launcher=launcher)

self.partition = partition
self.cores_per_node = cores_per_node
self.mem_per_node = mem_per_node
self.exclusive = exclusive
self.move_files = move_files
self.scheduler_options = scheduler_options + '\n'
if exclusive:
self.scheduler_options = "#SBATCH --exclusive\n" + scheduler_options
else:
self.scheduler_options = scheduler_options
self.worker_init = worker_init
self.scheduler_options += "#SBATCH --exclusive\n"
self.worker_init = worker_init + '\n'

def _status(self):
''' Internal: Do not call. Returns the status list for a list of job_ids
Expand Down Expand Up @@ -157,6 +167,16 @@ def submit(self, command, tasks_per_node, job_name="parsl.auto"):
logger.warn("Slurm provider '{}' is at capacity (no more blocks will be added)".format(self.label))
return None

scheduler_options = self.scheduler_options
worker_init = self.worker_init
if self.mem_per_node is not None:
scheduler_options += '#SBATCH --mem={}g\n'.format(self.mem_per_node)
worker_init += 'export PARSL_MEMORY_GB={}\n'.format(self.mem_per_node)
if self.cores_per_node is not None:
cpus_per_task = math.floor(self.cores_per_node / tasks_per_node)
scheduler_options += '#SBATCH --cpus-per-task={}'.format(cpus_per_task)
worker_init += 'export PARSL_CORES={}\n'.format(cpus_per_task)

job_name = "{0}.{1}".format(job_name, time.time())

script_path = "{0}/{1}.submit".format(self.script_dir, job_name)
Expand All @@ -169,8 +189,8 @@ def submit(self, command, tasks_per_node, job_name="parsl.auto"):
job_config["nodes"] = self.nodes_per_block
job_config["tasks_per_node"] = tasks_per_node
job_config["walltime"] = wtime_to_minutes(self.walltime)
job_config["scheduler_options"] = self.scheduler_options
job_config["worker_init"] = self.worker_init
job_config["scheduler_options"] = scheduler_options
job_config["worker_init"] = worker_init
job_config["partition"] = self.partition
job_config["user_script"] = command

Expand Down

0 comments on commit bb00a12

Please sign in to comment.