From 0b01bfbe6a694f73016f86a6c6195c6a57b9fd40 Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Wed, 6 Dec 2023 16:12:22 +0100 Subject: [PATCH] second rev on group apis - removed abcs for groups/arrays - improved return types in group.py - warn (temporarily) when an implicit group is found - add attributes.py with Attributes class add test file wip --- src/zarr/v3/abc/array.py | 140 ---------------------- src/zarr/v3/abc/codec.py | 2 +- src/zarr/v3/abc/group.py | 206 --------------------------------- src/zarr/v3/abc/store.py | 33 +++++- src/zarr/v3/array.py | 32 +++-- src/zarr/v3/attributes.py | 32 +++++ src/zarr/v3/codecs/sharding.py | 2 +- src/zarr/v3/group.py | 172 ++++++++++++++++----------- src/zarr/v3/store.py | 8 ++ src/zarr/v3/sync.py | 13 +-- tests/test_group_v3.py | 56 +++++++++ zarr/v3/stores/__init__.py | 4 + zarr/v3/stores/core.py | 160 +++++++++++++++++++++++++ zarr/v3/stores/local.py | 165 ++++++++++++++++++++++++++ zarr/v3/stores/memory.py | 60 ++++++++++ zarr/v3/stores/remote.py | 90 ++++++++++++++ 16 files changed, 729 insertions(+), 446 deletions(-) delete mode 100644 src/zarr/v3/abc/array.py delete mode 100644 src/zarr/v3/abc/group.py create mode 100644 src/zarr/v3/attributes.py create mode 100644 tests/test_group_v3.py create mode 100644 zarr/v3/stores/__init__.py create mode 100644 zarr/v3/stores/core.py create mode 100644 zarr/v3/stores/local.py create mode 100644 zarr/v3/stores/memory.py create mode 100644 zarr/v3/stores/remote.py diff --git a/src/zarr/v3/abc/array.py b/src/zarr/v3/abc/array.py deleted file mode 100644 index ed6a9cf769..0000000000 --- a/src/zarr/v3/abc/array.py +++ /dev/null @@ -1,140 +0,0 @@ -from __future__ import annotations -from abc import abstractproperty, abstractmethod, ABC -from typing import Tuple, Any, Dict - -import numpy as np - -from zarr.v3.abc.store import ReadStore, WriteStore -from zarr.v3.common import Selection - - -class BaseArray(ABC): - @abstractproperty - def store_path(self) -> str: # TODO: rename to `path`? - """Path to this array in the underlying store.""" - ... - - @abstractproperty - def dtype(self) -> np.dtype: - """Data type of the array elements. - - Returns - ------- - dtype - array data type - """ - ... - - @abstractproperty - def ndim(self) -> int: - """Number of array dimensions (axes). - - Returns - ------- - int - number of array dimensions (axes) - """ - ... - - @abstractproperty - def shape(self) -> Tuple[int, ...]: - """Array dimensions. - - Returns - ------- - tuple of int - array dimensions - """ - ... - - @abstractproperty - def size(self) -> int: - """Number of elements in the array. - - Returns - ------- - int - number of elements in an array. - """ - - @abstractproperty - def attrs(self) -> Dict[str, Any]: - """Array attributes. - - Returns - ------- - dict - user defined attributes - """ - ... - - @abstractproperty - def info(self) -> Any: - """Report some diagnostic information about the array. - - Returns - ------- - out - """ - ... - - -class AsyncArray(BaseArray): - """This class can be implemented as a v2 or v3 array""" - - @classmethod - @abstractmethod - async def from_json(cls, zarr_json: Any, store: ReadStore) -> AsyncArray: - ... - - @classmethod - @abstractmethod - async def open(cls, store: ReadStore) -> AsyncArray: - ... - - @classmethod - @abstractmethod - async def create(cls, store: WriteStore, *, shape, **kwargs) -> AsyncArray: - ... - - @abstractmethod - async def getitem(self, selection: Selection): - ... - - @abstractmethod - async def setitem(self, selection: Selection, value: np.ndarray) -> None: - ... - - -class SyncArray(BaseArray): - """ - This class can be implemented as a v2 or v3 array - """ - - @classmethod - @abstractmethod - def from_json(cls, zarr_json: Any, store: ReadStore) -> SyncArray: - ... - - @classmethod - @abstractmethod - def open(cls, store: ReadStore) -> SyncArray: - ... - - @classmethod - @abstractmethod - def create(cls, store: WriteStore, *, shape, **kwargs) -> SyncArray: - ... - - @abstractmethod - def __getitem__(self, selection: Selection): # TODO: type as np.ndarray | scalar - ... - - @abstractmethod - def __setitem__(self, selection: Selection, value: np.ndarray) -> None: - ... - - # some day ;) - # @property - # def __array_api_version__(self) -> str: - # return "2022.12" diff --git a/src/zarr/v3/abc/codec.py b/src/zarr/v3/abc/codec.py index c81f2c976f..f48419e00e 100644 --- a/src/zarr/v3/abc/codec.py +++ b/src/zarr/v3/abc/codec.py @@ -16,7 +16,7 @@ import numpy as np from zarr.v3.common import BytesLike, SliceSelection -from zarr.v3.store import StorePath +from zarr.v3.stores import StorePath if TYPE_CHECKING: diff --git a/src/zarr/v3/abc/group.py b/src/zarr/v3/abc/group.py deleted file mode 100644 index 60cda07d0e..0000000000 --- a/src/zarr/v3/abc/group.py +++ /dev/null @@ -1,206 +0,0 @@ -from __future__ import annotations - -from abc import abstractproperty, abstractmethod, ABC -from collections.abc import MutableMapping -from typing import Dict, Any, AsyncIterator, Union, Iterator - -from zarr.v3.abc.array import AsyncArray, SyncArray - - -class BaseGroup(ABC): - @abstractproperty - def attrs(self) -> Dict[str, Any]: - """User-defined attributes.""" - ... - - @abstractproperty - def info(self) -> Any: # TODO: type this later - """Return diagnostic information about the group.""" - ... - - -class AsyncGroup(BaseGroup): - @abstractmethod - async def nchildren(self) -> int: - ... - - @abstractmethod - async def children(self) -> AsyncIterator: - ... - - @abstractmethod - async def contains(self, child: str) -> bool: - """check if child exists""" - ... - - @abstractmethod - async def getitem(self, child: str) -> Union[AsyncArray, "AsyncGroup"]: - """get child""" - ... - - @abstractmethod - async def group_keys(self) -> AsyncIterator[str]: - """iterate over child group keys""" - ... - - @abstractmethod - async def groups(self) -> AsyncIterator["AsyncGroup"]: - """iterate over child groups""" - ... - - @abstractmethod - async def array_keys(self) -> AsyncIterator[str]: - """iterate over child array keys""" - ... - - @abstractmethod - async def arrays(self) -> AsyncIterator[AsyncArray]: - """iterate over child arrays""" - ... - - @abstractmethod - async def tree(self, expand=False, level=None) -> Any: # TODO: type this later - ... - - @abstractmethod - async def create_group(self, name: str, **kwargs) -> "AsyncGroup": - ... - - @abstractmethod - async def create_array(self, name: str, **kwargs) -> AsyncArray: - ... - - @abstractmethod - async def empty(self, **kwargs) -> AsyncArray: - ... - - @abstractmethod - async def zeros(self, **kwargs) -> AsyncArray: - ... - - @abstractmethod - async def ones(self, **kwargs) -> AsyncArray: - ... - - @abstractmethod - async def full(self, **kwargs) -> AsyncArray: - ... - - @abstractmethod - async def empty_like(self, prototype: AsyncArray, **kwargs) -> AsyncArray: - ... - - @abstractmethod - async def zeros_like(self, prototype: AsyncArray, **kwargs) -> AsyncArray: - ... - - @abstractmethod - async def ones_like(self, prototype: AsyncArray, **kwargs) -> AsyncArray: - ... - - @abstractmethod - async def full_like(self, prototype: AsyncArray, **kwargs) -> AsyncArray: - ... - - @abstractmethod - async def move(self, source: str, dest: str) -> None: - ... - - # TODO / maybes: - # store_path (rename to path?) - # visit - # visitkeys - # visitvalues - - -class SyncGroup(BaseGroup, MutableMapping): - @abstractproperty - def nchildren(self) -> int: - ... - - @abstractproperty - def children(self) -> Iterator: - ... - - @abstractmethod - def __contains__(self, child: str) -> bool: - """check if child exists""" - ... - - @abstractmethod - def __getitem__(self, child: str) -> Union[SyncArray, "SyncGroup"]: - """get child""" - ... - - @abstractmethod - def __setitem__(self, key: str, value: Union[SyncArray, "SyncGroup"]) -> None: - """get child""" - ... - - @abstractmethod - def group_keys(self) -> AsyncIterator[str]: - """iterate over child group keys""" - ... - - @abstractmethod - def groups(self) -> AsyncIterator["SyncGroup"]: - """iterate over child groups""" - ... - - @abstractmethod - def array_keys(self) -> AsyncIterator[str]: - """iterate over child array keys""" - ... - - @abstractmethod - def arrays(self) -> AsyncIterator[SyncArray]: - """iterate over child arrays""" - ... - - @abstractmethod - def tree(self) -> Any: - ... - - @abstractmethod - def create_group(self, name: str, **kwargs) -> "SyncGroup": - ... - - @abstractmethod - def create_array(self, name: str, **kwargs) -> SyncArray: - ... - - @abstractmethod - def empty(self, **kwargs) -> SyncArray: - ... - - @abstractmethod - def zeros(self, **kwargs) -> SyncArray: - ... - - @abstractmethod - def ones(self, **kwargs) -> SyncArray: - ... - - @abstractmethod - def full(self, **kwargs) -> SyncArray: - ... - - @abstractmethod - def empty_like(self, prototype: SyncArray, **kwargs) -> SyncArray: - ... - - @abstractmethod - def zeros_like(self, prototype: SyncArray, **kwargs) -> SyncArray: - ... - - @abstractmethod - def ones_like(self, prototype: SyncArray, **kwargs) -> SyncArray: - ... - - @abstractmethod - def full_like(self, prototype: SyncArray, **kwargs) -> SyncArray: - ... - - @abstractmethod - def move(self, source: str, dest: str) -> None: - ... diff --git a/src/zarr/v3/abc/store.py b/src/zarr/v3/abc/store.py index 5469cafe6d..abe4aa3d5d 100644 --- a/src/zarr/v3/abc/store.py +++ b/src/zarr/v3/abc/store.py @@ -23,12 +23,12 @@ async def get(self, key: str) -> bytes: ... @abstractmethod - async def get_partial_values(self, key_ranges: List[Tuple[str, int]]) -> bytes: + async def get_partial_values(self, key_ranges: List[Tuple[str, Tuple[int, int]]]) -> List[bytes]: """Retrieve possibly partial values from given key_ranges. Parameters ---------- - key_ranges : list[tuple[str, int]] + key_ranges : list[tuple[str, tuple[int, int]]] Ordered set of key, range pairs, a key may occur multiple times with different ranges Returns @@ -38,6 +38,19 @@ async def get_partial_values(self, key_ranges: List[Tuple[str, int]]) -> bytes: """ ... + async def exists(self, key: str) -> bool: + """Check if a key exists in the store. + + Parameters + ---------- + key : str + + Returns + ------- + bool + """ + ... + class WriteStore(ReadStore): @abstractmethod @@ -51,6 +64,20 @@ async def set(self, key: str, value: bytes) -> None: """ ... + async def delete(self, key: str) -> None + """Remove a key from the store + + Parameters + ---------- + key : str + """ + ... + + +class PartialWriteStore(WriteStore): + # TODO, instead of using this, should we just check if the store is a PartialWriteStore? + supports_partial_writes = True + @abstractmethod async def set_partial_values(self, key_start_values: List[Tuple[str, int, bytes]]) -> None: """Store values at a given key, starting at byte range_start. @@ -78,7 +105,7 @@ async def list(self) -> List[str]: @abstractmethod async def list_prefix(self, prefix: str) -> List[str]: - """Retrieve all keys in the store. + """Retrieve all keys in the store with a given prefix. Parameters ---------- diff --git a/src/zarr/v3/array.py b/src/zarr/v3/array.py index 123407fa0f..d55a5aee43 100644 --- a/src/zarr/v3/array.py +++ b/src/zarr/v3/array.py @@ -1,6 +1,5 @@ # Notes on what I've changed here: # 1. Split Array into AsyncArray and Array -# 2. Inherit from abc (SyncArray, AsyncArray) # 3. Added .size and .attrs methods # 4. Temporarily disabled the creation of ArrayV2 # 5. Added from_json to AsyncArray @@ -17,7 +16,6 @@ import numpy as np from attr import evolve, frozen -from zarr.v3.abc.array import SyncArray, AsyncArray from zarr.v3.abc.codec import ArrayBytesCodecPartialDecodeMixin @@ -49,7 +47,7 @@ @frozen -class AsyncArray(AsyncArray): +class AsyncArray: metadata: ArrayMetadata store_path: StorePath runtime_configuration: RuntimeConfiguration @@ -76,7 +74,7 @@ async def create( ) -> AsyncArray: store_path = make_store_path(store) if not exists_ok: - assert not await (store_path / ZARR_JSON).exists_async() + assert not await (store_path / ZARR_JSON).exists() data_type = ( DataType[dtype] if isinstance(dtype, str) else DataType[dtype_to_data_type[dtype.str]] @@ -154,7 +152,7 @@ async def open( runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), ) -> AsyncArray: store_path = make_store_path(store) - zarr_json_bytes = await (store_path / ZARR_JSON).get_async() + zarr_json_bytes = await (store_path / ZARR_JSON).get() assert zarr_json_bytes is not None return cls.from_json( store_path, @@ -169,7 +167,7 @@ async def open_auto( runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), ) -> AsyncArray: # TODO: Union[AsyncArray, ArrayV2] store_path = make_store_path(store) - v3_metadata_bytes = await (store_path / ZARR_JSON).get_async() + v3_metadata_bytes = await (store_path / ZARR_JSON).get() if v3_metadata_bytes is not None: return cls.from_json( store_path, @@ -178,7 +176,7 @@ async def open_auto( ) else: raise ValueError("no v2 support yet") - # return await ArrayV2.open_async(store_path) + # return await ArrayV2.open(store_path) @property def ndim(self) -> int: @@ -232,7 +230,7 @@ async def getitem(self, selection: Selection): async def _save_metadata(self) -> None: self._validate_metadata() - await (self.store_path / ZARR_JSON).set_async(self.metadata.to_bytes()) + await (self.store_path / ZARR_JSON).set(self.metadata.to_bytes()) def _validate_metadata(self) -> None: assert len(self.metadata.shape) == len( @@ -265,7 +263,7 @@ async def _read_chunk( else: out[out_selection] = self.metadata.fill_value else: - chunk_bytes = await store_path.get_async() + chunk_bytes = await store_path.get() if chunk_bytes is not None: chunk_array = await self.codec_pipeline.decode(chunk_bytes) tmp = chunk_array[chunk_selection] @@ -347,7 +345,7 @@ async def _write_chunk( else: # writing partial chunks # read chunk first - chunk_bytes = await store_path.get_async() + chunk_bytes = await store_path.get() # merge new value if chunk_bytes is None: @@ -367,13 +365,13 @@ async def _write_chunk( async def _write_chunk_to_store(self, store_path: StorePath, chunk_array: np.ndarray): if np.all(chunk_array == self.metadata.fill_value): # chunks that only contain fill_value will be removed - await store_path.delete_async() + await store_path.delete() else: chunk_bytes = await self.codec_pipeline.encode(chunk_array) if chunk_bytes is None: - await store_path.delete_async() + await store_path.delete() else: - await store_path.set_async(chunk_bytes) + await store_path.set(chunk_bytes) async def resize(self, new_shape: ChunkCoords) -> AsyncArray: assert len(new_shape) == len(self.metadata.shape) @@ -386,7 +384,7 @@ async def resize(self, new_shape: ChunkCoords) -> AsyncArray: new_chunk_coords = set(all_chunk_coords(new_shape, chunk_shape)) async def _delete_key(key: str) -> None: - await (self.store_path / key).delete_async() + await (self.store_path / key).delete() await concurrent_map( [ @@ -398,14 +396,14 @@ async def _delete_key(key: str) -> None: ) # Write new metadata - await (self.store_path / ZARR_JSON).set_async(new_metadata.to_bytes()) + await (self.store_path / ZARR_JSON).set(new_metadata.to_bytes()) return evolve(self, metadata=new_metadata) async def update_attributes(self, new_attributes: Dict[str, Any]) -> Array: new_metadata = evolve(self.metadata, attributes=new_attributes) # Write new metadata - await (self.store_path / ZARR_JSON).set_async(new_metadata.to_bytes()) + await (self.store_path / ZARR_JSON).set(new_metadata.to_bytes()) return evolve(self, metadata=new_metadata) def __repr__(self): @@ -416,7 +414,7 @@ async def info(self): @frozen -class Array(SyncArray): +class Array: _async_array: AsyncArray @classmethod diff --git a/src/zarr/v3/attributes.py b/src/zarr/v3/attributes.py new file mode 100644 index 0000000000..edbc84d8aa --- /dev/null +++ b/src/zarr/v3/attributes.py @@ -0,0 +1,32 @@ +from __future__ import annotations +from collections.abc import MutableMapping +from typing import TYPE_CHECKING, Any, Union + +if TYPE_CHECKING: + from zarr.v3.group import Group + from zarr.v3.array import Array + + +class Attributes(MutableMapping[str, Any]): + def __init__(self, obj: Union[Array, Group]): + # key=".zattrs", read_only=False, cache=True, synchronizer=None + self._obj = obj + + def __getitem__(self, key): + return self._obj.metadata.attributes[key] + + def __setitem__(self, key, value): + new_attrs = dict(self._obj.metadata.attributes) + new_attrs[key] = value + self._obj = self._obj.update_attributes(new_attrs) + + def __delitem__(self, key): + new_attrs = dict(self._obj.metadata.attributes) + del new_attrs[key] + self._obj = self._obj.update_attributes(new_attrs) + + def __iter__(self): + return iter(self._obj.metadata.attributes) + + def __len__(self): + return len(self._obj.metadata.attributes) diff --git a/src/zarr/v3/codecs/sharding.py b/src/zarr/v3/codecs/sharding.py index edbe327a6b..2694ee9f0b 100644 --- a/src/zarr/v3/codecs/sharding.py +++ b/src/zarr/v3/codecs/sharding.py @@ -43,7 +43,7 @@ CodecMetadata, ShardingCodecIndexLocation, ) -from zarr.v3.store import StorePath +from zarr.v3.stores import StorePath MAX_UINT_64 = 2**64 - 1 diff --git a/src/zarr/v3/group.py b/src/zarr/v3/group.py index 68cd43aa26..9058f8f64b 100644 --- a/src/zarr/v3/group.py +++ b/src/zarr/v3/group.py @@ -2,17 +2,20 @@ import asyncio import json +import logging from typing import Any, Dict, Literal, Optional, Union, AsyncIterator, Iterator, List -from attr import asdict, evolve, field, frozen # , validators +from attr import asdict, field, frozen # , validators -from zarr.v3.abc import AsyncGroup, SyncGroup from zarr.v3.array import AsyncArray, Array +from zarr.v3.attributes import Attributes from zarr.v3.common import ZARR_JSON, ZARRAY_JSON, ZATTRS_JSON, ZGROUP_JSON, make_cattr from zarr.v3.config import RuntimeConfiguration, SyncConfiguration from zarr.v3.store import StoreLike, StorePath, make_store_path from zarr.v3.sync import SyncMixin +logger = logging.getLogger("zarr.group") + @frozen class GroupMetadata: @@ -37,7 +40,7 @@ def from_json(cls, zarr_json: Any) -> GroupMetadata: @frozen -class AsyncGroup(AsyncGroup): +class AsyncGroup: metadata: GroupMetadata store_path: StorePath runtime_configuration: RuntimeConfiguration @@ -51,14 +54,13 @@ async def create( exists_ok: bool = False, zarr_format: Literal[2, 3] = 3, # field(default=3, validator=validators.in_([2, 3])), runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), - ) -> "AsyncGroup": + ) -> AsyncGroup: store_path = make_store_path(store) if not exists_ok: if zarr_format == 3: - assert not await (store_path / ZARR_JSON).exists_async() + assert not await (store_path / ZARR_JSON).exists() elif zarr_format == 2: - assert not await (store_path / ZGROUP_JSON).exists_async() - print(zarr_format, type(zarr_format)) + assert not await (store_path / ZGROUP_JSON).exists() group = cls( metadata=GroupMetadata(attributes=attributes or {}, zarr_format=zarr_format), store_path=store_path, @@ -73,14 +75,14 @@ async def open( store: StoreLike, runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), zarr_format: Literal[2, 3] = 3, - ) -> "AsyncGroup": + ) -> AsyncGroup: store_path = make_store_path(store) # TODO: consider trying to autodiscover the zarr-format here if zarr_format == 3: # V3 groups are comprised of a zarr.json object # (it is optional in the case of implicit groups) - zarr_json_bytes = await (store_path / ZARR_JSON).get_async() + zarr_json_bytes = await (store_path / ZARR_JSON).get() zarr_json = ( json.loads(zarr_json_bytes) if zarr_json_bytes is not None else {"zarr_format": 3} ) @@ -89,7 +91,7 @@ async def open( # V2 groups are comprised of a .zgroup and .zattrs objects # (both are optional in the case of implicit groups) zgroup_bytes, zattrs_bytes = await asyncio.gather( - (store_path / ZGROUP_JSON).get_async(), (store_path / ZATTRS_JSON).get_async() + (store_path / ZGROUP_JSON).get(), (store_path / ZATTRS_JSON).get() ) zgroup = ( json.loads(json.loads(zgroup_bytes)) @@ -119,16 +121,17 @@ def from_json( async def getitem( self, key: str, - ) -> Union[Array, Group]: + ) -> Union[AsyncArray, AsyncGroup]: store_path = self.store_path / key - if self.zarr_format == 3: - zarr_json_bytes = await (store_path / ZARR_JSON).get_async() + if self.metadata.zarr_format == 3: + zarr_json_bytes = await (store_path / ZARR_JSON).get() if zarr_json_bytes is None: # implicit group? + logger.warning("group at {} is an implicit group", store_path) zarr_json = { - "zarr_format": self.zarr_format, + "zarr_format": self.metadata.zarr_format, "node_type": "group", "attributes": {}, } @@ -137,16 +140,16 @@ async def getitem( if zarr_json["node_type"] == "group": return type(self).from_json(store_path, zarr_json, self.runtime_configuration) if zarr_json["node_type"] == "array": - return Array.from_json( + return AsyncArray.from_json( store_path, zarr_json, runtime_configuration=self.runtime_configuration ) - elif self.zarr_format == 2: + elif self.metadata.zarr_format == 2: # Q: how do we like optimistically fetching .zgroup, .zarray, and .zattrs? # This guarantees that we will always make at least one extra request to the store zgroup_bytes, zarray_bytes, zattrs_bytes = await asyncio.gather( - (store_path / ZGROUP_JSON).get_async(), - (store_path / ZARRAY_JSON).get_async(), - (store_path / ZATTRS_JSON).get_async(), + (store_path / ZGROUP_JSON).get(), + (store_path / ZARRAY_JSON).get(), + (store_path / ZATTRS_JSON).get(), ) # unpack the zarray, if this is None then we must be opening a group @@ -157,19 +160,34 @@ async def getitem( if zarray is not None: # TODO: update this once the V2 array support is part of the primary array class zarr_json = {**zarray, "attributes": zattrs} - return Array.from_json( + return AsyncArray.from_json( store_path, zarray, runtime_configuration=self.runtime_configuration ) else: + if zgroup_bytes is None: + # implicit group? + logger.warning("group at {} is an implicit group", store_path) zgroup = ( json.loads(zgroup_bytes) if zgroup_bytes is not None - else {"zarr_format": self.zarr_format} + else {"zarr_format": self.metadata.zarr_format} ) zarr_json = {**zgroup, "attributes": zattrs} return type(self).from_json(store_path, zarr_json, self.runtime_configuration) else: - raise ValueError(f"unexpected zarr_format: {self.zarr_format}") + raise ValueError(f"unexpected zarr_format: {self.metadata.zarr_format}") + + async def delitem(self, key: str) -> None: + store_path = self.store_path / key + if self.metadata.zarr_format == 3: + await (store_path / ZARR_JSON).delete() + elif self.metadata.zarr_format == 2: + await asyncio.gather( + (store_path / ZGROUP_JSON).delete(), # TODO: missing_ok=False + (store_path / ZATTRS_JSON).delete(), # TODO: missing_ok=True + ) + else: + raise ValueError(f"unexpected zarr_format: {self.metadata.zarr_format}") async def _save_metadata(self) -> None: to_save = self.metadata.to_bytes() @@ -184,7 +202,7 @@ def attrs(self): def info(self): return self.metadata.info - async def create_group(self, path: str, **kwargs) -> Group: + async def create_group(self, path: str, **kwargs) -> AsyncGroup: runtime_configuration = kwargs.pop("runtime_configuration", self.runtime_configuration) return await type(self).create( self.store_path / path, @@ -192,7 +210,7 @@ async def create_group(self, path: str, **kwargs) -> Group: **kwargs, ) - async def create_array(self, path: str, **kwargs) -> Array: + async def create_array(self, path: str, **kwargs) -> AsyncArray: runtime_configuration = kwargs.pop("runtime_configuration", self.runtime_configuration) return await AsyncArray.create( self.store_path / path, @@ -201,24 +219,30 @@ async def create_array(self, path: str, **kwargs) -> Array: ) async def update_attributes(self, new_attributes: Dict[str, Any]): - new_metadata = evolve(self.metadata, attributes=new_attributes) + # metadata.attributes is "frozen" so we simply clear and update the dict + self.metadata.attributes.clear() + self.metadata.attributes.update(new_attributes) # Write new metadata - to_save = new_metadata.to_bytes() - if new_metadata.zarr_format == 2: + to_save = self.metadata.to_bytes() + if self.metadata.zarr_format == 2: # only save the .zattrs object await (self.store_path / ZATTRS_JSON).set_async(to_save[ZATTRS_JSON]) else: await (self.store_path / ZARR_JSON).set_async(to_save[ZARR_JSON]) - return evolve(self, metadata=new_metadata) + + self.metadata.attributes.clear() + self.metadata.attributes.update(new_attributes) + + return self def __repr__(self): - return f"" + return f"" async def nchildren(self) -> int: raise NotImplementedError - async def children(self) -> AsyncIterator[AsyncArray, "AsyncGroup"]: + async def children(self) -> AsyncIterator[AsyncArray, AsyncGroup]: raise NotImplementedError async def contains(self, child: str) -> bool: @@ -227,7 +251,7 @@ async def contains(self, child: str) -> bool: async def group_keys(self) -> AsyncIterator[str]: raise NotImplementedError - async def groups(self) -> AsyncIterator["AsyncGroup"]: + async def groups(self) -> AsyncIterator[AsyncGroup]: raise NotImplementedError async def array_keys(self) -> AsyncIterator[str]: @@ -268,9 +292,9 @@ async def move(self, source: str, dest: str) -> None: @frozen -class Group(SyncGroup, SyncMixin): +class Group(SyncMixin): _async_group: AsyncGroup - _sync_configuration: field(factory=SyncConfiguration) + _sync_configuration: SyncConfiguration = field(init=True, default=SyncConfiguration()) @classmethod def create( @@ -281,32 +305,35 @@ def create( exists_ok: bool = False, runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), ) -> Group: - return cls._sync( - AsyncGroup.create, - store, - attributes=attributes, - exists_ok=exists_ok, - runtime_configuration=runtime_configuration, + obj = cls._sync( + AsyncGroup.create( + store, + attributes=attributes, + exists_ok=exists_ok, + runtime_configuration=runtime_configuration, + ) ) + return cls(obj) + @classmethod def open( cls, store: StoreLike, runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), ) -> Group: - obj = cls._sync(AsyncGroup.open, store, runtime_configuration) + obj = cls._sync(AsyncGroup.open(store, runtime_configuration)) return cls(obj) def __getitem__(self, path: str) -> Union[Array, Group]: - obj = self._sync(self._async_group.getitem, path) + obj = self._sync(self._async_group.getitem(path)) if isinstance(obj, AsyncArray): return Array(obj) else: return Group(obj) - def __delitem__(self, key): - raise NotImplementedError + def __delitem__(self, key) -> None: + self._sync(self._async_group.delitem(path)) def __iter__(self): raise NotImplementedError @@ -315,18 +342,23 @@ def __len__(self): raise NotImplementedError def __setitem__(self, key, value): + """__setitem__ is not supported in v3""" raise NotImplementedError @property - def attrs(self): - return self._async_group.attrs + def metadata(self) -> GroupMetadata: + return self._async_group.metadata + + @property + def attrs(self) -> Attributes: + return Attributes(self) @property def info(self): return self._async_group.info def update_attributes(self, new_attributes: Dict[str, Any]): - self._sync(self._async_group.update_attributes, new_attributes) + self._sync(self._async_group.update_attributes(new_attributes)) return self @property @@ -334,17 +366,17 @@ def nchildren(self) -> int: return self._sync(self._async_group.nchildren) @property - def children(self) -> List[Array, "Group"]: + def children(self) -> List[Array, Group]: _children = self._sync_iter(self._async_group.children) return [Array(obj) if isinstance(obj, AsyncArray) else Group(obj) for obj in _children] def __contains__(self, child) -> bool: - return self._sync(self._async_group.contains, child) + return self._sync(self._async_group.contains(child)) def group_keys(self) -> Iterator[str]: return self._sync_iter(self._async_group.group_keys) - def groups(self) -> List["Group"]: + def groups(self) -> List[Group]: # TODO: in v2 this was a generator that return key: Group return [Group(obj) for obj in self._sync_iter(self._async_group.groups)] @@ -355,37 +387,37 @@ def arrays(self) -> List[Array]: return [Array(obj) for obj in self._sync_iter(self._async_group.arrays)] def tree(self, expand=False, level=None) -> Any: - return self._sync(self._async_group.tree, expand=expand, level=level) + return self._sync(self._async_group.tree(expand=expand, level=level)) - def create_group(self, name: str, **kwargs) -> "Group": - return Group(self._sync(self._async_group.create_group, name, **kwargs)) + def create_group(self, name: str, **kwargs) -> Group: + return Group(self._sync(self._async_group.create_group(name, **kwargs))) def create_array(self, name: str, **kwargs) -> Array: - return Array(self._sync(self._async_group.create_array, name, **kwargs)) + return Array(self._sync(self._async_group.create_array(name, **kwargs))) - def empty(self, **kwargs) -> "Array": - return Array(self._sync(self._async_group.empty, **kwargs)) + def empty(self, **kwargs) -> Array: + return Array(self._sync(self._async_group.empty(**kwargs))) - def zeros(self, **kwargs) -> "Array": - return Array(self._sync(self._async_group.zeros, **kwargs)) + def zeros(self, **kwargs) -> Array: + return Array(self._sync(self._async_group.zeros(**kwargs))) - def ones(self, **kwargs) -> "Array": - return Array(self._sync(self._async_group.ones, **kwargs)) + def ones(self, **kwargs) -> Array: + return Array(self._sync(self._async_group.ones(**kwargs))) - def full(self, **kwargs) -> "Array": - return Array(self._sync(self._async_group.full, **kwargs)) + def full(self, **kwargs) -> Array: + return Array(self._sync(self._async_group.full(**kwargs))) - def empty_like(self, prototype: AsyncArray, **kwargs) -> "Array": - return Array(self._sync(self._async_group.empty_like, prototype, **kwargs)) + def empty_like(self, prototype: AsyncArray, **kwargs) -> Array: + return Array(self._sync(self._async_group.empty_like(prototype, **kwargs))) - def zeros_like(self, prototype: AsyncArray, **kwargs) -> "Array": - return Array(self._sync(self._async_group.zeros_like, prototype, **kwargs)) + def zeros_like(self, prototype: AsyncArray, **kwargs) -> Array: + return Array(self._sync(self._async_group.zeros_like(prototype, **kwargs))) - def ones_like(self, prototype: AsyncArray, **kwargs) -> "Array": - return Array(self._sync(self._async_group.ones_like, prototype, **kwargs)) + def ones_like(self, prototype: AsyncArray, **kwargs) -> Array: + return Array(self._sync(self._async_group.ones_like(prototype, **kwargs))) - def full_like(self, prototype: AsyncArray, **kwargs) -> "Array": - return Array(self._sync(self._async_group.full_like, prototype, **kwargs)) + def full_like(self, prototype: AsyncArray, **kwargs) -> Array: + return Array(self._sync(self._async_group.full_like(prototype, **kwargs))) def move(self, source: str, dest: str) -> None: - return self._sync(self._async_group.move, source, dest) + return self._sync(self._async_group.move(source, dest)) diff --git a/src/zarr/v3/store.py b/src/zarr/v3/store.py index b6c20be41f..262cd6481a 100644 --- a/src/zarr/v3/store.py +++ b/src/zarr/v3/store.py @@ -65,6 +65,14 @@ def __str__(self) -> str: def __repr__(self) -> str: return f"StorePath({self.store.__class__.__name__}, {repr(str(self))})" + def __eq__(self, other: Any) -> bool: + try: + if self.store == other.store and self.path == other.path: + return True + except Exception: + pass + return False + class Store: supports_partial_writes = False diff --git a/src/zarr/v3/sync.py b/src/zarr/v3/sync.py index 9ce5f6892a..e88c8e93f2 100644 --- a/src/zarr/v3/sync.py +++ b/src/zarr/v3/sync.py @@ -1,7 +1,6 @@ from __future__ import annotations import asyncio -import functools import threading from typing import Any, Coroutine, List, Optional @@ -95,14 +94,12 @@ class SyncMixin: _sync_configuration: SyncConfiguration - def _sync(self, method, *args, **kwargs): # TODO: type this - @functools.wraps(method) - def wrap(*args, **kwargs): - return sync(method, *args, loop=self._sync_configuration.asyncio_loop, **kwargs) + def _sync(self, coroutine: Coroutine): # TODO: type this + # TODO: refactor this to to take *args and **kwargs and pass those to the method + # this should allow us to better type the sync wrapper + return sync(coroutine, loop=self._sync_configuration.asyncio_loop) - return wrap(*args, **kwargs) - - def _sync_iter(self, func, *args, **kwargs) -> List[Any]: # TODO: type this + def _sync_iter(self, func: Coroutine, *args, **kwargs) -> List[Any]: # TODO: type this async def iter_to_list() -> List[Any]: # TODO: replace with generators so we don't materialize the entire iterator at once return [item async for item in func(*args, **kwargs)] diff --git a/tests/test_group_v3.py b/tests/test_group_v3.py new file mode 100644 index 0000000000..4e7179376b --- /dev/null +++ b/tests/test_group_v3.py @@ -0,0 +1,56 @@ +import pytest +import numpy as np + +from zarr.v3.group import AsyncGroup, Group, GroupMetadata +from zarr.v3.store import LocalStore, StorePath +from zarr.v3.config import RuntimeConfiguration + + +@pytest.fixture +def store_path(tmpdir): + store = LocalStore(str(tmpdir)) + p = StorePath(store) + return p + + +def test_group(store_path) -> None: + + agroup = AsyncGroup( + metadata=GroupMetadata(), + store_path=store_path, + runtime_configuration=RuntimeConfiguration(), + ) + group = Group(agroup) + + assert agroup.metadata is group.metadata + + # create two groups + foo = group.create_group("foo") + bar = foo.create_group("bar", attributes={"baz": "qux"}) + + # create an array from the "bar" group + data = np.arange(0, 4 * 4, dtype="uint16").reshape((4, 4)) + arr = bar.create_array( + "baz", shape=data.shape, dtype=data.dtype, chunk_shape=(2, 2), exists_ok=True + ) + arr[:] = data + + # check the array + assert arr == bar["baz"] + assert arr.shape == data.shape + assert arr.dtype == data.dtype + + # TODO: update this once the array api settles down + # assert arr.chunk_shape == (2, 2) + + bar2 = foo["bar"] + assert dict(bar2.attrs) == {"baz": "qux"} + + # update a group's attributes + bar2.attrs.update({"name": "bar"}) + # bar.attrs was modified in-place + assert dict(bar2.attrs) == {"baz": "qux", "name": "bar"} + + # and the attrs were modified in the store + bar3 = foo["bar"] + assert dict(bar3.attrs) == {"baz": "qux", "name": "bar"} diff --git a/zarr/v3/stores/__init__.py b/zarr/v3/stores/__init__.py new file mode 100644 index 0000000000..dd2a28596a --- /dev/null +++ b/zarr/v3/stores/__init__.py @@ -0,0 +1,4 @@ +from zarr.v3.stores.core import StorePath +from zarr.v3.stores.remote import RemoteStore +from zarr.v3.stores.local import LocalStore +from zarr.v3.stores.memory import MemoryStore diff --git a/zarr/v3/stores/core.py b/zarr/v3/stores/core.py new file mode 100644 index 0000000000..cdb4c56f8e --- /dev/null +++ b/zarr/v3/stores/core.py @@ -0,0 +1,160 @@ +from __future__ import annotations + +import asyncio +import io +from pathlib import Path +from typing import TYPE_CHECKING, Any, List, MutableMapping, Optional, Tuple, Union, List + +from zarr.v3.common import BytesLike, concurrent_map +from zarr.v3.abc.store import Store, ReadStore, WriteStore + +if TYPE_CHECKING: + from upath import UPath + + +def _dereference_path(root: str, path: str) -> str: + assert isinstance(root, str) + assert isinstance(path, str) + root = root.rstrip("/") + path = f"{root}/{path}" if root != "" else path + path = path.rstrip("/") + return path + + +class StorePath: + store: Store + path: str + + def __init__(self, store: Store, path: Optional[str] = None): + self.store = store + self.path = path or "" + + @classmethod + def from_path(cls, pth: Path) -> StorePath: + return cls(Store.from_path(pth)) + + async def get( + self, byte_range: Optional[Tuple[int, Optional[int]]] = None + ) -> Optional[BytesLike]: + return await self.store.get(self.path, byte_range) + + async def set( + self, value: BytesLike, byte_range: Optional[Tuple[int, int]] = None + ) -> None: + await self.store.set(self.path, value, byte_range) + + async def delete(self) -> None: + await self.store.delete(self.path) + + async def exists(self) -> bool: + return await self.store.exists(self.path) + + def __truediv__(self, other: str) -> StorePath: + return self.__class__(self.store, _dereference_path(self.path, other)) + + def __str__(self) -> str: + return _dereference_path(str(self.store), self.path) + + def __repr__(self) -> str: + return f"StorePath({self.store.__class__.__name__}, {repr(str(self))})" + + def __eq__(self, other: Any) -> bool: + try: + if self.store == other.store and self.path == other.path: + return True + except Exception: + pass + return False + +class BaseStore(Store): + supports_partial_writes = False + + # Does this really need to be on the Store? Could just + # be a convenience function + @classmethod + def from_path(cls, pth: Path) -> Store: + try: + from upath import UPath + from upath.implementations.local import PosixUPath, WindowsUPath + + if isinstance(pth, UPath) and not isinstance(pth, (PosixUPath, WindowsUPath)): + storage_options = pth._kwargs.copy() + storage_options.pop("_url", None) + return RemoteStore(str(pth), **storage_options) + except ImportError: + pass + + return LocalStore(pth) + + # async def set( + # self, key: str, value: BytesLike, byte_range: Optional[Tuple[int, int]] = None + # ) -> None: + # raise NotImplementedError + + # async def delete(self, key: str) -> None: + # raise NotImplementedError + + # async def exists(self, key: str) -> bool: + # raise NotImplementedError + + # def __truediv__(self, other: str) -> StorePath: + # return StorePath(self, other) + +class BaseReadStore(ReadStore) + + async def get_partial_values(self, key_ranges: List[Tuple[str, Tuple[int, int]]]) -> List[bytes]: + """Retrieve possibly partial values from given key_ranges. + + Parameters + ---------- + key_ranges : list[tuple[str, tuple[int, int]]] + Ordered set of key, range pairs, a key may occur multiple times with different ranges + + Returns + ------- + list[bytes] + list of values, in the order of the key_ranges, may contain null/none for missing keys + """ + # fallback for stores that don't support partial reads + async def _get_then_slice(key: str, key_range: tuple[int, int]) -> bytes: + value = await self.get(key) + return value[key_range[0]:key_range[1]] + + return await concurrent_map( + key_ranges, + _get_then_slice, + limit=None # TODO: wire this to config + ) + + # async def multi_get( + # self, keys: List[Tuple[str, Optional[Tuple[int, int]]]] + # ) -> List[Optional[BytesLike]]: + # return await asyncio.gather(*[self.get(key, byte_range) for key, byte_range in keys]) + + +class BaseWriteStore(WriteStore): + + + + + + + +StoreLike = Union[BaseStore, StorePath, Path, str] + + +def make_store_path(store_like: StoreLike) -> StorePath: + if isinstance(store_like, StorePath): + return store_like + elif isinstance(store_like, BaseStore): + return StorePath(store_like) + elif isinstance(store_like, Path): + return StorePath(Store.from_path(store_like)) + elif isinstance(store_like, str): + try: + from upath import UPath + + return StorePath(Store.from_path(UPath(store_like))) + except ImportError: + return StorePath(LocalStore(Path(store_like))) + raise TypeError diff --git a/zarr/v3/stores/local.py b/zarr/v3/stores/local.py new file mode 100644 index 0000000000..d088305482 --- /dev/null +++ b/zarr/v3/stores/local.py @@ -0,0 +1,165 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Union, Optional + +from zarr.v3.abc.store import PartialWriteStore +from zarr.v3.stores.core import BaseStore +from zarr.v3.common import concurrent_map + + +class LocalStore( + PartialWriteStore, + ListMixin, + BaseStore, +): + root: Path + auto_mkdir: bool + + def __init__(self, root: Union[Path, str], auto_mkdir: bool = True): + if isinstance(root, str): + root = Path(root) + assert isinstance(root, Path) + + self.root = root + self.auto_mkdir = auto_mkdir + + def _cat_file( + self, path: Path, start: Optional[int] = None, end: Optional[int] = None + ) -> BytesLike: + if start is None and end is None: + return path.read_bytes() + with path.open("rb") as f: + size = f.seek(0, io.SEEK_END) + if start is not None: + if start >= 0: + f.seek(start) + else: + f.seek(max(0, size + start)) + if end is not None: + if end < 0: + end = size + end + return f.read(end - f.tell()) + return f.read() + + def _put_file( + self, + path: Path, + value: BytesLike, + start: Optional[int] = None, + ): + if self.auto_mkdir: + path.parent.mkdir(parents=True, exist_ok=True) + if start is not None: + with path.open("r+b") as f: + f.seek(start) + f.write(value) + else: + return path.write_bytes(value) + + async def get(self, key: str) -> Optional[BytesLike]: + assert isinstance(key, str) + path = self.root / key + + try: + return await to_thread(self._cat_file, path) + except (FileNotFoundError, IsADirectoryError, NotADirectoryError): + return None + + async def get_partial_values( + self, key_ranges: List[Tuple[str, Tuple[int, int]]] + ) -> List[bytes]: + args = [] + for key, byte_range in key_ranges: + assert isinstance(key, str) + path = self.root / key + if byte_range is not None: + args.append((self._cat_file, path, byte_range[0], byte_range[1])) + else: + args.append((self._cat_file, path)) + return await concurrent_map(args, to_thread, limit=None) # TODO: fix limit + + async def set(self, key: str, value: BytesLike) -> None: + assert isinstance(key, str) + path = self.root / key + await to_thread(self._put_file, path, value) + + async def set_partial_values(self, key_start_values: List[Tuple[str, int, bytes]]) -> None: + args = [] + for key, start, value in key_start_values: + assert isinstance(key, str) + path = self.root / key + if start is not None: + args.append((self._put_file, path, value, start)) + else: + args.append((self._put_file, path, value)) + await concurrent_map(args, to_thread, limit=None) # TODO: fix limit + + async def delete(self, key: str) -> None: + path = self.root / key + if path.is_dir(): # TODO: support deleting directories? shutil.rmtree? + shutil.rmtree(path) + else: + await to_thread(path.unlink, True) # Q: we may want to raise if path is missing + + async def exists(self, key: str) -> bool: + path = self.root / key + return await to_thread(path.exists) + + async def list(self) -> List[str]: + """Retrieve all keys in the store. + + Returns + ------- + list[str] + """ + # Q: do we want to return strings or Paths? + def _list(root: Path) -> List[str]: + files = [str(p) for p in root.rglob("") if p.is_file()] + return files + + return to_thread(_list, self.root) + + async def list_prefix(self, prefix: str) -> List[str]: + """Retrieve all keys in the store with a given prefix. + + Parameters + ---------- + prefix : str + + Returns + ------- + list[str] + """ + + def _list_prefix(root: Path, prefix: str) -> List[str]: + files = [p for p in (root / prefix).rglob("*") if p.is_file()] + return files + + return to_thread(_list_prefix, self.root, prefix) + + async def list_dir(self, prefix: str) -> List[str]: + """ + Retrieve all keys and prefixes with a given prefix and which do not contain the character + “/” after the given prefix. + + Parameters + ---------- + prefix : str + + Returns + ------- + list[str] + """ + + def _list_dir(root: Path, prefix: str) -> List[str]: + files = [str(p) for p in (root / prefix).glob("*") if p.is_file()] + return files + + return to_thread(_list_dir, self.root, prefix) + + def __str__(self) -> str: + return f"file://{self.root}" + + def __repr__(self) -> str: + return f"LocalStore({repr(str(self))})" diff --git a/zarr/v3/stores/memory.py b/zarr/v3/stores/memory.py new file mode 100644 index 0000000000..ba7efe70eb --- /dev/null +++ b/zarr/v3/stores/memory.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Union, Optional + +from zarr.v3.common import BytesLike +from zarr.v3.abc.store import WriteListStore +from zarr.v3.stores.core import BaseStore + + +# TODO: this store could easily be extended to wrap any MutuableMapping store from v2 +# When that is done, the `MemoryStore` will just be a store that wraps a dict. +class MemoryStore(WriteListStore, BaseStore): + supports_partial_writes = True + store_dict: MutableMapping[str, bytes] + + def __init__(self, store_dict: Optional[MutableMapping[str, bytes]] = None): + self.store_dict = store_dict or {} + + async def get( + self, key: str, byte_range: Optional[Tuple[int, Optional[int]]] = None + ) -> Optional[BytesLike]: + assert isinstance(key, str) + try: + value = self.store_dict[key] + if byte_range is not None: + value = value[byte_range[0] : byte_range[1]] + return value + except KeyError: + return None + + async def get_partial_values(self, key_ranges: List[Tuple[str, int]]) -> bytes: + raise NotImplementedError + + async def set( + self, key: str, value: BytesLike, byte_range: Optional[Tuple[int, int]] = None + ) -> None: + assert isinstance(key, str) + + if byte_range is not None: + buf = bytearray(self.store_dict[key]) + buf[byte_range[0] : byte_range[1]] = value + self.store_dict[key] = buf + else: + self.store_dict[key] = value + + async def delete(self, key: str) -> None: + try: + del self.store_dict[key] + except KeyError: + pass + + async def exists(self, key: str) -> bool: + return key in self.store_dict + + def __str__(self) -> str: + return f"memory://{id(self.store_dict)}" + + def __repr__(self) -> str: + return f"MemoryStore({repr(str(self))})" diff --git a/zarr/v3/stores/remote.py b/zarr/v3/stores/remote.py new file mode 100644 index 0000000000..060a0baf9e --- /dev/null +++ b/zarr/v3/stores/remote.py @@ -0,0 +1,90 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union + +from zarr.v3.abc.store import WriteListStore +from zarr.v3.stores.core import BaseStore + + +if TYPE_CHECKING: + from upath import UPath + from fsspec.asyn import AsyncFileSystem + + +class RemoteStore(WriteListStore, BaseStore): + root: UPath + + def __init__(self, url: Union[UPath, str], **storage_options: Dict[str, Any]): + from upath import UPath + import fsspec + + if isinstance(url, str): + self.root = UPath(url, **storage_options) + else: + assert len(storage_options) == 0, ( + "If constructed with a UPath object, no additional " + + "storage_options are allowed." + ) + self.root = url.rstrip("/") + # test instantiate file system + fs, _ = fsspec.core.url_to_fs(str(self.root), asynchronous=True, **self.root._kwargs) + assert fs.__class__.async_impl, "FileSystem needs to support async operations." + + def make_fs(self) -> Tuple[AsyncFileSystem, str]: + import fsspec + + storage_options = self.root._kwargs.copy() + storage_options.pop("_url", None) + fs, root = fsspec.core.url_to_fs(str(self.root), asynchronous=True, **self.root._kwargs) + assert fs.__class__.async_impl, "FileSystem needs to support async operations." + return fs, root + + async def get( + self, key: str, byte_range: Optional[Tuple[int, Optional[int]]] = None + ) -> Optional[BytesLike]: + assert isinstance(key, str) + fs, root = self.make_fs() + path = _dereference_path(root, key) + + try: + value = await ( + fs._cat_file(path, start=byte_range[0], end=byte_range[1]) + if byte_range + else fs._cat_file(path) + ) + except (FileNotFoundError, IsADirectoryError, NotADirectoryError): + return None + + return value + + async def set( + self, key: str, value: BytesLike, byte_range: Optional[Tuple[int, int]] = None + ) -> None: + assert isinstance(key, str) + fs, root = self.make_fs() + path = _dereference_path(root, key) + + # write data + if byte_range: + with fs._open(path, "r+b") as f: + f.seek(byte_range[0]) + f.write(value) + else: + await fs._pipe_file(path, value) + + async def delete(self, key: str) -> None: + fs, root = self.make_fs() + path = _dereference_path(root, key) + if await fs._exists(path): + await fs._rm(path) + + async def exists(self, key: str) -> bool: + fs, root = self.make_fs() + path = _dereference_path(root, key) + return await fs._exists(path) + + def __str__(self) -> str: + return str(self.root) + + def __repr__(self) -> str: + return f"RemoteStore({repr(str(self))})"