Skip to content

Commit

Permalink
Enable sync patching fsspec for file transfers
Browse files Browse the repository at this point in the history
  • Loading branch information
moradology committed Jun 4, 2024
1 parent 6fa56ea commit 7167f96
Showing 1 changed file with 10 additions and 1 deletion.
11 changes: 10 additions & 1 deletion pangeo_forge_recipes/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,16 @@ class TransferFilesWithConcurrency(beam.DoFn):
concurrency_per_executor: The number of concurrent threads per executor.
secrets: Optional dictionary containing secrets required for accessing the transfer target.
open_kwargs: Optional dictionary of keyword arguments to be passed when opening files.
fsspec_sync_patch: Experimental. Likely slower. When enabled, this attempts to
replace asynchronous code with synchronous implementations to potentially address
deadlocking issues. cf. https://github.com/h5py/h5py/issues/2019
"""

transfer_target: CacheFSSpecTarget
max_concurrency: int
secrets: Optional[Dict] = None
open_kwargs: Optional[Dict] = None
fsspec_sync_patch: bool = False

def process(self, indexed_urls):
with ThreadPoolExecutor(max_workers=self.max_concurrency) as executor:
Expand All @@ -170,7 +174,7 @@ def process(self, indexed_urls):

def transfer_file(self, index: Index, url: str) -> Tuple[Index, str]:
open_kwargs = self.open_kwargs or {}
self.transfer_target.cache_file(url, self.secrets, **open_kwargs)
self.transfer_target.cache_file(url, self.secrets, self.fsspec_sync_patch, **open_kwargs)
return (index, self.transfer_target._full_path(url))


Expand All @@ -188,13 +192,17 @@ class CheckpointFileTransfer(beam.PTransform):
number to limit total cluster concurrency. Default is 20.
concurrency_per_executor (Optional[int]): The number of concurrent threads per executor.
Default is 10.
fsspec_sync_patch: Experimental. Likely slower. When enabled, this attempts to
replace asynchronous code with synchronous implementations to potentially address
deadlocking issues. cf. https://github.com/h5py/h5py/issues/2019
"""

transfer_target: Union[str, CacheFSSpecTarget]
secrets: Optional[dict] = None
open_kwargs: Optional[dict] = None
max_executors: int = 20
concurrency_per_executor: int = 10
fsspec_sync_patch: bool = False

def assign_keys(self, element) -> Tuple[int, Any]:
index, url = element
Expand All @@ -219,6 +227,7 @@ def expand(self, pcoll):
secrets=self.secrets,
open_kwargs=self.open_kwargs,
max_concurrency=self.concurrency_per_executor,
fsspec_sync_patch=self.fsspec_sync_patch,
)
)
)
Expand Down

0 comments on commit 7167f96

Please sign in to comment.