diff --git a/Cargo.lock b/Cargo.lock index 3f15fc3..47bbca4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -105,15 +105,15 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.7.2" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "428d9aa8fbc0670b7b8d6030a7fadd0f86151cae55e4dbbece15f3780a3dfaf3" +checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da" [[package]] name = "cc" -version = "1.1.30" +version = "1.1.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b16803a61b81d9eabb7eae2588776c4c1e584b738ede45fdbb4c972cec1e9945" +checksum = "c2e7962b54006dcfcc61cb72735f4d89bb97061dd6a7ed882ec6b8ee53714c6f" dependencies = [ "shlex", ] @@ -650,9 +650,8 @@ dependencies = [ [[package]] name = "object_store" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25a0c4b3a0e31f8b66f71ad8064521efa773910196e2cde791436f13409f3b45" +version = "0.11.1" +source = "git+https://github.com/kylebarron/arrow-rs?branch=kyle/list-returns-static-stream#33062b14efc66ff35353bf785cbd444056b09b66" dependencies = [ "async-trait", "base64", @@ -749,9 +748,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.88" +version = "1.0.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c3a7fc5db1e57d5a779a352c8cdb57b29aa4c40cc69c3a68a7fedc815fbf2f9" +checksum = "f139b0662de085916d1fb67d2b4169d1addddda1919e696f3252b740b629986e" dependencies = [ "unicode-ident", ] @@ -1139,18 +1138,18 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.210" +version = "1.0.213" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8e3592472072e6e22e0a54d5904d9febf8508f65fb8552499a1abc7d1078c3a" +checksum = "3ea7893ff5e2466df8d720bb615088341b295f849602c6956047f8f80f0e9bc1" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.210" +version = "1.0.213" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "243902eda00fad750862fc144cea25caca5e20d615af0a81bee94ca738f1df1f" +checksum = "7e85ad2009c50b58e87caa8cd6dac16bdf511bbfb7af6c33df902396aa480fa5" dependencies = [ "proc-macro2", "quote", @@ -1159,9 +1158,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.129" +version = "1.0.132" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6dbcf9b78a125ee667ae19388837dd12294b858d101fdd393cb9d5501ef09eb2" +checksum = "d726bfaff4b320266d395898905d0eba0345aae23b54aee3a737e260fd46db03" dependencies = [ "itoa", "memchr", @@ -1247,9 +1246,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "syn" -version = "2.0.79" +version = "2.0.82" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89132cd0bf050864e1d38dc3bbc07a0eb8e7530af26344d3d2bbbef83499f590" +checksum = "83540f837a8afc019423a8edb95b52a8effe46957ee402287f4292fae35be021" dependencies = [ "proc-macro2", "quote", @@ -1273,18 +1272,18 @@ checksum = "61c41af27dd6d1e27b1b16b489db798443478cef1f06a660c96db617ba5de3b1" [[package]] name = "thiserror" -version = "1.0.64" +version = "1.0.65" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d50af8abc119fb8bb6dbabcfa89656f46f84aa0ac7688088608076ad2b459a84" +checksum = "5d11abd9594d9b38965ef50805c5e469ca9cc6f197f883f717e0269a3057b3d5" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.64" +version = "1.0.65" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08904e7672f5eb876eaaf87e0ce17857500934f4981c4a0ab2b4aa98baac7fc3" +checksum = "ae71770322cbd277e69d762a16c444af02aa0575ac0d174f0b9562d3b37f8602" dependencies = [ "proc-macro2", "quote", @@ -1308,9 +1307,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.40.0" +version = "1.41.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998" +checksum = "145f3413504347a2be84393cc8a7d2fb4d863b375909ea59f2158261aa258bbb" dependencies = [ "backtrace", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 4e6e7c8..a10ef2c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,9 @@ thiserror = "1" tokio = "1.40" url = "2" +[patch.crates-io] +object_store = { git = "https://github.com/kylebarron/arrow-rs", branch = "kyle/list-returns-static-stream" } + [profile.release] lto = true codegen-units = 1 diff --git a/docs/api/list.md b/docs/api/list.md index 931febb..acd37f1 100644 --- a/docs/api/list.md +++ b/docs/api/list.md @@ -1,8 +1,8 @@ # List ::: object_store_rs.list -::: object_store_rs.list_async ::: object_store_rs.list_with_delimiter ::: object_store_rs.list_with_delimiter_async ::: object_store_rs.ObjectMeta ::: object_store_rs.ListResult +::: object_store_rs.ListStream diff --git a/object-store-rs/python/object_store_rs/_list.pyi b/object-store-rs/python/object_store_rs/_list.pyi index 2be0f82..dea1c86 100644 --- a/object-store-rs/python/object_store_rs/_list.pyi +++ b/object-store-rs/python/object_store_rs/_list.pyi @@ -37,13 +37,36 @@ class ListResult(TypedDict): objects: List[ObjectMeta] """Object metadata for the listing""" +class ListStream: + """ + A stream of [ObjectMeta][object_store_rs.ObjectMeta] that can be polled in a sync or + async fashion. + """ + def __aiter__(self) -> ListStream: + """Return `Self` as an async iterator.""" + + def __iter__(self) -> ListStream: + """Return `Self` as an async iterator.""" + + async def collect_async(self) -> List[ObjectMeta]: + """Collect all remaining ObjectMeta objects in the stream.""" + + def collect(self) -> List[ObjectMeta]: + """Collect all remaining ObjectMeta objects in the stream.""" + + async def __anext__(self) -> List[ObjectMeta]: + """Return the next chunk of ObjectMeta in the stream.""" + + def __next__(self) -> List[ObjectMeta]: + """Return the next chunk of ObjectMeta in the stream.""" + def list( store: ObjectStore, prefix: str | None = None, *, offset: str | None = None, - max_items: int | None = 2000, -) -> List[ObjectMeta]: + chunk_size: int = 50, +) -> ListStream: """ List all the objects with the given prefix. @@ -51,14 +74,14 @@ def list( `foo/bar/x` but not of `foo/bar_baz/x`. List is recursive, i.e. `foo/bar/more/x` will be included. - Note: the order of returned [`ObjectMeta`][object_store_rs.ObjectMeta] is not - guaranteed + !!! note + The order of returned [`ObjectMeta`][object_store_rs.ObjectMeta] is not + guaranteed !!! note - In the future, we'd like to have `list` return an async iterable, just like - `get`, so that we can stream the result of `list`, but we need [some - changes](https://github.com/apache/arrow-rs/issues/6587) in the upstream - object-store repo first. + There is no async version of this method, because `list` is not async under the + hood, rather it only instantiates a stream, which can be polled in synchronous + or asynchronous fashion. See [`ListStream`][object_store_rs.ListStream]. Args: store: The ObjectStore instance to use. @@ -66,22 +89,11 @@ def list( Keyword Args: offset: If provided, list all the objects with the given prefix and a location greater than `offset`. Defaults to `None`. - max_items: The maximum number of items to return. Defaults to 2000. + chunk_size: The number of items to collect per chunk in the returned + (async) iterator. Returns: - A list of `ObjectMeta`. - """ - -async def list_async( - store: ObjectStore, - prefix: str | None = None, - *, - offset: str | None = None, - max_items: int | None = 2000, -) -> List[ObjectMeta]: - """Call `list` asynchronously. - - Refer to the documentation for [list][object_store_rs.list]. + A ListStream, which you can iterate through to access list results. """ def list_with_delimiter(store: ObjectStore, prefix: str | None = None) -> ListResult: diff --git a/object-store-rs/python/object_store_rs/_object_store_rs.pyi b/object-store-rs/python/object_store_rs/_object_store_rs.pyi index 5dc83ef..012af98 100644 --- a/object-store-rs/python/object_store_rs/_object_store_rs.pyi +++ b/object-store-rs/python/object_store_rs/_object_store_rs.pyi @@ -13,9 +13,9 @@ from ._get import get_ranges_async as get_ranges_async from ._head import head as head from ._head import head_async as head_async from ._list import ListResult as ListResult +from ._list import ListStream as ListStream from ._list import ObjectMeta as ObjectMeta from ._list import list as list -from ._list import list_async as list_async from ._list import list_with_delimiter as list_with_delimiter from ._list import list_with_delimiter_async as list_with_delimiter_async from ._put import PutResult as PutResult diff --git a/object-store-rs/src/get.rs b/object-store-rs/src/get.rs index d162b53..61cb18c 100644 --- a/object-store-rs/src/get.rs +++ b/object-store-rs/src/get.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use bytes::Bytes; use chrono::{DateTime, Utc}; -use futures::stream::BoxStream; +use futures::stream::{BoxStream, Fuse}; use futures::StreamExt; use object_store::{GetOptions, GetResult, ObjectStore}; use pyo3::exceptions::{PyStopAsyncIteration, PyStopIteration, PyValueError}; @@ -108,23 +108,25 @@ impl PyGetResult { } } +// Note: we fuse the underlying stream so that we can get `None` multiple times. +// See the note on PyListStream for more background. #[pyclass(name = "BytesStream")] pub struct PyBytesStream { - stream: Arc>>>, + stream: Arc>>>>, min_chunk_size: usize, } impl PyBytesStream { fn new(stream: BoxStream<'static, object_store::Result>, min_chunk_size: usize) -> Self { Self { - stream: Arc::new(Mutex::new(stream)), + stream: Arc::new(Mutex::new(stream.fuse())), min_chunk_size, } } } async fn next_stream( - stream: Arc>>>, + stream: Arc>>>>, min_chunk_size: usize, sync: bool, ) -> PyResult { diff --git a/object-store-rs/src/lib.rs b/object-store-rs/src/lib.rs index cd75d5b..9420962 100644 --- a/object-store-rs/src/lib.rs +++ b/object-store-rs/src/lib.rs @@ -37,7 +37,6 @@ fn _object_store_rs(py: Python, m: &Bound) -> PyResult<()> { m.add_wrapped(wrap_pyfunction!(get::get))?; m.add_wrapped(wrap_pyfunction!(head::head_async))?; m.add_wrapped(wrap_pyfunction!(head::head))?; - m.add_wrapped(wrap_pyfunction!(list::list_async))?; m.add_wrapped(wrap_pyfunction!(list::list_with_delimiter_async))?; m.add_wrapped(wrap_pyfunction!(list::list_with_delimiter))?; m.add_wrapped(wrap_pyfunction!(list::list))?; diff --git a/object-store-rs/src/list.rs b/object-store-rs/src/list.rs index b9fcf25..531400b 100644 --- a/object-store-rs/src/list.rs +++ b/object-store-rs/src/list.rs @@ -1,13 +1,15 @@ use std::sync::Arc; -use futures::stream::BoxStream; +use futures::stream::{BoxStream, Fuse}; use futures::StreamExt; use indexmap::IndexMap; use object_store::path::Path; use object_store::{ListResult, ObjectMeta, ObjectStore}; +use pyo3::exceptions::{PyStopAsyncIteration, PyStopIteration}; use pyo3::prelude::*; use pyo3_object_store::error::{PyObjectStoreError, PyObjectStoreResult}; use pyo3_object_store::PyObjectStore; +use tokio::sync::Mutex; use crate::runtime::get_runtime; @@ -33,6 +35,124 @@ impl IntoPy for PyObjectMeta { } } +// Note: we fuse the underlying stream so that we can get `None` multiple times. +// +// In general, you can't poll an iterator after it's already emitted None. But the issue here is +// that we need _two_ states for the Python async iterator. It needs to first get all returned +// results, and then it needs its **own** PyStopAsyncIteration/PyStopIteration. But these are _two_ +// results to be returned from the Rust call, and we can't return them both at the same time. The +// easiest way to fix this is to safely return `None` from the stream multiple times. The first +// time we see `None` we return any batched results, the second time we see `None`, there are no +// batched results and we return PyStopAsyncIteration/PyStopIteration. +// +// Note: another way we could solve this is by removing any batching from the stream, but batching +// should improve the performance of the Rust/Python bridge. +// +// Ref: +// - https://stackoverflow.com/a/66964599 +// - https://docs.rs/futures/latest/futures/prelude/stream/trait.StreamExt.html#method.fuse +#[pyclass(name = "ListStream")] +pub(crate) struct PyListStream { + stream: Arc>>>>, + chunk_size: usize, +} + +impl PyListStream { + fn new( + stream: BoxStream<'static, object_store::Result>, + chunk_size: usize, + ) -> Self { + Self { + stream: Arc::new(Mutex::new(stream.fuse())), + chunk_size, + } + } +} + +#[pymethods] +impl PyListStream { + fn __aiter__(slf: Py) -> Py { + slf + } + + fn __iter__(slf: Py) -> Py { + slf + } + + fn collect(&self, py: Python) -> PyResult> { + let runtime = get_runtime(py)?; + let stream = self.stream.clone(); + runtime.block_on(collect_stream(stream)) + } + + fn collect_async<'py>(&'py self, py: Python<'py>) -> PyResult> { + let stream = self.stream.clone(); + pyo3_async_runtimes::tokio::future_into_py(py, collect_stream(stream)) + } + + fn __anext__<'py>(&'py self, py: Python<'py>) -> PyResult> { + let stream = self.stream.clone(); + pyo3_async_runtimes::tokio::future_into_py(py, next_stream(stream, self.chunk_size, false)) + } + + fn __next__<'py>(&'py self, py: Python<'py>) -> PyResult> { + let runtime = get_runtime(py)?; + let stream = self.stream.clone(); + runtime.block_on(next_stream(stream, self.chunk_size, true)) + } +} + +async fn next_stream( + stream: Arc>>>>, + chunk_size: usize, + sync: bool, +) -> PyResult> { + let mut stream = stream.lock().await; + let mut metas: Vec = vec![]; + loop { + match stream.next().await { + Some(Ok(meta)) => { + metas.push(PyObjectMeta(meta)); + if metas.len() >= chunk_size { + return Ok(metas); + } + } + Some(Err(e)) => return Err(PyObjectStoreError::from(e).into()), + None => { + if metas.is_empty() { + // Depending on whether the iteration is sync or not, we raise either a + // StopIteration or a StopAsyncIteration + if sync { + return Err(PyStopIteration::new_err("stream exhausted")); + } else { + return Err(PyStopAsyncIteration::new_err("stream exhausted")); + } + } else { + return Ok(metas); + } + } + }; + } +} + +async fn collect_stream( + stream: Arc>>>>, +) -> PyResult> { + let mut stream = stream.lock().await; + let mut metas: Vec = vec![]; + loop { + match stream.next().await { + Some(Ok(meta)) => { + metas.push(PyObjectMeta(meta)); + } + Some(Err(e)) => return Err(PyObjectStoreError::from(e).into()), + None => { + return Ok(metas); + } + }; + } +} + pub(crate) struct PyListResult(ListResult); impl IntoPy for PyListResult { @@ -61,65 +181,21 @@ impl IntoPy for PyListResult { } #[pyfunction] -#[pyo3(signature = (store, prefix = None, *, offset = None, max_items = 2000))] +#[pyo3(signature = (store, prefix = None, *, offset = None, chunk_size = 50))] pub(crate) fn list( - py: Python, - store: PyObjectStore, - prefix: Option, - offset: Option, - max_items: Option, -) -> PyObjectStoreResult> { - let store = store.into_inner(); - let prefix = prefix.map(|s| s.into()); - let runtime = get_runtime(py)?; - py.allow_threads(|| { - let stream = if let Some(offset) = offset { - store.list_with_offset(prefix.as_ref(), &offset.into()) - } else { - store.list(prefix.as_ref()) - }; - let out = runtime.block_on(materialize_list_stream(stream, max_items))?; - Ok::<_, PyObjectStoreError>(out) - }) -} - -#[pyfunction] -#[pyo3(signature = (store, prefix = None, *, offset = None, max_items = 2000))] -pub(crate) fn list_async( - py: Python, store: PyObjectStore, prefix: Option, offset: Option, - max_items: Option, -) -> PyResult> { - let store = store.into_inner(); + chunk_size: usize, +) -> PyObjectStoreResult { + let store = store.into_inner().clone(); let prefix = prefix.map(|s| s.into()); - - pyo3_async_runtimes::tokio::future_into_py(py, async move { - let stream = if let Some(offset) = offset { - store.list_with_offset(prefix.as_ref(), &offset.into()) - } else { - store.list(prefix.as_ref()) - }; - Ok(materialize_list_stream(stream, max_items).await?) - }) -} - -async fn materialize_list_stream( - mut stream: BoxStream<'_, object_store::Result>, - max_items: Option, -) -> PyObjectStoreResult> { - let mut result = vec![]; - while let Some(object) = stream.next().await { - result.push(PyObjectMeta(object?)); - if let Some(max_items) = max_items { - if result.len() >= max_items { - return Ok(result); - } - } - } - - Ok(result) + let stream = if let Some(offset) = offset { + store.list_with_offset(prefix.as_ref(), &offset.into()) + } else { + store.list(prefix.as_ref()) + }; + Ok(PyListStream::new(stream, chunk_size)) } #[pyfunction] diff --git a/tests/test_delete.py b/tests/test_delete.py index f16ce1f..ca92156 100644 --- a/tests/test_delete.py +++ b/tests/test_delete.py @@ -12,11 +12,11 @@ def test_delete_one(): obs.put(store, "file2.txt", b"bar") obs.put(store, "file3.txt", b"baz") - assert len(obs.list(store)) == 3 + assert len(obs.list(store).collect()) == 3 obs.delete(store, "file1.txt") obs.delete(store, "file2.txt") obs.delete(store, "file3.txt") - assert len(obs.list(store)) == 0 + assert len(obs.list(store).collect()) == 0 def test_delete_many(): @@ -26,12 +26,12 @@ def test_delete_many(): obs.put(store, "file2.txt", b"bar") obs.put(store, "file3.txt", b"baz") - assert len(obs.list(store)) == 3 + assert len(obs.list(store).collect()) == 3 obs.delete( store, ["file1.txt", "file2.txt", "file3.txt"], ) - assert len(obs.list(store)) == 0 + assert len(obs.list(store).collect()) == 0 # Local filesystem errors if the file does not exist. @@ -43,11 +43,11 @@ def test_delete_one_local_fs(): obs.put(store, "file2.txt", b"bar") obs.put(store, "file3.txt", b"baz") - assert len(obs.list(store)) == 3 + assert len(obs.list(store).collect()) == 3 obs.delete(store, "file1.txt") obs.delete(store, "file2.txt") obs.delete(store, "file3.txt") - assert len(obs.list(store)) == 0 + assert len(obs.list(store).collect()) == 0 with pytest.raises(Exception, match="No such file"): obs.delete(store, "file1.txt") @@ -61,7 +61,7 @@ def test_delete_many_local_fs(): obs.put(store, "file2.txt", b"bar") obs.put(store, "file3.txt", b"baz") - assert len(obs.list(store)) == 3 + assert len(obs.list(store).collect()) == 3 obs.delete( store, ["file1.txt", "file2.txt", "file3.txt"], diff --git a/tests/test_list.py b/tests/test_list.py index 37686ea..fae76a1 100644 --- a/tests/test_list.py +++ b/tests/test_list.py @@ -2,14 +2,13 @@ from object_store_rs.store import MemoryStore -def test_list_max_items(): +def test_list(): store = MemoryStore() obs.put(store, "file1.txt", b"foo") obs.put(store, "file2.txt", b"bar") obs.put(store, "file3.txt", b"baz") - assert len(obs.list(store)) == 3 - assert len(obs.list(store, max_items=2)) == 2 - assert len(obs.list(store, max_items=1)) == 1 - assert len(obs.list(store, max_items=0)) == 1 + stream = obs.list(store) + result = stream.collect() + assert len(result) == 3