Skip to content

Commit

Permalink
Fix URL to File coercion (#5059)
Browse files Browse the repository at this point in the history
* Unify coercion and check for URLs to fix #5058

* Handle nonexistent and erroneous URLs sensibly for optional File coercion

* Add missing test file

---------

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
adamnovak and github-actions[bot] authored Aug 27, 2024
1 parent eaae62c commit f23b1d3
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 40 deletions.
25 changes: 19 additions & 6 deletions src/toil/jobStores/abstractJobStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,8 @@ def url_exists(cls, src_uri: str) -> bool:
"""
Return True if the file at the given URI exists, and False otherwise.
May raise an error if file existence cannot be determined.
:param src_uri: URL that points to a file or object in the storage
mechanism of a supported URL scheme e.g. a blob in an AWS s3 bucket.
"""
Expand Down Expand Up @@ -637,6 +639,8 @@ def open_url(cls, src_uri: str) -> IO[bytes]:
def _url_exists(cls, url: ParseResult) -> bool:
"""
Return True if the item at the given URL exists, and Flase otherwise.
May raise an error if file existence cannot be determined.
"""
raise NotImplementedError(f"No implementation for {url}")

Expand Down Expand Up @@ -1756,9 +1760,10 @@ def _url_exists(cls, url: ParseResult) -> bool:
# TODO: Figure out how to HEAD instead of this.
with cls._open_url(url):
return True
except:
pass
return False
except FileNotFoundError:
return False
# Any other errors we should pass through because something really went
# wrong (e.g. server is broken today but file may usually exist)

@classmethod
@retry(
Expand Down Expand Up @@ -1800,18 +1805,26 @@ def count(l: int) -> None:
@retry(
errors=[
BadStatusLine,
ErrorCondition(error=HTTPError, error_codes=[408, 500, 503]),
ErrorCondition(error=HTTPError, error_codes=[408, 429, 500, 502, 503]),
]
)
def _open_url(cls, url: ParseResult) -> IO[bytes]:
try:
return cast(IO[bytes], closing(urlopen(url.geturl())))
except HTTPError as e:
if e.code == 404:
if e.code in (404, 410):
# Translate into a FileNotFoundError for detecting
# un-importable files
# known nonexistent files
raise FileNotFoundError(str(url)) from e
else:
# Other codes indicate a real problem with the server; we don't
# want to e.g. run a workflow without an optional input that
# the user specified a path to just because the server was
# busy.

# Sometimes we expect to see this when polling existence for
# inputs at guessed paths, so don't complain *too* loudly here.
logger.debug("Unusual status %d for URL %s", e.code, str(url))
raise

@classmethod
Expand Down
13 changes: 13 additions & 0 deletions src/toil/test/wdl/testfiles/url_to_file.wdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
version 1.0

workflow url_to_file {
input {
}

File the_file = "https://hgdownload.soe.ucsc.edu/goldenPath/hs1/bigZips/hs1.chrom.sizes"

output {
File out_file = the_file
String first_line = read_lines(the_file)[0]
}
}
13 changes: 13 additions & 0 deletions src/toil/test/wdl/testfiles/url_to_optional_file.wdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
version 1.0

workflow url_to_optional_file {
input {
Int http_code = 404
}

File? the_file = "https://httpstat.us/" + http_code

output {
File? out_file = the_file
}
}
52 changes: 52 additions & 0 deletions src/toil/test/wdl/wdltoil_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,58 @@ def test_MD5sum(self):
assert os.path.exists(result['ga4ghMd5.value'])
assert os.path.basename(result['ga4ghMd5.value']) == 'md5sum.txt'

def test_url_to_file(self):
"""
Test if web URL strings can be coerced to usable Files.
"""
wdl = os.path.abspath('src/toil/test/wdl/testfiles/url_to_file.wdl')

result_json = subprocess.check_output(
self.base_command + [wdl, '-o', self.output_dir, '--logInfo', '--retryCount=0'])
result = json.loads(result_json)

assert 'url_to_file.first_line' in result
assert isinstance(result['url_to_file.first_line'], str)
self.assertEqual(result['url_to_file.first_line'], 'chr1\t248387328')

def test_url_to_optional_file(self):
"""
Test if missing and error-producing URLs are handled correctly for optional File? values.
"""
wdl = os.path.abspath('src/toil/test/wdl/testfiles/url_to_optional_file.wdl')

def run_for_code(code: int) -> dict:
"""
Run a workflow coercing URL to File? where the URL returns the given status code.
Return the parsed output.
"""
logger.info("Test optional file with HTTP code %s", code)
json_value = '{"url_to_optional_file.http_code": %d}' % code
result_json = subprocess.check_output(
self.base_command + [wdl, json_value, '-o', self.output_dir, '--logInfo', '--retryCount=0'])
result = json.loads(result_json)
return result

# Check files that exist
result = run_for_code(200)
assert 'url_to_optional_file.out_file' in result
self.assertNotEqual(result['url_to_optional_file.out_file'], None)

for code in (404, 410):
# Check files that definitely don't
result = run_for_code(code)
assert 'url_to_optional_file.out_file' in result
self.assertEqual(result['url_to_optional_file.out_file'], None)

for code in (402, 418, 500, 502):
# Check that cases where the server refuses to say if the file
# exists stop the workflow.
with self.assertRaises(subprocess.CalledProcessError):
run_for_code(code)



def test_missing_output_directory(self):
"""
Test if Toil can run a WDL workflow into a new directory.
Expand Down
84 changes: 50 additions & 34 deletions src/toil/wdl/wdltoil.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
TypeVar,
Union,
cast)
from mypy_extensions import Arg, DefaultArg
from urllib.error import HTTPError
from urllib.parse import quote, unquote, urljoin, urlsplit
from functools import partial

Expand Down Expand Up @@ -809,7 +811,7 @@ def __init__(
) -> None:
"""
Set up the standard library.
:param task_path: Dotted WDL name of the part of the workflow this library is working for.
:param execution_dir: Directory to use as the working directory for workflow code.
:param enforce_existence: If true, then if a file is detected as
Expand Down Expand Up @@ -874,7 +876,7 @@ def _devirtualize_filename(self, filename: str) -> str:
'devirtualize' filename passed to a read_* function: return a filename that can be open()ed
on the local host.
"""

result = self.devirtualize_to(
filename,
self._file_store.localTempDir,
Expand Down Expand Up @@ -912,7 +914,7 @@ def devirtualize_to(
The input filename could already be devirtualized. In this case, the filename
should not be added to the cache
:param state: State dict which must be shared among successive calls into a dest_dir.
:param enforce_existence: Raise an error if the file is nonexistent. Else, let it pass through.
"""
Expand Down Expand Up @@ -1488,6 +1490,7 @@ def import_file_from_uri(uri: str) -> str:
if not AbstractJobStore.url_exists(candidate_uri):
# Wasn't found there
continue

# Now we know this exists, so pass it through
return candidate_uri
else:
Expand All @@ -1499,12 +1502,16 @@ def import_file_from_uri(uri: str) -> str:
# Wasn't found there
continue
logger.info('Imported %s', candidate_uri)

except UnimplementedURLException as e:
# We can't find anything that can even support this URL scheme.
# Report to the user, they are probably missing an extra.
logger.critical('Error: ' + str(e))
sys.exit(1)
except HTTPError as e:
# Something went wrong looking for it there.
logger.warning("Checked URL %s but got HTTP status %s", candidate_uri, e.code)
# Try the next location.
continue
except Exception:
# Something went wrong besides the file not being found. Maybe
# we have no auth.
Expand Down Expand Up @@ -1561,13 +1568,18 @@ def drop_if_missing(value_type: WDL.Type.Base, filename: str, work_dir: str) ->
logger.debug("Consider file %s", filename)

if is_url(filename):
if (not filename.startswith(TOIL_NONEXISTENT_URI_SCHEME)
and (filename.startswith(TOIL_URI_SCHEME) or AbstractJobStore.url_exists(filename))):
# We assume anything in the filestore actually exists.
return filename
else:
logger.warning('File %s with type %s does not actually exist at its URI', filename, value_type)
return None
try:
if (not filename.startswith(TOIL_NONEXISTENT_URI_SCHEME)
and (filename.startswith(TOIL_URI_SCHEME) or AbstractJobStore.url_exists(filename))):
# We assume anything in the filestore actually exists.
return filename
else:
logger.warning('File %s with type %s does not actually exist at its URI', filename, value_type)
return None
except HTTPError as e:
# The error doesn't always include the URL in its message.
logger.error("File %s could not be checked for existence due to HTTP error %d", filename, e.code)
raise
else:
# Get the absolute path, not resolving symlinks
effective_path = os.path.abspath(os.path.join(work_dir, filename))
Expand Down Expand Up @@ -1939,10 +1951,10 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]:
total_bytes: float = convert_units(total_gb, 'GB')
runtime_disk = int(total_bytes)


if not runtime_bindings.has_binding("gpu") and self._task.effective_wdl_version in ('1.0', 'draft-2'):
# For old WDL versions, guess whether the task wants GPUs if not specified.
use_gpus = (runtime_bindings.has_binding('gpuCount') or
use_gpus = (runtime_bindings.has_binding('gpuCount') or
runtime_bindings.has_binding('gpuType') or
runtime_bindings.has_binding('nvidiaDriverVersion'))
else:
Expand All @@ -1951,7 +1963,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]:
# truth on whether to use GPUs or not.
# Fields such as gpuType and gpuCount will control what GPUs are provided.
use_gpus = cast(WDL.Value.Boolean, runtime_bindings.get('gpu', WDL.Value.Boolean(False))).value

if use_gpus:
# We want to have GPUs
# TODO: actually coerce types here instead of casting to detect user mistakes
Expand Down Expand Up @@ -2036,7 +2048,7 @@ def add_injections(self, command_string: str, task_container: TaskContainer) ->
Currently doesn't implement the MiniWDL plugin system, but does add
resource usage monitoring to Docker containers.
"""

parts = []

if isinstance(task_container, SwarmContainer):
Expand Down Expand Up @@ -3460,31 +3472,35 @@ def monkeypatch_coerce(standard_library: ToilWDLStdLibBase) -> Generator[None, N
:param standard_library: a standard library object
:return
"""
# We're doing this because while miniwdl recognizes when a string needs to be converted into a file, it's method of
# We're doing this because while miniwdl recognizes when a string needs to be converted into a file, its method of
# conversion is to just store the local filepath. Toil needs to virtualize the file into the jobstore so until
# there is an internal entrypoint, monkeypatch it.
def base_coerce(self: WDL.Value.Base, desired_type: Optional[WDL.Type.Base] = None) -> WDL.Value.Base:
if isinstance(desired_type, WDL.Type.File):
self.value = standard_library._virtualize_filename(self.value)
return self
return old_base_coerce(self, desired_type) # old_coerce will recurse back into this monkey patched coerce

def string_coerce(self: WDL.Value.String, desired_type: Optional[WDL.Type.Base] = None) -> WDL.Value.Base:
# Sometimes string coerce is called instead, so monkeypatch this one as well
if isinstance(desired_type, WDL.Type.File) and not isinstance(self, WDL.Value.File):
if os.path.isfile(os.path.join(standard_library.execution_dir or ".", self.value)):
return WDL.Value.File(standard_library._virtualize_filename(self.value), self.expr)
else:
return WDL.Value.File(TOIL_NONEXISTENT_URI_SCHEME + self.value, self.expr)
return old_str_coerce(self, desired_type)
# there is a proper hook, monkeypatch it.

SelfType = TypeVar("SelfType", bound=WDL.Value.Base)
def make_coerce(old_coerce: Callable[[SelfType, Optional[WDL.Type.Base]], WDL.Value.Base]) -> Callable[[Arg(SelfType, 'self'), DefaultArg(Optional[WDL.Type.Base], 'desired_type')], WDL.Value.Base]:
"""
Stamp out a replacement coerce method that calls the given original one.
"""
def coerce(self: SelfType, desired_type: Optional[WDL.Type.Base] = None) -> WDL.Value.Base:
if isinstance(desired_type, WDL.Type.File) and not isinstance(self, WDL.Value.File):
# Coercing something to File.
if not is_url(self.value) and not os.path.isfile(os.path.join(standard_library.execution_dir or ".", self.value)):
# It is a local file that isn't there.
return WDL.Value.File(TOIL_NONEXISTENT_URI_SCHEME + self.value, self.expr)
else:
# Virtualize normally
return WDL.Value.File(standard_library._virtualize_filename(self.value), self.expr)
return old_coerce(self, desired_type)

return coerce

old_base_coerce = WDL.Value.Base.coerce
old_str_coerce = WDL.Value.String.coerce
try:
# Mypy does not like monkeypatching:
# https://github.com/python/mypy/issues/2427#issuecomment-1419206807
WDL.Value.Base.coerce = base_coerce # type: ignore[method-assign]
WDL.Value.String.coerce = string_coerce # type: ignore[method-assign]
WDL.Value.Base.coerce = make_coerce(old_base_coerce) # type: ignore[method-assign]
WDL.Value.String.coerce = make_coerce(old_str_coerce) # type: ignore[method-assign]
yield
finally:
WDL.Value.Base.coerce = old_base_coerce # type: ignore[method-assign]
Expand Down Expand Up @@ -3598,7 +3614,7 @@ def main() -> None:
output_bindings = toil.start(root_job)
if not isinstance(output_bindings, WDL.Env.Bindings):
raise RuntimeError("The output of the WDL job is not a binding.")

devirtualization_state: DirectoryNamingStateDict = {}
devirtualized_to_virtualized: Dict[str, str] = dict()
virtualized_to_devirtualized: Dict[str, str] = dict()
Expand Down

0 comments on commit f23b1d3

Please sign in to comment.