Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow importing on workers #5098

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 153 additions & 64 deletions src/toil/cwl/cwltoil.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Implemented support for Common Workflow Language (CWL) for Toil."""
import argparse
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused import?

# Copyright (C) 2015 Curoverse, Inc
# Copyright (C) 2015-2021 Regents of the University of California
# Copyright (C) 2019-2020 Seven Bridges
Expand Down Expand Up @@ -2404,7 +2405,7 @@ def __init__(
subjob_name="_wrapper",
local=True,
)
self.cwltool = remove_pickle_problems(tool)
self.cwltool = tool
self.cwljob = cwljob
self.runtime_context = runtime_context
self.conditional = conditional
Expand Down Expand Up @@ -2441,7 +2442,7 @@ def __init__(
conditional: Union[Conditional, None] = None,
):
"""Store the context for later execution."""
self.cwltool = remove_pickle_problems(tool)
self.cwltool = tool
self.conditional = conditional or Conditional()

if runtime_context.builder:
Expand Down Expand Up @@ -2790,6 +2791,32 @@ def get_container_engine(runtime_context: cwltool.context.RuntimeContext) -> str
return "singularity"
return "docker"

def makeRootJob(
tool: Process,
jobobj: CWLObjectType,
runtime_context: cwltool.context.RuntimeContext,
initialized_job_order: CWLObjectType,
options: Namespace,
toil: Toil
) -> CWLNamedJob:
"""
Create the Toil root Job object for the CWL tool. Is the same as makeJob() except this also handles import logic.

Actually creates what might be a subgraph of two jobs. The second of which may be the follow on of the first.
If only one job is created, it is returned twice.

:return:
"""

if options.run_imports_on_workers:
import_job = CWLImportJob(initialized_job_order, tool, runtime_context, options)
return import_job
else:
import_workflow_inputs(toil._jobStore, options, initialized_job_order=initialized_job_order, tool=tool)
rootJob, followOn = makeJob(tool, jobobj, runtime_context, None, None) # toplevel, no name needed
rootJob.cwljob = initialized_job_order
return rootJob


def makeJob(
tool: Process,
Expand All @@ -2805,6 +2832,9 @@ def makeJob(
"""
Create the correct Toil Job object for the CWL tool.

Actually creates what might be a subgraph of two jobs. The second of which may be the follow on of the first.
If only one job is created, it is returned twice.

Types: workflow, job, or job wrapper for dynamic resource requirements.

:return: "wfjob, followOn" if the input tool is a workflow, and "job, job" otherwise
Expand Down Expand Up @@ -3111,7 +3141,11 @@ def hasChild(self, c: Job) -> Any:


def remove_pickle_problems(obj: ProcessType) -> ProcessType:
"""Doc_loader does not pickle correctly, causing Toil errors, remove from objects."""
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is only called in one place now, other than itself. Is it safe to remove and was the issue resolved?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's still needed as I ran into this issue while creating the CWLImportJob. Something internal in the CWL tool object is unpickleable and must be removed. Before it was being called in every Job initialization, but now I moved it to be ran only once on the leader.

Doc_loader does not pickle correctly, causing Toil errors, remove from objects.

See github issue: https://github.com/mypyc/mypyc/issues/804
"""
if hasattr(obj, "doc_loader"):
obj.doc_loader = None
if isinstance(obj, cwltool.workflow.WorkflowStep):
Expand Down Expand Up @@ -3143,7 +3177,7 @@ def __init__(
self.cwlwf = cwlwf
self.cwljob = cwljob
self.runtime_context = runtime_context
self.cwlwf = remove_pickle_problems(self.cwlwf)
self.cwlwf = self.cwlwf
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No-op.

self.conditional = conditional or Conditional()

def run(
Expand Down Expand Up @@ -3332,6 +3366,111 @@ def run(
return UnresolvedDict(outobj)


class CWLSetupJob(CWLNamedJob):
"""
Job to take a CWL tool and job order with all files imported and makes a CWLWorkflow as a child to run it.
"""
def __init__(self, initialized_job_order: Promised[CWLObjectType], tool: Promised[Process], runtime_context: cwltool.context.RuntimeContext):
super().__init__()
self.initialized_job_order = initialized_job_order
self.tool = tool
self.runtime_context = runtime_context

def run(self, file_store: AbstractFileStore) -> Any:
"""
:return: Returns a CWL object that represents the output of the workflow.
"""
initialized_job_order = unwrap(self.initialized_job_order)
tool = unwrap(self.tool)
root_job, _ = makeJob(tool, initialized_job_order, self.runtime_context, None, None)
self.addChild(root_job)

root_job.cwljob = initialized_job_order

return root_job.rv()


class CWLImportJob(CWLNamedJob):
"""
Job to do file imports on a worker instead of a leader. Assumes all local and cloud files are accessible.

