Skip to content

Commit

Permalink
Add options to do file imports on workers
Browse files Browse the repository at this point in the history
  • Loading branch information
stxue1 committed Sep 13, 2024
1 parent 8a8fed4 commit f0ee54b
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 64 deletions.
217 changes: 153 additions & 64 deletions src/toil/cwl/cwltoil.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Implemented support for Common Workflow Language (CWL) for Toil."""
import argparse
# Copyright (C) 2015 Curoverse, Inc
# Copyright (C) 2015-2021 Regents of the University of California
# Copyright (C) 2019-2020 Seven Bridges
Expand Down Expand Up @@ -2404,7 +2405,7 @@ def __init__(
subjob_name="_wrapper",
local=True,
)
self.cwltool = remove_pickle_problems(tool)
self.cwltool = tool
self.cwljob = cwljob
self.runtime_context = runtime_context
self.conditional = conditional
Expand Down Expand Up @@ -2441,7 +2442,7 @@ def __init__(
conditional: Union[Conditional, None] = None,
):
"""Store the context for later execution."""
self.cwltool = remove_pickle_problems(tool)
self.cwltool = tool
self.conditional = conditional or Conditional()

if runtime_context.builder:
Expand Down Expand Up @@ -2790,6 +2791,32 @@ def get_container_engine(runtime_context: cwltool.context.RuntimeContext) -> str
return "singularity"
return "docker"

def makeRootJob(
tool: Process,
jobobj: CWLObjectType,
runtime_context: cwltool.context.RuntimeContext,
initialized_job_order: CWLObjectType,
options: Namespace,
toil: Toil
) -> CWLNamedJob:
"""
Create the Toil root Job object for the CWL tool. Is the same as makeJob() except this also handles import logic.
Actually creates what might be a subgraph of two jobs. The second of which may be the follow on of the first.
If only one job is created, it is returned twice.
:return:
"""

if options.run_imports_on_workers:
import_job = CWLImportJob(initialized_job_order, tool, runtime_context, options)
return import_job
else:
import_workflow_inputs(toil._jobStore, options, initialized_job_order=initialized_job_order, tool=tool)
rootJob, followOn = makeJob(tool, jobobj, runtime_context, None, None) # toplevel, no name needed
rootJob.cwljob = initialized_job_order
return rootJob


def makeJob(
tool: Process,
Expand All @@ -2805,6 +2832,9 @@ def makeJob(
"""
Create the correct Toil Job object for the CWL tool.
Actually creates what might be a subgraph of two jobs. The second of which may be the follow on of the first.
If only one job is created, it is returned twice.
Types: workflow, job, or job wrapper for dynamic resource requirements.
:return: "wfjob, followOn" if the input tool is a workflow, and "job, job" otherwise
Expand Down Expand Up @@ -3111,7 +3141,11 @@ def hasChild(self, c: Job) -> Any:


