Skip to content

Commit

Permalink
Allow fetching job inputs for debugging (#4848)
Browse files Browse the repository at this point in the history
* Reformat worker

* Actually change kwarg name

* Enable stopping WDL (and probably CWL) jobs after files are downloaded

* Make sure WDL commands get logged before we stop

* Add type hints

* Add debug flag accessor

* Make debug-job default to debug logging

* Build fake container environments for CWL and WDL jobs when debugging them

* Add an example of dumping job files to the docs

* Add tests for the file retrieval and container faking

* Add missing imports

---------

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
adamnovak and github-actions[bot] authored Apr 23, 2024
1 parent bb76807 commit 068717e
Show file tree
Hide file tree
Showing 18 changed files with 513 additions and 107 deletions.
45 changes: 44 additions & 1 deletion docs/running/debugging.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ For example, say you have this WDL workflow in ``test.wdl``. This workflow **can

You could try to run it with::

toil-wdl-runner --jobStore ./store test.wdl
toil-wdl-runner --jobStore ./store test.wdl --retryCount 0

But it will fail.

Expand All @@ -68,6 +68,49 @@ And if we know there's only one failed WDL task, we can just tell Toil to rerun
toil debug-job ./store WDLTaskJob

Any of these will run the job (including any containers) on the local machine, where its execution can be observed live or monitored with a debugger.

Fetching Job Inputs
~~~~~~~~~~~~~~~~~~~

The ``--retrieveTaskDirectory`` option to ``toil debug-job`` allows you to send the input files for a job to a directory, and then stop running the job. It works for CWL and WDL jobs, and for Python workflows that call :meth:`toil.job.Job.files_downloaded_hook` after downloading their files. It will make the worker work in the specified directory, so the job's temporary directory will be at ``worker/job`` inside it. For WDL and CWL jobs that mount files into containers, there will also be an ``inside`` directory populated with symlinks to the files as they would be visible from the root of the container's filesystem.

For example, say you have a **broken WDL workflow** named ``example_alwaysfail_with_files.wdl``, like this:

.. literalinclude:: ../../src/toil/test/docs/scripts/example_alwaysfail_with_files.wdl

You can try and fail to run it like this::

toil-wdl-runner --jobStore ./store example_alwaysfail_with_files.wdl --retryCount 0

If you then dump the files from the failing job::

toil debug-job ./store WDLTaskJob --retrieveTaskDirectory dumpdir

You will end up with a directory tree that looks, accorfing to ``tree``, something like this::

dumpdir
├── inside
│ └── mnt
│ └── miniwdl_task_container
│ └── work
│ └── _miniwdl_inputs
│ ├── 0
│ │ └── test.txt -> ../../../../../../worker/job/2c6b3dc4-1d21-4abf-9937-db475e6a6bc2/test.txt
│ └── 1
│ └── test.txt -> ../../../../../../worker/job/e3d724e1-e6cc-4165-97f1-6f62ab0fb1ef/test.txt
└── worker
└── job
├── 2c6b3dc4-1d21-4abf-9937-db475e6a6bc2
│ └── test.txt
├── e3d724e1-e6cc-4165-97f1-6f62ab0fb1ef
│ └── test.txt
├── tmpr2j5yaic
├── tmpxqr9__y4
└── work

15 directories, 4 files

You can see where Toil downloaded the input files for the job to the worker's temporary directory, and how they would be mounted into the container.


Introspecting the Job Store
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ markers =
wes_server
cwl_small_log_dir
cwl_small
wdl

[flake8]
# for compatability with the "black" Python code formatter
Expand Down
2 changes: 1 addition & 1 deletion src/toil/batchSystems/singleMachine.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ def _runDebugJob(self, jobCommand, jobID, environment):
jobName, jobStoreLocator, jobStoreID = jobCommand.split()[1:4] # Parse command
jobStore = Toil.resumeJobStore(jobStoreLocator)
statusCode = toil_worker.workerScript(jobStore, jobStore.config, jobName, jobStoreID,
redirectOutputToLogFile=not self.debugWorker) # Call the worker
redirect_output_to_log_file=not self.debugWorker) # Call the worker
else:
# Run synchronously. If starting or running the command fails, let the exception stop us.
statusCode = subprocess.check_call(jobCommand,
Expand Down
7 changes: 5 additions & 2 deletions src/toil/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,10 @@ def create_config_dict_from_parser(parser: ArgumentParser) -> CommentedMap:


def parser_with_common_options(
provisioner_options: bool = False, jobstore_option: bool = True, prog: Optional[str] = None
provisioner_options: bool = False,
jobstore_option: bool = True,
prog: Optional[str] = None,
default_log_level: Optional[int] = None
) -> ArgParser:
parser = ArgParser(prog=prog or "Toil", formatter_class=ArgumentDefaultsHelpFormatter)

Expand All @@ -611,7 +614,7 @@ def parser_with_common_options(
parser.add_argument('jobStore', type=str, help=JOBSTORE_HELP)

# always add these
add_logging_options(parser)
add_logging_options(parser, default_log_level)
parser.add_argument("--version", action='version', version=version)
parser.add_argument("--tempDirRoot", dest="tempDirRoot", type=str, default=tempfile.gettempdir(),
help="Path to where temporary directory containing all temp files are created, "
Expand Down
52 changes: 48 additions & 4 deletions src/toil/cwl/cwltoil.py
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,24 @@ def run_jobs(
class ToilTool:
"""Mixin to hook Toil into a cwltool tool type."""

def __init__(self, *args: Any, **kwargs: Any) -> None:
"""
Init hook to set up member variables.
"""
super().__init__(*args, **kwargs)
# Reserve a spot for the Toil job that ends up executing this tool.
self._toil_job: Optional[Job] = None
# Remember path mappers we have used so we can interrogate them later to find out what the job mapped.
self._path_mappers: List[cwltool.pathmapper.PathMapper] = []

def connect_toil_job(self, job: Job) -> None:
"""
Attach the Toil tool to the Toil job that is executing it. This allows
it to use the Toil job to stop at certain points if debugging flags are
set.
"""
self._toil_job = job

def make_path_mapper(
self,
reffiles: List[Any],
Expand All @@ -1021,12 +1039,12 @@ def make_path_mapper(
"""Create the appropriate PathMapper for the situation."""
if getattr(runtimeContext, "bypass_file_store", False):
# We only need to understand cwltool's supported URIs
return PathMapper(
mapper = PathMapper(
reffiles, runtimeContext.basedir, stagedir, separateDirs=separateDirs
)
else:
# We need to be able to read from Toil-provided URIs
return ToilPathMapper(
mapper = ToilPathMapper(
reffiles,
runtimeContext.basedir,
stagedir,
Expand All @@ -1035,6 +1053,10 @@ def make_path_mapper(
streaming_allowed=runtimeContext.streaming_allowed,
)

# Remember the path mappers
self._path_mappers.append(mapper)
return mapper

def __str__(self) -> str:
"""Return string representation of this tool type."""
return f'{self.__class__.__name__}({repr(getattr(self, "tool", {}).get("id", "???"))})'
Expand All @@ -1051,17 +1073,34 @@ def _initialworkdir(
name conflicts at the top level of the work directory.
"""

# Set up the initial work dir with all its files
super()._initialworkdir(j, builder)

# The initial work dir listing is now in j.generatefiles["listing"]
# Also j.generatrfiles is a CWL Directory.
# Also j.generatefiles is a CWL Directory.
# So check the initial working directory.
logger.info("Initial work dir: %s", j.generatefiles)
logger.debug("Initial work dir: %s", j.generatefiles)
ensure_no_collisions(
j.generatefiles,
"the job's working directory as specified by the InitialWorkDirRequirement",
)

if self._toil_job is not None:
# Make a table of all the places we mapped files to when downloading the inputs.

# We want to hint which host paths and container (if any) paths correspond
host_and_job_paths: List[Tuple[str, str]] = []

for pm in self._path_mappers:
for _, mapper_entry in pm.items_exclude_children():
# We know that mapper_entry.target as seen by the task is
# mapper_entry.resolved on the host.
host_and_job_paths.append((mapper_entry.resolved, mapper_entry.target))

# Notice that we have downloaded our inputs. Explain which files
# those are here and what the task will expect to call them.
self._toil_job.files_downloaded_hook(host_and_job_paths)


class ToilExpressionTool(ToilTool, cwltool.command_line_tool.ExpressionTool):
"""Subclass the cwltool expression tool to provide the custom ToilPathMapper."""
Expand Down Expand Up @@ -2634,6 +2673,11 @@ def run(self, file_store: AbstractFileStore) -> Any:

runtime_context.name = self.description.unitName

if isinstance(self.cwltool, ToilTool):
# Connect the CWL tool to us so it can call into the Toil job when
# it reaches points where we might need to debug it.
self.cwltool.connect_toil_job(self)

status = "did_not_run"
try:
output, status = ToilSingleJobExecutor().execute(
Expand Down
30 changes: 16 additions & 14 deletions src/toil/fileStores/abstractFileStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

from toil.common import Toil, cacheDirName, getDirSizeRecursively
from toil.fileStores import FileID
from toil.job import Job, JobDescription
from toil.job import Job, JobDescription, DebugStoppingPointReached
from toil.jobStores.abstractJobStore import AbstractJobStore
from toil.lib.compatibility import deprecated
from toil.lib.conversions import bytes2human
Expand Down Expand Up @@ -191,17 +191,17 @@ def open(self, job: Job) -> Generator[None, None, None]:
:param job: The job instance of the toil job to run.
"""
failed = True
job_requested_disk = job.disk
try:
yield
failed = False
finally:
# Do a finally instead of an except/raise because we don't want
# to appear as "another exception occurred" in the stack trace.
if failed:
except BaseException as e:
if isinstance(e, DebugStoppingPointReached):
self._dumpAccessLogs(job_type="Debugged", log_level=logging.INFO)
else:
self._dumpAccessLogs()

raise
finally:
# See how much disk space is used at the end of the job.
# Not a real peak disk usage, but close enough to be useful for warning the user.
self._job_disk_used = getDirSizeRecursively(self.localTempDir)
Expand Down Expand Up @@ -363,14 +363,16 @@ def handle(numBytes: int) -> None:

yield wrappedStream, fileID

def _dumpAccessLogs(self) -> None:
def _dumpAccessLogs(self, job_type: str = "Failed", log_level: int = logging.WARNING) -> None:
"""
When something goes wrong, log a report.
Log a report of the files accessed.
Includes the files that were accessed while the file store was open.
:param job_type: Adjective to describe the job in the report.
"""
if len(self._accessLog) > 0:
logger.warning('Failed job accessed files:')
logger.log(log_level, '%s job accessed files:', job_type)

for item in self._accessLog:
# For each access record
Expand All @@ -379,14 +381,14 @@ def _dumpAccessLogs(self) -> None:
file_id, dest_path = item
if os.path.exists(dest_path):
if os.path.islink(dest_path):
logger.warning('Symlinked file \'%s\' to path \'%s\'', file_id, dest_path)
logger.log(log_level, 'Symlinked file \'%s\' to path \'%s\'', file_id, dest_path)
else:
logger.warning('Downloaded file \'%s\' to path \'%s\'', file_id, dest_path)
logger.log(log_level, 'Downloaded file \'%s\' to path \'%s\'', file_id, dest_path)
else:
logger.warning('Downloaded file \'%s\' to path \'%s\' (gone!)', file_id, dest_path)
logger.log(log_level, 'Downloaded file \'%s\' to path \'%s\' (gone!)', file_id, dest_path)
else:
# Otherwise dump without the name
logger.warning('Streamed file \'%s\'', *item)
logger.log(log_level, 'Streamed file \'%s\'', *item)

def logAccess(
self, fileStoreID: Union[FileID, str], destination: Union[str, None] = None
Expand Down
2 changes: 1 addition & 1 deletion src/toil/fileStores/cachingFileStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -1036,7 +1036,7 @@ def open(self, job: Job) -> Generator[None, None, None]:
# Create a working directory for the job
startingDir = os.getcwd()
# Move self.localTempDir from the worker directory set up in __init__ to a per-job directory.
self.localTempDir = make_public_dir(in_directory=self.localTempDir)
self.localTempDir = make_public_dir(self.localTempDir, suggested_name="job")
# Check the status of all jobs on this node. If there are jobs that started and died before
# cleaning up their presence from the database, clean them up ourselves.
self._removeDeadJobs(self.coordination_dir, self.con)
Expand Down
2 changes: 1 addition & 1 deletion src/toil/fileStores/nonCachingFileStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def check_for_state_corruption(self) -> None:
@contextmanager
def open(self, job: Job) -> Generator[None, None, None]:
startingDir = os.getcwd()
self.localTempDir: str = make_public_dir(in_directory=self.localTempDir)
self.localTempDir: str = make_public_dir(self.localTempDir, suggested_name="job")
self._removeDeadJobs(self.coordination_dir)
self.jobStateFile = self._createJobStateFile()
self.check_for_state_corruption()
Expand Down
Loading

0 comments on commit 068717e

Please sign in to comment.