Skip to content

Commit

Permalink
feat: provide setting for latency wait
Browse files Browse the repository at this point in the history
  • Loading branch information
johanneskoester committed Feb 18, 2024
1 parent 16a3eb4 commit a947732
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 7 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ python = "^3.11"
snakemake-interface-common = "^1.14.2"
snakemake-interface-storage-plugins = "^3.0.0"
sysrsync = "^1.1.1"
reretry = "^0.11.8"


[tool.poetry.group.dev.dependencies]
Expand Down
45 changes: 38 additions & 7 deletions snakemake_storage_plugin_fs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
from dataclasses import dataclass, field
from functools import wraps
import os
from pathlib import Path
import shutil
import subprocess
import time
from typing import Any, Iterable, List, Optional

import sysrsync
from reretry import retry

from snakemake_interface_common.logging import get_logger
from snakemake_interface_common.exceptions import WorkflowError
from snakemake_interface_storage_plugins.storage_provider import (
StorageProviderBase,
Expand All @@ -26,6 +31,18 @@
)


@dataclass
class StorageProviderSettings(StorageProviderSettingsBase):
latency_wait: int = field(
default=1,
metadata={
"help": "Time in seconds to wait until retry if file operation is not "
"successfull. This is useful to deal with filesystem latency as it can "
"occur with network filesystems. Default is 1 second.",
},
)


# Required:
# Implementation of your storage provider
# This class can be empty as the one below.
Expand Down Expand Up @@ -104,6 +121,16 @@ def list_objects(self, query: Any) -> Iterable[str]:
return ()


def latency_wait(f):
@wraps(f)
def wrapper(self, *args, **kwargs):
return retry(
tries=2, delay=self.provider.settings.latency_wait, logger=get_logger()
)(f)

return wrapper


# Required:
# Implementation of storage object. If certain methods cannot be supported by your
# storage (e.g. because it is read-only see
Expand Down Expand Up @@ -135,11 +162,11 @@ async def inventory(self, cache: IOCacheStorageInterface):
# already inventorized, stop here
return

try:
stat = self._stat()
except FileNotFoundError:
if not self.exists():
cache.exists_in_storage[key] = False
return

stat = self._stat()
if self.query_path.is_symlink():
# get symlink stat
lstat = self._stat(follow_symlinks=False)
Expand Down Expand Up @@ -170,12 +197,15 @@ def cleanup(self):
# Nothing to be done here.
pass

# Fallible methods should implement some retry logic.
# The easiest way to do this (but not the only one) is to use the retry_decorator
# provided by snakemake-interface-storage-plugins.
def exists(self) -> bool:
# return True if the object exists
return self.query_path.exists()
exists = self.query_path.exists()
if not exists and self.provider.settings.latency_wait:
# Retry once
time.sleep(self.provider.settings.latency_wait)
return self.query_path.exists()
else:
return exists

def mtime(self) -> float:
# return the modification time
Expand Down Expand Up @@ -226,6 +256,7 @@ def list_candidate_matches(self) -> Iterable[str]:
else:
return (prefix,)

@latency_wait
def _stat(self, follow_symlinks: bool = True):
# We don't want the cached variant (Path.stat), as we cache ourselves in
# inventory and afterwards the information may change.
Expand Down

0 comments on commit a947732

Please sign in to comment.