From 94b00c7af97d56456b36260e87f3f1a50dec6c4c Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Tue, 9 Apr 2024 22:08:17 -0600 Subject: [PATCH] poc: concurrently stream from list_dir -> getitem --- src/zarr/v3/group.py | 31 ++++++++++++++++--------------- src/zarr/v3/store/local.py | 5 +---- src/zarr/v3/store/memory.py | 2 -- tests/v3/test_group.py | 4 +--- 4 files changed, 18 insertions(+), 24 deletions(-) diff --git a/src/zarr/v3/group.py b/src/zarr/v3/group.py index 031d9a0ad..1ef322266 100644 --- a/src/zarr/v3/group.py +++ b/src/zarr/v3/group.py @@ -161,7 +161,6 @@ async def getitem( ) -> AsyncArray | AsyncGroup: store_path = self.store_path / key - logger.warning("key=%s, store_path=%s", key, store_path) # Note: # in zarr-python v2, we first check if `key` references an Array, else if `key` references @@ -305,20 +304,22 @@ async def children(self) -> AsyncGenerator[AsyncArray | AsyncGroup, None]: raise ValueError(msg) - async for key in self.store_path.store.list_dir(self.store_path.path): - # these keys are not valid child names so we make sure to skip them - # TODO: it would be nice to make these special keys accessible programmatically, - # and scoped to specific zarr versions - if key not in ("zarr.json", ".zgroup", ".zattrs"): - try: - # TODO: performance optimization -- batch - print(key) - child = await self.getitem(key) - # keyerror is raised when `subkey``names an object in the store - # in which case `subkey` cannot be the name of a sub-array or sub-group. - yield child - except KeyError: - pass + # leaving these imports here for demo purposes + from aiostream import stream, async_, pipe + from aiostream.aiter_utils import aitercontext + + children = ( + stream.iterate(self.store_path.store.list_dir(self.store_path.path)) + | pipe.filter(lambda x: x not in ("zarr.json", ".zgroup", ".zattrs")) + | + # TODO: need to handle directories without a metadata doc + # previously, we gracefully ignored them by catching the KeyError here. + pipe.map(async_(self.getitem)) + ) + + async with aitercontext(children) as safe_children: + async for child in safe_children: + yield child async def contains(self, child: str) -> bool: raise NotImplementedError diff --git a/src/zarr/v3/store/local.py b/src/zarr/v3/store/local.py index 1a87c450a..a6ed37bc8 100644 --- a/src/zarr/v3/store/local.py +++ b/src/zarr/v3/store/local.py @@ -154,7 +154,6 @@ async def list(self) -> AsyncGenerator[str, None]: if p.is_file(): yield str(p) - async def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]: """Retrieve all keys in the store with a given prefix. @@ -170,7 +169,6 @@ async def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]: if p.is_file(): yield str(p) - async def list_dir(self, prefix: str) -> AsyncGenerator[str, None]: """ Retrieve all keys and prefixes with a given prefix and which do not contain the character @@ -186,7 +184,7 @@ async def list_dir(self, prefix: str) -> AsyncGenerator[str, None]: """ base = self.root / prefix to_strip = str(base) + "/" - + try: key_iter = base.iterdir() except (FileNotFoundError, NotADirectoryError): @@ -194,4 +192,3 @@ async def list_dir(self, prefix: str) -> AsyncGenerator[str, None]: for key in key_iter: yield str(key).replace(to_strip, "") - diff --git a/src/zarr/v3/store/memory.py b/src/zarr/v3/store/memory.py index dbfa537ed..485ab81ed 100644 --- a/src/zarr/v3/store/memory.py +++ b/src/zarr/v3/store/memory.py @@ -78,8 +78,6 @@ async def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]: yield key async def list_dir(self, prefix: str) -> AsyncGenerator[str, None]: - print('prefix', prefix) - print('keys in list_dir', list(self._store_dict)) for key in self._store_dict: if key.startswith(prefix + "/") and key != prefix: yield key.strip(prefix + "/").rsplit("/", maxsplit=1)[0] diff --git a/tests/v3/test_group.py b/tests/v3/test_group.py index 136a7917a..0c02f8baa 100644 --- a/tests/v3/test_group.py +++ b/tests/v3/test_group.py @@ -56,7 +56,7 @@ def test_group_children(store: MemoryStore | LocalStore): # if group.children guarantees a particular order for the children. # If order is not guaranteed, then the better version of this test is # to compare two sets, but presently neither the group nor array classes are hashable. - print('getting children') + print("getting children") observed = group.children print(observed) print(list([subgroup, subarray, implicit_subgroup])) @@ -66,8 +66,6 @@ def test_group_children(store: MemoryStore | LocalStore): assert subgroup in observed - - @pytest.mark.parametrize("store", (("local", "memory")), indirect=["store"]) def test_group(store: MemoryStore | LocalStore) -> None: store_path = StorePath(store)