Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add resource hints for Condor provider #1273

Merged
merged 8 commits into from
Sep 18, 2019
31 changes: 27 additions & 4 deletions parsl/providers/condor/condor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ class CondorProvider(RepresentationMixin, ClusterProvider):
:class:`~parsl.channels.SSHInteractiveLoginChannel`.
nodes_per_block : int
Nodes to provision per block.
cores_per_slot : int
Specify the number of cores to provision per slot. If set to None, executors
will assume all cores on the node are available for computation. Default is None.
mem_per_slot : float
Specify the real memory to provision per slot in GB. If set to None, no
explicit request to the scheduler will be made. Default is None.
init_blocks : int
Number of blocks to provision at time of initialization
min_blocks : int
Expand Down Expand Up @@ -67,6 +73,8 @@ class CondorProvider(RepresentationMixin, ClusterProvider):
def __init__(self,
channel=LocalChannel(),
nodes_per_block=1,
cores_per_slot=None,
mem_per_slot=None,
init_blocks=1,
min_blocks=0,
max_blocks=10,
Expand All @@ -93,6 +101,12 @@ def __init__(self,
launcher,
cmd_timeout=cmd_timeout)
self.provisioned_blocks = 0
self.cores_per_slot = cores_per_slot
self.mem_per_slot = mem_per_slot

# To Parsl, Condor slots should be treated equivalently to nodes
self.cores_per_node = cores_per_slot
self.mem_per_node = mem_per_slot

self.environment = environment if environment is not None else {}
for key, value in self.environment.items():
Expand All @@ -104,8 +118,8 @@ def __init__(self,
pass

self.project = project
self.scheduler_options = scheduler_options
self.worker_init = worker_init
self.scheduler_options = scheduler_options + '\n'
self.worker_init = worker_init + '\n'
self.requirements = requirements
self.transfer_input_files = transfer_input_files

Expand Down Expand Up @@ -187,6 +201,15 @@ def submit(self, command, tasks_per_node, job_name="parsl.auto"):

job_name = "parsl.{0}.{1}".format(job_name, time.time())

scheduler_options = self.scheduler_options
worker_init = self.worker_init
if self.mem_per_slot is not None:
scheduler_options += 'RequestMemory = {}\n'.format(self.mem_per_slot * 1024)
worker_init += 'export PARSL_MEMORY_GB={}\n'.format(self.mem_per_slot)
if self.cores_per_slot is not None:
scheduler_options += 'RequestCpus = {}\n'.format(self.cores_per_slot)
worker_init += 'export PARSL_CORES={}\n'.format(self.cores_per_slot)

script_path = "{0}/{1}.submit".format(self.script_dir, job_name)
script_path = os.path.abspath(script_path)
userscript_path = "{0}/{1}.script".format(self.script_dir, job_name)
Expand All @@ -199,8 +222,8 @@ def submit(self, command, tasks_per_node, job_name="parsl.auto"):
job_config["submit_script_dir"] = self.channel.script_dir
job_config["project"] = self.project
job_config["nodes"] = self.nodes_per_block
job_config["scheduler_options"] = self.scheduler_options
job_config["worker_init"] = self.worker_init
job_config["scheduler_options"] = scheduler_options
job_config["worker_init"] = worker_init
job_config["user_script"] = command
job_config["tasks_per_node"] = tasks_per_node
job_config["requirements"] = self.requirements
Expand Down