This class is only used when runImportsOnWorkers is enabled.
"""
def __init__(self, initialized_job_order: CWLObjectType, tool: Process, runtime_context: cwltool.context.RuntimeContext, options: Namespace):
super().__init__(local=False, disk=options.import_workers_disk)
self.initialized_job_order = initialized_job_order
self.tool = tool
self.options = options
self.runtime_context = runtime_context

def run(self, file_store: AbstractFileStore) -> Any:
"""
Import the workflow inputs and then create and run the workflow.
:return: Promise of workflow outputs
"""
import_workflow_inputs(file_store.jobStore, self.options, self.initialized_job_order, self.tool)
setup_job = CWLSetupJob(self.initialized_job_order, self.tool, self.runtime_context)
self.addChild(setup_job)
return setup_job.rv()


def import_workflow_inputs(jobstore: AbstractJobStore, options: Namespace, initialized_job_order: CWLObjectType, tool: Process) -> None:
fileindex: Dict[str, str] = {}
existing: Dict[str, str] = {}
# Define something we can call to import a file and get its file
# ID.
# We cast this because import_file is overloaded depending on if we
# pass a shared file name or not, and we know the way we call it we
# always get a FileID out.
file_import_function = cast(
Callable[[str], FileID],
functools.partial(jobstore.import_file, symlink=True),
)

# Import all the input files, some of which may be missing optional
# files.
logger.info("Importing input files...")
fs_access = ToilFsAccess(options.basedir)
import_files(
file_import_function,
fs_access,
fileindex,
existing,
initialized_job_order,
mark_broken=True,
skip_remote=options.reference_inputs,
bypass_file_store=options.bypass_file_store,
log_level=logging.INFO,
)
# Import all the files associated with tools (binaries, etc.).
# Not sure why you would have an optional secondary file here, but
# the spec probably needs us to support them.
logger.info("Importing tool-associated files...")
visitSteps(
tool,
functools.partial(
import_files,
file_import_function,
fs_access,
fileindex,
existing,
mark_broken=True,
skip_remote=options.reference_inputs,
bypass_file_store=options.bypass_file_store,
log_level=logging.INFO,
),
)

# We always expect to have processed all files that exist
for param_name, param_value in initialized_job_order.items():
# Loop through all the parameters for the workflow overall.
# Drop any files that aren't either imported (for when we use
# the file store) or available on disk (for when we don't).
# This will properly make them cause an error later if they
# were required.
rm_unprocessed_secondary_files(param_value)


def visitSteps(
cmdline_tool: Process,
op: Callable[[CommentedMap], None],
Expand Down Expand Up @@ -3624,6 +3763,11 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int:

options = get_options(args)

# Take care of incompatible arguments related to file imports
if options.run_imports_on_workers is True and options.import_workers_disk is None:
logger.error("Commandline arguments --runImportsOnWorkers and --importWorkersDisk must both be set to run file imports on workers.")
return 1

# Do cwltool setup
cwltool.main.setup_schema(args=options, custom_schema_callback=None)
tmpdir_prefix = options.tmpdir_prefix = options.tmpdir_prefix or DEFAULT_TMPDIR_PREFIX
Expand Down Expand Up @@ -3683,9 +3827,6 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int:
tmp_outdir_prefix = os.path.abspath(
options.tmp_outdir_prefix or DEFAULT_TMPDIR_PREFIX
)

fileindex: Dict[str, str] = {}
existing: Dict[str, str] = {}
conf_file = getattr(options, "beta_dependency_resolvers_configuration", None)
use_conda_dependencies = getattr(options, "beta_conda_dependencies", None)
job_script_provider = None
Expand Down Expand Up @@ -3895,73 +4036,21 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int:
discover_secondaryFiles=True,
)

# Define something we can call to import a file and get its file
# ID.
# We cast this because import_file is overloaded depending on if we
# pass a shared file name or not, and we know the way we call it we
# always get a FileID out.
file_import_function = cast(
Callable[[str], FileID],
functools.partial(toil.import_file, symlink=True),
)

# Import all the input files, some of which may be missing optional
# files.
logger.info("Importing input files...")
fs_access = ToilFsAccess(options.basedir)
import_files(
file_import_function,
fs_access,
fileindex,
existing,
initialized_job_order,
mark_broken=True,
skip_remote=options.reference_inputs,
bypass_file_store=options.bypass_file_store,
log_level=logging.INFO,
)
# Import all the files associated with tools (binaries, etc.).
# Not sure why you would have an optional secondary file here, but
# the spec probably needs us to support them.
logger.info("Importing tool-associated files...")
visitSteps(
tool,
functools.partial(
import_files,
file_import_function,
fs_access,
fileindex,
existing,
mark_broken=True,
skip_remote=options.reference_inputs,
bypass_file_store=options.bypass_file_store,
log_level=logging.INFO,
),
)

# We always expect to have processed all files that exist
for param_name, param_value in initialized_job_order.items():
# Loop through all the parameters for the workflow overall.
# Drop any files that aren't either imported (for when we use
# the file store) or available on disk (for when we don't).
# This will properly make them cause an error later if they
# were required.
rm_unprocessed_secondary_files(param_value)

logger.info("Creating root job")
logger.debug("Root tool: %s", tool)
tool = remove_pickle_problems(tool)
try:
wf1, _ = makeJob(
wf1 = makeRootJob(
tool=tool,
jobobj={},
runtime_context=runtime_context,
parent_name=None, # toplevel, no name needed
conditional=None,
initialized_job_order=initialized_job_order,
options=options,
toil=toil
)
except CWL_UNSUPPORTED_REQUIREMENT_EXCEPTION as err:
logging.error(err)
return CWL_UNSUPPORTED_REQUIREMENT_EXIT_CODE
wf1.cwljob = initialized_job_order
logger.info("Starting workflow")
outobj = toil.start(wf1)

Expand Down
16 changes: 16 additions & 0 deletions src/toil/options/cwl.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from argparse import ArgumentParser

from configargparse import SUPPRESS
from toil.lib.conversions import human2bytes

from toil.version import baseVersion

Expand Down Expand Up @@ -281,6 +282,21 @@ def add_cwl_options(parser: ArgumentParser, suppress: bool = True) -> None:
help=suppress_help or "Disable file streaming for files that have 'streamable' flag True",
dest="disable_streaming",
)
parser.add_argument(
"--runImportsOnWorkers", "--run-imports-on-workers",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test for this?

action="store_true",
default=False,
help=suppress_help or "Run the file imports on a worker instead of the leader. This is useful if the leader is not optimized for high network performance."
"If set to true, the argument --importWorkersDisk must also be set.",
dest="run_imports_on_workers"
)

parser.add_argument("--importWorkersDisk", "--import-workers-disk",
help=suppress_help or "Specify the amount of disk space an import worker will use. If file streaming for input files is not available,"
"this should be set to the size of the largest input file. This must be set in conjunction with the argument runImportsOnWorkers.",
dest="import_workers_disk",
type=lambda x: human2bytes(str(x)),
default=None)

provgroup = parser.add_argument_group(
"Options for recording provenance information of the execution"
Expand Down
Loading