From f0ee54b2d5ca4e07910ac6eef74f6bdd14517b29 Mon Sep 17 00:00:00 2001 From: stxue1 Date: Thu, 12 Sep 2024 17:29:41 -0700 Subject: [PATCH] Add options to do file imports on workers --- src/toil/cwl/cwltoil.py | 217 ++++++++++++++++++++++++++++------------ src/toil/options/cwl.py | 16 +++ 2 files changed, 169 insertions(+), 64 deletions(-) diff --git a/src/toil/cwl/cwltoil.py b/src/toil/cwl/cwltoil.py index 4d46d46089..e0330bdbaa 100644 --- a/src/toil/cwl/cwltoil.py +++ b/src/toil/cwl/cwltoil.py @@ -1,4 +1,5 @@ """Implemented support for Common Workflow Language (CWL) for Toil.""" +import argparse # Copyright (C) 2015 Curoverse, Inc # Copyright (C) 2015-2021 Regents of the University of California # Copyright (C) 2019-2020 Seven Bridges @@ -2404,7 +2405,7 @@ def __init__( subjob_name="_wrapper", local=True, ) - self.cwltool = remove_pickle_problems(tool) + self.cwltool = tool self.cwljob = cwljob self.runtime_context = runtime_context self.conditional = conditional @@ -2441,7 +2442,7 @@ def __init__( conditional: Union[Conditional, None] = None, ): """Store the context for later execution.""" - self.cwltool = remove_pickle_problems(tool) + self.cwltool = tool self.conditional = conditional or Conditional() if runtime_context.builder: @@ -2790,6 +2791,32 @@ def get_container_engine(runtime_context: cwltool.context.RuntimeContext) -> str return "singularity" return "docker" +def makeRootJob( + tool: Process, + jobobj: CWLObjectType, + runtime_context: cwltool.context.RuntimeContext, + initialized_job_order: CWLObjectType, + options: Namespace, + toil: Toil +) -> CWLNamedJob: + """ + Create the Toil root Job object for the CWL tool. Is the same as makeJob() except this also handles import logic. + + Actually creates what might be a subgraph of two jobs. The second of which may be the follow on of the first. + If only one job is created, it is returned twice. + + :return: + """ + + if options.run_imports_on_workers: + import_job = CWLImportJob(initialized_job_order, tool, runtime_context, options) + return import_job + else: + import_workflow_inputs(toil._jobStore, options, initialized_job_order=initialized_job_order, tool=tool) + rootJob, followOn = makeJob(tool, jobobj, runtime_context, None, None) # toplevel, no name needed + rootJob.cwljob = initialized_job_order + return rootJob + def makeJob( tool: Process, @@ -2805,6 +2832,9 @@ def makeJob( """ Create the correct Toil Job object for the CWL tool. + Actually creates what might be a subgraph of two jobs. The second of which may be the follow on of the first. + If only one job is created, it is returned twice. + Types: workflow, job, or job wrapper for dynamic resource requirements. :return: "wfjob, followOn" if the input tool is a workflow, and "job, job" otherwise @@ -3111,7 +3141,11 @@ def hasChild(self, c: Job) -> Any: def remove_pickle_problems(obj: ProcessType) -> ProcessType: - """Doc_loader does not pickle correctly, causing Toil errors, remove from objects.""" + """ + Doc_loader does not pickle correctly, causing Toil errors, remove from objects. + + See github issue: https://github.com/mypyc/mypyc/issues/804 + """ if hasattr(obj, "doc_loader"): obj.doc_loader = None if isinstance(obj, cwltool.workflow.WorkflowStep): @@ -3143,7 +3177,7 @@ def __init__( self.cwlwf = cwlwf self.cwljob = cwljob self.runtime_context = runtime_context - self.cwlwf = remove_pickle_problems(self.cwlwf) + self.cwlwf = self.cwlwf self.conditional = conditional or Conditional() def run( @@ -3332,6 +3366,111 @@ def run( return UnresolvedDict(outobj) +class CWLSetupJob(CWLNamedJob): + """ + Job to take a CWL tool and job order with all files imported and makes a CWLWorkflow as a child to run it. + """ + def __init__(self, initialized_job_order: Promised[CWLObjectType], tool: Promised[Process], runtime_context: cwltool.context.RuntimeContext): + super().__init__() + self.initialized_job_order = initialized_job_order + self.tool = tool + self.runtime_context = runtime_context + + def run(self, file_store: AbstractFileStore) -> Any: + """ + :return: Returns a CWL object that represents the output of the workflow. + """ + initialized_job_order = unwrap(self.initialized_job_order) + tool = unwrap(self.tool) + root_job, _ = makeJob(tool, initialized_job_order, self.runtime_context, None, None) + self.addChild(root_job) + + root_job.cwljob = initialized_job_order + + return root_job.rv() + + +class CWLImportJob(CWLNamedJob): + """ + Job to do file imports on a worker instead of a leader. Assumes all local and cloud files are accessible. + + This class is only used when runImportsOnWorkers is enabled. + """ + def __init__(self, initialized_job_order: CWLObjectType, tool: Process, runtime_context: cwltool.context.RuntimeContext, options: Namespace): + super().__init__(local=False, disk=options.import_workers_disk) + self.initialized_job_order = initialized_job_order + self.tool = tool + self.options = options + self.runtime_context = runtime_context + + def run(self, file_store: AbstractFileStore) -> Any: + """ + Import the workflow inputs and then create and run the workflow. + :return: Promise of workflow outputs + """ + import_workflow_inputs(file_store.jobStore, self.options, self.initialized_job_order, self.tool) + setup_job = CWLSetupJob(self.initialized_job_order, self.tool, self.runtime_context) + self.addChild(setup_job) + return setup_job.rv() + + +def import_workflow_inputs(jobstore: AbstractJobStore, options: Namespace, initialized_job_order: CWLObjectType, tool: Process) -> None: + fileindex: Dict[str, str] = {} + existing: Dict[str, str] = {} + # Define something we can call to import a file and get its file + # ID. + # We cast this because import_file is overloaded depending on if we + # pass a shared file name or not, and we know the way we call it we + # always get a FileID out. + file_import_function = cast( + Callable[[str], FileID], + functools.partial(jobstore.import_file, symlink=True), + ) + + # Import all the input files, some of which may be missing optional + # files. + logger.info("Importing input files...") + fs_access = ToilFsAccess(options.basedir) + import_files( + file_import_function, + fs_access, + fileindex, + existing, + initialized_job_order, + mark_broken=True, + skip_remote=options.reference_inputs, + bypass_file_store=options.bypass_file_store, + log_level=logging.INFO, + ) + # Import all the files associated with tools (binaries, etc.). + # Not sure why you would have an optional secondary file here, but + # the spec probably needs us to support them. + logger.info("Importing tool-associated files...") + visitSteps( + tool, + functools.partial( + import_files, + file_import_function, + fs_access, + fileindex, + existing, + mark_broken=True, + skip_remote=options.reference_inputs, + bypass_file_store=options.bypass_file_store, + log_level=logging.INFO, + ), + ) + + # We always expect to have processed all files that exist + for param_name, param_value in initialized_job_order.items(): + # Loop through all the parameters for the workflow overall. + # Drop any files that aren't either imported (for when we use + # the file store) or available on disk (for when we don't). + # This will properly make them cause an error later if they + # were required. + rm_unprocessed_secondary_files(param_value) + + def visitSteps( cmdline_tool: Process, op: Callable[[CommentedMap], None], @@ -3624,6 +3763,11 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int: options = get_options(args) + # Take care of incompatible arguments related to file imports + if options.run_imports_on_workers is True and options.import_workers_disk is None: + logger.error("Commandline arguments --runImportsOnWorkers and --importWorkersDisk must both be set to run file imports on workers.") + return 1 + # Do cwltool setup cwltool.main.setup_schema(args=options, custom_schema_callback=None) tmpdir_prefix = options.tmpdir_prefix = options.tmpdir_prefix or DEFAULT_TMPDIR_PREFIX @@ -3683,9 +3827,6 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int: tmp_outdir_prefix = os.path.abspath( options.tmp_outdir_prefix or DEFAULT_TMPDIR_PREFIX ) - - fileindex: Dict[str, str] = {} - existing: Dict[str, str] = {} conf_file = getattr(options, "beta_dependency_resolvers_configuration", None) use_conda_dependencies = getattr(options, "beta_conda_dependencies", None) job_script_provider = None @@ -3895,73 +4036,21 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int: discover_secondaryFiles=True, ) - # Define something we can call to import a file and get its file - # ID. - # We cast this because import_file is overloaded depending on if we - # pass a shared file name or not, and we know the way we call it we - # always get a FileID out. - file_import_function = cast( - Callable[[str], FileID], - functools.partial(toil.import_file, symlink=True), - ) - - # Import all the input files, some of which may be missing optional - # files. - logger.info("Importing input files...") - fs_access = ToilFsAccess(options.basedir) - import_files( - file_import_function, - fs_access, - fileindex, - existing, - initialized_job_order, - mark_broken=True, - skip_remote=options.reference_inputs, - bypass_file_store=options.bypass_file_store, - log_level=logging.INFO, - ) - # Import all the files associated with tools (binaries, etc.). - # Not sure why you would have an optional secondary file here, but - # the spec probably needs us to support them. - logger.info("Importing tool-associated files...") - visitSteps( - tool, - functools.partial( - import_files, - file_import_function, - fs_access, - fileindex, - existing, - mark_broken=True, - skip_remote=options.reference_inputs, - bypass_file_store=options.bypass_file_store, - log_level=logging.INFO, - ), - ) - - # We always expect to have processed all files that exist - for param_name, param_value in initialized_job_order.items(): - # Loop through all the parameters for the workflow overall. - # Drop any files that aren't either imported (for when we use - # the file store) or available on disk (for when we don't). - # This will properly make them cause an error later if they - # were required. - rm_unprocessed_secondary_files(param_value) - logger.info("Creating root job") logger.debug("Root tool: %s", tool) + tool = remove_pickle_problems(tool) try: - wf1, _ = makeJob( + wf1 = makeRootJob( tool=tool, jobobj={}, runtime_context=runtime_context, - parent_name=None, # toplevel, no name needed - conditional=None, + initialized_job_order=initialized_job_order, + options=options, + toil=toil ) except CWL_UNSUPPORTED_REQUIREMENT_EXCEPTION as err: logging.error(err) return CWL_UNSUPPORTED_REQUIREMENT_EXIT_CODE - wf1.cwljob = initialized_job_order logger.info("Starting workflow") outobj = toil.start(wf1) diff --git a/src/toil/options/cwl.py b/src/toil/options/cwl.py index 0db2c80889..a606db34e5 100644 --- a/src/toil/options/cwl.py +++ b/src/toil/options/cwl.py @@ -2,6 +2,7 @@ from argparse import ArgumentParser from configargparse import SUPPRESS +from toil.lib.conversions import human2bytes from toil.version import baseVersion @@ -281,6 +282,21 @@ def add_cwl_options(parser: ArgumentParser, suppress: bool = True) -> None: help=suppress_help or "Disable file streaming for files that have 'streamable' flag True", dest="disable_streaming", ) + parser.add_argument( + "--runImportsOnWorkers", "--run-imports-on-workers", + action="store_true", + default=False, + help=suppress_help or "Run the file imports on a worker instead of the leader. This is useful if the leader is not optimized for high network performance." + "If set to true, the argument --importWorkersDisk must also be set.", + dest="run_imports_on_workers" + ) + + parser.add_argument("--importWorkersDisk", "--import-workers-disk", + help=suppress_help or "Specify the amount of disk space an import worker will use. If file streaming for input files is not available," + "this should be set to the size of the largest input file. This must be set in conjunction with the argument runImportsOnWorkers.", + dest="import_workers_disk", + type=lambda x: human2bytes(str(x)), + default=None) provgroup = parser.add_argument_group( "Options for recording provenance information of the execution"