Skip to content

Commit

Permalink
Make leader wait for expected updates to be visible in the job store,…
Browse files Browse the repository at this point in the history
… or fail the job (#4811)

* Implement expecting version bumps and fail src/toil/test/batchSystems/batchSystemTest.py::MaxCoresSingleMachineBatchSystemTest::testServices

* Actually turn on debug logging for service test

* Refer to jobs for space usage accounting by stringified job description and not body file

* Use exponential backoff when polling for job updates

* Fix comparison direction

* Plug the new CLI option

* Include version writers in warnings

* Make return type annotation correct

* Don't wait for new versions of failed jobs because then we're too slow to pass the badWorker tests

* Scale down stats tutorial test to fit on small CI runners

* Work out that command overrides aren't being removed

* Stop having an overloaded command field on JobDescriptions

* Fix typos and update architecture to lean less on command

* Fix calling the checkpoint restore

* Handle None vs. empty successors in tests

* Handle places that didn't expect nextSuccessors() to ever be None

* Remove extra the

* Fix handling jobs that had no bodies, and consolidate warning logic

* Always actually do a reset even if no new version is ready.

* Use has_body accessor more

* Rename loadJob variables

* Rename _body_spec and use more has_body()

* Use a NamedTuple instead of a command-style string to point to the body

* Improve JobDescription docstring and fix typoed argument name

* Remove worker command from JobDescription

* Eliminate references to get_worker_command/set_worker_command

---------

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
adamnovak and github-actions[bot] authored Apr 25, 2024
1 parent 068717e commit 8438f73
Show file tree
Hide file tree
Showing 28 changed files with 453 additions and 247 deletions.
4 changes: 2 additions & 2 deletions docs/appendices/architecture.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ These components are described below:
If the job defines successor jobs the worker may choose to immediately run them
(see `Job Chaining`_ below).
* the batch-system:
Responsible for scheduling the jobs given to it by the leader, creating
Responsible for scheduling the jobs given to it by the leader, running
a worker command for each job. The batch-system is defined by the
:class:`~toil.batchSystems.abstractBatchSystem.AbstractBatchSystem` class.
Toil uses multiple existing batch systems to schedule jobs, including
Expand All @@ -63,7 +63,7 @@ and un-pickled by the worker when they are scheduled to run.
During scheduling, Toil does not work with the actual Job objects. Instead,
:class:`~toil.job.JobDescription` objects are used to store all the information
that the Toil Leader ever needs to know about the Job. This includes requirements
information, dependency information, commands to issue, etc.
information, dependency information, body object to run, worker command to issue, etc.

Internally, the JobDescription object is referenced by its jobStoreID, which is
often not human readable. However, the Job and JobDescription objects contain
Expand Down
9 changes: 6 additions & 3 deletions docs/running/cliOptions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ from the batch system.
The options for jobs that either run too long/fail or get lost (some batch
systems have issues!).

--retryCount RETRYCOUNT
--retryCount INT
Number of times to retry a failing job before giving
up and labeling job failed. default=1
--enableUnlimitedPreemptibleRetries
Expand All @@ -443,14 +443,17 @@ systems have issues!).
doubled and they will be retried. The remaining
retry count will be reduced by 1. Currently only
supported by LSF. default=False.
--maxJobDuration MAXJOBDURATION
--maxJobDuration INT
Maximum runtime of a job (in seconds) before we kill
it (this is a lower bound, and the actual time before
killing the job may be longer).
--rescueJobsFrequency RESCUEJOBSFREQUENCY
--rescueJobsFrequency INT
Period of time to wait (in seconds) between checking
for missing/overlong jobs, that is jobs which get lost
by the batch system. Expert parameter.
--jobStoreTimeout FLOAT
Maximum time (in seconds) to wait for a job's update to
the job store before declaring it failed.

**Log Management Options**

Expand Down
35 changes: 22 additions & 13 deletions src/toil/batchSystems/abstractBatchSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ class BatchJobExitReason(enum.IntEnum):
"""Internal error."""
MEMLIMIT: int = 6
"""Job hit batch system imposed memory limit."""
MISSING: int = 7
"""Job disappeared from the scheduler without actually stopping, so Toil killed it."""
MAXJOBDURATION: int = 8
"""Job ran longer than --maxJobDuration, so Toil killed it."""
PARTITION: int = 9
"""Job was not able to talk to the leader via the job store, so Toil declared it failed."""


@classmethod
def to_string(cls, value: int) -> str:
Expand Down Expand Up @@ -156,17 +163,19 @@ def set_message_bus(self, message_bus: MessageBus) -> None:
"""

@abstractmethod
def issueBatchJob(self, jobDesc: JobDescription, job_environment: Optional[Dict[str, str]] = None) -> int:
def issueBatchJob(self, command: str, job_desc: JobDescription, job_environment: Optional[Dict[str, str]] = None) -> int:
"""
Issues a job with the specified command to the batch system and returns
a unique jobID.
a unique job ID number.
:param jobDesc: a toil.job.JobDescription
:param command: the command to execute somewhere to run the Toil
worker process
:param job_desc: the JobDescription for the job being run
:param job_environment: a collection of job-specific environment
variables to be set on the worker.
variables to be set on the worker.
:return: a unique jobID that can be used to reference the newly issued
job
:return: a unique job ID number that can be used to reference the newly
issued job
"""
raise NotImplementedError()

Expand All @@ -188,20 +197,20 @@ def getIssuedBatchJobIDs(self) -> List[int]:
"""
Gets all currently issued jobs
:return: A list of jobs (as jobIDs) currently issued (may be running, or may be
waiting to be run). Despite the result being a list, the ordering should not
be depended upon.
:return: A list of jobs (as job ID numbers) currently issued (may be
running, or may be waiting to be run). Despite the result being a
list, the ordering should not be depended upon.
"""
raise NotImplementedError()

@abstractmethod
def getRunningBatchJobIDs(self) -> Dict[int, float]:
"""
Gets a map of jobs as jobIDs that are currently running (not just waiting)
and how long they have been running, in seconds.
Gets a map of jobs as job ID numbers that are currently running (not
just waiting) and how long they have been running, in seconds.
:return: dictionary with currently running jobID keys and how many seconds they have
been running as the value
:return: dictionary with currently running job ID number keys and how
many seconds they have been running as the value
"""
raise NotImplementedError()

Expand Down
10 changes: 5 additions & 5 deletions src/toil/batchSystems/abstractGridEngineBatchSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,9 +394,9 @@ def supportsWorkerCleanup(cls):
def supportsAutoDeployment(cls):
return False

def issueBatchJob(self, jobDesc, job_environment: Optional[Dict[str, str]] = None):
def issueBatchJob(self, command: str, jobDesc, job_environment: Optional[Dict[str, str]] = None):
# Avoid submitting internal jobs to the batch queue, handle locally
localID = self.handleLocalJob(jobDesc)
localID = self.handleLocalJob(command, jobDesc)
if localID is not None:
return localID
else:
Expand All @@ -410,10 +410,10 @@ def issueBatchJob(self, jobDesc, job_environment: Optional[Dict[str, str]] = Non
gpus = accelerator['count']
else:
gpus = jobDesc.accelerators

self.newJobsQueue.put((jobID, jobDesc.cores, jobDesc.memory, jobDesc.command, get_job_kind(jobDesc.get_names()),
self.newJobsQueue.put((jobID, jobDesc.cores, jobDesc.memory, command, get_job_kind(jobDesc.get_names()),
job_environment, gpus))
logger.debug("Issued the job command: %s with job id: %s and job name %s", jobDesc.command, str(jobID),
logger.debug("Issued the job command: %s with job id: %s and job name %s", command, str(jobID),
get_job_kind(jobDesc.get_names()))
return jobID

Expand Down
6 changes: 3 additions & 3 deletions src/toil/batchSystems/awsBatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,9 @@ def _check_accelerator_request(self, requirer: Requirer) -> None:
'AWS Batch can only provide nvidia gpu accelerators.'
])

def issueBatchJob(self, job_desc: JobDescription, job_environment: Optional[Dict[str, str]] = None) -> int:
def issueBatchJob(self, command: str, job_desc: JobDescription, job_environment: Optional[Dict[str, str]] = None) -> int:
# Try the job as local
local_id = self.handleLocalJob(job_desc)
local_id = self.handleLocalJob(command, job_desc)
if local_id is not None:
# It is a local job
return local_id
Expand All @@ -184,7 +184,7 @@ def issueBatchJob(self, job_desc: JobDescription, job_environment: Optional[Dict
environment.update(job_environment)

# Make a command to run it in the executor
command_list = pack_job(job_desc, self.user_script)
command_list = pack_job(command, self.user_script)

# Compose a job spec to submit
job_spec = {
Expand Down
9 changes: 4 additions & 5 deletions src/toil/batchSystems/contained_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,17 @@
from typing import Any, Dict, List, Optional

from toil.batchSystems.abstractBatchSystem import EXIT_STATUS_UNAVAILABLE_VALUE
from toil.job import JobDescription
from toil.resource import Resource
from toil.statsAndLogging import configure_root_logger, set_log_level

logger = logging.getLogger(__name__)


def pack_job(job_desc: JobDescription, user_script: Optional[Resource] = None, environment: Optional[Dict[str, str]] = None) -> List[str]:
def pack_job(command: str, user_script: Optional[Resource] = None, environment: Optional[Dict[str, str]] = None) -> List[str]:
"""
Create a command that, when run, will execute the given job.
Create a command that runs the given command in an environment.
:param job_desc: Job description for the job to run.
:param command: Worker command to run to run the job.
:param user_script: User script that will be loaded before the job is run.
:param environment: Environment variable dict that will be applied before
the job is run.
Expand All @@ -46,7 +45,7 @@ def pack_job(job_desc: JobDescription, user_script: Optional[Resource] = None, e
"""
# Make a job dict to send to the executor.
# TODO: Factor out executor setup from here and Kubernetes and TES
job: Dict[str, Any] = {"command": job_desc.command}
job: Dict[str, Any] = {"command": command}
if user_script is not None:
# If there's a user script resource be sure to send it along
job['userScript'] = user_script
Expand Down
8 changes: 4 additions & 4 deletions src/toil/batchSystems/htcondor.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,9 +387,9 @@ def getEnvString(self, overrides: Dict[str, str]) -> str:
return '"' + ' '.join(env_items) + '"'

# Override the issueBatchJob method so HTCondor can be given the disk request
def issueBatchJob(self, jobNode, job_environment: Optional[Dict[str, str]] = None):
def issueBatchJob(self, command: str, jobNode, job_environment: Optional[Dict[str, str]] = None):
# Avoid submitting internal jobs to the batch queue, handle locally
localID = self.handleLocalJob(jobNode)
localID = self.handleLocalJob(command, jobNode)
if localID is not None:
return localID
else:
Expand All @@ -398,7 +398,7 @@ def issueBatchJob(self, jobNode, job_environment: Optional[Dict[str, str]] = Non
self.currentJobs.add(jobID)

# Construct our style of job tuple
self.newJobsQueue.put((jobID, jobNode.cores, jobNode.memory, jobNode.disk, jobNode.jobName, jobNode.command,
self.newJobsQueue.put((jobID, jobNode.cores, jobNode.memory, jobNode.disk, jobNode.jobName, command,
job_environment or {}, jobNode.accelerators))
logger.debug("Issued the job command: %s with job id: %s ", jobNode.command, str(jobID))
logger.debug("Issued the job command: %s with job id: %s ", command, str(jobID))
return jobID
9 changes: 5 additions & 4 deletions src/toil/batchSystems/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,7 @@ def _check_accelerator_request(self, requirer: Requirer) -> None:

def _create_pod_spec(
self,
command: str,
job_desc: JobDescription,
job_environment: Optional[Dict[str, str]] = None
) -> V1PodSpec:
Expand All @@ -770,7 +771,7 @@ def _create_pod_spec(
environment.update(job_environment)

# Make a command to run it in the executor
command_list = pack_job(job_desc, self.user_script, environment=environment)
command_list = pack_job(command, self.user_script, environment=environment)

# The Kubernetes API makes sense only in terms of the YAML format. Objects
# represent sections of the YAML files. Except from our point of view, all
Expand Down Expand Up @@ -1005,9 +1006,9 @@ def _delete_job(
self._release_acquired_resources(resources, notify=resource_notify)
del self._acquired_resources[job_name]

def issueBatchJob(self, job_desc: JobDescription, job_environment: Optional[Dict[str, str]] = None) -> int:
def issueBatchJob(self, command: str, job_desc: JobDescription, job_environment: Optional[Dict[str, str]] = None) -> int:
# Try the job as local
localID = self.handleLocalJob(job_desc)
localID = self.handleLocalJob(command, job_desc)
if localID is not None:
# It is a local job
return localID
Expand All @@ -1018,7 +1019,7 @@ def issueBatchJob(self, job_desc: JobDescription, job_environment: Optional[Dict
self.check_resource_request(job_desc)

# Make a pod that describes running the job
pod_spec = self._create_pod_spec(job_desc, job_environment=job_environment)
pod_spec = self._create_pod_spec(command, job_desc, job_environment=job_environment)

# Make a batch system scope job ID
job_id = self.getNextJobID()
Expand Down
6 changes: 3 additions & 3 deletions src/toil/batchSystems/local_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ def __init__(self, config: Config, maxCores: float, maxMemory: int, maxDisk: int
config, maxCores, maxMemory, maxDisk, max_jobs=max_local_jobs
)

def handleLocalJob(self, jobDesc: JobDescription) -> Optional[int]:
def handleLocalJob(self, command: str, jobDesc: JobDescription) -> Optional[int]:
"""
To be called by issueBatchJobs.
To be called by issueBatchJob.
Returns the jobID if the jobDesc has been submitted to the local queue,
otherwise returns None
Expand All @@ -50,7 +50,7 @@ def handleLocalJob(self, jobDesc: JobDescription) -> Optional[int]:
# somehow doesn't error whereas just returning the value complains
# we're returning an Any. TODO: When singleMachine.py typechecks,
# remove all these extra variables.
local_id: int = self.localBatch.issueBatchJob(jobDesc)
local_id: int = self.localBatch.issueBatchJob(command, jobDesc)
return local_id
else:
return None
Expand Down
8 changes: 4 additions & 4 deletions src/toil/batchSystems/mesos/batchSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,13 @@ def ignoreNode(self, nodeAddress):
def unignoreNode(self, nodeAddress):
self.ignoredNodes.remove(nodeAddress)

def issueBatchJob(self, jobNode: JobDescription, job_environment: Optional[Dict[str, str]] = None):
def issueBatchJob(self, command: str, jobNode: JobDescription, job_environment: Optional[Dict[str, str]] = None):
"""
Issues the following command returning a unique jobID. Command is the string to run, memory
is an int giving the number of bytes the job needs to run in and cores is the number of cpus
needed for the job and error-file is the path of the file to place any std-err/std-out in.
"""
localID = self.handleLocalJob(jobNode)
localID = self.handleLocalJob(command, jobNode)
if localID is not None:
return localID

Expand All @@ -200,12 +200,12 @@ def issueBatchJob(self, jobNode: JobDescription, job_environment: Optional[Dict[
job = ToilJob(jobID=jobID,
name=str(jobNode),
resources=MesosShape(wallTime=0, **mesos_resources),
command=jobNode.command,
command=command,
userScript=self.userScript,
environment=environment,
workerCleanupInfo=self.workerCleanupInfo)
jobType = job.resources
log.debug("Queueing the job command: %s with job id: %s ...", jobNode.command, str(jobID))
log.debug("Queueing the job %s with job id: %s ...", jobNode, str(jobID))

# TODO: round all elements of resources

Expand Down
5 changes: 3 additions & 2 deletions src/toil/batchSystems/mesos/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,13 @@ def runJob(job):
"""
if job.userScript:
job.userScript.register()
log.debug("Invoking command: '%s'", job.command)
command = job.command
log.debug("Invoking command: '%s'", command)
# Construct the job's environment
jobEnv = dict(os.environ, **job.environment)
log.debug('Using environment variables: %s', jobEnv.keys())
with self.popenLock:
return subprocess.Popen(job.command,
return subprocess.Popen(command,
preexec_fn=lambda: os.setpgrp(),
shell=True, env=jobEnv)

Expand Down
12 changes: 6 additions & 6 deletions src/toil/batchSystems/singleMachine.py
Original file line number Diff line number Diff line change
Expand Up @@ -743,24 +743,24 @@ def _handleChild(self, pid: int) -> None:

logger.debug('Child %d for job %s succeeded', pid, jobID)

def issueBatchJob(self, jobDesc: JobDescription, job_environment: Optional[Dict[str, str]] = None) -> int:
def issueBatchJob(self, command: str, job_desc: JobDescription, job_environment: Optional[Dict[str, str]] = None) -> int:
"""Adds the command and resources to a queue to be run."""

self._checkOnDaddy()

# Apply scale in cores
scaled_desc = jobDesc.scale('cores', self.scale)
scaled_desc = job_desc.scale('cores', self.scale)
# Round cores up to multiples of minCores
scaled_desc.cores = max(math.ceil(scaled_desc.cores / self.minCores) * self.minCores, self.minCores)

# Don't do our own assertions about job size vs. our configured size.
# The abstract batch system can handle it.
self.check_resource_request(scaled_desc)
logger.debug(f"Issuing the command: {jobDesc.command} with {scaled_desc.requirements_string()}")
logger.debug(f"Issuing the command: {command} with {scaled_desc.requirements_string()}")
with self.jobIndexLock:
jobID = self.jobIndex
self.jobIndex += 1
self.jobs[jobID] = jobDesc.command
self.jobs[jobID] = command

environment = self.environment.copy()
if job_environment:
Expand All @@ -769,10 +769,10 @@ def issueBatchJob(self, jobDesc: JobDescription, job_environment: Optional[Dict[
if self.debugWorker:
# Run immediately, blocking for return.
# Ignore resource requirements; we run one job at a time
self._runDebugJob(jobDesc.command, jobID, environment)
self._runDebugJob(command, jobID, environment)
else:
# Queue the job for later
self.inputQueue.put((jobDesc.command, jobID, scaled_desc.cores, scaled_desc.memory,
self.inputQueue.put((command, jobID, scaled_desc.cores, scaled_desc.memory,
scaled_desc.disk, scaled_desc.accelerators, environment))

return jobID
Expand Down
2 changes: 2 additions & 0 deletions src/toil/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ class Config:
doubleMem: bool
maxJobDuration: int
rescueJobsFrequency: int
job_store_timeout: float

# Log management
maxLogFileSize: int
Expand Down Expand Up @@ -373,6 +374,7 @@ def set_option(option_name: str,
set_option("doubleMem")
set_option("maxJobDuration")
set_option("rescueJobsFrequency")
set_option("job_store_timeout")

# Log management
set_option("maxLogFileSize")
Expand Down
4 changes: 1 addition & 3 deletions src/toil/fileStores/abstractFileStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,7 @@ def __init__(
assert self.jobStore.config.workflowID is not None
self.workflow_dir: str = Toil.getLocalWorkflowDir(self.jobStore.config.workflowID, self.jobStore.config.workDir)
self.coordination_dir: str =Toil.get_local_workflow_coordination_dir(self.jobStore.config.workflowID, self.jobStore.config.workDir, self.jobStore.config.coordination_dir)
self.jobName: str = (
self.jobDesc.command.split()[1] if self.jobDesc.command else ""
)
self.jobName: str = str(self.jobDesc)
self.waitForPreviousCommit = waitForPreviousCommit
self.logging_messages: List[Dict[str, Union[int, str]]] = []
self.logging_user_streams: List[dict[str, str]] = []
Expand Down
2 changes: 1 addition & 1 deletion src/toil/fileStores/cachingFileStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -1859,7 +1859,7 @@ def startCommit(self, jobState=False):
logger.debug('Starting commit of %s forked from %s', state_to_commit, self.jobDesc)
# Make sure the deep copy isn't summoning ghosts of old job
# versions. It must be as new or newer at this point.
self.jobDesc.check_new_version(state_to_commit)
self.jobDesc.assert_is_not_newer_than(state_to_commit)

# Bump the original's version since saving will do that too and we
# don't want duplicate versions.
Expand Down
Loading

0 comments on commit 8438f73

Please sign in to comment.