Skip to content

Commit

Permalink
Remove Nested async and Fallibility from ObjectStore::list (#4930)
Browse files Browse the repository at this point in the history
* Remove nested async and fallibility from ObjectStore::list

* Clippy

* Update limit test

* Update docs
  • Loading branch information
tustvold authored Oct 17, 2023
1 parent d4d11fe commit fa7a61a
Show file tree
Hide file tree
Showing 13 changed files with 197 additions and 279 deletions.
13 changes: 5 additions & 8 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,19 +331,16 @@ impl ObjectStore for AmazonS3 {
.boxed()
}

async fn list(
&self,
prefix: Option<&Path>,
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
self.client.list(prefix).await
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
self.client.list(prefix)
}

async fn list_with_offset(
fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
self.client.list_with_offset(prefix, offset).await
) -> BoxStream<'_, Result<ObjectMeta>> {
self.client.list_with_offset(prefix, offset)
}

async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
Expand Down
7 changes: 2 additions & 5 deletions object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,8 @@ impl ObjectStore for MicrosoftAzure {
self.client.delete_request(location, &()).await
}

async fn list(
&self,
prefix: Option<&Path>,
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
self.client.list(prefix).await
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
self.client.list(prefix)
}

async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
Expand Down
13 changes: 5 additions & 8 deletions object_store/src/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,19 +147,16 @@ impl ObjectStore for ChunkedStore {
self.inner.delete(location).await
}

async fn list(
&self,
prefix: Option<&Path>,
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
self.inner.list(prefix).await
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
self.inner.list(prefix)
}

async fn list_with_offset(
fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
self.inner.list_with_offset(prefix, offset).await
) -> BoxStream<'_, Result<ObjectMeta>> {
self.inner.list_with_offset(prefix, offset)
}

async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
Expand Down
32 changes: 10 additions & 22 deletions object_store/src/client/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,13 @@ pub trait ListClientExt {
offset: Option<&Path>,
) -> BoxStream<'_, Result<ListResult>>;

async fn list(
&self,
prefix: Option<&Path>,
) -> Result<BoxStream<'_, Result<ObjectMeta>>>;
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>>;

async fn list_with_offset(
fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> Result<BoxStream<'_, Result<ObjectMeta>>>;
) -> BoxStream<'_, Result<ObjectMeta>>;

async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult>;
}
Expand Down Expand Up @@ -90,31 +87,22 @@ impl<T: ListClient> ListClientExt for T {
.boxed()
}

async fn list(
&self,
prefix: Option<&Path>,
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
let stream = self
.list_paginated(prefix, false, None)
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
self.list_paginated(prefix, false, None)
.map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
.try_flatten()
.boxed();

Ok(stream)
.boxed()
}

async fn list_with_offset(
fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
let stream = self
.list_paginated(prefix, false, Some(offset))
) -> BoxStream<'_, Result<ObjectMeta>> {
self.list_paginated(prefix, false, Some(offset))
.map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
.try_flatten()
.boxed();

Ok(stream)
.boxed()
}

async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
Expand Down
7 changes: 2 additions & 5 deletions object_store/src/gcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,11 +601,8 @@ impl ObjectStore for GoogleCloudStorage {
self.client.delete_request(location).await
}

async fn list(
&self,
prefix: Option<&Path>,
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
self.client.list(prefix).await
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
self.client.list(prefix)
}

async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
Expand Down
24 changes: 13 additions & 11 deletions object_store/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::StreamExt;
use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
use snafu::{OptionExt, ResultExt, Snafu};
use tokio::io::AsyncWrite;
Expand Down Expand Up @@ -122,14 +122,13 @@ impl ObjectStore for HttpStore {
self.client.delete(location).await
}

async fn list(
&self,
prefix: Option<&Path>,
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
let prefix_len = prefix.map(|p| p.as_ref().len()).unwrap_or_default();
let status = self.client.list(prefix, "infinity").await?;
Ok(futures::stream::iter(
status
let prefix = prefix.cloned();
futures::stream::once(async move {
let status = self.client.list(prefix.as_ref(), "infinity").await?;

let iter = status
.response
.into_iter()
.filter(|r| !r.is_dir())
Expand All @@ -138,9 +137,12 @@ impl ObjectStore for HttpStore {
response.object_meta(self.client.base_url())
})
// Filter out exact prefix matches
.filter_ok(move |r| r.location.as_ref().len() > prefix_len),
)
.boxed())
.filter_ok(move |r| r.location.as_ref().len() > prefix_len);

Ok::<_, crate::Error>(futures::stream::iter(iter))
})
.try_flatten()
.boxed()
}

async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
Expand Down
Loading

0 comments on commit fa7a61a

Please sign in to comment.