Skip to content

Commit

Permalink
refactor(object_store): upgrade object_store to 0.7. (#3713)
Browse files Browse the repository at this point in the history
* upgrade object_store to 0.7.

* impl ObjectStore:list_with_offset
  • Loading branch information
youngsofun authored Dec 5, 2023
1 parent e164199 commit ba3fc45
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 12 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion integrations/object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ version.workspace = true
async-trait = "0.1"
bytes = "1"
futures = "0.3"
object_store = "0.6"
object_store = "0.7"
opendal.workspace = true
tokio = "1"

Expand Down
90 changes: 82 additions & 8 deletions integrations/object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@ use bytes::Bytes;
use futures::stream::BoxStream;
use futures::Stream;
use futures::StreamExt;
use futures::TryStreamExt;
use object_store::path::Path;
use object_store::GetOptions;
use object_store::GetResult;
use object_store::GetResultPayload;
use object_store::ListResult;
use object_store::MultipartId;
use object_store::ObjectMeta;
use object_store::ObjectStore;
use object_store::Result;
use opendal::Entry;
use opendal::Metadata;
use opendal::Metakey;
use opendal::Operator;
Expand Down Expand Up @@ -88,24 +91,39 @@ impl ObjectStore for OpendalStore {
})
}

async fn get_opts(&self, location: &Path, _: GetOptions) -> Result<GetResult> {
let r = self
async fn get_opts(&self, _location: &Path, _options: GetOptions) -> Result<GetResult> {
Err(object_store::Error::NotSupported {
source: Box::new(opendal::Error::new(
opendal::ErrorKind::Unsupported,
"get_opts is not implemented so far",
)),
})
}

async fn get(&self, location: &Path) -> Result<GetResult> {
let meta = self
.inner
.reader(location.as_ref())
.stat(location.as_ref())
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;

Ok(GetResult::Stream(Box::pin(OpendalReader { inner: r })))
}

async fn get(&self, location: &Path) -> Result<GetResult> {
let meta = ObjectMeta {
location: location.clone(),
last_modified: meta.last_modified().unwrap_or_default(),
size: meta.content_length() as usize,
e_tag: meta.etag().map(|x| x.to_string()),
};
let r = self
.inner
.reader(location.as_ref())
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;

Ok(GetResult::Stream(Box::pin(OpendalReader { inner: r })))
Ok(GetResult {
payload: GetResultPayload::Stream(Box::pin(OpendalReader { inner: r })),
range: (0..meta.size),
meta,
})
}

async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
Expand Down Expand Up @@ -165,6 +183,37 @@ impl ObjectStore for OpendalStore {
Ok(stream.boxed())
}

async fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
let path = prefix.map_or("".into(), |x| format!("{}/", x));
let offset = offset.clone();
let stream = if self.inner.info().full_capability().list_with_start_after {
self.inner
.lister_with(&path)
.start_after(offset.as_ref())
.metakey(Metakey::ContentLength | Metakey::LastModified)
.recursive(true)
.await
.map_err(|err| format_object_store_error(err, &path))?
.then(try_format_object_meta)
.boxed()
} else {
self.inner
.lister_with(&path)
.metakey(Metakey::ContentLength | Metakey::LastModified)
.recursive(true)
.await
.map_err(|err| format_object_store_error(err, &path))?
.try_filter(move |entry| futures::future::ready(entry.path() > offset.as_ref()))
.then(try_format_object_meta)
.boxed()
};
Ok(stream)
}

async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
let path = prefix.map_or("".into(), |x| format!("{}/", x));
let mut stream = self
Expand Down Expand Up @@ -252,6 +301,13 @@ fn format_object_meta(path: &str, meta: &Metadata) -> ObjectMeta {
}
}

async fn try_format_object_meta(res: Result<Entry, opendal::Error>) -> Result<ObjectMeta> {
let entry = res.map_err(|err| format_object_store_error(err, ""))?;
let meta = entry.metadata();

Ok(format_object_meta(entry.path(), meta))
}

struct OpendalReader {
inner: Reader,
}
Expand Down Expand Up @@ -377,4 +433,22 @@ mod tests {
assert_eq!(result.objects[0].location.as_ref(), "data/test.txt");
assert_eq!(result.common_prefixes[0].as_ref(), "data/nested");
}

#[tokio::test]
async fn test_list_with_offset() {
let object_store = create_test_object_store().await;
let path: Path = "data/".try_into().unwrap();
let offset: Path = "data/nested/test.txt".try_into().unwrap();
let result = object_store
.list_with_offset(Some(&path), &offset)
.await
.unwrap()
.collect::<Vec<_>>()
.await;
assert_eq!(result.len(), 1);
assert_eq!(
result[0].as_ref().unwrap().location.as_ref(),
"data/test.txt"
);
}
}

0 comments on commit ba3fc45

Please sign in to comment.