From 661c0d6ed361773e845e4c8a1e48644e78237f1d Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Tue, 16 Jan 2024 12:53:40 -0800 Subject: [PATCH] progress on store interface (memory and local stores only) fixes after rebas e make all tests pass --- .gitignore | 1 + src/zarr/v3/__init__.py | 5 - src/zarr/v3/abc/codec.py | 2 +- src/zarr/v3/abc/store.py | 48 +- src/zarr/v3/codecs/sharding.py | 14 +- src/zarr/v3/group.py | 8 +- src/zarr/v3/store.py | 359 -- src/zarr/v3/store/__init__.py | 5 + src/zarr/v3/store/core.py | 83 + .../v3/stores => src/zarr/v3/store}/local.py | 128 +- src/zarr/v3/store/memory.py | 86 + .../v3/stores => src/zarr/v3/store}/remote.py | 33 +- tests/test_codecs_v3.py | 53 +- tests/test_storage.py | 4967 +++++++++-------- tests/test_storage_v3.py | 1373 ++--- zarr/v3/stores/__init__.py | 4 - zarr/v3/stores/core.py | 160 - zarr/v3/stores/memory.py | 60 - 18 files changed, 3505 insertions(+), 3884 deletions(-) delete mode 100644 src/zarr/v3/store.py create mode 100644 src/zarr/v3/store/__init__.py create mode 100644 src/zarr/v3/store/core.py rename {zarr/v3/stores => src/zarr/v3/store}/local.py (59%) create mode 100644 src/zarr/v3/store/memory.py rename {zarr/v3/stores => src/zarr/v3/store}/remote.py (85%) delete mode 100644 zarr/v3/stores/__init__.py delete mode 100644 zarr/v3/stores/core.py delete mode 100644 zarr/v3/stores/memory.py diff --git a/.gitignore b/.gitignore index 53b6cd356..7d32026e1 100644 --- a/.gitignore +++ b/.gitignore @@ -78,5 +78,6 @@ src/zarr/_version.py #doesnotexist #test_sync* data/* +src/fixture/ .DS_Store diff --git a/src/zarr/v3/__init__.py b/src/zarr/v3/__init__.py index 07258154a..038dff89b 100644 --- a/src/zarr/v3/__init__.py +++ b/src/zarr/v3/__init__.py @@ -8,11 +8,7 @@ from zarr.v3.group import Group # noqa: F401 from zarr.v3.metadata import RuntimeConfiguration, runtime_configuration # noqa: F401 from zarr.v3.store import ( # noqa: F401 - LocalStore, - RemoteStore, - Store, StoreLike, - StorePath, make_store_path, ) from zarr.v3.sync import sync as _sync @@ -27,7 +23,6 @@ async def open_auto_async( return await Array.open(store_path, runtime_configuration=runtime_configuration_) except KeyError: return await Group.open(store_path, runtime_configuration=runtime_configuration_) - def open_auto( diff --git a/src/zarr/v3/abc/codec.py b/src/zarr/v3/abc/codec.py index f48419e00..c81f2c976 100644 --- a/src/zarr/v3/abc/codec.py +++ b/src/zarr/v3/abc/codec.py @@ -16,7 +16,7 @@ import numpy as np from zarr.v3.common import BytesLike, SliceSelection -from zarr.v3.stores import StorePath +from zarr.v3.store import StorePath if TYPE_CHECKING: diff --git a/src/zarr/v3/abc/store.py b/src/zarr/v3/abc/store.py index abe4aa3d5..ce5de279c 100644 --- a/src/zarr/v3/abc/store.py +++ b/src/zarr/v3/abc/store.py @@ -1,20 +1,19 @@ from abc import abstractmethod, ABC -from typing import List, Tuple +from typing import List, Tuple, Optional class Store(ABC): - pass - - -class ReadStore(Store): @abstractmethod - async def get(self, key: str) -> bytes: + async def get( + self, key: str, byte_range: Optional[Tuple[int, Optional[int]]] = None + ) -> Optional[bytes]: """Retrieve the value associated with a given key. Parameters ---------- key : str + byte_range : tuple[int, Optional[int]], optional Returns ------- @@ -23,7 +22,9 @@ async def get(self, key: str) -> bytes: ... @abstractmethod - async def get_partial_values(self, key_ranges: List[Tuple[str, Tuple[int, int]]]) -> List[bytes]: + async def get_partial_values( + self, key_ranges: List[Tuple[str, Tuple[int, int]]] + ) -> List[bytes]: """Retrieve possibly partial values from given key_ranges. Parameters @@ -38,6 +39,7 @@ async def get_partial_values(self, key_ranges: List[Tuple[str, Tuple[int, int]]] """ ... + @abstractmethod async def exists(self, key: str) -> bool: """Check if a key exists in the store. @@ -51,8 +53,12 @@ async def exists(self, key: str) -> bool: """ ... + @property + @abstractmethod + def supports_writes(self) -> bool: + """Does the store support writes?""" + ... -class WriteStore(ReadStore): @abstractmethod async def set(self, key: str, value: bytes) -> None: """Store a (key, value) pair. @@ -64,7 +70,8 @@ async def set(self, key: str, value: bytes) -> None: """ ... - async def delete(self, key: str) -> None + @abstractmethod + async def delete(self, key: str) -> None: """Remove a key from the store Parameters @@ -73,10 +80,11 @@ async def delete(self, key: str) -> None """ ... - -class PartialWriteStore(WriteStore): - # TODO, instead of using this, should we just check if the store is a PartialWriteStore? - supports_partial_writes = True + @property + @abstractmethod + def supports_partial_writes(self) -> bool: + """Does the store support partial writes?""" + ... @abstractmethod async def set_partial_values(self, key_start_values: List[Tuple[str, int, bytes]]) -> None: @@ -91,8 +99,12 @@ async def set_partial_values(self, key_start_values: List[Tuple[str, int, bytes] """ ... + @property + @abstractmethod + def supports_listing(self) -> bool: + """Does the store support listing?""" + ... -class ListMixin: @abstractmethod async def list(self) -> List[str]: """Retrieve all keys in the store. @@ -132,11 +144,3 @@ async def list_dir(self, prefix: str) -> List[str]: list[str] """ ... - - -class ReadListStore(ReadStore, ListMixin): - pass - - -class WriteListStore(WriteStore, ListMixin): - pass diff --git a/src/zarr/v3/codecs/sharding.py b/src/zarr/v3/codecs/sharding.py index 2694ee9f0..12c84ade2 100644 --- a/src/zarr/v3/codecs/sharding.py +++ b/src/zarr/v3/codecs/sharding.py @@ -43,7 +43,7 @@ CodecMetadata, ShardingCodecIndexLocation, ) -from zarr.v3.stores import StorePath +from zarr.v3.store import StorePath MAX_UINT_64 = 2**64 - 1 @@ -354,7 +354,7 @@ async def decode_partial( for chunk_coords in all_chunk_coords: chunk_byte_slice = shard_index.get_chunk_slice(chunk_coords) if chunk_byte_slice: - chunk_bytes = await store_path.get_async(chunk_byte_slice) + chunk_bytes = await store_path.get(chunk_byte_slice) if chunk_bytes: shard_dict[chunk_coords] = chunk_bytes @@ -533,9 +533,9 @@ async def _write_chunk( ) if shard_builder.index.is_all_empty(): - await store_path.delete_async() + await store_path.delete() else: - await store_path.set_async( + await store_path.set( await shard_builder.finalize( self.configuration.index_location, self._encode_shard_index, @@ -561,9 +561,9 @@ def _shard_index_size(self) -> int: async def _load_shard_index_maybe(self, store_path: StorePath) -> Optional[_ShardIndex]: shard_index_size = self._shard_index_size() if self.configuration.index_location == ShardingCodecIndexLocation.start: - index_bytes = await store_path.get_async((0, shard_index_size)) + index_bytes = await store_path.get((0, shard_index_size)) else: - index_bytes = await store_path.get_async((-shard_index_size, None)) + index_bytes = await store_path.get((-shard_index_size, None)) if index_bytes is not None: return await self._decode_shard_index(index_bytes) return None @@ -574,7 +574,7 @@ async def _load_shard_index(self, store_path: StorePath) -> _ShardIndex: ) async def _load_full_shard_maybe(self, store_path: StorePath) -> Optional[_ShardProxy]: - shard_bytes = await store_path.get_async() + shard_bytes = await store_path.get() return await _ShardProxy.from_bytes(shard_bytes, self) if shard_bytes else None diff --git a/src/zarr/v3/group.py b/src/zarr/v3/group.py index 9058f8f64..9f53a4981 100644 --- a/src/zarr/v3/group.py +++ b/src/zarr/v3/group.py @@ -191,7 +191,7 @@ async def delitem(self, key: str) -> None: async def _save_metadata(self) -> None: to_save = self.metadata.to_bytes() - awaitables = [(self.store_path / key).set_async(value) for key, value in to_save.items()] + awaitables = [(self.store_path / key).set(value) for key, value in to_save.items()] await asyncio.gather(*awaitables) @property @@ -227,9 +227,9 @@ async def update_attributes(self, new_attributes: Dict[str, Any]): to_save = self.metadata.to_bytes() if self.metadata.zarr_format == 2: # only save the .zattrs object - await (self.store_path / ZATTRS_JSON).set_async(to_save[ZATTRS_JSON]) + await (self.store_path / ZATTRS_JSON).set(to_save[ZATTRS_JSON]) else: - await (self.store_path / ZARR_JSON).set_async(to_save[ZARR_JSON]) + await (self.store_path / ZARR_JSON).set(to_save[ZARR_JSON]) self.metadata.attributes.clear() self.metadata.attributes.update(new_attributes) @@ -333,7 +333,7 @@ def __getitem__(self, path: str) -> Union[Array, Group]: return Group(obj) def __delitem__(self, key) -> None: - self._sync(self._async_group.delitem(path)) + self._sync(self._async_group.delitem(key)) def __iter__(self): raise NotImplementedError diff --git a/src/zarr/v3/store.py b/src/zarr/v3/store.py deleted file mode 100644 index 262cd6481..000000000 --- a/src/zarr/v3/store.py +++ /dev/null @@ -1,359 +0,0 @@ -# TODO: -# 1. Stores should inherit from zarr.v3.abc.store classes -# 2. remove "_async" suffix from all methods? - -# Changes I've made here: -# 1. Make delay import of fsspec - -from __future__ import annotations - -import asyncio -import io -from pathlib import Path -from typing import TYPE_CHECKING, Any, Dict, List, MutableMapping, Optional, Tuple, Union - -from zarr.v3.common import BytesLike, to_thread - -if TYPE_CHECKING: - from upath import UPath - from fsspec.asyn import AsyncFileSystem - - -def _dereference_path(root: str, path: str) -> str: - assert isinstance(root, str) - assert isinstance(path, str) - root = root.rstrip("/") - path = f"{root}/{path}" if root != "" else path - path = path.rstrip("/") - return path - - -class StorePath: - store: Store - path: str - - def __init__(self, store: Store, path: Optional[str] = None): - self.store = store - self.path = path or "" - - @classmethod - def from_path(cls, pth: Path) -> StorePath: - return cls(Store.from_path(pth)) - - async def get_async( - self, byte_range: Optional[Tuple[int, Optional[int]]] = None - ) -> Optional[BytesLike]: - return await self.store.get_async(self.path, byte_range) - - async def set_async( - self, value: BytesLike, byte_range: Optional[Tuple[int, int]] = None - ) -> None: - await self.store.set_async(self.path, value, byte_range) - - async def delete_async(self) -> None: - await self.store.delete_async(self.path) - - async def exists_async(self) -> bool: - return await self.store.exists_async(self.path) - - def __truediv__(self, other: str) -> StorePath: - return self.__class__(self.store, _dereference_path(self.path, other)) - - def __str__(self) -> str: - return _dereference_path(str(self.store), self.path) - - def __repr__(self) -> str: - return f"StorePath({self.store.__class__.__name__}, {repr(str(self))})" - - def __eq__(self, other: Any) -> bool: - try: - if self.store == other.store and self.path == other.path: - return True - except Exception: - pass - return False - - -class Store: - supports_partial_writes = False - - @classmethod - def from_path(cls, pth: Path) -> Store: - try: - from upath import UPath - from upath.implementations.local import PosixUPath, WindowsUPath - - if isinstance(pth, UPath) and not isinstance(pth, (PosixUPath, WindowsUPath)): - storage_options = pth._kwargs.copy() - storage_options.pop("_url", None) - return RemoteStore(str(pth), **storage_options) - except ImportError: - pass - - return LocalStore(pth) - - async def multi_get_async( - self, keys: List[Tuple[str, Optional[Tuple[int, int]]]] - ) -> List[Optional[BytesLike]]: - return await asyncio.gather(*[self.get_async(key, byte_range) for key, byte_range in keys]) - - async def get_async( - self, key: str, byte_range: Optional[Tuple[int, Optional[int]]] = None - ) -> Optional[BytesLike]: - raise NotImplementedError - - async def multi_set_async( - self, key_values: List[Tuple[str, BytesLike, Optional[Tuple[int, int]]]] - ) -> None: - await asyncio.gather( - *[self.set_async(key, value, byte_range) for key, value, byte_range in key_values] - ) - - async def set_async( - self, key: str, value: BytesLike, byte_range: Optional[Tuple[int, int]] = None - ) -> None: - raise NotImplementedError - - async def delete_async(self, key: str) -> None: - raise NotImplementedError - - async def exists_async(self, key: str) -> bool: - raise NotImplementedError - - def __truediv__(self, other: str) -> StorePath: - return StorePath(self, other) - - -class LocalStore(Store): - supports_partial_writes = True - root: Path - auto_mkdir: bool - - def __init__(self, root: Union[Path, str], auto_mkdir: bool = True): - if isinstance(root, str): - root = Path(root) - assert isinstance(root, Path) - - self.root = root - self.auto_mkdir = auto_mkdir - - def _cat_file( - self, path: Path, start: Optional[int] = None, end: Optional[int] = None - ) -> BytesLike: - if start is None and end is None: - return path.read_bytes() - with path.open("rb") as f: - size = f.seek(0, io.SEEK_END) - if start is not None: - if start >= 0: - f.seek(start) - else: - f.seek(max(0, size + start)) - if end is not None: - if end < 0: - end = size + end - return f.read(end - f.tell()) - return f.read() - - def _put_file( - self, - path: Path, - value: BytesLike, - start: Optional[int] = None, - ): - if self.auto_mkdir: - path.parent.mkdir(parents=True, exist_ok=True) - if start is not None: - with path.open("r+b") as f: - f.seek(start) - f.write(value) - else: - return path.write_bytes(value) - - async def get_async( - self, key: str, byte_range: Optional[Tuple[int, Optional[int]]] = None - ) -> Optional[BytesLike]: - assert isinstance(key, str) - path = self.root / key - - try: - value = await ( - to_thread(self._cat_file, path, byte_range[0], byte_range[1]) - if byte_range is not None - else to_thread(self._cat_file, path) - ) - except (FileNotFoundError, IsADirectoryError, NotADirectoryError): - return None - - return value - - async def set_async( - self, key: str, value: BytesLike, byte_range: Optional[Tuple[int, int]] = None - ) -> None: - assert isinstance(key, str) - path = self.root / key - - if byte_range is not None: - await to_thread(self._put_file, path, value, byte_range[0]) - else: - await to_thread(self._put_file, path, value) - - async def delete_async(self, key: str) -> None: - path = self.root / key - await to_thread(path.unlink, True) - - async def exists_async(self, key: str) -> bool: - path = self.root / key - return await to_thread(path.exists) - - def __str__(self) -> str: - return f"file://{self.root}" - - def __repr__(self) -> str: - return f"LocalStore({repr(str(self))})" - - -class RemoteStore(Store): - root: UPath - - def __init__(self, url: Union[UPath, str], **storage_options: Dict[str, Any]): - from upath import UPath - import fsspec - - if isinstance(url, str): - self.root = UPath(url, **storage_options) - else: - assert len(storage_options) == 0, ( - "If constructed with a UPath object, no additional " - + "storage_options are allowed." - ) - self.root = url.rstrip("/") - # test instantiate file system - fs, _ = fsspec.core.url_to_fs(str(self.root), asynchronous=True, **self.root._kwargs) - assert fs.__class__.async_impl, "FileSystem needs to support async operations." - - def make_fs(self) -> Tuple[AsyncFileSystem, str]: - import fsspec - - storage_options = self.root._kwargs.copy() - storage_options.pop("_url", None) - fs, root = fsspec.core.url_to_fs(str(self.root), asynchronous=True, **self.root._kwargs) - assert fs.__class__.async_impl, "FileSystem needs to support async operations." - return fs, root - - async def get_async( - self, key: str, byte_range: Optional[Tuple[int, Optional[int]]] = None - ) -> Optional[BytesLike]: - assert isinstance(key, str) - fs, root = self.make_fs() - path = _dereference_path(root, key) - - try: - value = await ( - fs._cat_file(path, start=byte_range[0], end=byte_range[1]) - if byte_range - else fs._cat_file(path) - ) - except (FileNotFoundError, IsADirectoryError, NotADirectoryError): - return None - - return value - - async def set_async( - self, key: str, value: BytesLike, byte_range: Optional[Tuple[int, int]] = None - ) -> None: - assert isinstance(key, str) - fs, root = self.make_fs() - path = _dereference_path(root, key) - - # write data - if byte_range: - with fs._open(path, "r+b") as f: - f.seek(byte_range[0]) - f.write(value) - else: - await fs._pipe_file(path, value) - - async def delete_async(self, key: str) -> None: - fs, root = self.make_fs() - path = _dereference_path(root, key) - if await fs._exists(path): - await fs._rm(path) - - async def exists_async(self, key: str) -> bool: - fs, root = self.make_fs() - path = _dereference_path(root, key) - return await fs._exists(path) - - def __str__(self) -> str: - return str(self.root) - - def __repr__(self) -> str: - return f"RemoteStore({repr(str(self))})" - - -class MemoryStore(Store): - supports_partial_writes = True - store_dict: MutableMapping[str, bytes] - - def __init__(self, store_dict: Optional[MutableMapping[str, bytes]] = None): - self.store_dict = store_dict or {} - - async def get_async( - self, key: str, byte_range: Optional[Tuple[int, Optional[int]]] = None - ) -> Optional[BytesLike]: - assert isinstance(key, str) - try: - value = self.store_dict[key] - if byte_range is not None: - value = value[byte_range[0] : byte_range[1]] - return value - except KeyError: - return None - - async def set_async( - self, key: str, value: BytesLike, byte_range: Optional[Tuple[int, int]] = None - ) -> None: - assert isinstance(key, str) - - if byte_range is not None: - buf = bytearray(self.store_dict[key]) - buf[byte_range[0] : byte_range[1]] = value - self.store_dict[key] = buf - else: - self.store_dict[key] = value - - async def delete_async(self, key: str) -> None: - try: - del self.store_dict[key] - except KeyError: - pass - - async def exists_async(self, key: str) -> bool: - return key in self.store_dict - - def __str__(self) -> str: - return f"memory://{id(self.store_dict)}" - - def __repr__(self) -> str: - return f"MemoryStore({repr(str(self))})" - - -StoreLike = Union[Store, StorePath, Path, str] - - -def make_store_path(store_like: StoreLike) -> StorePath: - if isinstance(store_like, StorePath): - return store_like - elif isinstance(store_like, Store): - return StorePath(store_like) - elif isinstance(store_like, Path): - return StorePath(Store.from_path(store_like)) - elif isinstance(store_like, str): - try: - from upath import UPath - - return StorePath(Store.from_path(UPath(store_like))) - except ImportError: - return StorePath(LocalStore(Path(store_like))) - raise TypeError diff --git a/src/zarr/v3/store/__init__.py b/src/zarr/v3/store/__init__.py new file mode 100644 index 000000000..2268381d2 --- /dev/null +++ b/src/zarr/v3/store/__init__.py @@ -0,0 +1,5 @@ +# flake8: noqa +from zarr.v3.store.core import StorePath, StoreLike, make_store_path +from zarr.v3.store.remote import RemoteStore +from zarr.v3.store.local import LocalStore +from zarr.v3.store.memory import MemoryStore diff --git a/src/zarr/v3/store/core.py b/src/zarr/v3/store/core.py new file mode 100644 index 000000000..0ef1c8569 --- /dev/null +++ b/src/zarr/v3/store/core.py @@ -0,0 +1,83 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Any, Optional, Tuple, Union + +from zarr.v3.common import BytesLike +from zarr.v3.abc.store import Store + + +def _dereference_path(root: str, path: str) -> str: + assert isinstance(root, str) + assert isinstance(path, str) + root = root.rstrip("/") + path = f"{root}/{path}" if root != "" else path + path = path.rstrip("/") + return path + + +class StorePath: + store: Store + path: str + + def __init__(self, store: Store, path: Optional[str] = None): + self.store = store + self.path = path or "" + + @classmethod + def from_path(cls, pth: Path) -> StorePath: + return cls(Store.from_path(pth)) + + async def get( + self, byte_range: Optional[Tuple[int, Optional[int]]] = None + ) -> Optional[BytesLike]: + return await self.store.get(self.path, byte_range) + + async def set(self, value: BytesLike, byte_range: Optional[Tuple[int, int]] = None) -> None: + if byte_range is not None: + raise NotImplementedError("Store.set does not have partial writes yet") + await self.store.set(self.path, value) + + async def delete(self) -> None: + await self.store.delete(self.path) + + async def exists(self) -> bool: + return await self.store.exists(self.path) + + def __truediv__(self, other: str) -> StorePath: + return self.__class__(self.store, _dereference_path(self.path, other)) + + def __str__(self) -> str: + return _dereference_path(str(self.store), self.path) + + def __repr__(self) -> str: + return f"StorePath({self.store.__class__.__name__}, {repr(str(self))})" + + def __eq__(self, other: Any) -> bool: + try: + if self.store == other.store and self.path == other.path: + return True + except Exception: + pass + return False + + +StoreLike = Union[Store, StorePath, Path, str] + + +def make_store_path(store_like: StoreLike) -> StorePath: + if isinstance(store_like, StorePath): + return store_like + elif isinstance(store_like, Store): + return StorePath(store_like) + # elif isinstance(store_like, Path): + # return StorePath(Store.from_path(store_like)) + elif isinstance(store_like, str): + try: + from upath import UPath + + return StorePath(Store.from_path(UPath(store_like))) + except ImportError as e: + raise e + # return StorePath(LocalStore(Path(store_like))) + raise TypeError diff --git a/zarr/v3/stores/local.py b/src/zarr/v3/store/local.py similarity index 59% rename from zarr/v3/stores/local.py rename to src/zarr/v3/store/local.py index d08830548..a62eea20f 100644 --- a/zarr/v3/stores/local.py +++ b/src/zarr/v3/store/local.py @@ -1,18 +1,56 @@ from __future__ import annotations +import io +import shutil from pathlib import Path -from typing import Union, Optional +from typing import Union, Optional, List, Tuple -from zarr.v3.abc.store import PartialWriteStore -from zarr.v3.stores.core import BaseStore -from zarr.v3.common import concurrent_map +from zarr.v3.abc.store import Store +from zarr.v3.common import BytesLike, concurrent_map, to_thread -class LocalStore( - PartialWriteStore, - ListMixin, - BaseStore, +def _get(path: Path, byte_range: Optional[Tuple[int, Optional[int]]] = None) -> bytes: + if byte_range is not None: + start = byte_range[0] + end = (start + byte_range[1]) if byte_range[1] is not None else None + else: + return path.read_bytes() + with path.open("rb") as f: + size = f.seek(0, io.SEEK_END) + if start is not None: + if start >= 0: + f.seek(start) + else: + f.seek(max(0, size + start)) + if end is not None: + if end < 0: + end = size + end + return f.read(end - f.tell()) + return f.read() + + +def _put( + path: Path, + value: BytesLike, + start: Optional[int] = None, + auto_mkdir: bool = True, ): + if auto_mkdir: + path.parent.mkdir(parents=True, exist_ok=True) + if start is not None: + with path.open("r+b") as f: + f.seek(start) + f.write(value) + else: + return path.write_bytes(value) + + +class LocalStore(Store): + + supports_writes: bool = True + supports_partial_writes: bool = True + supports_listing: bool = True + root: Path auto_mkdir: bool @@ -24,45 +62,20 @@ def __init__(self, root: Union[Path, str], auto_mkdir: bool = True): self.root = root self.auto_mkdir = auto_mkdir - def _cat_file( - self, path: Path, start: Optional[int] = None, end: Optional[int] = None - ) -> BytesLike: - if start is None and end is None: - return path.read_bytes() - with path.open("rb") as f: - size = f.seek(0, io.SEEK_END) - if start is not None: - if start >= 0: - f.seek(start) - else: - f.seek(max(0, size + start)) - if end is not None: - if end < 0: - end = size + end - return f.read(end - f.tell()) - return f.read() - - def _put_file( - self, - path: Path, - value: BytesLike, - start: Optional[int] = None, - ): - if self.auto_mkdir: - path.parent.mkdir(parents=True, exist_ok=True) - if start is not None: - with path.open("r+b") as f: - f.seek(start) - f.write(value) - else: - return path.write_bytes(value) + def __str__(self) -> str: + return f"file://{self.root}" + + def __repr__(self) -> str: + return f"LocalStore({repr(str(self))})" - async def get(self, key: str) -> Optional[BytesLike]: + async def get( + self, key: str, byte_range: Optional[Tuple[int, Optional[int]]] = None + ) -> Optional[bytes]: assert isinstance(key, str) path = self.root / key try: - return await to_thread(self._cat_file, path) + return await to_thread(_get, path, byte_range) except (FileNotFoundError, IsADirectoryError, NotADirectoryError): return None @@ -74,15 +87,15 @@ async def get_partial_values( assert isinstance(key, str) path = self.root / key if byte_range is not None: - args.append((self._cat_file, path, byte_range[0], byte_range[1])) + args.append((_get, path, byte_range[0], byte_range[1])) else: - args.append((self._cat_file, path)) + args.append((_get, path)) return await concurrent_map(args, to_thread, limit=None) # TODO: fix limit async def set(self, key: str, value: BytesLike) -> None: assert isinstance(key, str) path = self.root / key - await to_thread(self._put_file, path, value) + await to_thread(_put, path, value) async def set_partial_values(self, key_start_values: List[Tuple[str, int, bytes]]) -> None: args = [] @@ -90,9 +103,9 @@ async def set_partial_values(self, key_start_values: List[Tuple[str, int, bytes] assert isinstance(key, str) path = self.root / key if start is not None: - args.append((self._put_file, path, value, start)) + args.append((_put, path, value, start)) else: - args.append((self._put_file, path, value)) + args.append((_put, path, value)) await concurrent_map(args, to_thread, limit=None) # TODO: fix limit async def delete(self, key: str) -> None: @@ -104,7 +117,7 @@ async def delete(self, key: str) -> None: async def exists(self, key: str) -> bool: path = self.root / key - return await to_thread(path.exists) + return await to_thread(path.is_file) async def list(self) -> List[str]: """Retrieve all keys in the store. @@ -118,7 +131,7 @@ def _list(root: Path) -> List[str]: files = [str(p) for p in root.rglob("") if p.is_file()] return files - return to_thread(_list, self.root) + return await to_thread(_list, self.root) async def list_prefix(self, prefix: str) -> List[str]: """Retrieve all keys in the store with a given prefix. @@ -136,7 +149,7 @@ def _list_prefix(root: Path, prefix: str) -> List[str]: files = [p for p in (root / prefix).rglob("*") if p.is_file()] return files - return to_thread(_list_prefix, self.root, prefix) + return await to_thread(_list_prefix, self.root, prefix) async def list_dir(self, prefix: str) -> List[str]: """ @@ -153,13 +166,12 @@ async def list_dir(self, prefix: str) -> List[str]: """ def _list_dir(root: Path, prefix: str) -> List[str]: - files = [str(p) for p in (root / prefix).glob("*") if p.is_file()] - return files - return to_thread(_list_dir, self.root, prefix) + base = root / prefix + to_strip = str(base) + "/" + try: + return [str(key).replace(to_strip, "") for key in base.iterdir()] + except (FileNotFoundError, NotADirectoryError): + return [] - def __str__(self) -> str: - return f"file://{self.root}" - - def __repr__(self) -> str: - return f"LocalStore({repr(str(self))})" + return await to_thread(_list_dir, self.root, prefix) diff --git a/src/zarr/v3/store/memory.py b/src/zarr/v3/store/memory.py new file mode 100644 index 000000000..137037585 --- /dev/null +++ b/src/zarr/v3/store/memory.py @@ -0,0 +1,86 @@ +from __future__ import annotations + +from typing import Optional, MutableMapping, List, Tuple + +from zarr.v3.common import BytesLike +from zarr.v3.abc.store import Store + + +# TODO: this store could easily be extended to wrap any MutuableMapping store from v2 +# When that is done, the `MemoryStore` will just be a store that wraps a dict. +class MemoryStore(Store): + supports_writes: bool = True + supports_partial_writes: bool = True + supports_listing: bool = True + + _store_dict: MutableMapping[str, bytes] + + def __init__(self, store_dict: Optional[MutableMapping[str, bytes]] = None): + self._store_dict = store_dict or {} + + def __str__(self) -> str: + return f"memory://{id(self._store_dict)}" + + def __repr__(self) -> str: + return f"MemoryStore({repr(str(self))})" + + async def get( + self, key: str, byte_range: Optional[Tuple[int, Optional[int]]] = None + ) -> Optional[BytesLike]: + assert isinstance(key, str) + try: + value = self._store_dict[key] + if byte_range is not None: + value = value[byte_range[0] : byte_range[1]] + return value + except KeyError: + return None + + async def get_partial_values( + self, key_ranges: List[Tuple[str, Tuple[int, int]]] + ) -> List[bytes]: + raise NotImplementedError + + async def exists(self, key: str) -> bool: + return key in self._store_dict + + async def set( + self, key: str, value: BytesLike, byte_range: Optional[Tuple[int, int]] = None + ) -> None: + assert isinstance(key, str) + if not isinstance(value, (bytes, bytearray, memoryview)): + raise TypeError(f"expected BytesLike, got {type(value)}") + + if byte_range is not None: + buf = bytearray(self._store_dict[key]) + buf[byte_range[0] : byte_range[1]] = value + self._store_dict[key] = buf + else: + self._store_dict[key] = value + + async def delete(self, key: str) -> None: + try: + del self._store_dict[key] + except KeyError: + pass # Q(JH): why not raise? + + async def set_partial_values(self, key_start_values: List[Tuple[str, int, bytes]]) -> None: + raise NotImplementedError + + async def list(self) -> List[str]: + return list(self._store_dict.keys()) + + async def list_prefix(self, prefix: str) -> List[str]: + return [key for key in self._store_dict if key.startswith(prefix)] + + async def list_dir(self, prefix: str) -> List[str]: + if prefix == "": + return list({key.split("/", maxsplit=1)[0] for key in self._store_dict}) + else: + return list( + { + key.strip(prefix + "/").split("/")[0] + for key in self._store_dict + if (key.startswith(prefix + "/") and key != prefix) + } + ) diff --git a/zarr/v3/stores/remote.py b/src/zarr/v3/store/remote.py similarity index 85% rename from zarr/v3/stores/remote.py rename to src/zarr/v3/store/remote.py index 060a0baf9..0e6fc84e0 100644 --- a/zarr/v3/stores/remote.py +++ b/src/zarr/v3/store/remote.py @@ -2,8 +2,9 @@ from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union -from zarr.v3.abc.store import WriteListStore -from zarr.v3.stores.core import BaseStore +from zarr.v3.abc.store import Store +from zarr.v3.store.core import _dereference_path +from zarr.v3.common import BytesLike if TYPE_CHECKING: @@ -11,7 +12,11 @@ from fsspec.asyn import AsyncFileSystem -class RemoteStore(WriteListStore, BaseStore): +class RemoteStore(Store): + supports_writes: bool = True + supports_partial_writes: bool = False + supports_listing: bool = True + root: UPath def __init__(self, url: Union[UPath, str], **storage_options: Dict[str, Any]): @@ -30,7 +35,13 @@ def __init__(self, url: Union[UPath, str], **storage_options: Dict[str, Any]): fs, _ = fsspec.core.url_to_fs(str(self.root), asynchronous=True, **self.root._kwargs) assert fs.__class__.async_impl, "FileSystem needs to support async operations." - def make_fs(self) -> Tuple[AsyncFileSystem, str]: + def __str__(self) -> str: + return str(self.root) + + def __repr__(self) -> str: + return f"RemoteStore({repr(str(self))})" + + def _make_fs(self) -> Tuple[AsyncFileSystem, str]: import fsspec storage_options = self.root._kwargs.copy() @@ -43,7 +54,7 @@ async def get( self, key: str, byte_range: Optional[Tuple[int, Optional[int]]] = None ) -> Optional[BytesLike]: assert isinstance(key, str) - fs, root = self.make_fs() + fs, root = self._make_fs() path = _dereference_path(root, key) try: @@ -61,7 +72,7 @@ async def set( self, key: str, value: BytesLike, byte_range: Optional[Tuple[int, int]] = None ) -> None: assert isinstance(key, str) - fs, root = self.make_fs() + fs, root = self._make_fs() path = _dereference_path(root, key) # write data @@ -73,18 +84,12 @@ async def set( await fs._pipe_file(path, value) async def delete(self, key: str) -> None: - fs, root = self.make_fs() + fs, root = self._make_fs() path = _dereference_path(root, key) if await fs._exists(path): await fs._rm(path) async def exists(self, key: str) -> bool: - fs, root = self.make_fs() + fs, root = self._make_fs() path = _dereference_path(root, key) return await fs._exists(path) - - def __str__(self) -> str: - return str(self.root) - - def __repr__(self) -> str: - return f"RemoteStore({repr(str(self))})" diff --git a/tests/test_codecs_v3.py b/tests/test_codecs_v3.py index 93acdb2ba..2b1896987 100644 --- a/tests/test_codecs_v3.py +++ b/tests/test_codecs_v3.py @@ -13,7 +13,8 @@ from zarr.v3.indexing import morton_order_iter from zarr.v3.metadata import CodecMetadata, ShardingCodecIndexLocation, runtime_configuration -from zarr.v3.store import MemoryStore, Store +from zarr.v3.abc.store import Store +from zarr.v3.store import MemoryStore, StorePath @frozen @@ -38,7 +39,7 @@ async def set(self, value: np.ndarray): @pytest.fixture def store() -> Iterator[Store]: - yield MemoryStore() + yield StorePath(MemoryStore()) @pytest.fixture @@ -283,7 +284,7 @@ async def test_order( fill_value=1, ) z[:, :] = data - assert await store.get_async("order/0.0") == z._store["0.0"] + assert await (store / "order/0.0").get() == z._store["0.0"] @pytest.mark.parametrize("input_order", ["F", "C"]) @@ -395,7 +396,7 @@ async def test_transpose( fill_value=1, ) z[:, :] = data - assert await store.get_async("transpose/0.0") == await store.get_async("transpose_zarr/0.0") + assert await (store / "transpose/0.0").get() == await (store / "transpose_zarr/0.0").get() def test_transpose_invalid( @@ -606,7 +607,7 @@ async def test_delete_empty_chunks(store: Store): await _AsyncArrayProxy(a)[:16, :16].set(np.zeros((16, 16))) await _AsyncArrayProxy(a)[:16, :16].set(data) assert np.array_equal(await _AsyncArrayProxy(a)[:16, :16].get(), data) - assert await store.get_async("delete_empty_chunks/c0/0") is None + assert await (store / "delete_empty_chunks/c0/0").get() is None @pytest.mark.asyncio @@ -630,8 +631,8 @@ async def test_delete_empty_sharded_chunks(store: Store): data = np.ones((16, 16), dtype="uint16") data[:8, :8] = 0 assert np.array_equal(data, await _AsyncArrayProxy(a)[:, :].get()) - assert await store.get_async("delete_empty_sharded_chunks/c/1/0") is None - chunk_bytes = await store.get_async("delete_empty_sharded_chunks/c/0/0") + assert await (store / "delete_empty_sharded_chunks/c/1/0").get() is None + chunk_bytes = await (store / "delete_empty_sharded_chunks/c/0/0").get() assert chunk_bytes is not None and len(chunk_bytes) == 16 * 2 + 8 * 8 * 2 + 4 @@ -661,10 +662,10 @@ async def test_zarr_compat(store: Store): assert np.array_equal(data, await _AsyncArrayProxy(a)[:16, :18].get()) assert np.array_equal(data, z2[:16, :18]) - assert z2._store["0.0"] == await store.get_async("zarr_compat3/0.0") - assert z2._store["0.1"] == await store.get_async("zarr_compat3/0.1") - assert z2._store["1.0"] == await store.get_async("zarr_compat3/1.0") - assert z2._store["1.1"] == await store.get_async("zarr_compat3/1.1") + assert z2._store["0.0"] == await (store / "zarr_compat3/0.0").get() + assert z2._store["0.1"] == await (store / "zarr_compat3/0.1").get() + assert z2._store["1.0"] == await (store / "zarr_compat3/1.0").get() + assert z2._store["1.1"] == await (store / "zarr_compat3/1.1").get() @pytest.mark.asyncio @@ -695,10 +696,10 @@ async def test_zarr_compat_F(store: Store): assert np.array_equal(data, await _AsyncArrayProxy(a)[:16, :18].get()) assert np.array_equal(data, z2[:16, :18]) - assert z2._store["0.0"] == await store.get_async("zarr_compatF3/0.0") - assert z2._store["0.1"] == await store.get_async("zarr_compatF3/0.1") - assert z2._store["1.0"] == await store.get_async("zarr_compatF3/1.0") - assert z2._store["1.1"] == await store.get_async("zarr_compatF3/1.1") + assert z2._store["0.0"] == await (store / "zarr_compatF3/0.0").get() + assert z2._store["0.1"] == await (store / "zarr_compatF3/0.1").get() + assert z2._store["1.0"] == await (store / "zarr_compatF3/1.0").get() + assert z2._store["1.1"] == await (store / "zarr_compatF3/1.1").get() @pytest.mark.asyncio @@ -728,7 +729,7 @@ async def test_dimension_names(store: Store): ) assert (await AsyncArray.open(store / "dimension_names2")).metadata.dimension_names is None - zarr_json_bytes = await (store / "dimension_names2" / "zarr.json").get_async() + zarr_json_bytes = await (store / "dimension_names2" / "zarr.json").get() assert zarr_json_bytes is not None assert "dimension_names" not in json.loads(zarr_json_bytes) @@ -794,7 +795,7 @@ async def test_endian(store: Store, endian: Literal["big", "little"]): fill_value=1, ) z[:, :] = data - assert await store.get_async("endian/0.0") == z._store["0.0"] + assert await (store / "endian/0.0").get() == z._store["0.0"] @pytest.mark.parametrize("dtype_input_endian", [">u2", " 0: - try: - r = requests.get(endpoint_uri) - if r.ok: - break - except Exception: # pragma: no cover - pass - timeout -= 0.1 # pragma: no cover - time.sleep(0.1) # pragma: no cover - s3so = dict(client_kwargs={"endpoint_url": endpoint_uri}, use_listings_cache=False) - s3 = s3fs.S3FileSystem(anon=False, **s3so) - s3.mkdir("test") - request.cls.s3so = s3so - yield - proc.terminate() - proc.wait() - - -class TestNestedDirectoryStore(TestDirectoryStore): - def create_store(self, normalize_keys=False, **kwargs): - path = tempfile.mkdtemp() - atexit.register(atexit_rmtree, path) - store = NestedDirectoryStore(path, normalize_keys=normalize_keys, **kwargs) - return store - - def test_init_array(self): - store = self.create_store() - assert store._dimension_separator == "/" - init_array(store, shape=1000, chunks=100) - - # check metadata - assert array_meta_key in store - meta = store._metadata_class.decode_array_metadata(store[array_meta_key]) - assert ZARR_FORMAT == meta["zarr_format"] - assert (1000,) == meta["shape"] - assert (100,) == meta["chunks"] - assert np.dtype(None) == meta["dtype"] - assert meta["dimension_separator"] == "/" - - def test_chunk_nesting(self): - store = self.create_store() - # any path where last segment looks like a chunk key gets special handling - store[self.root + "0.0"] = b"xxx" - assert b"xxx" == store[self.root + "0.0"] - # assert b'xxx' == store['0/0'] - store[self.root + "foo/10.20.30"] = b"yyy" - assert b"yyy" == store[self.root + "foo/10.20.30"] - # assert b'yyy' == store['foo/10/20/30'] - store[self.root + "42"] = b"zzz" - assert b"zzz" == store[self.root + "42"] - - def test_listdir(self): - store = self.create_store() - z = zarr.zeros((10, 10), chunks=(5, 5), store=store) - z[:] = 1 # write to all chunks - for k in store.listdir(): - assert store.get(k) is not None - - -class TestNestedDirectoryStoreNone: - def test_value_error(self): - path = tempfile.mkdtemp() - atexit.register(atexit_rmtree, path) - store = NestedDirectoryStore(path, normalize_keys=True, dimension_separator=None) - assert store._dimension_separator == "/" - - -class TestNestedDirectoryStoreWithWrongValue: - def test_value_error(self): - path = tempfile.mkdtemp() - atexit.register(atexit_rmtree, path) - with pytest.raises(ValueError): - NestedDirectoryStore(path, normalize_keys=True, dimension_separator=".") - - -class TestN5Store(TestNestedDirectoryStore): - def create_store(self, normalize_keys=False): - path = tempfile.mkdtemp() - atexit.register(atexit_rmtree, path) - store = N5Store(path, normalize_keys=normalize_keys) return store - def test_equal(self): - store_a = self.create_store() - store_b = N5Store(store_a.path) - assert store_a == store_b - - @pytest.mark.parametrize("zarr_meta_key", [".zarray", ".zattrs", ".zgroup"]) - def test_del_zarr_meta_key(self, zarr_meta_key): - store = self.create_store() - store[n5_attrs_key] = json_dumps({"foo": "bar"}) - del store[zarr_meta_key] - assert n5_attrs_key not in store - - def test_chunk_nesting(self): - store = self.create_store() - store["0.0"] = b"xxx" - assert "0.0" in store - assert b"xxx" == store["0.0"] - # assert b'xxx' == store['0/0'] - store["foo/10.20.30"] = b"yyy" - assert "foo/10.20.30" in store - assert b"yyy" == store["foo/10.20.30"] - # N5 reverses axis order - assert b"yyy" == store["foo/30/20/10"] - del store["foo/10.20.30"] - assert "foo/30/20/10" not in store - store["42"] = b"zzz" - assert "42" in store - assert b"zzz" == store["42"] - - def test_init_array(self): - store = self.create_store() - init_array(store, shape=1000, chunks=100) - - # check metadata - assert array_meta_key in store - meta = store._metadata_class.decode_array_metadata(store[array_meta_key]) - assert ZARR_FORMAT == meta["zarr_format"] - assert (1000,) == meta["shape"] - assert (100,) == meta["chunks"] - assert np.dtype(None) == meta["dtype"] - # N5Store wraps the actual compressor - compressor_config = meta["compressor"]["compressor_config"] - assert default_compressor.get_config() == compressor_config - # N5Store always has a fill value of 0 - assert meta["fill_value"] == 0 - assert meta["dimension_separator"] == "." - # Top-level groups AND arrays should have - # the n5 keyword in metadata - raw_n5_meta = json.loads(store[n5_attrs_key]) - assert raw_n5_meta.get("n5", None) == N5_FORMAT - - def test_init_array_path(self): - path = "foo/bar" - store = self.create_store() - init_array(store, shape=1000, chunks=100, path=path) - - # check metadata - key = path + "/" + array_meta_key - assert key in store - meta = store._metadata_class.decode_array_metadata(store[key]) - assert ZARR_FORMAT == meta["zarr_format"] - assert (1000,) == meta["shape"] - assert (100,) == meta["chunks"] - assert np.dtype(None) == meta["dtype"] - # N5Store wraps the actual compressor - compressor_config = meta["compressor"]["compressor_config"] - assert default_compressor.get_config() == compressor_config - # N5Store always has a fill value of 0 - assert meta["fill_value"] == 0 - - def test_init_array_compat(self): - store = self.create_store() - init_array(store, shape=1000, chunks=100, compressor="none") - meta = store._metadata_class.decode_array_metadata(store[array_meta_key]) - # N5Store wraps the actual compressor - compressor_config = meta["compressor"]["compressor_config"] - assert compressor_config is None - - def test_init_array_overwrite(self): - self._test_init_array_overwrite("C") - - def test_init_array_overwrite_path(self): - self._test_init_array_overwrite_path("C") - - def test_init_array_overwrite_chunk_store(self): - self._test_init_array_overwrite_chunk_store("C") - - def test_init_group_overwrite(self): - self._test_init_group_overwrite("C") - - def test_init_group_overwrite_path(self): - self._test_init_group_overwrite_path("C") - - def test_init_group_overwrite_chunk_store(self): - self._test_init_group_overwrite_chunk_store("C") - - def test_init_group(self): - store = self.create_store() - init_group(store) - store[".zattrs"] = json_dumps({"foo": "bar"}) - # check metadata - assert group_meta_key in store - assert group_meta_key in store.listdir() - assert group_meta_key in store.listdir("") - meta = store._metadata_class.decode_group_metadata(store[group_meta_key]) - assert ZARR_FORMAT == meta["zarr_format"] - - def test_filters(self): - all_filters, all_errors = zip( - *[ - (None, does_not_raise()), - ([], does_not_raise()), - ([AsType("f4", "f8")], pytest.raises(ValueError)), - ] - ) - for filters, error in zip(all_filters, all_errors): - store = self.create_store() - with error: - init_array(store, shape=1000, chunks=100, filters=filters) - -@pytest.mark.skipif(have_fsspec is False, reason="needs fsspec") -class TestN5FSStore(TestFSStore): - def create_store(self, normalize_keys=False, path=None, **kwargs): +# def test_filesystem_path(self): + +# # test behaviour with path that does not exist +# path = "data/store" +# if os.path.exists(path): +# shutil.rmtree(path) +# store = DirectoryStore(path) +# # should only be created on demand +# assert not os.path.exists(path) +# store["foo"] = b"bar" +# assert os.path.isdir(path) + +# # check correct permissions +# # regression test for https://github.com/zarr-developers/zarr-python/issues/325 +# stat = os.stat(path) +# mode = stat.st_mode & 0o666 +# umask = os.umask(0) +# os.umask(umask) +# assert mode == (0o666 & ~umask) + +# # test behaviour with file path +# with tempfile.NamedTemporaryFile() as f: +# with pytest.raises(ValueError): +# DirectoryStore(f.name) + +# def test_init_pathlib(self): +# path = tempfile.mkdtemp() +# atexit.register(atexit_rmtree, path) +# DirectoryStore(pathlib.Path(path)) + +# def test_pickle_ext(self): +# store = self.create_store() +# store2 = pickle.loads(pickle.dumps(store)) + +# # check path is preserved +# assert store.path == store2.path + +# # check point to same underlying directory +# assert self.root + "xxx" not in store +# store2[self.root + "xxx"] = b"yyy" +# assert b"yyy" == ensure_bytes(store[self.root + "xxx"]) + +# def test_setdel(self): +# store = self.create_store() +# setdel_hierarchy_checks(store, self.root) + +# def test_normalize_keys(self): +# store = self.create_store(normalize_keys=True) +# store[self.root + "FOO"] = b"bar" +# assert self.root + "FOO" in store +# assert self.root + "foo" in store + +# def test_listing_keys_slash(self): +# def mock_walker_slash(_path): +# yield from [ +# # trailing slash in first key +# ("root_with_slash/", ["d1", "g1"], [".zgroup"]), +# ("root_with_slash/d1", [], [".zarray"]), +# ("root_with_slash/g1", [], [".zgroup"]), +# ] + +# res = set(DirectoryStore._keys_fast("root_with_slash/", walker=mock_walker_slash)) +# assert res == {".zgroup", "g1/.zgroup", "d1/.zarray"} + +# def test_listing_keys_no_slash(self): +# def mock_walker_no_slash(_path): +# yield from [ +# # no trailing slash in first key +# ("root_with_no_slash", ["d1", "g1"], [".zgroup"]), +# ("root_with_no_slash/d1", [], [".zarray"]), +# ("root_with_no_slash/g1", [], [".zgroup"]), +# ] + +# res = set(DirectoryStore._keys_fast("root_with_no_slash", mock_walker_no_slash)) +# assert res == {".zgroup", "g1/.zgroup", "d1/.zarray"} + + +# @pytest.mark.skipif(have_fsspec is False, reason="needs fsspec") +# class TestFSStore(StoreTests): +# def create_store(self, normalize_keys=False, dimension_separator=".", path=None, **kwargs): + +# if path is None: +# path = tempfile.mkdtemp() +# atexit.register(atexit_rmtree, path) + +# store = FSStore( +# path, normalize_keys=normalize_keys, dimension_separator=dimension_separator, **kwargs +# ) +# return store + +# def test_init_array(self): +# store = self.create_store() +# init_array(store, shape=1000, chunks=100) + +# # check metadata +# assert array_meta_key in store +# meta = store._metadata_class.decode_array_metadata(store[array_meta_key]) +# assert ZARR_FORMAT == meta["zarr_format"] +# assert (1000,) == meta["shape"] +# assert (100,) == meta["chunks"] +# assert np.dtype(None) == meta["dtype"] +# assert meta["dimension_separator"] == "." + +# def test_dimension_separator(self): +# for x in (".", "/"): +# store = self.create_store(dimension_separator=x) +# norm = store._normalize_key +# assert ".zarray" == norm(".zarray") +# assert ".zarray" == norm("/.zarray") +# assert ".zgroup" == norm("/.zgroup") +# assert "group/.zarray" == norm("group/.zarray") +# assert "group/.zgroup" == norm("group/.zgroup") +# assert "group/.zarray" == norm("/group/.zarray") +# assert "group/.zgroup" == norm("/group/.zgroup") + +# def test_complex(self): +# path1 = tempfile.mkdtemp() +# path2 = tempfile.mkdtemp() +# store = self.create_store( +# path="simplecache::file://" + path1, +# simplecache={"same_names": True, "cache_storage": path2}, +# ) +# assert not store +# assert not os.listdir(path1) +# assert not os.listdir(path2) +# store[self.root + "foo"] = b"hello" +# assert "foo" in os.listdir(str(path1) + "/" + self.root) +# assert self.root + "foo" in store +# assert not os.listdir(str(path2)) +# assert store[self.root + "foo"] == b"hello" +# assert "foo" in os.listdir(str(path2)) + +# def test_deep_ndim(self): +# import zarr + +# store = self.create_store() +# path = None if self.version == 2 else "group1" +# foo = zarr.open_group(store=store, path=path) +# bar = foo.create_group("bar") +# baz = bar.create_dataset("baz", shape=(4, 4, 4), chunks=(2, 2, 2), dtype="i8") +# baz[:] = 1 +# if self.version == 2: +# assert set(store.listdir()) == {".zgroup", "bar"} +# else: +# assert set(store.listdir()) == {"data", "meta", "zarr.json"} +# assert set(store.listdir("meta/root/" + path)) == {"bar", "bar.group.json"} +# assert set(store.listdir("data/root/" + path)) == {"bar"} +# assert foo["bar"]["baz"][(0, 0, 0)] == 1 + +# def test_not_fsspec(self): +# import zarr + +# path = tempfile.mkdtemp() +# with pytest.raises(ValueError, match="storage_options"): +# zarr.open_array(path, mode="w", storage_options={"some": "kwargs"}) +# with pytest.raises(ValueError, match="storage_options"): +# zarr.open_group(path, mode="w", storage_options={"some": "kwargs"}) +# zarr.open_array("file://" + path, mode="w", shape=(1,), dtype="f8") + +# def test_create(self): +# import zarr + +# path1 = tempfile.mkdtemp() +# path2 = tempfile.mkdtemp() +# g = zarr.open_group("file://" + path1, mode="w", storage_options={"auto_mkdir": True}) +# a = g.create_dataset("data", shape=(8,)) +# a[:4] = [0, 1, 2, 3] +# assert "data" in os.listdir(path1) +# assert ".zgroup" in os.listdir(path1) + +# # consolidated metadata (GH#915) +# consolidate_metadata("file://" + path1) +# assert ".zmetadata" in os.listdir(path1) + +# g = zarr.open_group( +# "simplecache::file://" + path1, +# mode="r", +# storage_options={"cache_storage": path2, "same_names": True}, +# ) +# assert g.data[:].tolist() == [0, 1, 2, 3, 0, 0, 0, 0] +# with pytest.raises(PermissionError): +# g.data[:] = 1 + +# @pytest.mark.parametrize("mode,allowed", [("r", False), ("r+", True)]) +# def test_modify_consolidated(self, mode, allowed): +# import zarr + +# url = "file://" + tempfile.mkdtemp() + +# # create +# root = zarr.open_group(url, mode="w") +# root.zeros("baz", shape=(10000, 10000), chunks=(1000, 1000), dtype="i4") +# zarr.consolidate_metadata(url) + +# # reopen and modify +# root = zarr.open_consolidated(url, mode=mode) +# if allowed: +# root["baz"][0, 0] = 7 + +# root = zarr.open_consolidated(url, mode="r") +# assert root["baz"][0, 0] == 7 +# else: +# with pytest.raises(zarr.errors.ReadOnlyError): +# root["baz"][0, 0] = 7 + +# @pytest.mark.parametrize("mode", ["r", "r+"]) +# def test_modify_consolidated_metadata_raises(self, mode): +# import zarr + +# url = "file://" + tempfile.mkdtemp() + +# # create +# root = zarr.open_group(url, mode="w") +# root.zeros("baz", shape=(10000, 10000), chunks=(1000, 1000), dtype="i4") +# zarr.consolidate_metadata(url) + +# # reopen and modify +# root = zarr.open_consolidated(url, mode=mode) +# with pytest.raises(zarr.errors.ReadOnlyError): +# root["baz"].resize(100, 100) + +# def test_read_only(self): +# path = tempfile.mkdtemp() +# atexit.register(atexit_rmtree, path) +# store = self.create_store(path=path) +# store[self.root + "foo"] = b"bar" + +# store = self.create_store(path=path, mode="r") + +# with pytest.raises(PermissionError): +# store[self.root + "foo"] = b"hex" + +# with pytest.raises(PermissionError): +# del store[self.root + "foo"] + +# with pytest.raises(PermissionError): +# store.delitems([self.root + "foo"]) + +# with pytest.raises(PermissionError): +# store.setitems({self.root + "foo": b"baz"}) + +# with pytest.raises(PermissionError): +# store.clear() + +# with pytest.raises(PermissionError): +# store.rmdir(self.root + "anydir") + +# assert store[self.root + "foo"] == b"bar" + +# def test_eq(self): +# store1 = self.create_store(path="anypath") +# store2 = self.create_store(path="anypath") +# assert store1 == store2 + +# @pytest.mark.usefixtures("s3") +# def test_s3(self): +# import zarr + +# g = zarr.open_group("s3://test/out.zarr", mode="w", storage_options=self.s3so) +# a = g.create_dataset("data", shape=(8,)) +# a[:4] = [0, 1, 2, 3] + +# g = zarr.open_group("s3://test/out.zarr", mode="r", storage_options=self.s3so) + +# assert g.data[:].tolist() == [0, 1, 2, 3, 0, 0, 0, 0] + +# # test via convenience +# g = zarr.open("s3://test/out.zarr", mode="r", storage_options=self.s3so) +# assert g.data[:].tolist() == [0, 1, 2, 3, 0, 0, 0, 0] + +# @pytest.mark.usefixtures("s3") +# def test_s3_complex(self): +# import zarr + +# g = zarr.open_group("s3://test/out.zarr", mode="w", storage_options=self.s3so) +# expected = np.empty((8, 8, 8), dtype="int64") +# expected[:] = -1 +# a = g.create_dataset( +# "data", shape=(8, 8, 8), fill_value=-1, chunks=(1, 1, 1), overwrite=True +# ) +# expected[0] = 0 +# expected[3] = 3 +# expected[6, 6, 6] = 6 +# a[6, 6, 6] = 6 +# a[:4] = expected[:4] + +# b = g.create_dataset( +# "data_f", +# shape=(8,), +# chunks=(1,), +# dtype=[("foo", "S3"), ("bar", "i4")], +# fill_value=(b"b", 1), +# ) +# b[:4] = (b"aaa", 2) +# g2 = zarr.open_group("s3://test/out.zarr", mode="r", storage_options=self.s3so) + +# assert (g2.data[:] == expected).all() +# a.chunk_store.fs.invalidate_cache("test/out.zarr/data") +# a[:] = 5 +# assert (a[:] == 5).all() + +# assert g2.data_f["foo"].tolist() == [b"aaa"] * 4 + [b"b"] * 4 +# with pytest.raises(PermissionError): +# g2.data[:] = 5 + +# with pytest.raises(PermissionError): +# g2.store.setitems({}) + +# with pytest.raises(PermissionError): +# # even though overwrite=True, store is read-only, so fails +# g2.create_dataset( +# "data", shape=(8, 8, 8), fill_value=-1, chunks=(1, 1, 1), overwrite=True +# ) + +# a = g.create_dataset( +# "data", shape=(8, 8, 8), fill_value=-1, chunks=(1, 1, 1), overwrite=True +# ) +# assert (a[:] == -np.ones((8, 8, 8))).all() + + +# @pytest.mark.skipif(have_fsspec is False, reason="needs fsspec") +# class TestFSStoreWithKeySeparator(StoreTests): +# def create_store(self, normalize_keys=False, key_separator=".", **kwargs): + +# # Since the user is passing key_separator, that will take priority. +# skip_if_nested_chunks(**kwargs) + +# path = tempfile.mkdtemp() +# atexit.register(atexit_rmtree, path) +# return FSStore(path, normalize_keys=normalize_keys, key_separator=key_separator) + + +# @pytest.mark.skipif(have_fsspec is False, reason="needs fsspec") +# class TestFSStoreFromFilesystem(StoreTests): +# def create_store(self, normalize_keys=False, dimension_separator=".", path=None, **kwargs): +# import fsspec + +# fs = fsspec.filesystem("file") + +# if path is None: +# path = tempfile.mkdtemp() +# atexit.register(atexit_rmtree, path) + +# with pytest.raises(ValueError): +# # can't specify storage_options when passing an +# # existing fs object +# _ = FSStore(path, fs=fs, auto_mkdir=True) + +# store = FSStore( +# path, +# normalize_keys=normalize_keys, +# dimension_separator=dimension_separator, +# fs=fs, +# **kwargs, +# ) + +# return store + + +# @pytest.fixture() +# def s3(request): +# # writable local S3 system +# import shlex +# import subprocess +# import time + +# if "BOTO_CONFIG" not in os.environ: # pragma: no cover +# os.environ["BOTO_CONFIG"] = "/dev/null" +# if "AWS_ACCESS_KEY_ID" not in os.environ: # pragma: no cover +# os.environ["AWS_ACCESS_KEY_ID"] = "foo" +# if "AWS_SECRET_ACCESS_KEY" not in os.environ: # pragma: no cover +# os.environ["AWS_SECRET_ACCESS_KEY"] = "bar" +# requests = pytest.importorskip("requests") +# s3fs = pytest.importorskip("s3fs") +# pytest.importorskip("moto") + +# port = 5555 +# endpoint_uri = "http://127.0.0.1:%d/" % port +# proc = subprocess.Popen( +# shlex.split("moto_server s3 -p %d" % port), +# stderr=subprocess.DEVNULL, +# stdout=subprocess.DEVNULL, +# ) + +# timeout = 5 +# while timeout > 0: +# try: +# r = requests.get(endpoint_uri) +# if r.ok: +# break +# except Exception: # pragma: no cover +# pass +# timeout -= 0.1 # pragma: no cover +# time.sleep(0.1) # pragma: no cover +# s3so = dict(client_kwargs={"endpoint_url": endpoint_uri}, use_listings_cache=False) +# s3 = s3fs.S3FileSystem(anon=False, **s3so) +# s3.mkdir("test") +# request.cls.s3so = s3so +# yield +# proc.terminate() +# proc.wait() + + +# class TestNestedDirectoryStore(TestDirectoryStore): +# def create_store(self, normalize_keys=False, **kwargs): +# path = tempfile.mkdtemp() +# atexit.register(atexit_rmtree, path) +# store = NestedDirectoryStore(path, normalize_keys=normalize_keys, **kwargs) +# return store + +# def test_init_array(self): +# store = self.create_store() +# assert store._dimension_separator == "/" +# init_array(store, shape=1000, chunks=100) + +# # check metadata +# assert array_meta_key in store +# meta = store._metadata_class.decode_array_metadata(store[array_meta_key]) +# assert ZARR_FORMAT == meta["zarr_format"] +# assert (1000,) == meta["shape"] +# assert (100,) == meta["chunks"] +# assert np.dtype(None) == meta["dtype"] +# assert meta["dimension_separator"] == "/" + +# def test_chunk_nesting(self): +# store = self.create_store() +# # any path where last segment looks like a chunk key gets special handling +# store[self.root + "0.0"] = b"xxx" +# assert b"xxx" == store[self.root + "0.0"] +# # assert b'xxx' == store['0/0'] +# store[self.root + "foo/10.20.30"] = b"yyy" +# assert b"yyy" == store[self.root + "foo/10.20.30"] +# # assert b'yyy' == store['foo/10/20/30'] +# store[self.root + "42"] = b"zzz" +# assert b"zzz" == store[self.root + "42"] + +# def test_listdir(self): +# store = self.create_store() +# z = zarr.zeros((10, 10), chunks=(5, 5), store=store) +# z[:] = 1 # write to all chunks +# for k in store.listdir(): +# assert store.get(k) is not None + + +# class TestNestedDirectoryStoreNone: +# def test_value_error(self): +# path = tempfile.mkdtemp() +# atexit.register(atexit_rmtree, path) +# store = NestedDirectoryStore(path, normalize_keys=True, dimension_separator=None) +# assert store._dimension_separator == "/" + + +# class TestNestedDirectoryStoreWithWrongValue: +# def test_value_error(self): +# path = tempfile.mkdtemp() +# atexit.register(atexit_rmtree, path) +# with pytest.raises(ValueError): +# NestedDirectoryStore(path, normalize_keys=True, dimension_separator=".") + + +# class TestN5Store(TestNestedDirectoryStore): +# def create_store(self, normalize_keys=False): +# path = tempfile.mkdtemp() +# atexit.register(atexit_rmtree, path) +# store = N5Store(path, normalize_keys=normalize_keys) +# return store + +# def test_equal(self): +# store_a = self.create_store() +# store_b = N5Store(store_a.path) +# assert store_a == store_b + +# @pytest.mark.parametrize("zarr_meta_key", [".zarray", ".zattrs", ".zgroup"]) +# def test_del_zarr_meta_key(self, zarr_meta_key): +# store = self.create_store() +# store[n5_attrs_key] = json_dumps({"foo": "bar"}) +# del store[zarr_meta_key] +# assert n5_attrs_key not in store + +# def test_chunk_nesting(self): +# store = self.create_store() +# store["0.0"] = b"xxx" +# assert "0.0" in store +# assert b"xxx" == store["0.0"] +# # assert b'xxx' == store['0/0'] +# store["foo/10.20.30"] = b"yyy" +# assert "foo/10.20.30" in store +# assert b"yyy" == store["foo/10.20.30"] +# # N5 reverses axis order +# assert b"yyy" == store["foo/30/20/10"] +# del store["foo/10.20.30"] +# assert "foo/30/20/10" not in store +# store["42"] = b"zzz" +# assert "42" in store +# assert b"zzz" == store["42"] + +# def test_init_array(self): +# store = self.create_store() +# init_array(store, shape=1000, chunks=100) + +# # check metadata +# assert array_meta_key in store +# meta = store._metadata_class.decode_array_metadata(store[array_meta_key]) +# assert ZARR_FORMAT == meta["zarr_format"] +# assert (1000,) == meta["shape"] +# assert (100,) == meta["chunks"] +# assert np.dtype(None) == meta["dtype"] +# # N5Store wraps the actual compressor +# compressor_config = meta["compressor"]["compressor_config"] +# assert default_compressor.get_config() == compressor_config +# # N5Store always has a fill value of 0 +# assert meta["fill_value"] == 0 +# assert meta["dimension_separator"] == "." +# # Top-level groups AND arrays should have +# # the n5 keyword in metadata +# raw_n5_meta = json.loads(store[n5_attrs_key]) +# assert raw_n5_meta.get("n5", None) == N5_FORMAT + +# def test_init_array_path(self): +# path = "foo/bar" +# store = self.create_store() +# init_array(store, shape=1000, chunks=100, path=path) + +# # check metadata +# key = path + "/" + array_meta_key +# assert key in store +# meta = store._metadata_class.decode_array_metadata(store[key]) +# assert ZARR_FORMAT == meta["zarr_format"] +# assert (1000,) == meta["shape"] +# assert (100,) == meta["chunks"] +# assert np.dtype(None) == meta["dtype"] +# # N5Store wraps the actual compressor +# compressor_config = meta["compressor"]["compressor_config"] +# assert default_compressor.get_config() == compressor_config +# # N5Store always has a fill value of 0 +# assert meta["fill_value"] == 0 + +# def test_init_array_compat(self): +# store = self.create_store() +# init_array(store, shape=1000, chunks=100, compressor="none") +# meta = store._metadata_class.decode_array_metadata(store[array_meta_key]) +# # N5Store wraps the actual compressor +# compressor_config = meta["compressor"]["compressor_config"] +# assert compressor_config is None + +# def test_init_array_overwrite(self): +# self._test_init_array_overwrite("C") + +# def test_init_array_overwrite_path(self): +# self._test_init_array_overwrite_path("C") + +# def test_init_array_overwrite_chunk_store(self): +# self._test_init_array_overwrite_chunk_store("C") + +# def test_init_group_overwrite(self): +# self._test_init_group_overwrite("C") + +# def test_init_group_overwrite_path(self): +# self._test_init_group_overwrite_path("C") + +# def test_init_group_overwrite_chunk_store(self): +# self._test_init_group_overwrite_chunk_store("C") + +# def test_init_group(self): +# store = self.create_store() +# init_group(store) +# store[".zattrs"] = json_dumps({"foo": "bar"}) +# # check metadata +# assert group_meta_key in store +# assert group_meta_key in store.listdir() +# assert group_meta_key in store.listdir("") +# meta = store._metadata_class.decode_group_metadata(store[group_meta_key]) +# assert ZARR_FORMAT == meta["zarr_format"] + +# def test_filters(self): +# all_filters, all_errors = zip( +# *[ +# (None, does_not_raise()), +# ([], does_not_raise()), +# ([AsType("f4", "f8")], pytest.raises(ValueError)), +# ] +# ) +# for filters, error in zip(all_filters, all_errors): +# store = self.create_store() +# with error: +# init_array(store, shape=1000, chunks=100, filters=filters) + + +# @pytest.mark.skipif(have_fsspec is False, reason="needs fsspec") +# class TestN5FSStore(TestFSStore): +# def create_store(self, normalize_keys=False, path=None, **kwargs): + +# if path is None: +# path = tempfile.mkdtemp() +# atexit.register(atexit_rmtree, path) + +# store = N5FSStore(path, normalize_keys=normalize_keys, **kwargs) +# return store + +# def test_equal(self): +# store_a = self.create_store() +# store_b = N5FSStore(store_a.path) +# assert store_a == store_b + +# # This is copied wholesale from the N5Store tests. The same test could +# # be run by making TestN5FSStore inherit from both TestFSStore and +# # TestN5Store, but a direct copy is arguably more explicit. + +# @pytest.mark.parametrize("zarr_meta_key", [".zarray", ".zattrs", ".zgroup"]) +# def test_del_zarr_meta_key(self, zarr_meta_key): +# store = self.create_store() +# store[n5_attrs_key] = json_dumps({"foo": "bar"}) +# del store[zarr_meta_key] +# assert n5_attrs_key not in store + +# def test_chunk_nesting(self): +# store = self.create_store() +# store["0.0"] = b"xxx" +# assert "0.0" in store +# assert b"xxx" == store["0.0"] +# # assert b'xxx' == store['0/0'] +# store["foo/10.20.30"] = b"yyy" +# assert "foo/10.20.30" in store +# assert b"yyy" == store["foo/10.20.30"] +# # N5 reverses axis order +# assert b"yyy" == store["foo/30/20/10"] +# del store["foo/10.20.30"] +# assert "foo/30/20/10" not in store +# store["42"] = b"zzz" +# assert "42" in store +# assert b"zzz" == store["42"] + +# def test_init_array(self): +# store = self.create_store() +# init_array(store, shape=1000, chunks=100) + +# # check metadata +# assert array_meta_key in store +# meta = store._metadata_class.decode_array_metadata(store[array_meta_key]) +# assert ZARR_FORMAT == meta["zarr_format"] +# assert (1000,) == meta["shape"] +# assert (100,) == meta["chunks"] +# assert np.dtype(None) == meta["dtype"] +# # N5Store wraps the actual compressor +# compressor_config = meta["compressor"]["compressor_config"] +# assert default_compressor.get_config() == compressor_config +# # N5Store always has a fill value of 0 +# assert meta["fill_value"] == 0 +# assert meta["dimension_separator"] == "." +# # Top-level groups AND arrays should have +# # the n5 keyword in metadata +# raw_n5_meta = json.loads(store[n5_attrs_key]) +# assert raw_n5_meta.get("n5", None) == N5_FORMAT + +# def test_init_array_path(self): +# path = "foo/bar" +# store = self.create_store() +# init_array(store, shape=1000, chunks=100, path=path) + +# # check metadata +# key = path + "/" + array_meta_key +# assert key in store +# meta = store._metadata_class.decode_array_metadata(store[key]) +# assert ZARR_FORMAT == meta["zarr_format"] +# assert (1000,) == meta["shape"] +# assert (100,) == meta["chunks"] +# assert np.dtype(None) == meta["dtype"] +# # N5Store wraps the actual compressor +# compressor_config = meta["compressor"]["compressor_config"] +# assert default_compressor.get_config() == compressor_config +# # N5Store always has a fill value of 0 +# assert meta["fill_value"] == 0 + +# def test_init_array_compat(self): +# store = self.create_store() +# init_array(store, shape=1000, chunks=100, compressor="none") +# meta = store._metadata_class.decode_array_metadata(store[array_meta_key]) +# # N5Store wraps the actual compressor +# compressor_config = meta["compressor"]["compressor_config"] +# assert compressor_config is None + +# def test_init_array_overwrite(self): +# self._test_init_array_overwrite("C") + +# def test_init_array_overwrite_path(self): +# self._test_init_array_overwrite_path("C") + +# def test_init_array_overwrite_chunk_store(self): +# self._test_init_array_overwrite_chunk_store("C") + +# def test_init_group_overwrite(self): +# self._test_init_group_overwrite("C") + +# def test_init_group_overwrite_path(self): +# self._test_init_group_overwrite_path("C") + +# def test_init_group_overwrite_chunk_store(self): +# self._test_init_group_overwrite_chunk_store("C") + +# def test_dimension_separator(self): + +# with pytest.warns(UserWarning, match="dimension_separator"): +# self.create_store(dimension_separator="/") + +# def test_init_group(self): +# store = self.create_store() +# init_group(store) +# store[".zattrs"] = json_dumps({"foo": "bar"}) +# # check metadata +# assert group_meta_key in store +# assert group_meta_key in store.listdir() +# assert group_meta_key in store.listdir("") +# meta = store._metadata_class.decode_group_metadata(store[group_meta_key]) +# assert ZARR_FORMAT == meta["zarr_format"] + +# def test_filters(self): +# all_filters, all_errors = zip( +# *[ +# (None, does_not_raise()), +# ([], does_not_raise()), +# ([AsType("f4", "f8")], pytest.raises(ValueError)), +# ] +# ) +# for filters, error in zip(all_filters, all_errors): +# store = self.create_store() +# with error: +# init_array(store, shape=1000, chunks=100, filters=filters) + + +# @pytest.mark.skipif(have_fsspec is False, reason="needs fsspec") +# class TestNestedFSStore(TestNestedDirectoryStore): +# def create_store(self, normalize_keys=False, path=None, **kwargs): +# if path is None: +# path = tempfile.mkdtemp() +# atexit.register(atexit_rmtree, path) +# store = FSStore( +# path, normalize_keys=normalize_keys, +# dimension_separator="/", auto_mkdir=True, **kwargs +# ) +# return store + +# def test_numbered_groups(self): +# import zarr + +# # Create an array +# store = self.create_store() +# group = zarr.group(store=store) +# arr = group.create_dataset("0", shape=(10, 10)) +# arr[1] = 1 + +# # Read it back +# store = self.create_store(path=store.path) +# zarr.open_group(store.path)["0"] + + +# class TestTempStore(StoreTests): +# def create_store(self, **kwargs): +# skip_if_nested_chunks(**kwargs) +# return TempStore(**kwargs) + +# def test_setdel(self): +# store = self.create_store() +# setdel_hierarchy_checks(store, self.root) + + +# class TestZipStore(StoreTests): + +# ZipStoreClass = ZipStore + +# def create_store(self, **kwargs): +# path = mktemp(suffix=".zip") +# atexit.register(os.remove, path) +# store = ZipStore(path, mode="w", **kwargs) +# return store + +# def test_mode(self): +# with self.ZipStoreClass("data/store.zip", mode="w") as store: +# store[self.root + "foo"] = b"bar" +# store = self.ZipStoreClass("data/store.zip", mode="r") +# with pytest.raises(PermissionError): +# store[self.root + "foo"] = b"bar" +# with pytest.raises(PermissionError): +# store.clear() + +# def test_flush(self): +# store = self.ZipStoreClass("data/store.zip", mode="w") +# store[self.root + "foo"] = b"bar" +# store.flush() +# assert store[self.root + "foo"] == b"bar" +# store.close() + +# store = self.ZipStoreClass("data/store.zip", mode="r") +# store.flush() # no-op + +# def test_context_manager(self): +# with self.create_store() as store: +# store[self.root + "foo"] = b"bar" +# store[self.root + "baz"] = b"qux" +# assert 2 == len(store) + +# def test_pop(self): +# # override because not implemented +# store = self.create_store() +# store[self.root + "foo"] = b"bar" +# with pytest.raises(NotImplementedError): +# store.pop(self.root + "foo") + +# def test_popitem(self): +# # override because not implemented +# store = self.create_store() +# store[self.root + "foo"] = b"bar" +# with pytest.raises(NotImplementedError): +# store.popitem() + +# def test_permissions(self): +# store = self.ZipStoreClass("data/store.zip", mode="w") +# foo_key = "foo" if self.version == 2 else self.root + "foo" +# # TODO: cannot provide key ending in / for v3 +# # how to create an empty folder in that case? +# baz_key = "baz/" if self.version == 2 else self.root + "baz" +# store[foo_key] = b"bar" +# store[baz_key] = b"" + +# store.flush() +# store.close() +# z = ZipFile("data/store.zip", "r") +# info = z.getinfo(foo_key) +# perm = oct(info.external_attr >> 16) +# assert perm == "0o644" +# info = z.getinfo(baz_key) +# perm = oct(info.external_attr >> 16) +# # only for posix platforms +# if os.name == "posix": +# if self.version == 2: +# assert perm == "0o40775" +# else: +# # baz/ on v2, but baz on v3, so not a directory +# assert perm == "0o644" +# z.close() + +# def test_store_and_retrieve_ndarray(self): +# store = ZipStore("data/store.zip") +# x = np.array([[1, 2], [3, 4]]) +# store["foo"] = x +# y = np.frombuffer(store["foo"], dtype=x.dtype).reshape(x.shape) +# assert np.array_equiv(y, x) + + +# class TestDBMStore(StoreTests): +# def create_store(self, dimension_separator=None): +# path = mktemp(suffix=".anydbm") +# atexit.register(atexit_rmglob, path + "*") +# # create store using default dbm implementation +# store = DBMStore(path, flag="n", dimension_separator=dimension_separator) +# return store + +# def test_context_manager(self): +# with self.create_store() as store: +# store[self.root + "foo"] = b"bar" +# store[self.root + "baz"] = b"qux" +# assert 2 == len(store) + + +# class TestDBMStoreDumb(TestDBMStore): +# def create_store(self, **kwargs): +# path = mktemp(suffix=".dumbdbm") +# atexit.register(atexit_rmglob, path + "*") + +# import dbm.dumb as dumbdbm + +# store = DBMStore(path, flag="n", open=dumbdbm.open, **kwargs) +# return store + + +# class TestDBMStoreGnu(TestDBMStore): +# def create_store(self, **kwargs): +# gdbm = pytest.importorskip("dbm.gnu") +# path = mktemp(suffix=".gdbm") # pragma: no cover +# atexit.register(os.remove, path) # pragma: no cover +# store = DBMStore( +# path, flag="n", open=gdbm.open, write_lock=False, **kwargs +# ) # pragma: no cover +# return store # pragma: no cover + + +# class TestDBMStoreNDBM(TestDBMStore): +# def create_store(self, **kwargs): +# ndbm = pytest.importorskip("dbm.ndbm") +# path = mktemp(suffix=".ndbm") # pragma: no cover +# atexit.register(atexit_rmglob, path + "*") # pragma: no cover +# store = DBMStore(path, flag="n", open=ndbm.open, **kwargs) # pragma: no cover +# return store # pragma: no cover + + +# class TestDBMStoreBerkeleyDB(TestDBMStore): +# def create_store(self, **kwargs): +# bsddb3 = pytest.importorskip("bsddb3") +# path = mktemp(suffix=".dbm") +# atexit.register(os.remove, path) +# store = DBMStore(path, flag="n", open=bsddb3.btopen, write_lock=False, **kwargs) +# return store + + +# class TestLMDBStore(StoreTests): +# def create_store(self, **kwargs): +# pytest.importorskip("lmdb") +# path = mktemp(suffix=".lmdb") +# atexit.register(atexit_rmtree, path) +# buffers = True +# store = LMDBStore(path, buffers=buffers, **kwargs) +# return store + +# def test_context_manager(self): +# with self.create_store() as store: +# store[self.root + "foo"] = b"bar" +# store[self.root + "baz"] = b"qux" +# assert 2 == len(store) + + +# class TestSQLiteStore(StoreTests): +# def create_store(self, **kwargs): +# pytest.importorskip("sqlite3") +# path = mktemp(suffix=".db") +# atexit.register(atexit_rmtree, path) +# store = SQLiteStore(path, **kwargs) +# return store + +# def test_underscore_in_name(self): +# path = mktemp(suffix=".db") +# atexit.register(atexit_rmtree, path) +# store = SQLiteStore(path) +# store["a"] = b"aaa" +# store["a_b"] = b"aa_bb" +# store.rmdir("a") +# assert "a_b" in store + + +# class TestSQLiteStoreInMemory(TestSQLiteStore): +# def create_store(self, **kwargs): +# pytest.importorskip("sqlite3") +# store = SQLiteStore(":memory:", **kwargs) +# return store + +# def test_pickle(self): + +# # setup store +# store = self.create_store() +# store[self.root + "foo"] = b"bar" +# store[self.root + "baz"] = b"quux" + +# # round-trip through pickle +# with pytest.raises(PicklingError): +# pickle.dumps(store) + + +# @skip_test_env_var("ZARR_TEST_MONGO") +# class TestMongoDBStore(StoreTests): +# def create_store(self, **kwargs): +# pytest.importorskip("pymongo") +# store = MongoDBStore( +# host="127.0.0.1", database="zarr_tests", collection="zarr_tests", **kwargs +# ) +# # start with an empty store +# store.clear() +# return store + + +# @skip_test_env_var("ZARR_TEST_REDIS") +# class TestRedisStore(StoreTests): +# def create_store(self, **kwargs): +# # TODO: this is the default host for Redis on Travis, +# # we probably want to generalize this though +# pytest.importorskip("redis") +# store = RedisStore(host="localhost", port=6379, **kwargs) +# # start with an empty store +# store.clear() +# return store + + +# class TestLRUStoreCache(StoreTests): + +# CountingClass = CountingDict +# LRUStoreClass = LRUStoreCache + +# def create_store(self, **kwargs): +# # wrapper therefore no dimension_separator argument +# skip_if_nested_chunks(**kwargs) +# return self.LRUStoreClass(dict(), max_size=2**27) + +# def test_cache_values_no_max_size(self): + +# # setup store +# store = self.CountingClass() +# foo_key = self.root + "foo" +# bar_key = self.root + "bar" +# store[foo_key] = b"xxx" +# store[bar_key] = b"yyy" +# assert 0 == store.counter["__getitem__", foo_key] +# assert 1 == store.counter["__setitem__", foo_key] +# assert 0 == store.counter["__getitem__", bar_key] +# assert 1 == store.counter["__setitem__", bar_key] + +# # setup cache +# cache = self.LRUStoreClass(store, max_size=None) +# assert 0 == cache.hits +# assert 0 == cache.misses + +# # test first __getitem__, cache miss +# assert b"xxx" == cache[foo_key] +# assert 1 == store.counter["__getitem__", foo_key] +# assert 1 == store.counter["__setitem__", foo_key] +# assert 0 == cache.hits +# assert 1 == cache.misses + +# # test second __getitem__, cache hit +# assert b"xxx" == cache[foo_key] +# assert 1 == store.counter["__getitem__", foo_key] +# assert 1 == store.counter["__setitem__", foo_key] +# assert 1 == cache.hits +# assert 1 == cache.misses + +# # test __setitem__, __getitem__ +# cache[foo_key] = b"zzz" +# assert 1 == store.counter["__getitem__", foo_key] +# assert 2 == store.counter["__setitem__", foo_key] +# # should be a cache hit +# assert b"zzz" == cache[foo_key] +# assert 1 == store.counter["__getitem__", foo_key] +# assert 2 == store.counter["__setitem__", foo_key] +# assert 2 == cache.hits +# assert 1 == cache.misses + +# # manually invalidate all cached values +# cache.invalidate_values() +# assert b"zzz" == cache[foo_key] +# assert 2 == store.counter["__getitem__", foo_key] +# assert 2 == store.counter["__setitem__", foo_key] +# cache.invalidate() +# assert b"zzz" == cache[foo_key] +# assert 3 == store.counter["__getitem__", foo_key] +# assert 2 == store.counter["__setitem__", foo_key] + +# # test __delitem__ +# del cache[foo_key] +# with pytest.raises(KeyError): +# # noinspection PyStatementEffect +# cache[foo_key] +# with pytest.raises(KeyError): +# # noinspection PyStatementEffect +# store[foo_key] + +# # verify other keys untouched +# assert 0 == store.counter["__getitem__", bar_key] +# assert 1 == store.counter["__setitem__", bar_key] + +# def test_cache_values_with_max_size(self): + +# # setup store +# store = self.CountingClass() +# foo_key = self.root + "foo" +# bar_key = self.root + "bar" +# store[foo_key] = b"xxx" +# store[bar_key] = b"yyy" +# assert 0 == store.counter["__getitem__", foo_key] +# assert 0 == store.counter["__getitem__", bar_key] +# # setup cache - can only hold one item +# cache = self.LRUStoreClass(store, max_size=5) +# assert 0 == cache.hits +# assert 0 == cache.misses + +# # test first 'foo' __getitem__, cache miss +# assert b"xxx" == cache[foo_key] +# assert 1 == store.counter["__getitem__", foo_key] +# assert 0 == cache.hits +# assert 1 == cache.misses + +# # test second 'foo' __getitem__, cache hit +# assert b"xxx" == cache[foo_key] +# assert 1 == store.counter["__getitem__", foo_key] +# assert 1 == cache.hits +# assert 1 == cache.misses + +# # test first 'bar' __getitem__, cache miss +# assert b"yyy" == cache[bar_key] +# assert 1 == store.counter["__getitem__", bar_key] +# assert 1 == cache.hits +# assert 2 == cache.misses + +# # test second 'bar' __getitem__, cache hit +# assert b"yyy" == cache[bar_key] +# assert 1 == store.counter["__getitem__", bar_key] +# assert 2 == cache.hits +# assert 2 == cache.misses + +# # test 'foo' __getitem__, should have been evicted, cache miss +# assert b"xxx" == cache[foo_key] +# assert 2 == store.counter["__getitem__", foo_key] +# assert 2 == cache.hits +# assert 3 == cache.misses + +# # test 'bar' __getitem__, should have been evicted, cache miss +# assert b"yyy" == cache[bar_key] +# assert 2 == store.counter["__getitem__", bar_key] +# assert 2 == cache.hits +# assert 4 == cache.misses + +# # setup store +# store = self.CountingClass() +# store[foo_key] = b"xxx" +# store[bar_key] = b"yyy" +# assert 0 == store.counter["__getitem__", foo_key] +# assert 0 == store.counter["__getitem__", bar_key] +# # setup cache - can hold two items +# cache = self.LRUStoreClass(store, max_size=6) +# assert 0 == cache.hits +# assert 0 == cache.misses + +# # test first 'foo' __getitem__, cache miss +# assert b"xxx" == cache[foo_key] +# assert 1 == store.counter["__getitem__", foo_key] +# assert 0 == cache.hits +# assert 1 == cache.misses + +# # test second 'foo' __getitem__, cache hit +# assert b"xxx" == cache[foo_key] +# assert 1 == store.counter["__getitem__", foo_key] +# assert 1 == cache.hits +# assert 1 == cache.misses + +# # test first 'bar' __getitem__, cache miss +# assert b"yyy" == cache[bar_key] +# assert 1 == store.counter["__getitem__", bar_key] +# assert 1 == cache.hits +# assert 2 == cache.misses + +# # test second 'bar' __getitem__, cache hit +# assert b"yyy" == cache[bar_key] +# assert 1 == store.counter["__getitem__", bar_key] +# assert 2 == cache.hits +# assert 2 == cache.misses + +# # test 'foo' __getitem__, should still be cached +# assert b"xxx" == cache[foo_key] +# assert 1 == store.counter["__getitem__", foo_key] +# assert 3 == cache.hits +# assert 2 == cache.misses + +# # test 'bar' __getitem__, should still be cached +# assert b"yyy" == cache[bar_key] +# assert 1 == store.counter["__getitem__", bar_key] +# assert 4 == cache.hits +# assert 2 == cache.misses + +# def test_cache_keys(self): + +# # setup +# store = self.CountingClass() +# foo_key = self.root + "foo" +# bar_key = self.root + "bar" +# baz_key = self.root + "baz" +# store[foo_key] = b"xxx" +# store[bar_key] = b"yyy" +# assert 0 == store.counter["__contains__", foo_key] +# assert 0 == store.counter["__iter__"] +# assert 0 == store.counter["keys"] +# cache = self.LRUStoreClass(store, max_size=None) + +# # keys should be cached on first call +# keys = sorted(cache.keys()) +# assert keys == [bar_key, foo_key] +# assert 1 == store.counter["keys"] +# # keys should now be cached +# assert keys == sorted(cache.keys()) +# assert 1 == store.counter["keys"] +# assert foo_key in cache +# assert 1 == store.counter["__contains__", foo_key] +# # the next check for `foo_key` is cached +# assert foo_key in cache +# assert 1 == store.counter["__contains__", foo_key] +# assert keys == sorted(cache) +# assert 0 == store.counter["__iter__"] +# assert 1 == store.counter["keys"] + +# # cache should be cleared if store is modified - crude but simple for now +# cache[baz_key] = b"zzz" +# keys = sorted(cache.keys()) +# assert keys == [bar_key, baz_key, foo_key] +# assert 2 == store.counter["keys"] +# # keys should now be cached +# assert keys == sorted(cache.keys()) +# assert 2 == store.counter["keys"] + +# # manually invalidate keys +# cache.invalidate_keys() +# keys = sorted(cache.keys()) +# assert keys == [bar_key, baz_key, foo_key] +# assert 3 == store.counter["keys"] +# assert 1 == store.counter["__contains__", foo_key] +# assert 0 == store.counter["__iter__"] +# cache.invalidate_keys() +# keys = sorted(cache) +# assert keys == [bar_key, baz_key, foo_key] +# assert 4 == store.counter["keys"] +# assert 1 == store.counter["__contains__", foo_key] +# assert 0 == store.counter["__iter__"] +# cache.invalidate_keys() +# assert foo_key in cache +# assert 4 == store.counter["keys"] +# assert 2 == store.counter["__contains__", foo_key] +# assert 0 == store.counter["__iter__"] + +# # check these would get counted if called directly +# assert foo_key in store +# assert 3 == store.counter["__contains__", foo_key] +# assert keys == sorted(store) +# assert 1 == store.counter["__iter__"] + + +# def test_getsize(): +# store = KVStore(dict()) +# store["foo"] = b"aaa" +# store["bar"] = b"bbbb" +# store["baz/quux"] = b"ccccc" +# assert 7 == getsize(store) +# assert 5 == getsize(store, "baz") + +# store = KVStore(dict()) +# store["boo"] = None +# assert -1 == getsize(store) + + +# @pytest.mark.parametrize("dict_store", [False, True]) +# def test_migrate_1to2(dict_store): +# from zarr import meta_v1 + +# # N.B., version 1 did not support hierarchies, so we only have to be +# # concerned about migrating a single array at the root of the store + +# # setup +# store = dict() if dict_store else KVStore(dict()) +# meta = dict( +# shape=(100,), +# chunks=(10,), +# dtype=np.dtype("f4"), +# compression="zlib", +# compression_opts=1, +# fill_value=None, +# order="C", +# ) +# meta_json = meta_v1.encode_metadata(meta) +# store["meta"] = meta_json +# store["attrs"] = json.dumps(dict()).encode("ascii") + +# # run migration +# migrate_1to2(store) + +# # check results +# assert "meta" not in store +# assert array_meta_key in store +# assert "attrs" not in store +# assert attrs_key in store +# meta_migrated = decode_array_metadata(store[array_meta_key]) +# assert 2 == meta_migrated["zarr_format"] + +# # preserved fields +# for f in "shape", "chunks", "dtype", "fill_value", "order": +# assert meta[f] == meta_migrated[f] + +# # migrate should have added empty filters field +# assert meta_migrated["filters"] is None + +# # check compression and compression_opts migrated to compressor +# assert "compression" not in meta_migrated +# assert "compression_opts" not in meta_migrated +# assert meta_migrated["compressor"] == Zlib(1).get_config() + +# # check dict compression_opts +# store = dict() if dict_store else KVStore(dict()) +# meta["compression"] = "blosc" +# meta["compression_opts"] = dict(cname="lz4", clevel=5, shuffle=1) +# meta_json = meta_v1.encode_metadata(meta) +# store["meta"] = meta_json +# store["attrs"] = json.dumps(dict()).encode("ascii") +# migrate_1to2(store) +# meta_migrated = decode_array_metadata(store[array_meta_key]) +# assert "compression" not in meta_migrated +# assert "compression_opts" not in meta_migrated +# assert meta_migrated["compressor"] == Blosc(cname="lz4", clevel=5, shuffle=1).get_config() + +# # check 'none' compression is migrated to None (null in JSON) +# store = dict() if dict_store else KVStore(dict()) +# meta["compression"] = "none" +# meta_json = meta_v1.encode_metadata(meta) +# store["meta"] = meta_json +# store["attrs"] = json.dumps(dict()).encode("ascii") +# migrate_1to2(store) +# meta_migrated = decode_array_metadata(store[array_meta_key]) +# assert "compression" not in meta_migrated +# assert "compression_opts" not in meta_migrated +# assert meta_migrated["compressor"] is None + + +# def test_format_compatibility(): + +# # This test is intended to catch any unintended changes that break the ability to +# # read data stored with a previous minor version (which should be format-compatible). + +# # fixture data +# fixture = group(store=DirectoryStore("fixture")) + +# # set seed to get consistent random data +# np.random.seed(42) + +# arrays_chunks = [ +# (np.arange(1111, dtype=" 2 else "" +# # setup some values +# store[prefix + "a"] = b"aaa" +# store[prefix + "b"] = b"bbb" +# store[prefix + "c/d"] = b"ddd" +# store[prefix + "c/e/f"] = b"fff" + +# # test iterators on store with data +# assert 4 == len(store) +# keys = [prefix + "a", prefix + "b", prefix + "c/d", prefix + "c/e/f"] +# values = [b"aaa", b"bbb", b"ddd", b"fff"] +# items = list(zip(keys, values)) +# assert set(keys) == set(store) +# assert set(keys) == set(store.keys()) +# assert set(values) == set(store.values()) +# assert set(items) == set(store.items()) - if path is None: - path = tempfile.mkdtemp() - atexit.register(atexit_rmtree, path) +# def test_getsize(self): +# return super().test_getsize() - store = N5FSStore(path, normalize_keys=normalize_keys, **kwargs) - return store - - def test_equal(self): - store_a = self.create_store() - store_b = N5FSStore(store_a.path) - assert store_a == store_b +# def test_hierarchy(self): +# return super().test_hierarchy() - # This is copied wholesale from the N5Store tests. The same test could - # be run by making TestN5FSStore inherit from both TestFSStore and - # TestN5Store, but a direct copy is arguably more explicit. +# @pytest.mark.skipif(sys.version_info < (3, 7), reason="attr not serializable in py36") +# def test_pickle(self): +# # internal attribute on ContainerClient isn't serializable for py36 and earlier +# super().test_pickle() - @pytest.mark.parametrize("zarr_meta_key", [".zarray", ".zattrs", ".zgroup"]) - def test_del_zarr_meta_key(self, zarr_meta_key): - store = self.create_store() - store[n5_attrs_key] = json_dumps({"foo": "bar"}) - del store[zarr_meta_key] - assert n5_attrs_key not in store - - def test_chunk_nesting(self): - store = self.create_store() - store["0.0"] = b"xxx" - assert "0.0" in store - assert b"xxx" == store["0.0"] - # assert b'xxx' == store['0/0'] - store["foo/10.20.30"] = b"yyy" - assert "foo/10.20.30" in store - assert b"yyy" == store["foo/10.20.30"] - # N5 reverses axis order - assert b"yyy" == store["foo/30/20/10"] - del store["foo/10.20.30"] - assert "foo/30/20/10" not in store - store["42"] = b"zzz" - assert "42" in store - assert b"zzz" == store["42"] - - def test_init_array(self): - store = self.create_store() - init_array(store, shape=1000, chunks=100) - - # check metadata - assert array_meta_key in store - meta = store._metadata_class.decode_array_metadata(store[array_meta_key]) - assert ZARR_FORMAT == meta["zarr_format"] - assert (1000,) == meta["shape"] - assert (100,) == meta["chunks"] - assert np.dtype(None) == meta["dtype"] - # N5Store wraps the actual compressor - compressor_config = meta["compressor"]["compressor_config"] - assert default_compressor.get_config() == compressor_config - # N5Store always has a fill value of 0 - assert meta["fill_value"] == 0 - assert meta["dimension_separator"] == "." - # Top-level groups AND arrays should have - # the n5 keyword in metadata - raw_n5_meta = json.loads(store[n5_attrs_key]) - assert raw_n5_meta.get("n5", None) == N5_FORMAT - - def test_init_array_path(self): - path = "foo/bar" - store = self.create_store() - init_array(store, shape=1000, chunks=100, path=path) - - # check metadata - key = path + "/" + array_meta_key - assert key in store - meta = store._metadata_class.decode_array_metadata(store[key]) - assert ZARR_FORMAT == meta["zarr_format"] - assert (1000,) == meta["shape"] - assert (100,) == meta["chunks"] - assert np.dtype(None) == meta["dtype"] - # N5Store wraps the actual compressor - compressor_config = meta["compressor"]["compressor_config"] - assert default_compressor.get_config() == compressor_config - # N5Store always has a fill value of 0 - assert meta["fill_value"] == 0 - - def test_init_array_compat(self): - store = self.create_store() - init_array(store, shape=1000, chunks=100, compressor="none") - meta = store._metadata_class.decode_array_metadata(store[array_meta_key]) - # N5Store wraps the actual compressor - compressor_config = meta["compressor"]["compressor_config"] - assert compressor_config is None - def test_init_array_overwrite(self): - self._test_init_array_overwrite("C") +# class TestConsolidatedMetadataStore: - def test_init_array_overwrite_path(self): - self._test_init_array_overwrite_path("C") +# version = 2 +# ConsolidatedMetadataClass = ConsolidatedMetadataStore - def test_init_array_overwrite_chunk_store(self): - self._test_init_array_overwrite_chunk_store("C") +# @property +# def metadata_key(self): +# return ".zmetadata" - def test_init_group_overwrite(self): - self._test_init_group_overwrite("C") +# def test_bad_format(self): - def test_init_group_overwrite_path(self): - self._test_init_group_overwrite_path("C") +# # setup store with consolidated metadata +# store = dict() +# consolidated = { +# # bad format version +# "zarr_consolidated_format": 0, +# } +# store[self.metadata_key] = json.dumps(consolidated).encode() - def test_init_group_overwrite_chunk_store(self): - self._test_init_group_overwrite_chunk_store("C") +# # check appropriate error is raised +# with pytest.raises(MetadataError): +# self.ConsolidatedMetadataClass(store) - def test_dimension_separator(self): +# def test_bad_store_version(self): +# with pytest.raises(ValueError): +# self.ConsolidatedMetadataClass(KVStoreV3(dict())) - with pytest.warns(UserWarning, match="dimension_separator"): - self.create_store(dimension_separator="/") +# def test_read_write(self): - def test_init_group(self): - store = self.create_store() - init_group(store) - store[".zattrs"] = json_dumps({"foo": "bar"}) - # check metadata - assert group_meta_key in store - assert group_meta_key in store.listdir() - assert group_meta_key in store.listdir("") - meta = store._metadata_class.decode_group_metadata(store[group_meta_key]) - assert ZARR_FORMAT == meta["zarr_format"] - - def test_filters(self): - all_filters, all_errors = zip( - *[ - (None, does_not_raise()), - ([], does_not_raise()), - ([AsType("f4", "f8")], pytest.raises(ValueError)), - ] - ) - for filters, error in zip(all_filters, all_errors): - store = self.create_store() - with error: - init_array(store, shape=1000, chunks=100, filters=filters) +# # setup store with consolidated metadata +# store = dict() +# consolidated = { +# "zarr_consolidated_format": 1, +# "metadata": { +# "foo": "bar", +# "baz": 42, +# }, +# } +# store[self.metadata_key] = json.dumps(consolidated).encode() +# # create consolidated store +# cs = self.ConsolidatedMetadataClass(store) -@pytest.mark.skipif(have_fsspec is False, reason="needs fsspec") -class TestNestedFSStore(TestNestedDirectoryStore): - def create_store(self, normalize_keys=False, path=None, **kwargs): - if path is None: - path = tempfile.mkdtemp() - atexit.register(atexit_rmtree, path) - store = FSStore( - path, normalize_keys=normalize_keys, dimension_separator="/", auto_mkdir=True, **kwargs - ) - return store +# # test __contains__, __getitem__ +# for key, value in consolidated["metadata"].items(): +# assert key in cs +# assert value == cs[key] - def test_numbered_groups(self): - import zarr +# # test __delitem__, __setitem__ +# with pytest.raises(PermissionError): +# del cs["foo"] +# with pytest.raises(PermissionError): +# cs["bar"] = 0 +# with pytest.raises(PermissionError): +# cs["spam"] = "eggs" - # Create an array - store = self.create_store() - group = zarr.group(store=store) - arr = group.create_dataset("0", shape=(10, 10)) - arr[1] = 1 - # Read it back - store = self.create_store(path=store.path) - zarr.open_group(store.path)["0"] +# # standalone test we do not want to run on each store. -class TestTempStore(StoreTests): - def create_store(self, **kwargs): - skip_if_nested_chunks(**kwargs) - return TempStore(**kwargs) +# def test_fill_value_change(): +# a = zarr.create((10, 10), dtype=int) - def test_setdel(self): - store = self.create_store() - setdel_hierarchy_checks(store, self.root) +# assert a[0, 0] == 0 +# a.fill_value = 1 -class TestZipStore(StoreTests): +# assert a[0, 0] == 1 - ZipStoreClass = ZipStore +# assert json.loads(a.store[".zarray"])["fill_value"] == 1 - def create_store(self, **kwargs): - path = mktemp(suffix=".zip") - atexit.register(os.remove, path) - store = ZipStore(path, mode="w", **kwargs) - return store - def test_mode(self): - with self.ZipStoreClass("data/store.zip", mode="w") as store: - store[self.root + "foo"] = b"bar" - store = self.ZipStoreClass("data/store.zip", mode="r") - with pytest.raises(PermissionError): - store[self.root + "foo"] = b"bar" - with pytest.raises(PermissionError): - store.clear() - - def test_flush(self): - store = self.ZipStoreClass("data/store.zip", mode="w") - store[self.root + "foo"] = b"bar" - store.flush() - assert store[self.root + "foo"] == b"bar" - store.close() - - store = self.ZipStoreClass("data/store.zip", mode="r") - store.flush() # no-op - - def test_context_manager(self): - with self.create_store() as store: - store[self.root + "foo"] = b"bar" - store[self.root + "baz"] = b"qux" - assert 2 == len(store) - - def test_pop(self): - # override because not implemented - store = self.create_store() - store[self.root + "foo"] = b"bar" - with pytest.raises(NotImplementedError): - store.pop(self.root + "foo") +# def test_get_hierarchy_metadata_v2(): +# # v2 stores do not have hierarchy metadata (i.e. zarr.json) +# with pytest.raises(ValueError): +# _get_hierarchy_metadata(KVStore(dict)) - def test_popitem(self): - # override because not implemented - store = self.create_store() - store[self.root + "foo"] = b"bar" - with pytest.raises(NotImplementedError): - store.popitem() - - def test_permissions(self): - store = self.ZipStoreClass("data/store.zip", mode="w") - foo_key = "foo" if self.version == 2 else self.root + "foo" - # TODO: cannot provide key ending in / for v3 - # how to create an empty folder in that case? - baz_key = "baz/" if self.version == 2 else self.root + "baz" - store[foo_key] = b"bar" - store[baz_key] = b"" - - store.flush() - store.close() - z = ZipFile("data/store.zip", "r") - info = z.getinfo(foo_key) - perm = oct(info.external_attr >> 16) - assert perm == "0o644" - info = z.getinfo(baz_key) - perm = oct(info.external_attr >> 16) - # only for posix platforms - if os.name == "posix": - if self.version == 2: - assert perm == "0o40775" - else: - # baz/ on v2, but baz on v3, so not a directory - assert perm == "0o644" - z.close() - - def test_store_and_retrieve_ndarray(self): - store = ZipStore("data/store.zip") - x = np.array([[1, 2], [3, 4]]) - store["foo"] = x - y = np.frombuffer(store["foo"], dtype=x.dtype).reshape(x.shape) - assert np.array_equiv(y, x) - - -class TestDBMStore(StoreTests): - def create_store(self, dimension_separator=None): - path = mktemp(suffix=".anydbm") - atexit.register(atexit_rmglob, path + "*") - # create store using default dbm implementation - store = DBMStore(path, flag="n", dimension_separator=dimension_separator) - return store - def test_context_manager(self): - with self.create_store() as store: - store[self.root + "foo"] = b"bar" - store[self.root + "baz"] = b"qux" - assert 2 == len(store) +# def test_normalize_store_arg(tmpdir): +# with pytest.raises(ValueError): +# normalize_store_arg(dict(), zarr_version=4) +# for ext, Class in [(".zip", ZipStore), (".n5", N5Store)]: +# fn = tmpdir.join("store" + ext) +# store = normalize_store_arg(str(fn), zarr_version=2, mode="w") +# assert isinstance(store, Class) -class TestDBMStoreDumb(TestDBMStore): - def create_store(self, **kwargs): - path = mktemp(suffix=".dumbdbm") - atexit.register(atexit_rmglob, path + "*") +# if have_fsspec: +# import fsspec - import dbm.dumb as dumbdbm +# path = tempfile.mkdtemp() +# store = normalize_store_arg("file://" + path, zarr_version=2, mode="w") +# assert isinstance(store, FSStore) - store = DBMStore(path, flag="n", open=dumbdbm.open, **kwargs) - return store +# store = normalize_store_arg(fsspec.get_mapper("file://" + path)) +# assert isinstance(store, FSStore) -class TestDBMStoreGnu(TestDBMStore): - def create_store(self, **kwargs): - gdbm = pytest.importorskip("dbm.gnu") - path = mktemp(suffix=".gdbm") # pragma: no cover - atexit.register(os.remove, path) # pragma: no cover - store = DBMStore( - path, flag="n", open=gdbm.open, write_lock=False, **kwargs - ) # pragma: no cover - return store # pragma: no cover +# def test_meta_prefix_6853(): +# fixture = pathlib.Path(zarr.__file__).resolve().parent.parent / "fixture" +# meta = fixture / "meta" +# if not meta.exists(): # pragma: no cover +# s = DirectoryStore(str(meta), dimension_separator=".") +# a = zarr.open(store=s, mode="w", shape=(2, 2), dtype=" 2 else "" - # setup some values - store[prefix + "a"] = b"aaa" - store[prefix + "b"] = b"bbb" - store[prefix + "c/d"] = b"ddd" - store[prefix + "c/e/f"] = b"fff" - - # test iterators on store with data - assert 4 == len(store) - keys = [prefix + "a", prefix + "b", prefix + "c/d", prefix + "c/e/f"] - values = [b"aaa", b"bbb", b"ddd", b"fff"] - items = list(zip(keys, values)) - assert set(keys) == set(store) - assert set(keys) == set(store.keys()) - assert set(values) == set(store.values()) - assert set(items) == set(store.items()) - - def test_getsize(self): - return super().test_getsize() - - def test_hierarchy(self): - return super().test_hierarchy() - - @pytest.mark.skipif(sys.version_info < (3, 7), reason="attr not serializable in py36") - def test_pickle(self): - # internal attribute on ContainerClient isn't serializable for py36 and earlier - super().test_pickle() - - -class TestConsolidatedMetadataStore: - - version = 2 - ConsolidatedMetadataClass = ConsolidatedMetadataStore - - @property - def metadata_key(self): - return ".zmetadata" - - def test_bad_format(self): - - # setup store with consolidated metadata - store = dict() - consolidated = { - # bad format version - "zarr_consolidated_format": 0, - } - store[self.metadata_key] = json.dumps(consolidated).encode() - - # check appropriate error is raised - with pytest.raises(MetadataError): - self.ConsolidatedMetadataClass(store) - - def test_bad_store_version(self): - with pytest.raises(ValueError): - self.ConsolidatedMetadataClass(KVStoreV3(dict())) - - def test_read_write(self): - - # setup store with consolidated metadata - store = dict() - consolidated = { - "zarr_consolidated_format": 1, - "metadata": { - "foo": "bar", - "baz": 42, - }, - } - store[self.metadata_key] = json.dumps(consolidated).encode() - - # create consolidated store - cs = self.ConsolidatedMetadataClass(store) - - # test __contains__, __getitem__ - for key, value in consolidated["metadata"].items(): - assert key in cs - assert value == cs[key] - - # test __delitem__, __setitem__ - with pytest.raises(PermissionError): - del cs["foo"] - with pytest.raises(PermissionError): - cs["bar"] = 0 - with pytest.raises(PermissionError): - cs["spam"] = "eggs" - - -# standalone test we do not want to run on each store. - - -def test_fill_value_change(): - a = zarr.create((10, 10), dtype=int) - - assert a[0, 0] == 0 - - a.fill_value = 1 - - assert a[0, 0] == 1 - - assert json.loads(a.store[".zarray"])["fill_value"] == 1 - - -def test_get_hierarchy_metadata_v2(): - # v2 stores do not have hierarchy metadata (i.e. zarr.json) - with pytest.raises(ValueError): - _get_hierarchy_metadata(KVStore(dict)) - - -def test_normalize_store_arg(tmpdir): - with pytest.raises(ValueError): - normalize_store_arg(dict(), zarr_version=4) - - for ext, Class in [(".zip", ZipStore), (".n5", N5Store)]: - fn = tmpdir.join("store" + ext) - store = normalize_store_arg(str(fn), zarr_version=2, mode="w") - assert isinstance(store, Class) - - if have_fsspec: - import fsspec - - path = tempfile.mkdtemp() - store = normalize_store_arg("file://" + path, zarr_version=2, mode="w") - assert isinstance(store, FSStore) - - store = normalize_store_arg(fsspec.get_mapper("file://" + path)) - assert isinstance(store, FSStore) - - -def test_meta_prefix_6853(): - - fixture = pathlib.Path(zarr.__file__).resolve().parent.parent / "fixture" - meta = fixture / "meta" - if not meta.exists(): # pragma: no cover - s = DirectoryStore(str(meta), dimension_separator=".") - a = zarr.open(store=s, mode="w", shape=(2, 2), dtype=" str: - assert isinstance(root, str) - assert isinstance(path, str) - root = root.rstrip("/") - path = f"{root}/{path}" if root != "" else path - path = path.rstrip("/") - return path - - -class StorePath: - store: Store - path: str - - def __init__(self, store: Store, path: Optional[str] = None): - self.store = store - self.path = path or "" - - @classmethod - def from_path(cls, pth: Path) -> StorePath: - return cls(Store.from_path(pth)) - - async def get( - self, byte_range: Optional[Tuple[int, Optional[int]]] = None - ) -> Optional[BytesLike]: - return await self.store.get(self.path, byte_range) - - async def set( - self, value: BytesLike, byte_range: Optional[Tuple[int, int]] = None - ) -> None: - await self.store.set(self.path, value, byte_range) - - async def delete(self) -> None: - await self.store.delete(self.path) - - async def exists(self) -> bool: - return await self.store.exists(self.path) - - def __truediv__(self, other: str) -> StorePath: - return self.__class__(self.store, _dereference_path(self.path, other)) - - def __str__(self) -> str: - return _dereference_path(str(self.store), self.path) - - def __repr__(self) -> str: - return f"StorePath({self.store.__class__.__name__}, {repr(str(self))})" - - def __eq__(self, other: Any) -> bool: - try: - if self.store == other.store and self.path == other.path: - return True - except Exception: - pass - return False - -class BaseStore(Store): - supports_partial_writes = False - - # Does this really need to be on the Store? Could just - # be a convenience function - @classmethod - def from_path(cls, pth: Path) -> Store: - try: - from upath import UPath - from upath.implementations.local import PosixUPath, WindowsUPath - - if isinstance(pth, UPath) and not isinstance(pth, (PosixUPath, WindowsUPath)): - storage_options = pth._kwargs.copy() - storage_options.pop("_url", None) - return RemoteStore(str(pth), **storage_options) - except ImportError: - pass - - return LocalStore(pth) - - # async def set( - # self, key: str, value: BytesLike, byte_range: Optional[Tuple[int, int]] = None - # ) -> None: - # raise NotImplementedError - - # async def delete(self, key: str) -> None: - # raise NotImplementedError - - # async def exists(self, key: str) -> bool: - # raise NotImplementedError - - # def __truediv__(self, other: str) -> StorePath: - # return StorePath(self, other) - -class BaseReadStore(ReadStore) - - async def get_partial_values(self, key_ranges: List[Tuple[str, Tuple[int, int]]]) -> List[bytes]: - """Retrieve possibly partial values from given key_ranges. - - Parameters - ---------- - key_ranges : list[tuple[str, tuple[int, int]]] - Ordered set of key, range pairs, a key may occur multiple times with different ranges - - Returns - ------- - list[bytes] - list of values, in the order of the key_ranges, may contain null/none for missing keys - """ - # fallback for stores that don't support partial reads - async def _get_then_slice(key: str, key_range: tuple[int, int]) -> bytes: - value = await self.get(key) - return value[key_range[0]:key_range[1]] - - return await concurrent_map( - key_ranges, - _get_then_slice, - limit=None # TODO: wire this to config - ) - - # async def multi_get( - # self, keys: List[Tuple[str, Optional[Tuple[int, int]]]] - # ) -> List[Optional[BytesLike]]: - # return await asyncio.gather(*[self.get(key, byte_range) for key, byte_range in keys]) - - -class BaseWriteStore(WriteStore): - - - - - - - -StoreLike = Union[BaseStore, StorePath, Path, str] - - -def make_store_path(store_like: StoreLike) -> StorePath: - if isinstance(store_like, StorePath): - return store_like - elif isinstance(store_like, BaseStore): - return StorePath(store_like) - elif isinstance(store_like, Path): - return StorePath(Store.from_path(store_like)) - elif isinstance(store_like, str): - try: - from upath import UPath - - return StorePath(Store.from_path(UPath(store_like))) - except ImportError: - return StorePath(LocalStore(Path(store_like))) - raise TypeError diff --git a/zarr/v3/stores/memory.py b/zarr/v3/stores/memory.py deleted file mode 100644 index ba7efe70e..000000000 --- a/zarr/v3/stores/memory.py +++ /dev/null @@ -1,60 +0,0 @@ -from __future__ import annotations - -from pathlib import Path -from typing import Union, Optional - -from zarr.v3.common import BytesLike -from zarr.v3.abc.store import WriteListStore -from zarr.v3.stores.core import BaseStore - - -# TODO: this store could easily be extended to wrap any MutuableMapping store from v2 -# When that is done, the `MemoryStore` will just be a store that wraps a dict. -class MemoryStore(WriteListStore, BaseStore): - supports_partial_writes = True - store_dict: MutableMapping[str, bytes] - - def __init__(self, store_dict: Optional[MutableMapping[str, bytes]] = None): - self.store_dict = store_dict or {} - - async def get( - self, key: str, byte_range: Optional[Tuple[int, Optional[int]]] = None - ) -> Optional[BytesLike]: - assert isinstance(key, str) - try: - value = self.store_dict[key] - if byte_range is not None: - value = value[byte_range[0] : byte_range[1]] - return value - except KeyError: - return None - - async def get_partial_values(self, key_ranges: List[Tuple[str, int]]) -> bytes: - raise NotImplementedError - - async def set( - self, key: str, value: BytesLike, byte_range: Optional[Tuple[int, int]] = None - ) -> None: - assert isinstance(key, str) - - if byte_range is not None: - buf = bytearray(self.store_dict[key]) - buf[byte_range[0] : byte_range[1]] = value - self.store_dict[key] = buf - else: - self.store_dict[key] = value - - async def delete(self, key: str) -> None: - try: - del self.store_dict[key] - except KeyError: - pass - - async def exists(self, key: str) -> bool: - return key in self.store_dict - - def __str__(self) -> str: - return f"memory://{id(self.store_dict)}" - - def __repr__(self) -> str: - return f"MemoryStore({repr(str(self))})"