From 097740a91c07245260a7ff02e5f43d1407902558 Mon Sep 17 00:00:00 2001 From: stxue1 Date: Tue, 17 Dec 2024 17:20:01 -0800 Subject: [PATCH 1/4] Add leader fallback for files without filesize and import them on the leader --- src/toil/cwl/cwltoil.py | 182 ++++++++++++++++++++++++++++------------ src/toil/job.py | 12 ++- 2 files changed, 137 insertions(+), 57 deletions(-) diff --git a/src/toil/cwl/cwltoil.py b/src/toil/cwl/cwltoil.py index 176aa5e50b..f8f2e6bc78 100644 --- a/src/toil/cwl/cwltoil.py +++ b/src/toil/cwl/cwltoil.py @@ -51,7 +51,8 @@ TypeVar, Union, cast, - Literal, Protocol, + Literal, + Protocol, ) from urllib.parse import quote, unquote, urlparse, urlsplit @@ -66,6 +67,7 @@ import cwltool.main import cwltool.resolver import schema_salad.ref_resolver + # This is also in configargparse but MyPy doesn't know it from argparse import RawDescriptionHelpFormatter from configargparse import ArgParser, Namespace @@ -132,6 +134,8 @@ unwrap, ImportsJob, get_file_sizes, + FileMetadata, + WorkerImportJob, ) from toil.jobStores.abstractJobStore import ( AbstractJobStore, @@ -1893,12 +1897,20 @@ def extract_file_uri_once( return rp return None + V = TypeVar("V", covariant=True) + class VisitFunc(Protocol[V]): - def __call__(self, fileindex: dict[str, str], existing: dict[str, str], - file_metadata: CWLObjectType, mark_broken: bool, - skip_remote: bool) -> V: ... + def __call__( + self, + fileindex: dict[str, str], + existing: dict[str, str], + file_metadata: CWLObjectType, + mark_broken: bool, + skip_remote: bool, + ) -> V: ... + def visit_files( func: VisitFunc[V], @@ -2188,7 +2200,9 @@ def extract_and_convert_file_to_toil_uri( Unless skip_remote is set, also run on remote files and sets their locations to toil URIs as well. """ - location = extract_file_uri_once(fileindex, existing, file_metadata, mark_broken, skip_remote) + location = extract_file_uri_once( + fileindex, existing, file_metadata, mark_broken, skip_remote + ) if location is not None: file_metadata["location"] = convert_file_uri_to_toil_uri( convertfunc, fileindex, existing, location @@ -2896,7 +2910,9 @@ def file_import_function(url: str, log_level: int = logging.DEBUG) -> FileID: logger.log(log_level, "Loading %s...", url) return writeGlobalFileWrapper(file_store, url) - file_upload_function = functools.partial(extract_and_convert_file_to_toil_uri, file_import_function) + file_upload_function = functools.partial( + extract_and_convert_file_to_toil_uri, file_import_function + ) # Upload all the Files and set their and the Directories' locations, if # needed. @@ -2948,8 +2964,39 @@ def makeRootJob( :return: """ if options.run_imports_on_workers: + filenames = extract_workflow_inputs(options, initialized_job_order, tool) + files_to_data = get_file_sizes( + filenames, toil._jobStore, include_remote_files=options.reference_inputs + ) + + # files with a associated filesize that are valid to be imported on workers + valid_files_to_data = dict() + # files without an associated filesize that should be imported on the leader + leftover_files_to_data = dict() + for filename, file_data in files_to_data.items(): + if file_data.size is None: + leftover_files_to_data[filename] = file_data + else: + valid_files_to_data[filename] = file_data + + # import the files for the leader first + path_to_fileid = WorkerImportJob.import_files( + list(leftover_files_to_data.keys()), toil._jobStore + ) + + # then install the imported files before importing the other files + # this way the control flow can fall from the leader to workers + tool, initialized_job_order = CWLInstallImportsJob.convert_files( + initialized_job_order, + tool, + path_to_fileid, + options.basedir, + options.reference_inputs, + options.bypass_file_store, + ) + import_job = CWLImportWrapper( - initialized_job_order, tool, runtime_context, options + initialized_job_order, tool, runtime_context, valid_files_to_data, options ) return import_job else: @@ -3538,22 +3585,23 @@ def __init__( self.bypass_file_store = bypass_file_store self.import_data = import_data - def run(self, file_store: AbstractFileStore) -> Tuple[Process, CWLObjectType]: - """ - Convert the filenames in the workflow inputs into the URIs - :return: Promise of transformed workflow inputs. A tuple of the job order and process - """ - candidate_to_fileid: dict[str, FileID] = unwrap(self.import_data) - - initialized_job_order = unwrap(self.initialized_job_order) - tool = unwrap(self.tool) - + @staticmethod + def convert_files( + initialized_job_order: CWLObjectType, + tool: Process, + candidate_to_fileid: dict[str, FileID], + basedir: str, + skip_remote: bool, + bypass_file_store: bool, + ) -> tuple[Process, CWLObjectType]: def convert_file(filename: str) -> FileID: fileid = candidate_to_fileid[filename] return fileid - file_convert_function = functools.partial(extract_and_convert_file_to_toil_uri, convert_file) - fs_access = ToilFsAccess(self.basedir) + file_convert_function = functools.partial( + extract_and_convert_file_to_toil_uri, convert_file + ) + fs_access = ToilFsAccess(basedir) fileindex: dict[str, str] = {} existing: dict[str, str] = {} visit_files( @@ -3563,8 +3611,8 @@ def convert_file(filename: str) -> FileID: existing, initialized_job_order, mark_broken=True, - skip_remote=self.skip_remote, - bypass_file_store=self.bypass_file_store, + skip_remote=skip_remote, + bypass_file_store=bypass_file_store, ) visitSteps( tool, @@ -3575,8 +3623,8 @@ def convert_file(filename: str) -> FileID: fileindex, existing, mark_broken=True, - skip_remote=self.skip_remote, - bypass_file_store=self.bypass_file_store, + skip_remote=skip_remote, + bypass_file_store=bypass_file_store, ), ) @@ -3588,9 +3636,26 @@ def convert_file(filename: str) -> FileID: # This will properly make them cause an error later if they # were required. rm_unprocessed_secondary_files(param_value) - return tool, initialized_job_order + def run(self, file_store: AbstractFileStore) -> Tuple[Process, CWLObjectType]: + """ + Convert the filenames in the workflow inputs into the URIs + :return: Promise of transformed workflow inputs. A tuple of the job order and process + """ + candidate_to_fileid: dict[str, FileID] = unwrap(self.import_data) + + initialized_job_order = unwrap(self.initialized_job_order) + tool = unwrap(self.tool) + return CWLInstallImportsJob.convert_files( + initialized_job_order, + tool, + candidate_to_fileid, + self.basedir, + self.skip_remote, + self.bypass_file_store, + ) + class CWLImportWrapper(CWLNamedJob): """ @@ -3605,6 +3670,7 @@ def __init__( initialized_job_order: CWLObjectType, tool: Process, runtime_context: cwltool.context.RuntimeContext, + file_to_data: dict[str, FileMetadata], options: Namespace, ): super().__init__(local=False, disk=options.import_workers_threshold) @@ -3612,15 +3678,14 @@ def __init__( self.tool = tool self.options = options self.runtime_context = runtime_context + self.file_to_data = file_to_data def run(self, file_store: AbstractFileStore) -> Any: - filenames = extract_workflow_inputs( - self.options, self.initialized_job_order, self.tool + imports_job = ImportsJob( + self.file_to_data, + self.options.import_workers_threshold, + self.options.import_workers_disk, ) - file_to_data = get_file_sizes( - filenames, file_store.jobStore, include_remote_files=self.options.reference_inputs - ) - imports_job = ImportsJob(file_to_data, self.options.import_workers_threshold, self.options.import_workers_disk) self.addChild(imports_job) install_imports_job = CWLInstallImportsJob( initialized_job_order=self.initialized_job_order, @@ -3634,7 +3699,9 @@ def run(self, file_store: AbstractFileStore) -> Any: imports_job.addFollowOn(install_imports_job) start_job = CWLStartJob( - install_imports_job.rv(0), install_imports_job.rv(1), runtime_context=self.runtime_context + install_imports_job.rv(0), + install_imports_job.rv(1), + runtime_context=self.runtime_context, ) self.addChild(start_job) install_imports_job.addFollowOn(start_job) @@ -3645,7 +3712,7 @@ def run(self, file_store: AbstractFileStore) -> Any: class CWLStartJob(CWLNamedJob): """ Job responsible for starting the CWL workflow. - + Takes in the workflow/tool and inputs after all files are imported and creates jobs to run those workflows. """ @@ -3744,7 +3811,10 @@ def import_workflow_inputs( def file_import_function(url: str) -> FileID: logger.log(log_level, "Loading %s...", url) return jobstore.import_file(url, symlink=True) - import_function = functools.partial(extract_and_convert_file_to_toil_uri, file_import_function) + + import_function = functools.partial( + extract_and_convert_file_to_toil_uri, file_import_function + ) # Import all the input files, some of which may be missing optional # files. logger.info("Importing input files...") @@ -3763,8 +3833,13 @@ def file_import_function(url: str) -> FileID: # Make another function for importing tool files. This one doesn't allow # symlinking, since the tools might be coming from storage not accessible # to all nodes. - tool_import_function = functools.partial(extract_and_convert_file_to_toil_uri, - cast(Callable[[str], FileID], functools.partial(jobstore.import_file, symlink=False))) + tool_import_function = functools.partial( + extract_and_convert_file_to_toil_uri, + cast( + Callable[[str], FileID], + functools.partial(jobstore.import_file, symlink=False), + ), + ) # Import all the files associated with tools (binaries, etc.). # Not sure why you would have an optional secondary file here, but @@ -3795,6 +3870,8 @@ def file_import_function(url: str) -> FileID: T = TypeVar("T") + + def visitSteps( cmdline_tool: Process, op: Callable[[CommentedMap], list[T]], @@ -3818,12 +3895,10 @@ def visitSteps( # if they bothered to run the Process __init__. return op(cmdline_tool.tool) raise RuntimeError( - f"Unsupported type encountered in workflow " - f"traversal: {type(cmdline_tool)}" + f"Unsupported type encountered in workflow " f"traversal: {type(cmdline_tool)}" ) - def rm_unprocessed_secondary_files(job_params: Any) -> None: if isinstance(job_params, list): for j in job_params: @@ -4081,7 +4156,8 @@ def get_options(args: list[str]) -> Namespace: parser = ArgParser( allow_abbrev=False, usage="%(prog)s [options] WORKFLOW [INFILE] [WF_OPTIONS...]", - description=textwrap.dedent(""" + description=textwrap.dedent( + """ positional arguments: WORKFLOW CWL file to run. @@ -4096,10 +4172,11 @@ def get_options(args: list[str]) -> Namespace: If an input has the same name as a Toil option, pass '--' before it. - """), + """ + ), formatter_class=RawDescriptionHelpFormatter, ) - + addOptions(parser, jobstore_as_flag=True, cwl=True) options: Namespace options, extra = parser.parse_known_args(args) @@ -4264,14 +4341,12 @@ def main(args: Optional[list[str]] = None, stdout: TextIO = sys.stdout) -> int: options.tool_help = None options.debug = options.logLevel == "DEBUG" - job_order_object, options.basedir, jobloader = ( - cwltool.main.load_job_order( - options, - sys.stdin, - loading_context.fetcher_constructor, - loading_context.overrides_list, - tool_file_uri, - ) + job_order_object, options.basedir, jobloader = cwltool.main.load_job_order( + options, + sys.stdin, + loading_context.fetcher_constructor, + loading_context.overrides_list, + tool_file_uri, ) if options.overrides: loading_context.overrides_list.extend( @@ -4332,7 +4407,8 @@ def main(args: Optional[list[str]] = None, stdout: TextIO = sys.stdout) -> int: if err.code == 2: # raised by argparse's parse_args() function print( "\nIf both a CWL file and an input object (YAML/JSON) file were " - "provided, the problem may be the argument order." + usage_message, + "provided, the problem may be the argument order." + + usage_message, file=sys.stderr, ) raise @@ -4345,9 +4421,9 @@ def main(args: Optional[list[str]] = None, stdout: TextIO = sys.stdout) -> int: shortname(inp["id"]) in initialized_job_order and inp["type"] == "File" ): - cast( - CWLObjectType, initialized_job_order[shortname(inp["id"])] - )["streamable"] = inp.get("streamable", False) + cast(CWLObjectType, initialized_job_order[shortname(inp["id"])])[ + "streamable" + ] = inp.get("streamable", False) # TODO also for nested types that contain streamable Files runtime_context.use_container = not options.no_container diff --git a/src/toil/job.py b/src/toil/job.py index 3f0226899b..27600ddd7e 100644 --- a/src/toil/job.py +++ b/src/toil/job.py @@ -3902,7 +3902,6 @@ def get_filename_size(filename: str) -> FileMetadata: # Now we know this exists, so pass it through # Get filesizes filesize = file_source.get_size(candidate_uri) - except UnimplementedURLException as e: # We can't find anything that can even support this URL scheme. # Report to the user, they are probably missing an extra. @@ -3913,8 +3912,12 @@ def get_filename_size(filename: str) -> FileMetadata: logger.warning( "Checked URL %s but got HTTP status %s", candidate_uri, e.code ) - # Try the next location. - continue + if e.code == 405: + # 405 Method not allowed, maybe HEAD requests are not supported + filesize = None + else: + # Try the next location. + continue except FileNotFoundError: # Wasn't found there continue @@ -3992,6 +3995,7 @@ class WorkerImportJob(Job): def __init__( self, filenames: List[str], + local: bool = False, **kwargs: Any ): """ @@ -4000,7 +4004,7 @@ def __init__( :param kwargs: args for the superclass """ self.filenames = filenames - super().__init__(local=False, **kwargs) + super().__init__(local=local, **kwargs) @staticmethod def import_files( From 2410168e8fa9c920817fa76e5ae5f360f1b66df7 Mon Sep 17 00:00:00 2001 From: stxue1 <122345910+stxue1@users.noreply.github.com> Date: Wed, 18 Dec 2024 13:50:10 -0800 Subject: [PATCH 2/4] Update src/toil/cwl/cwltoil.py Co-authored-by: Adam Novak --- src/toil/cwl/cwltoil.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/toil/cwl/cwltoil.py b/src/toil/cwl/cwltoil.py index f8f2e6bc78..fd982cb8d0 100644 --- a/src/toil/cwl/cwltoil.py +++ b/src/toil/cwl/cwltoil.py @@ -2969,7 +2969,7 @@ def makeRootJob( filenames, toil._jobStore, include_remote_files=options.reference_inputs ) - # files with a associated filesize that are valid to be imported on workers + # files with an associated filesize that are valid to be imported on workers valid_files_to_data = dict() # files without an associated filesize that should be imported on the leader leftover_files_to_data = dict() From c6202e4d487404c8a2038f91769f6743757a804b Mon Sep 17 00:00:00 2001 From: stxue1 Date: Wed, 18 Dec 2024 13:55:26 -0800 Subject: [PATCH 3/4] Add docstring + rename functions --- src/toil/cwl/cwltoil.py | 37 ++++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/src/toil/cwl/cwltoil.py b/src/toil/cwl/cwltoil.py index fd982cb8d0..20c1922cdc 100644 --- a/src/toil/cwl/cwltoil.py +++ b/src/toil/cwl/cwltoil.py @@ -2969,24 +2969,26 @@ def makeRootJob( filenames, toil._jobStore, include_remote_files=options.reference_inputs ) - # files with an associated filesize that are valid to be imported on workers - valid_files_to_data = dict() - # files without an associated filesize that should be imported on the leader - leftover_files_to_data = dict() + # Mapping of files to metadata for files that will be imported on the worker + # This will consist of files that we were able to get a file size for + worker_files_to_data: dict[str, FileMetadata] = dict() + # Mapping of files to metadata for files that will be imported on the leader + # This will consist of files that we were not able to get a file size for + leader_files_to_data = dict() for filename, file_data in files_to_data.items(): if file_data.size is None: - leftover_files_to_data[filename] = file_data + leader_files_to_data[filename] = file_data else: - valid_files_to_data[filename] = file_data + worker_files_to_data[filename] = file_data # import the files for the leader first path_to_fileid = WorkerImportJob.import_files( - list(leftover_files_to_data.keys()), toil._jobStore + list(leader_files_to_data.keys()), toil._jobStore ) # then install the imported files before importing the other files # this way the control flow can fall from the leader to workers - tool, initialized_job_order = CWLInstallImportsJob.convert_files( + tool, initialized_job_order = CWLInstallImportsJob.fill_in_files( initialized_job_order, tool, path_to_fileid, @@ -2996,7 +2998,7 @@ def makeRootJob( ) import_job = CWLImportWrapper( - initialized_job_order, tool, runtime_context, valid_files_to_data, options + initialized_job_order, tool, runtime_context, worker_files_to_data, options ) return import_job else: @@ -3586,7 +3588,7 @@ def __init__( self.import_data = import_data @staticmethod - def convert_files( + def fill_in_files( initialized_job_order: CWLObjectType, tool: Process, candidate_to_fileid: dict[str, FileID], @@ -3594,12 +3596,17 @@ def convert_files( skip_remote: bool, bypass_file_store: bool, ) -> tuple[Process, CWLObjectType]: - def convert_file(filename: str) -> FileID: - fileid = candidate_to_fileid[filename] - return fileid + """ + Given a mapping of filenames to Toil file IDs, replace the filename with the file IDs throughout the CWL object. + """ + def fill_in_file(filename: str) -> FileID: + """ + Return the file name's associated Toil file ID + """ + return candidate_to_fileid[filename] file_convert_function = functools.partial( - extract_and_convert_file_to_toil_uri, convert_file + extract_and_convert_file_to_toil_uri, fill_in_file ) fs_access = ToilFsAccess(basedir) fileindex: dict[str, str] = {} @@ -3647,7 +3654,7 @@ def run(self, file_store: AbstractFileStore) -> Tuple[Process, CWLObjectType]: initialized_job_order = unwrap(self.initialized_job_order) tool = unwrap(self.tool) - return CWLInstallImportsJob.convert_files( + return CWLInstallImportsJob.fill_in_files( initialized_job_order, tool, candidate_to_fileid, From 74bdec22f2c0838b2f81d15d12026b3a0a40f1ee Mon Sep 17 00:00:00 2001 From: stxue1 Date: Tue, 14 Jan 2025 20:52:01 -0800 Subject: [PATCH 4/4] Change variable names from code review --- src/toil/cwl/cwltoil.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/toil/cwl/cwltoil.py b/src/toil/cwl/cwltoil.py index 20c1922cdc..3064641519 100644 --- a/src/toil/cwl/cwltoil.py +++ b/src/toil/cwl/cwltoil.py @@ -2965,25 +2965,25 @@ def makeRootJob( """ if options.run_imports_on_workers: filenames = extract_workflow_inputs(options, initialized_job_order, tool) - files_to_data = get_file_sizes( + metadata = get_file_sizes( filenames, toil._jobStore, include_remote_files=options.reference_inputs ) # Mapping of files to metadata for files that will be imported on the worker # This will consist of files that we were able to get a file size for - worker_files_to_data: dict[str, FileMetadata] = dict() + worker_metadata: dict[str, FileMetadata] = dict() # Mapping of files to metadata for files that will be imported on the leader # This will consist of files that we were not able to get a file size for - leader_files_to_data = dict() - for filename, file_data in files_to_data.items(): + leader_metadata = dict() + for filename, file_data in metadata.items(): if file_data.size is None: - leader_files_to_data[filename] = file_data + leader_metadata[filename] = file_data else: - worker_files_to_data[filename] = file_data + worker_metadata[filename] = file_data # import the files for the leader first path_to_fileid = WorkerImportJob.import_files( - list(leader_files_to_data.keys()), toil._jobStore + list(leader_metadata.keys()), toil._jobStore ) # then install the imported files before importing the other files @@ -2998,7 +2998,7 @@ def makeRootJob( ) import_job = CWLImportWrapper( - initialized_job_order, tool, runtime_context, worker_files_to_data, options + initialized_job_order, tool, runtime_context, worker_metadata, options ) return import_job else: