Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
johanneskoester committed Sep 8, 2023
1 parent 0906b25 commit b004122
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 34 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ readme = "README.md"
python = "^3.9"
snakemake-interface-common = "^1.3.2"
snakemake-interface-executor-plugins = "^2.0.0"
throttler = "^1.2.2"

[tool.poetry.group.dev.dependencies]
black = "^23.7.0"
Expand Down
95 changes: 61 additions & 34 deletions snakemake_executor_plugin_slurm/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
import csv
from dataclasses import dataclass, field
from fractions import Fraction
from io import StringIO
import os
import shlex
Expand All @@ -10,13 +9,14 @@
import uuid
from snakemake_interface_executor_plugins.executors.base import SubmittedJobInfo
from snakemake_interface_executor_plugins.executors.remote import RemoteExecutor
from snakemake_interface_executor_plugins import ExecutorSettingsBase, CommonSettings
from snakemake_interface_executor_plugins import CommonSettings
from snakemake_interface_executor_plugins.workflow import WorkflowExecutorInterface
from snakemake_interface_executor_plugins.logging import LoggerExecutorInterface
from snakemake_interface_executor_plugins.jobs import (
ExecutorJobInterface,
)
from snakemake_interface_common.exceptions import WorkflowError
from throttler import Throttler


# Required:
Expand Down Expand Up @@ -96,7 +96,8 @@ def run_job(self, job: ExecutorJobInterface):
call += f" -t {job.resources.runtime}"
else:
self.logger.warning(
"No wall time information given. This might or might not work on your cluster. "
"No wall time information given. This might or might not "
"work on your cluster. "
"If not, specify the resource runtime in your rule or as a reasonable "
"default via --default-resources."
)
Expand Down Expand Up @@ -156,10 +157,13 @@ def run_job(self, job: ExecutorJobInterface):
slurm_jobid = out.split(" ")[-1]
slurm_logfile = slurm_logfile.replace("%j", slurm_jobid)
self.logger.info(
f"Job {jobid} has been submitted with SLURM jobid {slurm_jobid} (log: {slurm_logfile})."
f"Job {jobid} has been submitted with SLURM jobid {slurm_jobid} "
f"(log: {slurm_logfile})."
)
self.report_job_submission(
SubmittedJobInfo(job, external_jobid=slurm_jobid, aux={"slurm_logfile": slurm_logfile})
SubmittedJobInfo(
job, external_jobid=slurm_jobid, aux={"slurm_logfile": slurm_logfile}
)
)

