From 9b09d62197d5ecf7251b893576921d397555b89b Mon Sep 17 00:00:00 2001 From: stxue1 <122345910+stxue1@users.noreply.github.com> Date: Fri, 29 Sep 2023 13:52:37 -0700 Subject: [PATCH] Add String to File functionality into toil-wdl-runner (#4589) * monkeypatch coerce for workflow related nodes * Fix task inputs string coerce * Disable kubernetes * Comment out cwl kubernetes * Maybe markers are wrong and comment out cactus-on-kubernetes * Add docstrings to changed functions + change input list to dict * Deal with nonetype --------- Co-authored-by: Adam Novak --- src/toil/wdl/wdltoil.py | 272 +++++++++++++++++++++++++--------------- 1 file changed, 172 insertions(+), 100 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index b898988691..764b501c5b 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -31,9 +31,10 @@ import tempfile import uuid -from contextlib import ExitStack +from contextlib import ExitStack, contextmanager from graphlib import TopologicalSorter -from typing import cast, Any, Callable, Union, Dict, List, Optional, Set, Sequence, Tuple, Type, TypeVar, Iterator +from typing import cast, Any, Callable, Union, Dict, List, Optional, Set, Sequence, Tuple, Type, TypeVar, Iterator, \ + Generator from urllib.parse import urlsplit, urljoin, quote, unquote import WDL @@ -428,7 +429,7 @@ class ToilWDLStdLibBase(WDL.StdLib.Base): Standard library implementation for WDL as run on Toil. """ - def __init__(self, file_store: AbstractFileStore): + def __init__(self, file_store: AbstractFileStore, execution_dir: Optional[str] = None): """ Set up the standard library. """ @@ -446,6 +447,8 @@ def __init__(self, file_store: AbstractFileStore): # Keep the file store around so we can access files. self._file_store = file_store + self._execution_dir = execution_dir + def _is_url(self, filename: str, schemes: List[str] = ['http:', 'https:', 's3:', 'gs:', TOIL_URI_SCHEME]) -> bool: """ Decide if a filename is a known kind of URL @@ -485,7 +488,12 @@ def _devirtualize_filename(self, filename: str) -> str: result = self._file_store.readGlobalFile(imported) else: # This is a local file - result = filename + # To support relative paths, join the execution dir and filename + # if filename is already an abs path, join() will do nothing + if self._execution_dir is not None: + result = os.path.join(self._execution_dir, filename) + else: + result = filename logger.debug('Devirtualized %s as openable file %s', filename, result) if not os.path.exists(result): @@ -501,11 +509,17 @@ def _virtualize_filename(self, filename: str) -> str: if self._is_url(filename): # Already virtual - logger.debug('Virtualized %s as WDL file %s', filename, filename) + logger.debug('Already virtualized %s as WDL file %s', filename, filename) return filename # Otherwise this is a local file and we want to fake it as a Toil file store file - file_id = self._file_store.writeGlobalFile(filename) + + # To support relative paths from execution directory, join the execution dir and filename + # If filename is already an abs path, join() will not do anything + if self._execution_dir is not None: + file_id = self._file_store.writeGlobalFile(os.path.join(self._execution_dir, filename)) + else: + file_id = self._file_store.writeGlobalFile(filename) result = pack_toil_uri(file_id, os.path.basename(filename)) logger.debug('Virtualized %s as WDL file %s', filename, result) return result @@ -737,15 +751,20 @@ def evaluate_decl(node: WDL.Tree.Decl, environment: WDLBindings, stdlib: WDL.Std return evaluate_named_expression(node, node.name, node.type, node.expr, environment, stdlib) -def evaluate_call_inputs(context: Union[WDL.Error.SourceNode, WDL.Error.SourcePosition], expressions: Dict[str, WDL.Expr.Base], environment: WDLBindings, stdlib: WDL.StdLib.Base) -> WDLBindings: +def evaluate_call_inputs(context: Union[WDL.Error.SourceNode, WDL.Error.SourcePosition], expressions: Dict[str, WDL.Expr.Base], environment: WDLBindings, stdlib: WDL.StdLib.Base, inputs_dict: Optional[Dict[str, WDL.Type.Base]] = None) -> WDLBindings: """ - Evaluate a bunch of expressions with names, and make them into a fresh set of bindings. + Evaluate a bunch of expressions with names, and make them into a fresh set of bindings. `inputs_dict` is a mapping of + variable names to their expected type for the input decls in a task. """ - new_bindings: WDLBindings = WDL.Env.Bindings() for k, v in expressions.items(): # Add each binding in turn - new_bindings = new_bindings.bind(k, evaluate_named_expression(context, k, None, v, environment, stdlib)) + # If the expected type is optional, then don't type check the lhs and rhs as miniwdl will return a StaticTypeMismatch error, so pass in None + expected_type = None + if not v.type.optional and inputs_dict is not None: + # This is done to enable passing in a string into a task input of file type + expected_type = inputs_dict.get(k, None) + new_bindings = new_bindings.bind(k, evaluate_named_expression(context, k, expected_type, v, environment, stdlib)) return new_bindings def evaluate_defaultable_decl(node: WDL.Tree.Decl, environment: WDLBindings, stdlib: WDL.StdLib.Base) -> WDL.Value.Base: @@ -756,7 +775,10 @@ def evaluate_defaultable_decl(node: WDL.Tree.Decl, environment: WDLBindings, std try: if node.name in environment and not isinstance(environment[node.name], WDL.Value.Null): logger.debug('Name %s is already defined with a non-null value, not using default', node.name) - return environment[node.name] + if not isinstance(environment[node.name], type(node.type)): + return environment[node.name].coerce(node.type) + else: + return environment[node.name] else: if node.type is not None and not node.type.optional and node.expr is None: # We need a value for this but there isn't one. @@ -965,7 +987,7 @@ class WDLBaseJob(Job): as the job's run method calls postprocess(). """ - def __init__(self, **kwargs: Any) -> None: + def __init__(self, execution_dir: Optional[str] = None, **kwargs: Any) -> None: """ Make a WDL-related job. @@ -992,6 +1014,8 @@ def __init__(self, **kwargs: Any) -> None: # jobs returning other jobs' promised RVs. self._postprocessing_steps: List[Tuple[str, Union[str, Promised[WDLBindings]]]] = [] + self._execution_dir = execution_dir + # TODO: We're not allowed by MyPy to override a method and widen the return # type, so this has to be Any. def run(self, file_store: AbstractFileStore) -> Any: @@ -1481,11 +1505,11 @@ class WDLWorkflowNodeJob(WDLBaseJob): Job that evaluates a WDL workflow node. """ - def __init__(self, node: WDL.Tree.WorkflowNode, prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, **kwargs: Any) -> None: + def __init__(self, node: WDL.Tree.WorkflowNode, prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, execution_dir: Optional[str] = None, **kwargs: Any) -> None: """ Make a new job to run a workflow node to completion. """ - super().__init__(unitName=node.workflow_node_id, displayName=node.workflow_node_id, **kwargs) + super().__init__(unitName=node.workflow_node_id, displayName=node.workflow_node_id, execution_dir=execution_dir, **kwargs) self._node = node self._prev_node_results = prev_node_results @@ -1504,59 +1528,64 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Combine the bindings we get from previous jobs incoming_bindings = combine_bindings(unwrap_all(self._prev_node_results)) # Set up the WDL standard library - standard_library = ToilWDLStdLibBase(file_store) - - if isinstance(self._node, WDL.Tree.Decl): - # This is a variable assignment - logger.info('Setting %s to %s', self._node.name, self._node.expr) - value = evaluate_decl(self._node, incoming_bindings, standard_library) - return self.postprocess(incoming_bindings.bind(self._node.name, value)) - elif isinstance(self._node, WDL.Tree.Call): - # This is a call of a task or workflow - - # Fetch all the inputs we are passing and bind them. - # The call is only allowed to use these. - logger.debug("Evaluating step inputs") - input_bindings = evaluate_call_inputs(self._node, self._node.inputs, incoming_bindings, standard_library) - - # Bindings may also be added in from the enclosing workflow inputs - # TODO: this is letting us also inject them from the workflow body. - # TODO: Can this result in picking up non-namespaced values that - # aren't meant to be inputs, by not changing their names? - passed_down_bindings = incoming_bindings.enter_namespace(self._node.name) - - if isinstance(self._node.callee, WDL.Tree.Workflow): - # This is a call of a workflow - subjob: WDLBaseJob = WDLWorkflowJob(self._node.callee, [input_bindings, passed_down_bindings], self._node.callee_id, f'{self._namespace}.{self._node.name}') + standard_library = ToilWDLStdLibBase(file_store, self._execution_dir) + with monkeypatch_coerce(standard_library): + if isinstance(self._node, WDL.Tree.Decl): + # This is a variable assignment + logger.info('Setting %s to %s', self._node.name, self._node.expr) + value = evaluate_decl(self._node, incoming_bindings, standard_library) + return self.postprocess(incoming_bindings.bind(self._node.name, value)) + elif isinstance(self._node, WDL.Tree.Call): + # This is a call of a task or workflow + + # Fetch all the inputs we are passing and bind them. + # The call is only allowed to use these. + logger.debug("Evaluating step inputs") + if self._node.callee is None: + # This should never be None, but mypy gets unhappy and this is better than an assert + inputs_mapping = None + else: + inputs_mapping = {e.name: e.type for e in self._node.callee.inputs or []} + input_bindings = evaluate_call_inputs(self._node, self._node.inputs, incoming_bindings, standard_library, inputs_mapping) + + # Bindings may also be added in from the enclosing workflow inputs + # TODO: this is letting us also inject them from the workflow body. + # TODO: Can this result in picking up non-namespaced values that + # aren't meant to be inputs, by not changing their names? + passed_down_bindings = incoming_bindings.enter_namespace(self._node.name) + + if isinstance(self._node.callee, WDL.Tree.Workflow): + # This is a call of a workflow + subjob: WDLBaseJob = WDLWorkflowJob(self._node.callee, [input_bindings, passed_down_bindings], self._node.callee_id, f'{self._namespace}.{self._node.name}', self._execution_dir) + self.addChild(subjob) + elif isinstance(self._node.callee, WDL.Tree.Task): + # This is a call of a task + subjob = WDLTaskJob(self._node.callee, [input_bindings, passed_down_bindings], self._node.callee_id, f'{self._namespace}.{self._node.name}') + self.addChild(subjob) + else: + raise WDL.Error.InvalidType(self._node, "Cannot call a " + str(type(self._node.callee))) + + # We need to agregate outputs namespaced with our node name, and existing bindings + subjob.then_namespace(self._node.name) + subjob.then_overlay(incoming_bindings) + self.defer_postprocessing(subjob) + return subjob.rv() + elif isinstance(self._node, WDL.Tree.Scatter): + subjob = WDLScatterJob(self._node, [incoming_bindings], self._namespace, self._execution_dir) self.addChild(subjob) - elif isinstance(self._node.callee, WDL.Tree.Task): - # This is a call of a task - subjob = WDLTaskJob(self._node.callee, [input_bindings, passed_down_bindings], self._node.callee_id, f'{self._namespace}.{self._node.name}') + # Scatters don't really make a namespace, just kind of a scope? + # TODO: Let stuff leave scope! + self.defer_postprocessing(subjob) + return subjob.rv() + elif isinstance(self._node, WDL.Tree.Conditional): + subjob = WDLConditionalJob(self._node, [incoming_bindings], self._namespace, self._execution_dir) self.addChild(subjob) + # Conditionals don't really make a namespace, just kind of a scope? + # TODO: Let stuff leave scope! + self.defer_postprocessing(subjob) + return subjob.rv() else: - raise WDL.Error.InvalidType(self._node, "Cannot call a " + str(type(self._node.callee))) - - # We need to agregate outputs namespaced with our node name, and existing bindings - subjob.then_namespace(self._node.name) - subjob.then_overlay(incoming_bindings) - self.defer_postprocessing(subjob) - return subjob.rv() - elif isinstance(self._node, WDL.Tree.Scatter): - subjob = WDLScatterJob(self._node, [incoming_bindings], self._namespace) - self.addChild(subjob) - # Scatters don't really make a namespace, just kind of a scope? - # TODO: Let stuff leave scope! - self.defer_postprocessing(subjob) - return subjob.rv() - elif isinstance(self._node, WDL.Tree.Conditional): - subjob = WDLConditionalJob(self._node, [incoming_bindings], self._namespace) - self.addChild(subjob) - # Conditionals don't really make a namespace, just kind of a scope? - # TODO: Let stuff leave scope! - self.defer_postprocessing(subjob) - return subjob.rv() - else: - raise WDL.Error.InvalidType(self._node, "Unimplemented WorkflowNode: " + str(type(self._node))) + raise WDL.Error.InvalidType(self._node, "Unimplemented WorkflowNode: " + str(type(self._node))) class WDLWorkflowNodeListJob(WDLBaseJob): """ @@ -1565,11 +1594,11 @@ class WDLWorkflowNodeListJob(WDLBaseJob): workflows or tasks or sections. """ - def __init__(self, nodes: List[WDL.Tree.WorkflowNode], prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, **kwargs: Any) -> None: + def __init__(self, nodes: List[WDL.Tree.WorkflowNode], prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, execution_dir: Optional[str] = None, **kwargs: Any) -> None: """ Make a new job to run a list of workflow nodes to completion. """ - super().__init__(unitName=nodes[0].workflow_node_id + '+', displayName=nodes[0].workflow_node_id + '+', **kwargs) + super().__init__(unitName=nodes[0].workflow_node_id + '+', displayName=nodes[0].workflow_node_id + '+', execution_dir=execution_dir, **kwargs) self._nodes = nodes self._prev_node_results = prev_node_results @@ -1588,16 +1617,17 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Combine the bindings we get from previous jobs current_bindings = combine_bindings(unwrap_all(self._prev_node_results)) # Set up the WDL standard library - standard_library = ToilWDLStdLibBase(file_store) - - for node in self._nodes: - if isinstance(node, WDL.Tree.Decl): - # This is a variable assignment - logger.info('Setting %s to %s', node.name, node.expr) - value = evaluate_decl(node, current_bindings, standard_library) - current_bindings = current_bindings.bind(node.name, value) - else: - raise WDL.Error.InvalidType(node, "Unimplemented WorkflowNode: " + str(type(node))) + standard_library = ToilWDLStdLibBase(file_store, self._execution_dir) + + with monkeypatch_coerce(standard_library): + for node in self._nodes: + if isinstance(node, WDL.Tree.Decl): + # This is a variable assignment + logger.info('Setting %s to %s', node.name, node.expr) + value = evaluate_decl(node, current_bindings, standard_library) + current_bindings = current_bindings.bind(node.name, value) + else: + raise WDL.Error.InvalidType(node, "Unimplemented WorkflowNode: " + str(type(node))) return self.postprocess(current_bindings) @@ -1766,12 +1796,12 @@ class WDLSectionJob(WDLBaseJob): Job that can create more graph for a section of the wrokflow. """ - def __init__(self, namespace: str, **kwargs: Any) -> None: + def __init__(self, namespace: str, execution_dir: Optional[str] = None, **kwargs: Any) -> None: """ Make a WDLSectionJob where the interior runs in the given namespace, starting with the root workflow. """ - super().__init__(**kwargs) + super().__init__(execution_dir, **kwargs) self._namespace = namespace @staticmethod @@ -1917,10 +1947,10 @@ def get_job_set_any(wdl_ids: Set[str]) -> List[WDLBaseJob]: if len(node_ids) == 1: # Make a one-node job - job: WDLBaseJob = WDLWorkflowNodeJob(section_graph.get(node_ids[0]), rvs, self._namespace) + job: WDLBaseJob = WDLWorkflowNodeJob(section_graph.get(node_ids[0]), rvs, self._namespace, self._execution_dir) else: # Make a multi-node job - job = WDLWorkflowNodeListJob([section_graph.get(node_id) for node_id in node_ids], rvs, self._namespace) + job = WDLWorkflowNodeListJob([section_graph.get(node_id) for node_id in node_ids], rvs, self._namespace, self._execution_dir) for prev_job in prev_jobs: # Connect up the happens-after relationships to make sure the # return values are available. @@ -2017,11 +2047,11 @@ class WDLScatterJob(WDLSectionJob): instance of the body. If an instance of the body doesn't create a binding, it gets a null value in the corresponding array. """ - def __init__(self, scatter: WDL.Tree.Scatter, prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, **kwargs: Any) -> None: + def __init__(self, scatter: WDL.Tree.Scatter, prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, execution_dir: Optional[str] = None, **kwargs: Any) -> None: """ Create a subtree that will run a WDL scatter. The scatter itself and the contents live in the given namespace. """ - super().__init__(namespace, **kwargs, unitName=scatter.workflow_node_id, displayName=scatter.workflow_node_id) + super().__init__(namespace, **kwargs, unitName=scatter.workflow_node_id, displayName=scatter.workflow_node_id, execution_dir=execution_dir) # Because we need to return the return value of the workflow, we need # to return a Toil promise for the last/sink job in the workflow's @@ -2050,7 +2080,8 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: standard_library = ToilWDLStdLibBase(file_store) # Get what to scatter over - scatter_value = evaluate_named_expression(self._scatter, self._scatter.variable, None, self._scatter.expr, bindings, standard_library) + with monkeypatch_coerce(standard_library): + scatter_value = evaluate_named_expression(self._scatter, self._scatter.variable, None, self._scatter.expr, bindings, standard_library) if not isinstance(scatter_value, WDL.Value.Array): raise RuntimeError("The returned value from a scatter is not an Array type.") @@ -2153,11 +2184,11 @@ class WDLConditionalJob(WDLSectionJob): """ Job that evaluates a conditional in a WDL workflow. """ - def __init__(self, conditional: WDL.Tree.Conditional, prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, **kwargs: Any) -> None: + def __init__(self, conditional: WDL.Tree.Conditional, prev_node_results: Sequence[Promised[WDLBindings]], namespace: str, execution_dir: Optional[str] = None, **kwargs: Any) -> None: """ Create a subtree that will run a WDL conditional. The conditional itself and its contents live in the given namespace. """ - super().__init__(namespace, **kwargs, unitName=conditional.workflow_node_id, displayName=conditional.workflow_node_id) + super().__init__(namespace, **kwargs, unitName=conditional.workflow_node_id, displayName=conditional.workflow_node_id, execution_dir=execution_dir) # Once again we need to ship the whole body template to be instantiated # into Toil jobs only if it will actually run. @@ -2182,7 +2213,8 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: standard_library = ToilWDLStdLibBase(file_store) # Get the expression value. Fake a name. - expr_value = evaluate_named_expression(self._conditional, "", WDL.Type.Boolean(), self._conditional.expr, bindings, standard_library) + with monkeypatch_coerce(standard_library): + expr_value = evaluate_named_expression(self._conditional, "", WDL.Type.Boolean(), self._conditional.expr, bindings, standard_library) if expr_value.value: # Evaluated to true! @@ -2203,7 +2235,7 @@ class WDLWorkflowJob(WDLSectionJob): Job that evaluates an entire WDL workflow. """ - def __init__(self, workflow: WDL.Tree.Workflow, prev_node_results: Sequence[Promised[WDLBindings]], workflow_id: List[str], namespace: str, **kwargs: Any) -> None: + def __init__(self, workflow: WDL.Tree.Workflow, prev_node_results: Sequence[Promised[WDLBindings]], workflow_id: List[str], namespace: str, execution_dir: Optional[str] = None, **kwargs: Any) -> None: """ Create a subtree that will run a WDL workflow. The job returns the return value of the workflow. @@ -2211,7 +2243,7 @@ def __init__(self, workflow: WDL.Tree.Workflow, prev_node_results: Sequence[Prom :param namespace: the namespace that the workflow's *contents* will be in. Caller has already added the workflow's own name. """ - super().__init__(namespace, **kwargs) + super().__init__(namespace, execution_dir, **kwargs) # Because we need to return the return value of the workflow, we need # to return a Toil promise for the last/sink job in the workflow's @@ -2240,19 +2272,20 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # For a task we only see the insode-the-task namespace. bindings = combine_bindings(unwrap_all(self._prev_node_results)) # Set up the WDL standard library - standard_library = ToilWDLStdLibBase(file_store) + standard_library = ToilWDLStdLibBase(file_store, self._execution_dir) if self._workflow.inputs: - for input_decl in self._workflow.inputs: - # Evaluate all the inputs that aren't pre-set - bindings = bindings.bind(input_decl.name, evaluate_defaultable_decl(input_decl, bindings, standard_library)) + with monkeypatch_coerce(standard_library): + for input_decl in self._workflow.inputs: + # Evaluate all the inputs that aren't pre-set + bindings = bindings.bind(input_decl.name, evaluate_defaultable_decl(input_decl, bindings, standard_library)) # Make jobs to run all the parts of the workflow sink = self.create_subgraph(self._workflow.body, [], bindings) if self._workflow.outputs: # Add evaluating the outputs after the sink - outputs_job = WDLOutputsJob(self._workflow.outputs, sink.rv()) + outputs_job = WDLOutputsJob(self._workflow.outputs, sink.rv(), self._execution_dir) sink.addFollowOn(outputs_job) # Caller is responsible for making sure namespaces are applied self.defer_postprocessing(outputs_job) @@ -2268,11 +2301,11 @@ class WDLOutputsJob(WDLBaseJob): Returns an environment with just the outputs bound, in no namespace. """ - def __init__(self, outputs: List[WDL.Tree.Decl], bindings: Promised[WDLBindings], **kwargs: Any): + def __init__(self, outputs: List[WDL.Tree.Decl], bindings: Promised[WDLBindings], execution_dir: Optional[str] = None, **kwargs: Any): """ Make a new WDLWorkflowOutputsJob for the given workflow, with the given set of bindings after its body runs. """ - super().__init__(**kwargs) + super().__init__(execution_dir, **kwargs) self._outputs = outputs self._bindings = bindings @@ -2284,13 +2317,14 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings: super().run(file_store) # Evaluate all the outputs in the normal, non-task-outputs library context - standard_library = ToilWDLStdLibBase(file_store) + standard_library = ToilWDLStdLibBase(file_store, self._execution_dir) # Combine the bindings from the previous job output_bindings = evaluate_output_decls(self._outputs, unwrap(self._bindings), standard_library) return self.postprocess(output_bindings) + class WDLRootJob(WDLSectionJob): """ Job that evaluates an entire WDL workflow, and returns the workflow outputs @@ -2298,13 +2332,13 @@ class WDLRootJob(WDLSectionJob): the workflow name; both forms are accepted. """ - def __init__(self, workflow: WDL.Tree.Workflow, inputs: WDLBindings, **kwargs: Any) -> None: + def __init__(self, workflow: WDL.Tree.Workflow, inputs: WDLBindings, execution_dir: Optional[str] = None, **kwargs: Any) -> None: """ Create a subtree to run the workflow and namespace the outputs. """ # The root workflow names the root namespace - super().__init__(workflow.name, **kwargs) + super().__init__(workflow.name, execution_dir, **kwargs) self._workflow = workflow self._inputs = inputs @@ -2317,12 +2351,47 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Run the workflow. We rely in this to handle entering the input # namespace if needed, or handling free-floating inputs. - workflow_job = WDLWorkflowJob(self._workflow, [self._inputs], [self._workflow.name], self._namespace) + workflow_job = WDLWorkflowJob(self._workflow, [self._inputs], [self._workflow.name], self._namespace, self._execution_dir) workflow_job.then_namespace(self._namespace) self.addChild(workflow_job) self.defer_postprocessing(workflow_job) return workflow_job.rv() +@contextmanager +def monkeypatch_coerce(standard_library: ToilWDLStdLibBase) -> Generator[None, None, None]: + """ + Monkeypatch miniwdl's WDL.Value.Base.coerce() function to virtualize files when they are represented as Strings. + Calls _virtualize_filename from a given standard library object. + :param standard_library: a standard library object + :return + """ + # We're doing this because while miniwdl recognizes when a string needs to be converted into a file, it's method of + # conversion is to just store the local filepath. Toil needs to virtualize the file into the jobstore so until + # there is an internal entrypoint, monkeypatch it. + def base_coerce(self: WDL.Value.Base, desired_type: Optional[WDL.Type.Base] = None) -> WDL.Value.Base: + if isinstance(desired_type, WDL.Type.File): + self.value = standard_library._virtualize_filename(self.value) + return self + return old_base_coerce(self, desired_type) # old_coerce will recurse back into this monkey patched coerce + def string_coerce(self: WDL.Value.String, desired_type: Optional[WDL.Type.Base] = None) -> WDL.Value.Base: + # Sometimes string coerce is called instead, so monkeypatch this one as well + if isinstance(desired_type, WDL.Type.File) and not isinstance(self, WDL.Type.File): + return WDL.Value.File(standard_library._virtualize_filename(self.value), self.expr) + return old_str_coerce(self, desired_type) + + old_base_coerce = WDL.Value.Base.coerce + old_str_coerce = WDL.Value.String.coerce + try: + # Mypy does not like monkeypatching: + # https://github.com/python/mypy/issues/2427#issuecomment-1419206807 + WDL.Value.Base.coerce = base_coerce # type: ignore[method-assign] + WDL.Value.String.coerce = string_coerce # type: ignore[method-assign] + yield + finally: + WDL.Value.Base.coerce = old_base_coerce # type: ignore[method-assign] + WDL.Value.String.coerce = old_str_coerce # type: ignore[method-assign] + + def main() -> None: """ A Toil workflow to interpret WDL input files. @@ -2411,8 +2480,11 @@ def main() -> None: # TODO: Automatically set a good MINIWDL__SINGULARITY__IMAGE_CACHE ? + # Get the execution directory + execution_dir = os.getcwd() + # Run the workflow and get its outputs namespaced with the workflow name. - root_job = WDLRootJob(document.workflow, input_bindings) + root_job = WDLRootJob(document.workflow, input_bindings, execution_dir) output_bindings = toil.start(root_job) if not isinstance(output_bindings, WDL.Env.Bindings): raise RuntimeError("The output of the WDL job is not a binding.")