Skip to content

Commit

Permalink
Allow symlinks to inputs as WDL outputs (#4883)
Browse files Browse the repository at this point in the history
* Detect missing files at the offending step and announce the problem conspicuously

* Log the offending expression

* Resolve symlinks against container mounts during file virtualization

* Try and forward along original virtualized filenames

---------

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
adamnovak and github-actions[bot] authored Apr 30, 2024
1 parent 8b6af29 commit 016f7a6
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 33 deletions.
164 changes: 132 additions & 32 deletions src/toil/wdl/wdltoil.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def wdl_error_reporter(task: str, exit: bool = False, log: Callable[[str], None]
try:
yield
except (
WDL.Error.EvalError,
WDL.Error.SyntaxError,
WDL.Error.ImportError,
WDL.Error.ValidationError,
Expand All @@ -102,13 +103,17 @@ def wdl_error_reporter(task: str, exit: bool = False, log: Callable[[str], None]
JobTooBigError
) as e:
# Don't expose tracebacks to the user for exceptions that may be expected
log("Could not " + task)
log("Could not " + task + " because:")

# These are the errors that MiniWDL's parser can raise and its reporter
# can report. See
# can report (plus some extras). See
# https://github.com/chanzuckerberg/miniwdl/blob/a780b1bf2db61f18de37616068968b2bb4c2d21c/WDL/CLI.py#L91-L97.
#
# We are going to use MiniWDL's pretty printer to print them.
# Make the MiniWDL stuff on stderr loud so people see it
sys.stderr.write("\n" + "🚨" * 3 + "\n")
print_error(e)
sys.stderr.write("🚨" * 3 + "\n\n")
if exit:
# Stop right now
sys.exit(1)
Expand Down Expand Up @@ -534,6 +539,8 @@ class ToilWDLStdLibBase(WDL.StdLib.Base):
def __init__(self, file_store: AbstractFileStore, execution_dir: Optional[str] = None):
"""
Set up the standard library.
:param execution_dir: Directory to use as the working directory for workflow code.
"""
# TODO: Just always be the 1.2 standard library.
wdl_version = "1.2"
Expand All @@ -552,19 +559,48 @@ def __init__(self, file_store: AbstractFileStore, execution_dir: Optional[str] =
# UUID to differentiate which node files are virtualized from
self._parent_dir_to_ids: Dict[str, uuid.UUID] = dict()

# Map forward from virtualized files to absolute devirtualized ones.
self._virtualized_to_devirtualized: Dict[str, str] = {}
# Allow mapping back from absolute devirtualized files to virtualized
# paths, to save re-uploads.
self._devirtualized_to_virtualized: Dict[str, str] = {}

self._execution_dir = execution_dir

def share_files(self, other: "ToilWDLStdLibBase") -> None:
"""
Share caches for devirtualizing and virtualizing files with another instance.
Files devirtualized by one instance can be re-virtualized back to their
original virtualized filenames by the other.
"""

if id(self._virtualized_to_devirtualized) != id(other._virtualized_to_devirtualized):
# Merge the virtualized to devirtualized mappings
self._virtualized_to_devirtualized.update(other._virtualized_to_devirtualized)
other._virtualized_to_devirtualized = self._virtualized_to_devirtualized

if id(self._devirtualized_to_virtualized) != id(other._devirtualized_to_virtualized):
# Merge the devirtualized to virtualized mappings
self._devirtualized_to_virtualized.update(other._devirtualized_to_virtualized)
other._devirtualized_to_virtualized = self._devirtualized_to_virtualized

@memoize
def _devirtualize_filename(self, filename: str) -> str:
"""
'devirtualize' filename passed to a read_* function: return a filename that can be open()ed
on the local host.
"""

return self.devirtualze_to(filename, self._file_store.localTempDir, self._file_store, self._execution_dir)

result = self.devirtualize_to(filename, self._file_store.localTempDir, self._file_store, self._execution_dir)
# Store the back mapping
self._devirtualized_to_virtualized[result] = filename
# And the forward
self._virtualized_to_devirtualized[filename] = result
return result

@staticmethod
def devirtualze_to(filename: str, dest_dir: str, file_source: Union[AbstractFileStore, Toil], execution_dir: Optional[str]) -> str:
def devirtualize_to(filename: str, dest_dir: str, file_source: Union[AbstractFileStore, Toil], execution_dir: Optional[str]) -> str:
"""
Download or export a WDL virtualized filename/URL to the given directory.
Expand Down Expand Up @@ -618,8 +654,12 @@ def devirtualze_to(filename: str, dest_dir: str, file_source: Union[AbstractFile
if filename.startswith(TOIL_URI_SCHEME):
# Get a local path to the file
if isinstance(file_source, AbstractFileStore):
# Read from the file store
result = file_source.readGlobalFile(file_id, dest_path)
# Read from the file store.
# File is not allowed to be modified by the task. See
# <https://github.com/openwdl/wdl/issues/495>.
# We try to get away with symlinks and hope the task
# container can mount the destination file.
result = file_source.readGlobalFile(file_id, dest_path, mutable=False, symlink=True)
elif isinstance(file_source, Toil):
# Read from the Toil context
file_source.export_file(file_id, dest_path)
Expand Down Expand Up @@ -649,6 +689,7 @@ def devirtualze_to(filename: str, dest_dir: str, file_source: Union[AbstractFile
raise RuntimeError(f"Virtualized file {filename} looks like a local file but isn't!")
return result

@memoize
def _virtualize_filename(self, filename: str) -> str:
"""
from a local path in write_dir, 'virtualize' into the filename as it should present in a
Expand All @@ -657,21 +698,36 @@ def _virtualize_filename(self, filename: str) -> str:

if is_url(filename):
# Already virtual
logger.debug('Already virtualized %s as WDL file %s', filename, filename)
logger.debug('Already virtual: %s', filename)
return filename

# Otherwise this is a local file and we want to fake it as a Toil file store file

# To support relative paths from execution directory, join the execution dir and filename
# If filename is already an abs path, join() will not do anything
# Make it an absolute path
if self._execution_dir is not None:
file_id = self._file_store.writeGlobalFile(os.path.join(self._execution_dir, filename))
# To support relative paths from execution directory, join the execution dir and filename
# If filename is already an abs path, join() will not do anything
abs_filename = os.path.join(self._execution_dir, filename)
else:
file_id = self._file_store.writeGlobalFile(filename)
dir = os.path.dirname(os.path.abspath(filename)) # is filename always an abspath?
parent_id = self._parent_dir_to_ids.setdefault(dir, uuid.uuid4())
result = pack_toil_uri(file_id, parent_id, os.path.basename(filename))
abs_filename = os.path.abspath(filename)

if abs_filename in self._devirtualized_to_virtualized:
# This is a previously devirtualized thing so we can just use the
# virtual version we remembered instead of reuploading it.
result = self._devirtualized_to_virtualized[abs_filename]
logger.debug("Re-using virtualized WDL file %s for %s", result, filename)
return result

file_id = self._file_store.writeGlobalFile(abs_filename)

file_dir = os.path.dirname(abs_filename)
parent_id = self._parent_dir_to_ids.setdefault(file_dir, uuid.uuid4())
result = pack_toil_uri(file_id, parent_id, os.path.basename(abs_filename))
logger.debug('Virtualized %s as WDL file %s', filename, result)
# Remember the upload in case we share a cache
self._devirtualized_to_virtualized[abs_filename] = result
# And remember the local path in case we want a redownload
self._virtualized_to_devirtualized[result] = abs_filename
return result

class ToilWDLStdLibTaskCommand(ToilWDLStdLibBase):
Expand Down Expand Up @@ -716,7 +772,7 @@ def _devirtualize_filename(self, filename: str) -> str:
logger.debug('Devirtualized %s as out-of-container file %s', filename, result)
return result


@memoize
def _virtualize_filename(self, filename: str) -> str:
"""
From a local path in write_dir, 'virtualize' into the filename as it should present in a
Expand All @@ -738,10 +794,11 @@ class ToilWDLStdLibTaskOutputs(ToilWDLStdLibBase, WDL.StdLib.TaskOutputs):
functions only allowed in task output sections.
"""

def __init__(self, file_store: AbstractFileStore, stdout_path: str, stderr_path: str, current_directory_override: Optional[str] = None):
def __init__(self, file_store: AbstractFileStore, stdout_path: str, stderr_path: str, file_to_mountpoint: Dict[str, str], current_directory_override: Optional[str] = None):
"""
Set up the standard library for a task output section. Needs to know
where standard output and error from the task have been stored.
where standard output and error from the task have been stored, and
what local paths to pretend are where for resolving symlinks.
If current_directory_override is set, resolves relative paths and globs
from there instead of from the real current directory.
Expand All @@ -759,6 +816,9 @@ def __init__(self, file_store: AbstractFileStore, stdout_path: str, stderr_path:
self._stdout_used = False
self._stderr_used = False

# Reverse and store the file mount dict
self._mountpoint_to_file = {v: k for k, v in file_to_mountpoint.items()}

# Remember current directory
self._current_directory_override = current_directory_override

Expand Down Expand Up @@ -865,6 +925,7 @@ def _devirtualize_filename(self, filename: str) -> str:

return super()._devirtualize_filename(filename)

@memoize
def _virtualize_filename(self, filename: str) -> str:
"""
Go from a local disk filename to a virtualized WDL-side filename.
Expand All @@ -875,11 +936,46 @@ def _virtualize_filename(self, filename: str) -> str:
"""

if not is_url(filename) and not filename.startswith('/'):
# We are getting a bare relative path the supposedly devirtualized side.
# We are getting a bare relative path on the supposedly devirtualized side.
# Find a real path to it relative to the current directory override.
work_dir = '.' if not self._current_directory_override else self._current_directory_override
filename = os.path.join(work_dir, filename)

if filename in self._devirtualized_to_virtualized:
result = self._devirtualized_to_virtualized[filename]
logger.debug("Re-using virtualized filename %s for %s", result, filename)
return result

if os.path.islink(filename):
# Recursively resolve symlinks
here = filename
# Notice if we have a symlink loop
seen = {here}
while os.path.islink(here):
dest = os.readlink(here)
if not dest.startswith('/'):
# Make it absolute
dest = os.path.join(os.path.dirname(here), dest)
here = dest
if here in self._mountpoint_to_file:
# This points to something mounted into the container, so use that path instead.
here = self._mountpoint_to_file[here]
if here in self._devirtualized_to_virtualized:
# Check the virtualized filenames before following symlinks
# all the way back to workflow inputs.
result = self._devirtualized_to_virtualized[here]
logger.debug("Re-using virtualized filename %s for %s linked from %s", result, here, filename)
return result
if here in seen:
raise RuntimeError(f"Symlink {filename} leads to symlink loop at {here}")
seen.add(here)

if os.path.exists(here):
logger.debug("Handling symlink %s ultimately to %s", filename, here)
else:
logger.error("Handling broken symlink %s ultimately to %s", filename, here)
filename = here

return super()._virtualize_filename(filename)

def evaluate_named_expression(context: Union[WDL.Error.SourceNode, WDL.Error.SourcePosition], name: str, expected_type: Optional[WDL.Type.Base], expression: Optional[WDL.Expr.Base], environment: WDLBindings, stdlib: WDL.StdLib.Base) -> WDL.Value.Base:
Expand Down Expand Up @@ -1131,8 +1227,10 @@ def drop_if_missing(value_type: WDL.Type.Base, filename: str) -> Optional[str]:
logger.warning('File %s with type %s does not actually exist at its URI', filename, value_type)
return None
else:
# Get the absolute path, not resolving symlinks
effective_path = os.path.abspath(os.path.join(work_dir, filename))
if os.path.exists(effective_path):
if os.path.islink(effective_path) or os.path.exists(effective_path):
# This is a broken symlink or a working symlink or a file.
return filename
else:
logger.warning('File %s with type %s does not actually exist at %s', filename, value_type, effective_path)
Expand Down Expand Up @@ -1399,7 +1497,7 @@ def __init__(self, task: WDL.Tree.Task, prev_node_results: Sequence[Promised[WDL
self._namespace = namespace
self._task_path = task_path

@report_wdl_errors("evaluate task code")
@report_wdl_errors("evaluate task code", exit=True)
def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]:
"""
Evaluate inputs and runtime and schedule the task.
Expand Down Expand Up @@ -1697,7 +1795,7 @@ def can_mount_proc(self) -> bool:
"""
return "KUBERNETES_SERVICE_HOST" not in os.environ

@report_wdl_errors("run task command")
@report_wdl_errors("run task command", exit=True)
def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]:
"""
Actually run the task.
Expand Down Expand Up @@ -1995,15 +2093,14 @@ def hacky_dedent(text: str) -> str:
# container-determined strings that are absolute paths to WDL File
# objects, and like MiniWDL we can say we only support
# working-directory-based relative paths for globs.
outputs_library = ToilWDLStdLibTaskOutputs(file_store, host_stdout_txt, host_stderr_txt, current_directory_override=workdir_in_container)
outputs_library = ToilWDLStdLibTaskOutputs(file_store, host_stdout_txt, host_stderr_txt, task_container.input_path_map, current_directory_override=workdir_in_container)
# Make sure files downloaded as inputs get re-used if we re-upload them.
outputs_library.share_files(standard_library)
output_bindings = evaluate_output_decls(self._task.outputs, bindings, outputs_library)

# Now we know if the standard output and error were sent somewhere by
# the workflow. If not, we should report them to the leader.

# Drop any files from the output which don't actually exist
output_bindings = drop_missing_files(output_bindings, current_directory_override=workdir_in_container)

if not outputs_library.stderr_used() and os.path.exists(host_stderr_txt):
size = os.path.getsize(host_stderr_txt)
logger.info('Unused standard error at %s of %d bytes', host_stderr_txt, size)
Expand All @@ -2021,10 +2118,14 @@ def hacky_dedent(text: str) -> str:
# Collect output messages from any code Toil injected into the task.
self.handle_injection_messages(outputs_library)

# TODO: Check the output bindings against the types of the decls so we
# can tell if we have a null in a value that is supposed to not be
# nullable. We can't just look at the types on the values themselves
# because those are all the non-nullable versions.
# Drop any files from the output which don't actually exist
output_bindings = drop_missing_files(output_bindings, current_directory_override=workdir_in_container)
for decl in self._task.outputs:
if not decl.type.optional and output_bindings[decl.name].value is None:
# We have an unacceptable null value. This can happen if a file
# is missing but not optional. Don't let it out to annoy the
# next task.
raise WDL.Error.EvalError(decl, f"non-optional value {decl.name} = {decl.expr} is missing")

# Upload any files in the outputs if not uploaded already. Accounts for how relative paths may still need to be container-relative.
output_bindings = virtualize_files(output_bindings, outputs_library)
Expand Down Expand Up @@ -3057,7 +3158,6 @@ def main() -> None:
raise RuntimeError("The output of the WDL job is not a binding.")

# Fetch all the output files
# TODO: deduplicate with _devirtualize_filename
def devirtualize_output(filename: str) -> str:
"""
'devirtualize' a file using the "toil" object instead of a filestore.
Expand All @@ -3066,7 +3166,7 @@ def devirtualize_output(filename: str) -> str:
# Make sure the output directory exists if we have output files
# that might need to use it.
os.makedirs(output_directory, exist_ok=True)
return ToilWDLStdLibBase.devirtualze_to(filename, output_directory, toil, execution_dir)
return ToilWDLStdLibBase.devirtualize_to(filename, output_directory, toil, execution_dir)

# Make all the files local files
output_bindings = map_over_files_in_bindings(output_bindings, devirtualize_output)
Expand Down
3 changes: 2 additions & 1 deletion src/toil/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,8 @@ def blockFn() -> bool:
# Job wants the worker to stop for debugging
raise
except BaseException as e: #Case that something goes wrong in worker, or we are asked to stop
logger.critical("Worker crashed with traceback:\n%s", traceback.format_exc())
if not isinstance(e, SystemExit):
logger.critical("Worker crashed with traceback:\n%s", traceback.format_exc())
logger.error("Exiting the worker because of a failed job on host %s", socket.gethostname())
if isinstance(e, CWL_UNSUPPORTED_REQUIREMENT_EXCEPTION):
# We need to inform the leader that this is a CWL workflow problem
Expand Down

0 comments on commit 016f7a6

Please sign in to comment.