Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return BoxStream with 'static lifetime from ObjectStore::list #6619

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,7 @@ impl GetClient for S3Client {
}

#[async_trait]
impl ListClient for S3Client {
impl ListClient for Arc<S3Client> {
/// Make an S3 List request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html>
async fn list_request(
&self,
Expand Down
4 changes: 2 additions & 2 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,15 +258,15 @@ impl ObjectStore for AmazonS3 {
.boxed()
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
self.client.list(prefix)
}

fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'_, Result<ObjectMeta>> {
) -> BoxStream<'static, Result<ObjectMeta>> {
if self.client.config.is_s3_express() {
let offset = offset.clone();
// S3 Express does not support start-after
Expand Down
2 changes: 1 addition & 1 deletion object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ impl GetClient for AzureClient {
}

#[async_trait]
impl ListClient for AzureClient {
impl ListClient for Arc<AzureClient> {
/// Make an Azure List request <https://docs.microsoft.com/en-us/rest/api/storageservices/list-blobs>
async fn list_request(
&self,
Expand Down
2 changes: 1 addition & 1 deletion object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl ObjectStore for MicrosoftAzure {
self.client.delete_request(location, &()).await
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
self.client.list(prefix)
}

Expand Down
4 changes: 2 additions & 2 deletions object_store/src/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,15 +149,15 @@ impl ObjectStore for ChunkedStore {
self.inner.delete(location).await
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
self.inner.list(prefix)
}

fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'_, Result<ObjectMeta>> {
) -> BoxStream<'static, Result<ObjectMeta>> {
self.inner.list_with_offset(prefix, offset)
}

Expand Down
19 changes: 10 additions & 9 deletions object_store/src/client/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,37 +44,38 @@ pub(crate) trait ListClientExt {
prefix: Option<&Path>,
delimiter: bool,
offset: Option<&Path>,
) -> BoxStream<'_, Result<ListResult>>;
) -> BoxStream<'static, Result<ListResult>>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the implications of this stream having static lifetime? I don't understand the implications on downstream crates

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my use case, Python bindings with pyo3 don't allow you to have any lifetime parameters shorter than 'static: https://pyo3.rs/v0.22.5/class.html#no-lifetime-parameters

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right -- I guess I am wondering does this effectively mean that the returned streams can't be borrowed (aka they effectively have to be Arc'd or something like that where the stream is owned 🤔 )

Basically I am trying to think about what usecases, if any, that this additional constrait would be break (I realize some people may need to update the type signatures, I am thinking of more fundamental changes)

One thing maybe we could do is try to update DataFusion (and our internal INfluxDB code) with this change and see what, if anything, breaks for us 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would only effect implementers of the trait, its a stronger contract on what the implementation chooses to return. The result is consumers of object_store shouldn't notice much


fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>>;
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>>;

#[allow(unused)]
fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'_, Result<ObjectMeta>>;
) -> BoxStream<'static, Result<ObjectMeta>>;

async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult>;
}

