Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming list results #35

Merged
merged 6 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 22 additions & 23 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ thiserror = "1"
tokio = "1.40"
url = "2"

[patch.crates-io]
object_store = { git = "https://github.com/kylebarron/arrow-rs", branch = "kyle/list-returns-static-stream" }

[profile.release]
lto = true
codegen-units = 1
2 changes: 1 addition & 1 deletion docs/api/list.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# List

::: object_store_rs.list
::: object_store_rs.list_async
::: object_store_rs.list_with_delimiter
::: object_store_rs.list_with_delimiter_async
::: object_store_rs.ObjectMeta
::: object_store_rs.ListResult
::: object_store_rs.ListStream
56 changes: 34 additions & 22 deletions object-store-rs/python/object_store_rs/_list.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -37,51 +37,63 @@ class ListResult(TypedDict):
objects: List[ObjectMeta]
"""Object metadata for the listing"""

class ListStream:
"""
A stream of [ObjectMeta][object_store_rs.ObjectMeta] that can be polled in a sync or
async fashion.
"""
def __aiter__(self) -> ListStream:
"""Return `Self` as an async iterator."""

def __iter__(self) -> ListStream:
"""Return `Self` as an async iterator."""

async def collect_async(self) -> List[ObjectMeta]:
"""Collect all remaining ObjectMeta objects in the stream."""

def collect(self) -> List[ObjectMeta]:
"""Collect all remaining ObjectMeta objects in the stream."""

async def __anext__(self) -> List[ObjectMeta]:
"""Return the next chunk of ObjectMeta in the stream."""

def __next__(self) -> List[ObjectMeta]:
"""Return the next chunk of ObjectMeta in the stream."""

def list(
store: ObjectStore,
prefix: str | None = None,
*,
offset: str | None = None,
max_items: int | None = 2000,
) -> List[ObjectMeta]:
chunk_size: int = 50,
) -> ListStream:
"""
List all the objects with the given prefix.

Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of
`foo/bar/x` but not of `foo/bar_baz/x`. List is recursive, i.e. `foo/bar/more/x`
will be included.

Note: the order of returned [`ObjectMeta`][object_store_rs.ObjectMeta] is not
guaranteed
!!! note
The order of returned [`ObjectMeta`][object_store_rs.ObjectMeta] is not
guaranteed

!!! note
In the future, we'd like to have `list` return an async iterable, just like
`get`, so that we can stream the result of `list`, but we need [some
changes](https://github.com/apache/arrow-rs/issues/6587) in the upstream
object-store repo first.
There is no async version of this method, because `list` is not async under the
hood, rather it only instantiates a stream, which can be polled in synchronous
or asynchronous fashion. See [`ListStream`][object_store_rs.ListStream].

Args:
store: The ObjectStore instance to use.
prefix: The prefix within ObjectStore to use for listing. Defaults to None.

Keyword Args:
offset: If provided, list all the objects with the given prefix and a location greater than `offset`. Defaults to `None`.
max_items: The maximum number of items to return. Defaults to 2000.
chunk_size: The number of items to collect per chunk in the returned
(async) iterator.

Returns:
A list of `ObjectMeta`.
"""

async def list_async(
store: ObjectStore,
prefix: str | None = None,
*,
offset: str | None = None,
max_items: int | None = 2000,
) -> List[ObjectMeta]:
"""Call `list` asynchronously.

Refer to the documentation for [list][object_store_rs.list].
A ListStream, which you can iterate through to access list results.
"""

def list_with_delimiter(store: ObjectStore, prefix: str | None = None) -> ListResult:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ from ._get import get_ranges_async as get_ranges_async
from ._head import head as head
from ._head import head_async as head_async
from ._list import ListResult as ListResult
from ._list import ListStream as ListStream
from ._list import ObjectMeta as ObjectMeta
from ._list import list as list
from ._list import list_async as list_async
from ._list import list_with_delimiter as list_with_delimiter
from ._list import list_with_delimiter_async as list_with_delimiter_async
from ._put import PutResult as PutResult
Expand Down
10 changes: 6 additions & 4 deletions object-store-rs/src/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::stream::BoxStream;
use futures::stream::{BoxStream, Fuse};
use futures::StreamExt;
use object_store::{GetOptions, GetResult, ObjectStore};
use pyo3::exceptions::{PyStopAsyncIteration, PyStopIteration, PyValueError};
Expand Down Expand Up @@ -108,23 +108,25 @@ impl PyGetResult {
}
}

// Note: we fuse the underlying stream so that we can get `None` multiple times.
// See the note on PyListStream for more background.
#[pyclass(name = "BytesStream")]
pub struct PyBytesStream {
stream: Arc<Mutex<BoxStream<'static, object_store::Result<Bytes>>>>,
stream: Arc<Mutex<Fuse<BoxStream<'static, object_store::Result<Bytes>>>>>,
min_chunk_size: usize,
}

impl PyBytesStream {
fn new(stream: BoxStream<'static, object_store::Result<Bytes>>, min_chunk_size: usize) -> Self {
Self {
stream: Arc::new(Mutex::new(stream)),
stream: Arc::new(Mutex::new(stream.fuse())),
min_chunk_size,
}
}
}

async fn next_stream(
stream: Arc<Mutex<BoxStream<'static, object_store::Result<Bytes>>>>,
stream: Arc<Mutex<Fuse<BoxStream<'static, object_store::Result<Bytes>>>>>,
min_chunk_size: usize,
sync: bool,
) -> PyResult<PyBytesWrapper> {
Expand Down
1 change: 0 additions & 1 deletion object-store-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ fn _object_store_rs(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
m.add_wrapped(wrap_pyfunction!(get::get))?;
m.add_wrapped(wrap_pyfunction!(head::head_async))?;
m.add_wrapped(wrap_pyfunction!(head::head))?;
m.add_wrapped(wrap_pyfunction!(list::list_async))?;
m.add_wrapped(wrap_pyfunction!(list::list_with_delimiter_async))?;
m.add_wrapped(wrap_pyfunction!(list::list_with_delimiter))?;
m.add_wrapped(wrap_pyfunction!(list::list))?;
Expand Down
Loading