Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add leader fallback for worker file imports #5189

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 138 additions & 55 deletions src/toil/cwl/cwltoil.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@
TypeVar,
Union,
cast,
Literal, Protocol,
Literal,
Protocol,
)
from urllib.parse import quote, unquote, urlparse, urlsplit

Expand All @@ -66,6 +67,7 @@
import cwltool.main
import cwltool.resolver
import schema_salad.ref_resolver

# This is also in configargparse but MyPy doesn't know it
from argparse import RawDescriptionHelpFormatter
from configargparse import ArgParser, Namespace
Expand Down Expand Up @@ -132,6 +134,8 @@
unwrap,
ImportsJob,
get_file_sizes,
FileMetadata,
WorkerImportJob,
)
from toil.jobStores.abstractJobStore import (
AbstractJobStore,
Expand Down Expand Up @@ -1893,12 +1897,20 @@ def extract_file_uri_once(
return rp
return None


V = TypeVar("V", covariant=True)


class VisitFunc(Protocol[V]):
def __call__(self, fileindex: dict[str, str], existing: dict[str, str],
file_metadata: CWLObjectType, mark_broken: bool,
skip_remote: bool) -> V: ...
def __call__(
self,
fileindex: dict[str, str],
existing: dict[str, str],
file_metadata: CWLObjectType,
mark_broken: bool,
skip_remote: bool,
) -> V: ...


def visit_files(
func: VisitFunc[V],
Expand Down Expand Up @@ -2188,7 +2200,9 @@ def extract_and_convert_file_to_toil_uri(
Unless skip_remote is set, also run on remote files and sets their locations
to toil URIs as well.
"""
location = extract_file_uri_once(fileindex, existing, file_metadata, mark_broken, skip_remote)
location = extract_file_uri_once(
fileindex, existing, file_metadata, mark_broken, skip_remote
)
if location is not None:
file_metadata["location"] = convert_file_uri_to_toil_uri(
convertfunc, fileindex, existing, location
Expand Down Expand Up @@ -2896,7 +2910,9 @@ def file_import_function(url: str, log_level: int = logging.DEBUG) -> FileID:
logger.log(log_level, "Loading %s...", url)
return writeGlobalFileWrapper(file_store, url)

file_upload_function = functools.partial(extract_and_convert_file_to_toil_uri, file_import_function)
file_upload_function = functools.partial(
extract_and_convert_file_to_toil_uri, file_import_function
)

# Upload all the Files and set their and the Directories' locations, if
# needed.
Expand Down Expand Up @@ -2948,8 +2964,41 @@ def makeRootJob(
:return:
"""
if options.run_imports_on_workers:
filenames = extract_workflow_inputs(options, initialized_job_order, tool)
files_to_data = get_file_sizes(
filenames, toil._jobStore, include_remote_files=options.reference_inputs
)

# Mapping of files to metadata for files that will be imported on the worker
# This will consist of files that we were able to get a file size for
worker_files_to_data: dict[str, FileMetadata] = dict()
# Mapping of files to metadata for files that will be imported on the leader
# This will consist of files that we were not able to get a file size for
leader_files_to_data = dict()
for filename, file_data in files_to_data.items():
if file_data.size is None:
leader_files_to_data[filename] = file_data
else:
worker_files_to_data[filename] = file_data

# import the files for the leader first
path_to_fileid = WorkerImportJob.import_files(
list(leader_files_to_data.keys()), toil._jobStore
)

# then install the imported files before importing the other files
# this way the control flow can fall from the leader to workers
tool, initialized_job_order = CWLInstallImportsJob.fill_in_files(
initialized_job_order,
tool,
path_to_fileid,
options.basedir,
options.reference_inputs,
options.bypass_file_store,
)

import_job = CWLImportWrapper(
initialized_job_order, tool, runtime_context, options
initialized_job_order, tool, runtime_context, worker_files_to_data, options
)
return import_job
else:
Expand Down Expand Up @@ -3538,22 +3587,28 @@ def __init__(
self.bypass_file_store = bypass_file_store
self.import_data = import_data

def run(self, file_store: AbstractFileStore) -> Tuple[Process, CWLObjectType]:
@staticmethod
def fill_in_files(
initialized_job_order: CWLObjectType,
tool: Process,
candidate_to_fileid: dict[str, FileID],
basedir: str,
skip_remote: bool,
bypass_file_store: bool,
) -> tuple[Process, CWLObjectType]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is missing a docstring.

It also might need a better name. It sounds like it is going to e.g. convert files from DOC to PDF or something, and really it is going to fill in Toil file IDs and replace URLs with other URLs.

"""
Convert the filenames in the workflow inputs into the URIs
:return: Promise of transformed workflow inputs. A tuple of the job order and process
Given a mapping of filenames to Toil file IDs, replace the filename with the file IDs throughout the CWL object.
"""
candidate_to_fileid: dict[str, FileID] = unwrap(self.import_data)

initialized_job_order = unwrap(self.initialized_job_order)
tool = unwrap(self.tool)

def convert_file(filename: str) -> FileID:
fileid = candidate_to_fileid[filename]
return fileid

file_convert_function = functools.partial(extract_and_convert_file_to_toil_uri, convert_file)
fs_access = ToilFsAccess(self.basedir)
def fill_in_file(filename: str) -> FileID:
"""
Return the file name's associated Toil file ID
"""
return candidate_to_fileid[filename]

file_convert_function = functools.partial(
extract_and_convert_file_to_toil_uri, fill_in_file
)
fs_access = ToilFsAccess(basedir)
fileindex: dict[str, str] = {}
existing: dict[str, str] = {}
visit_files(
Expand All @@ -3563,8 +3618,8 @@ def convert_file(filename: str) -> FileID:
existing,
initialized_job_order,
mark_broken=True,
skip_remote=self.skip_remote,
bypass_file_store=self.bypass_file_store,
skip_remote=skip_remote,
bypass_file_store=bypass_file_store,
)
visitSteps(
tool,
Expand All @@ -3575,8 +3630,8 @@ def convert_file(filename: str) -> FileID:
fileindex,
existing,
mark_broken=True,
skip_remote=self.skip_remote,
bypass_file_store=self.bypass_file_store,
skip_remote=skip_remote,
bypass_file_store=bypass_file_store,
),
)

Expand All @@ -3588,9 +3643,26 @@ def convert_file(filename: str) -> FileID:
# This will properly make them cause an error later if they
# were required.
rm_unprocessed_secondary_files(param_value)

return tool, initialized_job_order

def run(self, file_store: AbstractFileStore) -> Tuple[Process, CWLObjectType]:
"""
Convert the filenames in the workflow inputs into the URIs
:return: Promise of transformed workflow inputs. A tuple of the job order and process
"""
candidate_to_fileid: dict[str, FileID] = unwrap(self.import_data)

initialized_job_order = unwrap(self.initialized_job_order)
tool = unwrap(self.tool)
return CWLInstallImportsJob.fill_in_files(
initialized_job_order,
tool,
candidate_to_fileid,
self.basedir,
self.skip_remote,
self.bypass_file_store,
)


class CWLImportWrapper(CWLNamedJob):
"""
Expand All @@ -3605,22 +3677,22 @@ def __init__(
initialized_job_order: CWLObjectType,
tool: Process,
runtime_context: cwltool.context.RuntimeContext,
file_to_data: dict[str, FileMetadata],
options: Namespace,
):
super().__init__(local=False, disk=options.import_workers_threshold)
self.initialized_job_order = initialized_job_order
self.tool = tool
self.options = options
self.runtime_context = runtime_context
self.file_to_data = file_to_data

def run(self, file_store: AbstractFileStore) -> Any:
filenames = extract_workflow_inputs(
self.options, self.initialized_job_order, self.tool
imports_job = ImportsJob(
self.file_to_data,
self.options.import_workers_threshold,
self.options.import_workers_disk,
)
file_to_data = get_file_sizes(
filenames, file_store.jobStore, include_remote_files=self.options.reference_inputs
)
imports_job = ImportsJob(file_to_data, self.options.import_workers_threshold, self.options.import_workers_disk)
self.addChild(imports_job)
install_imports_job = CWLInstallImportsJob(
initialized_job_order=self.initialized_job_order,
Expand All @@ -3634,7 +3706,9 @@ def run(self, file_store: AbstractFileStore) -> Any:
imports_job.addFollowOn(install_imports_job)

start_job = CWLStartJob(
install_imports_job.rv(0), install_imports_job.rv(1), runtime_context=self.runtime_context
install_imports_job.rv(0),
install_imports_job.rv(1),
runtime_context=self.runtime_context,
)
self.addChild(start_job)
install_imports_job.addFollowOn(start_job)
Expand All @@ -3645,7 +3719,7 @@ def run(self, file_store: AbstractFileStore) -> Any:
class CWLStartJob(CWLNamedJob):
"""
Job responsible for starting the CWL workflow.

Takes in the workflow/tool and inputs after all files are imported
and creates jobs to run those workflows.
"""
Expand Down Expand Up @@ -3744,7 +3818,10 @@ def import_workflow_inputs(
def file_import_function(url: str) -> FileID:
logger.log(log_level, "Loading %s...", url)
return jobstore.import_file(url, symlink=True)
import_function = functools.partial(extract_and_convert_file_to_toil_uri, file_import_function)

import_function = functools.partial(
extract_and_convert_file_to_toil_uri, file_import_function
)
# Import all the input files, some of which may be missing optional
# files.
logger.info("Importing input files...")
Expand All @@ -3763,8 +3840,13 @@ def file_import_function(url: str) -> FileID:
# Make another function for importing tool files. This one doesn't allow
# symlinking, since the tools might be coming from storage not accessible
# to all nodes.
tool_import_function = functools.partial(extract_and_convert_file_to_toil_uri,
cast(Callable[[str], FileID], functools.partial(jobstore.import_file, symlink=False)))
tool_import_function = functools.partial(
extract_and_convert_file_to_toil_uri,
cast(
Callable[[str], FileID],
functools.partial(jobstore.import_file, symlink=False),
),
)

# Import all the files associated with tools (binaries, etc.).
# Not sure why you would have an optional secondary file here, but
Expand Down Expand Up @@ -3795,6 +3877,8 @@ def file_import_function(url: str) -> FileID:


T = TypeVar("T")


def visitSteps(
cmdline_tool: Process,
op: Callable[[CommentedMap], list[T]],
Expand All @@ -3818,12 +3902,10 @@ def visitSteps(
# if they bothered to run the Process __init__.
return op(cmdline_tool.tool)
raise RuntimeError(
f"Unsupported type encountered in workflow "
f"traversal: {type(cmdline_tool)}"
f"Unsupported type encountered in workflow " f"traversal: {type(cmdline_tool)}"
)



def rm_unprocessed_secondary_files(job_params: Any) -> None:
if isinstance(job_params, list):
for j in job_params:
Expand Down Expand Up @@ -4081,7 +4163,8 @@ def get_options(args: list[str]) -> Namespace:
parser = ArgParser(
allow_abbrev=False,
usage="%(prog)s [options] WORKFLOW [INFILE] [WF_OPTIONS...]",
description=textwrap.dedent("""
description=textwrap.dedent(
"""
positional arguments:

WORKFLOW CWL file to run.
Expand All @@ -4096,10 +4179,11 @@ def get_options(args: list[str]) -> Namespace:

If an input has the same name as a Toil option, pass
'--' before it.
"""),
"""
),
formatter_class=RawDescriptionHelpFormatter,
)

addOptions(parser, jobstore_as_flag=True, cwl=True)
options: Namespace
options, extra = parser.parse_known_args(args)
Expand Down Expand Up @@ -4264,14 +4348,12 @@ def main(args: Optional[list[str]] = None, stdout: TextIO = sys.stdout) -> int:

options.tool_help = None
options.debug = options.logLevel == "DEBUG"
job_order_object, options.basedir, jobloader = (
cwltool.main.load_job_order(
options,
sys.stdin,
loading_context.fetcher_constructor,
loading_context.overrides_list,
tool_file_uri,
)
job_order_object, options.basedir, jobloader = cwltool.main.load_job_order(
options,
sys.stdin,
loading_context.fetcher_constructor,
loading_context.overrides_list,
tool_file_uri,
)
if options.overrides:
loading_context.overrides_list.extend(
Expand Down Expand Up @@ -4332,7 +4414,8 @@ def main(args: Optional[list[str]] = None, stdout: TextIO = sys.stdout) -> int:
if err.code == 2: # raised by argparse's parse_args() function
print(
"\nIf both a CWL file and an input object (YAML/JSON) file were "
"provided, the problem may be the argument order." + usage_message,
"provided, the problem may be the argument order."
+ usage_message,
file=sys.stderr,
)
raise
Expand All @@ -4345,9 +4428,9 @@ def main(args: Optional[list[str]] = None, stdout: TextIO = sys.stdout) -> int:
shortname(inp["id"]) in initialized_job_order
and inp["type"] == "File"
):
cast(
CWLObjectType, initialized_job_order[shortname(inp["id"])]
)["streamable"] = inp.get("streamable", False)
cast(CWLObjectType, initialized_job_order[shortname(inp["id"])])[
"streamable"
] = inp.get("streamable", False)
# TODO also for nested types that contain streamable Files

runtime_context.use_container = not options.no_container
Expand Down
Loading
Loading