async def check_active_jobs(
Expand Down Expand Up @@ -190,8 +194,6 @@ async def check_active_jobs(
"TIMEOUT",
"ERROR",
)
# intialize time to sleep in seconds
min_sleep_time = 20
# Cap sleeping time between querying the status of all active jobs:
# If `AccountingStorageType`` for `sacct` is set to `accounting_storage/none`,
# sacct will query `slurmctld` (instead of `slurmdbd`) and this in turn can
Expand All @@ -200,26 +202,24 @@ async def check_active_jobs(
# removed from `slurmctld` within 6 minutes of finishing. So we're conservative
# here, with half that time
max_sleep_time = 180
# Initialize all query durations to specified
# 5 times the status_rate_limiter, to hit exactly
# the status_rate_limiter for the first async below.
# It is dynamically updated afterwards.
sacct_query_duration = (
self.status_rate_limiter._period / self.status_rate_limiter._rate_limit
) * 5

sacct_query_durations = []

status_attempts = 5

active_jobs_ids = {job_info.external_jobid for job_info in active_jobs}
active_jobs_seen_by_sacct = set()

# this code is inspired by the snakemake profile:
# https://github.com/Snakemake-Profiles/slurm/blob/a0e559e1eca607d0bd26c15f94d609e6905f8a8e/%7B%7Bcookiecutter.profile_name%7D%7D/slurm-status.py#L27
# https://github.com/Snakemake-Profiles/slurm
for i in range(status_attempts):
async with self.status_rate_limiter:
(status_of_jobs, sacct_query_duration) = await self.job_stati(
# -X: only show main job, no substeps
f"sacct -X --parsable2 --noheader --format=JobIdRaw,State --name {self.run_uuid}"
f"sacct -X --parsable2 --noheader --format=JobIdRaw,State "
f"--name {self.run_uuid}"
)
sacct_query_durations.append(sacct_query_duration)
self.logger.debug(f"status_of_jobs after sacct is: {status_of_jobs}")
# only take jobs that are still active
active_jobs_ids_with_current_sacct_status = (
Expand All @@ -240,17 +240,22 @@ async def check_active_jobs(
break
if i >= status_attempts - 1:
self.logger.warning(
f"Unable to get the status of all active_jobs that should be in slurmdbd, even after {status_attempts} attempts.\n"
f"The jobs with the following slurm job ids were previously seen by sacct, but sacct doesn't report them any more:\n"
f"Unable to get the status of all active_jobs that should be "
f"in slurmdbd, even after {status_attempts} attempts.\n"
f"The jobs with the following slurm job ids were previously seen "
"by sacct, but sacct doesn't report them any more:\n"
f"{missing_sacct_status}\n"
f"Please double-check with your slurm cluster administrator, that slurmdbd job accounting is properly set up.\n"
f"Please double-check with your slurm cluster administrator, that "
"slurmdbd job accounting is properly set up.\n"
)

self.update_status_rate_limiter(sacct_query_durations)

any_finished = False
for j in active_jobs:
# the job probably didn't make it into slurmdbd yet, so
# `sacct` doesn't return it
if not j.external_jobid in status_of_jobs:
if j.external_jobid not in status_of_jobs:
# but the job should still be queueing or running and
# appear in slurmdbd (and thus `sacct` output) later
yield j
Expand All @@ -269,23 +274,38 @@ async def check_active_jobs(
elif status in fail_stati:
self.print_job_error(
j,
msg=f"SLURM-job '{j.external_jobid}' failed, SLURM status is: '{status}'",
msg=f"SLURM-job '{j.external_jobid}' failed, SLURM status is: "
f"'{status}'",
aux_logs=[j.slurm_logfile],
)
self.report_job_error(j.job)
active_jobs_seen_by_sacct.remove(j.external_jobid)
else: # still running?
yield j

if not any_finished:
self.next_seconds_between_status_checks = min(
self.next_seconds_between_status_checks + 10,
max_sleep_time
self.next_seconds_between_status_checks + 10, max_sleep_time
)
else:
self.next_seconds_between_status_checks = None


def update_status_rate_limiter(self, sacct_query_durations):
# Update self.status_rate_limiter to avoid too many API calls in retries.
mean_sacct_query_duration = sum(sacct_query_durations) / len(
sacct_query_durations
)
rate_limit = Fraction(
min(
self.status_rate_limiter._rate_limit / self.status_rate_limiter._period,
# if slurmdbd (sacct) is strained and slow, reduce the query frequency
(1 / mean_sacct_query_duration) / 5,
)
).limit_denominator()
self.status_rate_limiter = Throttler(
rate_limit=rate_limit.numerator,
period=rate_limit.denominator,
)

def cancel_jobs(self, active_jobs: List[SubmittedJobInfo]):
# Cancel all active jobs.
Expand Down Expand Up @@ -399,7 +419,8 @@ def get_account(self):
return sacct_out.strip()
except subprocess.CalledProcessError as e:
self.logger.warning(
f"No account was given, not able to get a SLURM account via sacct: {e.stderr}"
f"No account was given, not able to get a SLURM account via sacct: "
f"{e.stderr}"
)
return None

Expand All @@ -414,19 +435,22 @@ def test_account(self, account):
)
except subprocess.CalledProcessError as e:
raise WorkflowError(
f"Unable to test the validity of the given or guessed SLURM account '{account}' with sacctmgr: {e.stderr}"
f"Unable to test the validity of the given or guessed SLURM account "
f"'{account}' with sacctmgr: {e.stderr}"
)

accounts = accounts.split()

if account not in accounts:
raise WorkflowError(
f"The given account {account} appears to be invalid. Available accounts:\n{', '.join(accounts)}"
f"The given account {account} appears to be invalid. Available "
f"accounts:\n{', '.join(accounts)}"
)

def get_default_partition(self, job):
"""
if no partition is given, checks whether a fallback onto a default partition is possible
if no partition is given, checks whether a fallback onto a default
partition is possible
"""
try:
out = subprocess.check_output(
Expand All @@ -437,13 +461,16 @@ def get_default_partition(self, job):
f"Failed to run sinfo for retrieval of cluster partitions: {e.stderr}"
)
for partition in out.split():
# a default partition is marked with an asterisk, but this is not part of the name
# A default partition is marked with an asterisk, but this is not part of
# the name.
if "*" in partition:
# the decode-call is necessary, because the output of sinfo is bytes
return partition.replace("*", "")
self.logger.warning(
f"No partition was given for rule '{job}', and unable to find a default partition."
f"No partition was given for rule '{job}', and unable to find "
"a default partition."
" Trying to submit without partition information."
" You may want to invoke snakemake with --default-resources 'slurm_partition=<your default partition>'."
" You may want to invoke snakemake with --default-resources "
"'slurm_partition=<your default partition>'."
)
return ""
return ""

0 comments on commit b004122

Please sign in to comment.