Skip to content

Commit

Permalink
feat: Add list prefix support (#3728)
Browse files Browse the repository at this point in the history
* Remove limit on list prefix

Signed-off-by: Xuanwo <[email protected]>

* Cover more test

Signed-off-by: Xuanwo <[email protected]>

* feat: Add list prefix support

Signed-off-by: Xuanwo <[email protected]>

* Add upgrade

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Dec 8, 2023
1 parent a8d5848 commit 9a222e4
Show file tree
Hide file tree
Showing 13 changed files with 173 additions and 73 deletions.
5 changes: 2 additions & 3 deletions bindings/nodejs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@ use std::collections::HashMap;
use std::str::FromStr;
use std::time::Duration;

use opendal::raw::oio::BlockingRead;
use opendal::raw::oio::ReadExt;

use futures::TryStreamExt;
use napi::bindgen_prelude::*;
use opendal::raw::oio::BlockingRead;
use opendal::raw::oio::ReadExt;

#[napi]
pub struct Operator(opendal::Operator);
Expand Down
4 changes: 4 additions & 0 deletions core/src/docs/upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

- The `thread_pool_enabled` option has been removed.

### List Prefix Supported

After [RFC: List Prefix](crate::docs::rfcs::rfc_3243_list_prefix) landed, we have changed the behavior of `list` a path without `/`. OpenDAL used to return `NotADirectory` error, but now we will return the list of entries that start with given prefix instead.

# Upgrade to v0.43

## Public API
Expand Down
48 changes: 38 additions & 10 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use bytes::Bytes;
use crate::raw::oio::FileReader;
use crate::raw::oio::FlatLister;
use crate::raw::oio::LazyReader;
use crate::raw::oio::PrefixLister;
use crate::raw::oio::RangeReader;
use crate::raw::oio::StreamableReader;
use crate::raw::*;
Expand Down Expand Up @@ -105,10 +106,8 @@ use crate::*;
/// Underlying services will return [`Capability`] to indicate the
/// features that returning listers support.
///
/// - If both `list_without_recursive` and `list_with_recursive`, return directly.
/// - If only `list_with_recursive`, with [`oio::to_flat_lister`].
/// - if only `list_without_recursive`, with [`oio::to_hierarchy_lister`].
/// - If neither not supported, something must be wrong for `list` is true.
/// - If support `list_with_recursive`, return directly.
/// - if not, wrap with [`FlatLister`].
///
/// ## Capability Check
///
Expand Down Expand Up @@ -366,9 +365,8 @@ impl<A: Accessor> CompleteAccessor<A> {
let recursive = args.recursive();

match (recursive, cap.list_with_recursive) {
// - If service can list_with_recursive
// - If recursive is false
(_, true) | (false, _) => {
// - If service can list_with_recursive, we can forward list to it directly.
(_, true) => {
let (rp, p) = self.inner.list(path, args).await?;
Ok((rp, CompleteLister::AlreadyComplete(p)))
}
Expand All @@ -377,6 +375,20 @@ impl<A: Accessor> CompleteAccessor<A> {
let p = FlatLister::new(self.inner.clone(), path);
Ok((RpList::default(), CompleteLister::NeedFlat(p)))
}
// If recursive and service doesn't support list_with_recursive, we need to handle
// list prefix by ourselves.
(false, false) => {
// Forward path that ends with /
if path.ends_with('/') {
let (rp, p) = self.inner.list(path, args).await?;
Ok((rp, CompleteLister::AlreadyComplete(p)))
} else {
let parent = get_parent(path);
let (rp, p) = self.inner.list(parent, args).await?;
let p = PrefixLister::new(p, path);
Ok((rp, CompleteLister::NeedPrefix(p)))
}
}
}
}

Expand All @@ -393,9 +405,8 @@ impl<A: Accessor> CompleteAccessor<A> {
let recursive = args.recursive();

match (recursive, cap.list_with_recursive) {
// - If service can both list_with_recursive
// - If recursive is false
(_, true) | (false, _) => {
// - If service can list_with_recursive, we can forward list to it directly.
(_, true) => {
let (rp, p) = self.inner.blocking_list(path, args)?;
Ok((rp, CompleteLister::AlreadyComplete(p)))
}
Expand All @@ -404,6 +415,20 @@ impl<A: Accessor> CompleteAccessor<A> {
let p = FlatLister::new(self.inner.clone(), path);
Ok((RpList::default(), CompleteLister::NeedFlat(p)))
}
// If recursive and service doesn't support list_with_recursive, we need to handle
// list prefix by ourselves.
(false, false) => {
// Forward path that ends with /
if path.ends_with('/') {
let (rp, p) = self.inner.blocking_list(path, args)?;
Ok((rp, CompleteLister::AlreadyComplete(p)))
} else {
let parent = get_parent(path);
let (rp, p) = self.inner.blocking_list(parent, args)?;
let p = PrefixLister::new(p, path);
Ok((rp, CompleteLister::NeedPrefix(p)))
}
}
}
}
}
Expand Down Expand Up @@ -706,6 +731,7 @@ where
pub enum CompleteLister<A: Accessor, P> {
AlreadyComplete(P),
NeedFlat(FlatLister<Arc<A>, P>),
NeedPrefix(PrefixLister<P>),
}

#[async_trait]
Expand All @@ -720,6 +746,7 @@ where
match self {
AlreadyComplete(p) => p.poll_next(cx),
NeedFlat(p) => p.poll_next(cx),
NeedPrefix(p) => p.poll_next(cx),
}
}
}
Expand All @@ -735,6 +762,7 @@ where
match self {
AlreadyComplete(p) => p.next(),
NeedFlat(p) => p.next(),
NeedPrefix(p) => p.next(),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/raw/oio/list/flat_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::*;
/// ListFuture is the future returned while calling async list.
type ListFuture<A, L> = BoxFuture<'static, (A, oio::Entry, Result<(RpList, L)>)>;

/// ToFlatLister will walk dir in bottom up way:
/// FlatLister will walk dir in bottom up way:
///
/// - List nested dir first
/// - Go back into parent dirs one by one
Expand Down
3 changes: 3 additions & 0 deletions core/src/raw/oio/list/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,6 @@ pub use flat_list::FlatLister;

mod hierarchy_list;
pub use hierarchy_list::HierarchyLister;

mod prefix_list;
pub use prefix_list::PrefixLister;
82 changes: 82 additions & 0 deletions core/src/raw/oio/list/prefix_list.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::task::ready;
use std::task::Context;
use std::task::Poll;

use crate::raw::*;
use crate::*;

/// PrefixLister is used to filter entries by prefix.
///
/// For example, if we have a lister that returns entries:
///
/// ```txt
/// .
/// ├── file_a
/// └── file_b
/// ```
///
/// We can use `PrefixLister` to filter entries with prefix `file_`.
pub struct PrefixLister<L> {
lister: L,
prefix: String,
}

/// # Safety
///
/// We will only take `&mut Self` reference for FsLister.
unsafe impl<L> Sync for PrefixLister<L> {}

impl<L> PrefixLister<L> {
/// Create a new flat lister
pub fn new(lister: L, prefix: &str) -> PrefixLister<L> {
PrefixLister {
lister,
prefix: prefix.to_string(),
}
}
}

impl<L> oio::List for PrefixLister<L>
where
L: oio::List,
{
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> {
loop {
match ready!(self.lister.poll_next(cx)) {
Ok(Some(e)) if !e.path().starts_with(&self.prefix) => continue,
v => return Poll::Ready(v),
}
}
}
}

impl<L> oio::BlockingList for PrefixLister<L>
where
L: oio::BlockingList,
{
fn next(&mut self) -> Result<Option<oio::Entry>> {
loop {
match self.lister.next() {
Ok(Some(e)) if !e.path().starts_with(&self.prefix) => continue,
v => return v,
}
}
}
}
3 changes: 2 additions & 1 deletion core/src/services/dashmap/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
// under the License.

use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::fmt::Debug;
use std::fmt::Formatter;

use async_trait::async_trait;
use dashmap::DashMap;
Expand Down
5 changes: 3 additions & 2 deletions core/src/services/dropbox/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use backon::ExponentialBuilder;
use std::default::Default;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;
use std::time::Duration;

use backon::ExponentialBuilder;
use bytes::Bytes;
use chrono::DateTime;
use chrono::Utc;
Expand All @@ -36,7 +36,8 @@ use serde::Deserialize;
use serde::Serialize;
use tokio::sync::Mutex;

use super::error::{parse_error, DropboxErrorResponse};
use super::error::parse_error;
use super::error::DropboxErrorResponse;
use crate::raw::*;
use crate::*;

Expand Down
3 changes: 2 additions & 1 deletion core/src/services/huggingface/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,11 @@ pub(super) struct HuggingfaceImport {

#[cfg(test)]
mod tests {
use bytes::Bytes;

use super::*;
use crate::raw::new_json_deserialize_error;
use crate::types::Result;
use bytes::Bytes;

#[test]
fn parse_list_response_test() -> Result<()> {
Expand Down
4 changes: 2 additions & 2 deletions core/src/types/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use crate::*;
///
/// - Lister implements `Stream<Item = Result<Entry>>`.
/// - Lister will return `None` if there is no more entries or error has been returned.
///
pub struct Lister {
acc: FusedAccessor,
lister: Option<oio::Lister>,
Expand Down Expand Up @@ -85,8 +84,9 @@ pub struct Lister {
///
/// ```rust
/// use std::mem::size_of;
/// use opendal::Result;
///
/// use opendal::Entry;
/// use opendal::Result;
///
/// assert_eq!(264, size_of::<(String, Result<opendal::raw::RpStat>)>());
/// assert_eq!(264, size_of::<Option<Entry>>());
Expand Down
24 changes: 1 addition & 23 deletions core/src/types/operator/blocking_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1024,16 +1024,6 @@ impl BlockingOperator {
path,
OpList::default(),
|inner, path, args| {
if !validate_path(&path, EntryMode::DIR) {
return Err(Error::new(
ErrorKind::NotADirectory,
"the path trying to list should end with `/`",
)
.with_operation("BlockingOperator::list")
.with_context("service", inner.info().scheme().into_static())
.with_context("path", &path));
}

let lister = BlockingLister::create(inner, &path, args)?;

lister.collect()
Expand Down Expand Up @@ -1197,19 +1187,7 @@ impl BlockingOperator {
self.inner().clone(),
path,
OpList::default(),
|inner, path, args| {
if !validate_path(&path, EntryMode::DIR) {
return Err(Error::new(
ErrorKind::NotADirectory,
"the path trying to list should end with `/`",
)
.with_operation("BlockingOperator::list")
.with_context("service", inner.info().scheme().into_static())
.with_context("path", &path));
}

BlockingLister::create(inner, &path, args)
},
|inner, path, args| BlockingLister::create(inner, &path, args),
))
}
}
Expand Down
24 changes: 1 addition & 23 deletions core/src/types/operator/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1153,16 +1153,6 @@ impl Operator {
OpList::default(),
|inner, path, args| {
let fut = async move {
if !validate_path(&path, EntryMode::DIR) {
return Err(Error::new(
ErrorKind::NotADirectory,
"the path trying to list should end with `/`",
)
.with_operation("Operator::list")
.with_context("service", inner.info().scheme().into_static())
.with_context("path", &path));
}

let lister = Lister::create(inner, &path, args).await?;

lister.try_collect().await
Expand Down Expand Up @@ -1330,19 +1320,7 @@ impl Operator {
path,
OpList::default(),
|inner, path, args| {
let fut = async move {
if !validate_path(&path, EntryMode::DIR) {
return Err(Error::new(
ErrorKind::NotADirectory,
"the path trying to list should end with `/`",
)
.with_operation("Operator::list")
.with_context("service", inner.info().scheme().into_static())
.with_context("path", &path));
}

Lister::create(inner, &path, args).await
};
let fut = async move { Lister::create(inner, &path, args).await };
Box::pin(fut)
},
));
Expand Down
Loading

0 comments on commit 9a222e4

Please sign in to comment.