#[async_trait]
impl<T: ListClient> ListClientExt for T {
impl<T: ListClient + Clone> ListClientExt for T {
fn list_paginated(
&self,
prefix: Option<&Path>,
delimiter: bool,
offset: Option<&Path>,
) -> BoxStream<'_, Result<ListResult>> {
) -> BoxStream<'static, Result<ListResult>> {
let offset = offset.map(|x| x.to_string());
let prefix = prefix
.filter(|x| !x.as_ref().is_empty())
.map(|p| format!("{}{}", p.as_ref(), crate::path::DELIMITER));

stream_paginated(
self.clone(),
(prefix, offset),
move |(prefix, offset), token| async move {
let (r, next_token) = self
move |client, (prefix, offset), token| async move {
let (r, next_token) = client
.list_request(
prefix.as_deref(),
delimiter,
Expand All @@ -88,7 +89,7 @@ impl<T: ListClient> ListClientExt for T {
.boxed()
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
self.list_paginated(prefix, false, None)
.map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
.try_flatten()
Expand All @@ -99,7 +100,7 @@ impl<T: ListClient> ListClientExt for T {
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'_, Result<ObjectMeta>> {
) -> BoxStream<'static, Result<ObjectMeta>> {
self.list_paginated(prefix, false, Some(offset))
.map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
.try_flatten()
Expand Down
50 changes: 29 additions & 21 deletions object_store/src/client/pagination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,14 @@ use std::future::Future;
/// finish, otherwise it will continue to call `op(state, token)` with the values returned by the
/// previous call to `op`, until a continuation token of `None` is returned
///
pub(crate) fn stream_paginated<F, Fut, S, T>(state: S, op: F) -> impl Stream<Item = Result<T>>
pub(crate) fn stream_paginated<F, Fut, S, T, C>(
client: C,
state: S,
op: F,
) -> impl Stream<Item = Result<T>>
where
F: Fn(S, Option<String>) -> Fut + Copy,
C: Clone,
F: Fn(C, S, Option<String>) -> Fut + Copy,
Fut: Future<Output = Result<(T, S, Option<String>)>>,
{
enum PaginationState<T> {
Expand All @@ -46,27 +51,30 @@ where
Done,
}

futures::stream::unfold(PaginationState::Start(state), move |state| async move {
let (s, page_token) = match state {
PaginationState::Start(s) => (s, None),
PaginationState::HasMore(s, page_token) if !page_token.is_empty() => {
(s, Some(page_token))
}
_ => {
return None;
}
};
futures::stream::unfold(PaginationState::Start(state), move |state| {
let client = client.clone();
async move {
let (s, page_token) = match state {
PaginationState::Start(s) => (s, None),
PaginationState::HasMore(s, page_token) if !page_token.is_empty() => {
(s, Some(page_token))
}
_ => {
return None;
}
};

let (resp, s, continuation) = match op(s, page_token).await {
Ok(resp) => resp,
Err(e) => return Some((Err(e), PaginationState::Done)),
};
let (resp, s, continuation) = match op(client, s, page_token).await {
Ok(resp) => resp,
Err(e) => return Some((Err(e), PaginationState::Done)),
};

let next_state = match continuation {
Some(token) => PaginationState::HasMore(s, token),
None => PaginationState::Done,
};
let next_state = match continuation {
Some(token) => PaginationState::HasMore(s, token),
None => PaginationState::Done,
};

Some((Ok(resp), next_state))
Some((Ok(resp), next_state))
}
})
}
2 changes: 1 addition & 1 deletion object_store/src/gcp/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ impl GetClient for GoogleCloudStorageClient {
}

#[async_trait]
impl ListClient for GoogleCloudStorageClient {
impl ListClient for Arc<GoogleCloudStorageClient> {
/// Perform a list request <https://cloud.google.com/storage/docs/xml-api/get-bucket-list>
async fn list_request(
&self,
Expand Down
4 changes: 2 additions & 2 deletions object_store/src/gcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,15 +183,15 @@ impl ObjectStore for GoogleCloudStorage {
self.client.delete_request(location).await
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
self.client.list(prefix)
}

fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'_, Result<ObjectMeta>> {
) -> BoxStream<'static, Result<ObjectMeta>> {
self.client.list_with_offset(prefix, offset)
}

Expand Down
15 changes: 9 additions & 6 deletions object_store/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
//! [rfc2518]: https://datatracker.ietf.org/doc/html/rfc2518
//! [WebDAV]: https://en.wikipedia.org/wiki/WebDAV

use std::sync::Arc;

use async_trait::async_trait;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
Expand Down Expand Up @@ -80,7 +82,7 @@ impl From<Error> for crate::Error {
/// See [`crate::http`] for more information
#[derive(Debug)]
pub struct HttpStore {
client: Client,
client: Arc<Client>,
}

impl std::fmt::Display for HttpStore {
Expand Down Expand Up @@ -131,19 +133,20 @@ impl ObjectStore for HttpStore {
self.client.delete(location).await
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
let prefix_len = prefix.map(|p| p.as_ref().len()).unwrap_or_default();
let prefix = prefix.cloned();
let client = Arc::clone(&self.client);
futures::stream::once(async move {
let status = self.client.list(prefix.as_ref(), "infinity").await?;
let status = client.list(prefix.as_ref(), "infinity").await?;

let iter = status
.response
.into_iter()
.filter(|r| !r.is_dir())
.map(|response| {
.map(move |response| {
response.check_ok()?;
response.object_meta(self.client.base_url())
response.object_meta(client.base_url())
})
// Filter out exact prefix matches
.filter_ok(move |r| r.location.as_ref().len() > prefix_len);
Expand Down Expand Up @@ -239,7 +242,7 @@ impl HttpBuilder {
let parsed = Url::parse(&url).context(UnableToParseUrlSnafu { url })?;

Ok(HttpStore {
client: Client::new(parsed, self.client_options, self.retry_config)?,
client: Arc::new(Client::new(parsed, self.client_options, self.retry_config)?),
})
}
}
Expand Down
8 changes: 4 additions & 4 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
/// `foo/bar_baz/x`. List is recursive, i.e. `foo/bar/more/x` will be included.
///
/// Note: the order of returned [`ObjectMeta`] is not guaranteed
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>>;
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>>;

/// List all the objects with the given prefix and a location greater than `offset`
///
Expand All @@ -732,7 +732,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'_, Result<ObjectMeta>> {
) -> BoxStream<'static, Result<ObjectMeta>> {
let offset = offset.clone();
self.list(prefix)
.try_filter(move |f| futures::future::ready(f.location > offset))
Expand Down Expand Up @@ -845,15 +845,15 @@ macro_rules! as_ref_impl {
self.as_ref().delete_stream(locations)
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
self.as_ref().list(prefix)
}

fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'_, Result<ObjectMeta>> {
) -> BoxStream<'static, Result<ObjectMeta>> {
self.as_ref().list_with_offset(prefix, offset)
}

Expand Down
14 changes: 8 additions & 6 deletions object_store/src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use tokio::sync::{OwnedSemaphorePermit, Semaphore};
///
#[derive(Debug)]
pub struct LimitStore<T: ObjectStore> {
inner: T,
inner: Arc<T>,
max_requests: usize,
semaphore: Arc<Semaphore>,
}
Expand All @@ -56,7 +56,7 @@ impl<T: ObjectStore> LimitStore<T> {
/// `max_requests`
pub fn new(inner: T, max_requests: usize) -> Self {
Self {
inner,
inner: Arc::new(inner),
max_requests,
semaphore: Arc::new(Semaphore::new(max_requests)),
}
Expand Down Expand Up @@ -144,12 +144,13 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
self.inner.delete_stream(locations)
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
let prefix = prefix.cloned();
let inner = Arc::clone(&self.inner);
let fut = Arc::clone(&self.semaphore)
.acquire_owned()
.map(move |permit| {
let s = self.inner.list(prefix.as_ref());
let s = inner.list(prefix.as_ref());
PermitWrapper::new(s, permit.unwrap())
});
fut.into_stream().flatten().boxed()
Expand All @@ -159,13 +160,14 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'_, Result<ObjectMeta>> {
) -> BoxStream<'static, Result<ObjectMeta>> {
let prefix = prefix.cloned();
let offset = offset.clone();
let inner = Arc::clone(&self.inner);
let fut = Arc::clone(&self.semaphore)
.acquire_owned()
.map(move |permit| {
let s = self.inner.list_with_offset(prefix.as_ref(), &offset);
let s = inner.list_with_offset(prefix.as_ref(), &offset);
PermitWrapper::new(s, permit.unwrap())
});
fut.into_stream().flatten().boxed()
Expand Down
2 changes: 1 addition & 1 deletion object_store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ impl ObjectStore for LocalFileSystem {
.await
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
let config = Arc::clone(&self.config);

let root_path = match prefix {
Expand Down
2 changes: 1 addition & 1 deletion object_store/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ impl ObjectStore for InMemory {
Ok(())
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
let root = Path::default();
let prefix = prefix.unwrap_or(&root);

Expand Down
Loading
Loading