def remove_pickle_problems(obj: ProcessType) -> ProcessType:
"""Doc_loader does not pickle correctly, causing Toil errors, remove from objects."""
"""
Doc_loader does not pickle correctly, causing Toil errors, remove from objects.
See github issue: https://github.com/mypyc/mypyc/issues/804
"""
if hasattr(obj, "doc_loader"):
obj.doc_loader = None
if isinstance(obj, cwltool.workflow.WorkflowStep):
Expand Down Expand Up @@ -3143,7 +3177,7 @@ def __init__(
self.cwlwf = cwlwf
self.cwljob = cwljob
self.runtime_context = runtime_context
self.cwlwf = remove_pickle_problems(self.cwlwf)
self.cwlwf = self.cwlwf
self.conditional = conditional or Conditional()

def run(
Expand Down Expand Up @@ -3332,6 +3366,111 @@ def run(
return UnresolvedDict(outobj)


class CWLSetupJob(CWLNamedJob):
"""
Job to take a CWL tool and job order with all files imported and makes a CWLWorkflow as a child to run it.
"""
def __init__(self, initialized_job_order: Promised[CWLObjectType], tool: Promised[Process], runtime_context: cwltool.context.RuntimeContext):
super().__init__()
self.initialized_job_order = initialized_job_order
self.tool = tool
self.runtime_context = runtime_context

def run(self, file_store: AbstractFileStore) -> Any:
"""
:return: Returns a CWL object that represents the output of the workflow.
"""
initialized_job_order = unwrap(self.initialized_job_order)
tool = unwrap(self.tool)
root_job, _ = makeJob(tool, initialized_job_order, self.runtime_context, None, None)
self.addChild(root_job)

root_job.cwljob = initialized_job_order

return root_job.rv()


class CWLImportJob(CWLNamedJob):
"""
Job to do file imports on a worker instead of a leader. Assumes all local and cloud files are accessible.
This class is only used when runImportsOnWorkers is enabled.
"""
def __init__(self, initialized_job_order: CWLObjectType, tool: Process, runtime_context: cwltool.context.RuntimeContext, options: Namespace):
super().__init__(local=False, disk=options.import_workers_disk)
self.initialized_job_order = initialized_job_order
self.tool = tool
self.options = options
self.runtime_context = runtime_context

def run(self, file_store: AbstractFileStore) -> Any:
"""
Import the workflow inputs and then create and run the workflow.
:return: Promise of workflow outputs
"""
import_workflow_inputs(file_store.jobStore, self.options, self.initialized_job_order, self.tool)
setup_job = CWLSetupJob(self.initialized_job_order, self.tool, self.runtime_context)
self.addChild(setup_job)
return setup_job.rv()


def import_workflow_inputs(jobstore: AbstractJobStore, options: Namespace, initialized_job_order: CWLObjectType, tool: Process) -> None:
fileindex: Dict[str, str] = {}
existing: Dict[str, str] = {}
# Define something we can call to import a file and get its file
# ID.
# We cast this because import_file is overloaded depending on if we
# pass a shared file name or not, and we know the way we call it we
# always get a FileID out.
file_import_function = cast(
Callable[[str], FileID],
functools.partial(jobstore.import_file, symlink=True),
)

# Import all the input files, some of which may be missing optional
# files.
logger.info("Importing input files...")
fs_access = ToilFsAccess(options.basedir)
import_files(
file_import_function,
fs_access,
fileindex,
existing,
initialized_job_order,
mark_broken=True,
skip_remote=options.reference_inputs,
bypass_file_store=options.bypass_file_store,
log_level=logging.INFO,
)
# Import all the files associated with tools (binaries, etc.).
# Not sure why you would have an optional secondary file here, but
# the spec probably needs us to support them.
logger.info("Importing tool-associated files...")
visitSteps(
tool,
functools.partial(
import_files,
file_import_function,
fs_access,
fileindex,
existing,
mark_broken=True,
skip_remote=options.reference_inputs,
bypass_file_store=options.bypass_file_store,
log_level=logging.INFO,
),
)

# We always expect to have processed all files that exist
for param_name, param_value in initialized_job_order.items():
# Loop through all the parameters for the workflow overall.
# Drop any files that aren't either imported (for when we use
# the file store) or available on disk (for when we don't).
# This will properly make them cause an error later if they
# were required.
rm_unprocessed_secondary_files(param_value)


def visitSteps(
cmdline_tool: Process,
op: Callable[[CommentedMap], None],
Expand Down Expand Up @@ -3624,6 +3763,11 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int:

options = get_options(args)

# Take care of incompatible arguments related to file imports
if options.run_imports_on_workers is True and options.import_workers_disk is None:
logger.error("Commandline arguments --runImportsOnWorkers and --importWorkersDisk must both be set to run file imports on workers.")
return 1

# Do cwltool setup
cwltool.main.setup_schema(args=options, custom_schema_callback=None)
tmpdir_prefix = options.tmpdir_prefix = options.tmpdir_prefix or DEFAULT_TMPDIR_PREFIX
Expand Down Expand Up @@ -3683,9 +3827,6 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int:
tmp_outdir_prefix = os.path.abspath(
options.tmp_outdir_prefix or DEFAULT_TMPDIR_PREFIX
)

fileindex: Dict[str, str] = {}
existing: Dict[str, str] = {}
conf_file = getattr(options, "beta_dependency_resolvers_configuration", None)
use_conda_dependencies = getattr(options, "beta_conda_dependencies", None)
job_script_provider = None
Expand Down Expand Up @@ -3895,73 +4036,21 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int:
discover_secondaryFiles=True,
)

# Define something we can call to import a file and get its file
# ID.
# We cast this because import_file is overloaded depending on if we
# pass a shared file name or not, and we know the way we call it we
# always get a FileID out.
file_import_function = cast(
Callable[[str], FileID],
functools.partial(toil.import_file, symlink=True),
)

# Import all the input files, some of which may be missing optional
# files.
logger.info("Importing input files...")
fs_access = ToilFsAccess(options.basedir)
import_files(
file_import_function,
fs_access,
fileindex,
existing,
initialized_job_order,
mark_broken=True,
skip_remote=options.reference_inputs,
bypass_file_store=options.bypass_file_store,
log_level=logging.INFO,
)
# Import all the files associated with tools (binaries, etc.).
# Not sure why you would have an optional secondary file here, but
# the spec probably needs us to support them.
logger.info("Importing tool-associated files...")
visitSteps(
tool,
functools.partial(
import_files,
file_import_function,
fs_access,
fileindex,
existing,
mark_broken=True,
skip_remote=options.reference_inputs,
bypass_file_store=options.bypass_file_store,
log_level=logging.INFO,
),
)

# We always expect to have processed all files that exist
for param_name, param_value in initialized_job_order.items():
# Loop through all the parameters for the workflow overall.
# Drop any files that aren't either imported (for when we use
# the file store) or available on disk (for when we don't).
# This will properly make them cause an error later if they
# were required.
rm_unprocessed_secondary_files(param_value)

logger.info("Creating root job")
logger.debug("Root tool: %s", tool)
tool = remove_pickle_problems(tool)
try:
wf1, _ = makeJob(
wf1 = makeRootJob(
tool=tool,
jobobj={},
runtime_context=runtime_context,
parent_name=None, # toplevel, no name needed
conditional=None,
initialized_job_order=initialized_job_order,
options=options,
toil=toil
)
except CWL_UNSUPPORTED_REQUIREMENT_EXCEPTION as err:
logging.error(err)
return CWL_UNSUPPORTED_REQUIREMENT_EXIT_CODE
wf1.cwljob = initialized_job_order
logger.info("Starting workflow")
outobj = toil.start(wf1)

Expand Down
16 changes: 16 additions & 0 deletions src/toil/options/cwl.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from argparse import ArgumentParser

from configargparse import SUPPRESS
from toil.lib.conversions import human2bytes

from toil.version import baseVersion

Expand Down Expand Up @@ -281,6 +282,21 @@ def add_cwl_options(parser: ArgumentParser, suppress: bool = True) -> None:
help=suppress_help or "Disable file streaming for files that have 'streamable' flag True",
dest="disable_streaming",
)
parser.add_argument(
"--runImportsOnWorkers", "--run-imports-on-workers",
action="store_true",
default=False,
help=suppress_help or "Run the file imports on a worker instead of the leader. This is useful if the leader is not optimized for high network performance."
"If set to true, the argument --importWorkersDisk must also be set.",
dest="run_imports_on_workers"
)

parser.add_argument("--importWorkersDisk", "--import-workers-disk",
help=suppress_help or "Specify the amount of disk space an import worker will use. If file streaming for input files is not available,"
"this should be set to the size of the largest input file. This must be set in conjunction with the argument runImportsOnWorkers.",
dest="import_workers_disk",
type=lambda x: human2bytes(str(x)),
default=None)

provgroup = parser.add_argument_group(
"Options for recording provenance information of the execution"
Expand Down

0 comments on commit f0ee54b

Please sign in to comment.