diff --git a/parsl/dataflow/strategy.py b/parsl/dataflow/strategy.py index 9a26aee1df..3ba01523a1 100644 --- a/parsl/dataflow/strategy.py +++ b/parsl/dataflow/strategy.py @@ -181,12 +181,8 @@ def _strategy_simple(self, tasks, *args, kind=None, **kwargs): # FIXME probably more of this logic should be moved to the provider min_blocks = executor.provider.min_blocks max_blocks = executor.provider.max_blocks - if isinstance(executor, IPyParallelExecutor): + if isinstance(executor, IPyParallelExecutor) or isinstance(executor, HighThroughputExecutor): tasks_per_node = executor.workers_per_node - elif isinstance(executor, HighThroughputExecutor): - # This is probably wrong calculation, we need this to come from the executor - # since we can't know slots ahead of time. - tasks_per_node = 1 elif isinstance(executor, ExtremeScaleExecutor): tasks_per_node = executor.ranks_per_node diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 07d98dcfbf..466bb10b94 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -9,6 +9,7 @@ import pickle from multiprocessing import Process, Queue from typing import Dict, List, Optional, Tuple, Union +import math from ipyparallel.serialize import pack_apply_message # ,unpack_apply_message from ipyparallel.serialize import deserialize_object # ,serialize_object @@ -184,6 +185,21 @@ def __init__(self, self.max_workers = max_workers self.prefetch_capacity = prefetch_capacity + mem_slots = max_workers + cpu_slots = max_workers + if hasattr(self.provider, 'mem_per_node') and \ + self.provider.mem_per_node is not None and \ + mem_per_worker is not None and \ + mem_per_worker > 0: + mem_slots = math.floor(self.provider.mem_per_node / mem_per_worker) + if hasattr(self.provider, 'cores_per_node') and \ + self.provider.cores_per_node is not None: + cpu_slots = math.floor(self.provider.cores_per_node / cores_per_worker) + + self.workers_per_node = min(max_workers, mem_slots, cpu_slots) + if self.workers_per_node == float('inf'): + self.workers_per_node = 1 # our best guess-- we do not have any provider hints + self._task_counter = 0 self.address = address self.hub_address = None # set to the correct hub address in dfk diff --git a/parsl/providers/slurm/slurm.py b/parsl/providers/slurm/slurm.py index 4ed750ae7b..a310302922 100644 --- a/parsl/providers/slurm/slurm.py +++ b/parsl/providers/slurm/slurm.py @@ -1,4 +1,5 @@ import os +import math import time import logging @@ -43,6 +44,12 @@ class SlurmProvider(ClusterProvider, RepresentationMixin): :class:`~parsl.channels.SSHInteractiveLoginChannel`. nodes_per_block : int Nodes to provision per block. + cores_per_node : int + Specify the number of cores to provision per node. If set to None, executors + will assume all cores on the node are available for computation. Default is None. + mem_per_node : float + Specify the real memory to provision per node in GB. If set to None, no + explicit request to the scheduler will be made. Default is None. min_blocks : int Minimum number of blocks to maintain. max_blocks : int @@ -71,6 +78,8 @@ def __init__(self, partition, channel=LocalChannel(), nodes_per_block=1, + cores_per_node=None, + mem_per_node=None, init_blocks=1, min_blocks=0, max_blocks=10, @@ -95,13 +104,14 @@ def __init__(self, launcher=launcher) self.partition = partition + self.cores_per_node = cores_per_node + self.mem_per_node = mem_per_node self.exclusive = exclusive self.move_files = move_files + self.scheduler_options = scheduler_options + '\n' if exclusive: - self.scheduler_options = "#SBATCH --exclusive\n" + scheduler_options - else: - self.scheduler_options = scheduler_options - self.worker_init = worker_init + self.scheduler_options += "#SBATCH --exclusive\n" + self.worker_init = worker_init + '\n' def _status(self): ''' Internal: Do not call. Returns the status list for a list of job_ids @@ -157,6 +167,16 @@ def submit(self, command, tasks_per_node, job_name="parsl.auto"): logger.warn("Slurm provider '{}' is at capacity (no more blocks will be added)".format(self.label)) return None + scheduler_options = self.scheduler_options + worker_init = self.worker_init + if self.mem_per_node is not None: + scheduler_options += '#SBATCH --mem={}g\n'.format(self.mem_per_node) + worker_init += 'export PARSL_MEMORY_GB={}\n'.format(self.mem_per_node) + if self.cores_per_node is not None: + cpus_per_task = math.floor(self.cores_per_node / tasks_per_node) + scheduler_options += '#SBATCH --cpus-per-task={}'.format(cpus_per_task) + worker_init += 'export PARSL_CORES={}\n'.format(cpus_per_task) + job_name = "{0}.{1}".format(job_name, time.time()) script_path = "{0}/{1}.submit".format(self.script_dir, job_name) @@ -169,8 +189,8 @@ def submit(self, command, tasks_per_node, job_name="parsl.auto"): job_config["nodes"] = self.nodes_per_block job_config["tasks_per_node"] = tasks_per_node job_config["walltime"] = wtime_to_minutes(self.walltime) - job_config["scheduler_options"] = self.scheduler_options - job_config["worker_init"] = self.worker_init + job_config["scheduler_options"] = scheduler_options + job_config["worker_init"] = worker_init job_config["partition"] = self.partition job_config["user_script"] = command