Skip to content

Commit

Permalink
🛠️ refactor to use cpus option for taskset
Browse files Browse the repository at this point in the history
  • Loading branch information
ddomenico committed Feb 16, 2024
1 parent 69e877b commit 145942e
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 15 deletions.
35 changes: 32 additions & 3 deletions isabl_cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from os.path import join
import abc
import os
import re
import sys
import traceback

Expand Down Expand Up @@ -705,12 +706,20 @@ def as_cli_command(cls):
is_flag=True,
)

cpus = click.option(
"--cpus",
help="When running local, mask cpus with taskset. e.g. 0, 0-32",
type=click.STRING,
default=None,
)

cli_options = [ # pylint: disable=unused-variable
(quiet, True),
(commit, True),
(force, cls.cli_allow_force),
(restart, cls.cli_allow_restart),
(local, cls.cli_allow_local),
(cpus, None)
]

def print_url(ctx, _, value):
Expand Down Expand Up @@ -739,6 +748,7 @@ def command(commit, quiet, **cli_options):
force = cli_options.pop("force", False)
restart = cli_options.pop("restart", False)
local = cli_options.pop("local", False)
cpus = cli_options.pop("cpus", None)
tuples = []

if commit and force:
Expand All @@ -764,6 +774,7 @@ def command(commit, quiet, **cli_options):
verbose=not quiet,
restart=restart,
local=local,
cpus=cpus,
run_args=cli_options,
)

Expand Down Expand Up @@ -835,6 +846,7 @@ def run(
verbose=True,
run_args=None,
local=False,
cpus=None,
):
"""
Run a list of targets, references tuples.
Expand All @@ -843,6 +855,7 @@ def run(
restart (bool): set settings.restart = True.
force (bool): if true, analyses are wiped before being submitted.
local (bool): if true, analyses will be run locally one by one.
cpus (str): if set, mask cpus with taskset.
commit (bool): if true, analyses are started (`force` overwrites).
verbose (bool): whether or not verbose output should be printed.
tuples (list): list of (targets, references) tuples.
Expand Down Expand Up @@ -875,6 +888,12 @@ def run(
# run extra settings validation
self.validate_settings(self.settings)

# validate cpus input if local and set
if local and cpus and not self.validate_taskset_input(cpus):
raise exceptions.ValidationError(
f"CPU input not supported by taskset: {cpus}"
)

# create analyses
analyses, invalid_tuples = self.get_or_create_analyses(tuples)

Expand All @@ -885,7 +904,7 @@ def run(

# run analyses
run_tuples, skipped_tuples, invalid_run_tuples = self.run_analyses(
analyses=analyses, commit=commit, force=force, restart=restart, local=local
analyses=analyses, commit=commit, force=force, restart=restart, local=local, cpus=cpus,
)

invalid_tuples.extend(invalid_run_tuples)
Expand Down Expand Up @@ -930,7 +949,7 @@ def run(

return run_tuples, skipped_tuples, invalid_tuples

def run_analyses(self, analyses, commit, force, restart, local):
def run_analyses(self, analyses, commit, force, restart, local, cpus=None):
"""
Run a list of analyses.
Expand Down Expand Up @@ -1003,7 +1022,10 @@ def run_analyses(self, analyses, commit, force, restart, local):

if commit:
click.echo(f"Running analyses with {submit_analyses.__name__}...")
run_tuples = submit_analyses(self, command_tuples)
if local:
run_tuples = submit_analyses(self, command_tuples, cpus)
else:
run_tuples = submit_analyses(self, command_tuples)
else:
run_tuples = [(i, self._staged_message) for i, _ in command_tuples]

Expand Down Expand Up @@ -1878,6 +1900,13 @@ def validate_source(self, experiments, source):
i["sample"]["source"] == source
), f"Sample source for {i['sample']['system_id']} does not match {source}."

def validate_taskset_input(self, cpus):
"""Validate cpus option is in proper taskset format."""
regex = r'^\s*([0-9]+(-[0-9]+)?\s*)(,\s*[0-9]+(-[0-9]+)?\s*)*$'
if re.match(regex, cpus):
return True
return False

# -------------------------
# NOTIFICATION UTILS
# -------------------------
Expand Down
16 changes: 4 additions & 12 deletions isabl_cli/batch_systems/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,12 @@
from isabl_cli.settings import perform_import


def submit_local(app, command_tuples):
def submit_local(app, command_tuples, specific_cpus=None):
"""Submit analyses locally and serially."""
# make sure analyses are in submitted status to avoid
# merging project level analyses in every success
ret = []

get_requirements = system_settings.SUBMIT_CONFIGURATION.get("get_requirements")

if get_requirements:
name = "SUBMIT_CONFIGURATION.get_requirements"
get_requirements = perform_import(val=get_requirements, setting_name=name)
method = command_tuples[0][0]["targets"][0]["technique"]["method"]
requirements = get_requirements(app, method)

api.patch_analyses_status([i for i, _ in command_tuples], "SUBMITTED")
label = f"Running {len(command_tuples)} analyses..."

Expand All @@ -34,9 +26,9 @@ def submit_local(app, command_tuples):
oldmask = os.umask(0o22)
status = app._get_after_completion_status(i)

# add requirements if specified
if requirements:
j = f"{requirements} {j}"
# add cpus if specified
if specific_cpus:
j = f"taskset -c {specific_cpus} {j}"

with open(log, "w") as stdout, open(err, "w") as stderr:
try:
Expand Down

0 comments on commit 145942e

Please sign in to comment.