diff --git a/bindings/nodejs/src/lib.rs b/bindings/nodejs/src/lib.rs index bd3525bfcc02..ee05ee2cf457 100644 --- a/bindings/nodejs/src/lib.rs +++ b/bindings/nodejs/src/lib.rs @@ -24,11 +24,10 @@ use std::collections::HashMap; use std::str::FromStr; use std::time::Duration; -use opendal::raw::oio::BlockingRead; -use opendal::raw::oio::ReadExt; - use futures::TryStreamExt; use napi::bindgen_prelude::*; +use opendal::raw::oio::BlockingRead; +use opendal::raw::oio::ReadExt; #[napi] pub struct Operator(opendal::Operator); diff --git a/core/src/docs/upgrade.md b/core/src/docs/upgrade.md index f769cca2faa4..d5d8c9d7de8d 100644 --- a/core/src/docs/upgrade.md +++ b/core/src/docs/upgrade.md @@ -6,6 +6,10 @@ - The `thread_pool_enabled` option has been removed. +### List Prefix Supported + +After [RFC: List Prefix](crate::docs::rfcs::rfc_3243_list_prefix) landed, we have changed the behavior of `list` a path without `/`. OpenDAL used to return `NotADirectory` error, but now we will return the list of entries that start with given prefix instead. + # Upgrade to v0.43 ## Public API diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index 278a9e827eb2..d41a4d6e0e31 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -30,6 +30,7 @@ use bytes::Bytes; use crate::raw::oio::FileReader; use crate::raw::oio::FlatLister; use crate::raw::oio::LazyReader; +use crate::raw::oio::PrefixLister; use crate::raw::oio::RangeReader; use crate::raw::oio::StreamableReader; use crate::raw::*; @@ -105,10 +106,8 @@ use crate::*; /// Underlying services will return [`Capability`] to indicate the /// features that returning listers support. /// -/// - If both `list_without_recursive` and `list_with_recursive`, return directly. -/// - If only `list_with_recursive`, with [`oio::to_flat_lister`]. -/// - if only `list_without_recursive`, with [`oio::to_hierarchy_lister`]. -/// - If neither not supported, something must be wrong for `list` is true. +/// - If support `list_with_recursive`, return directly. +/// - if not, wrap with [`FlatLister`]. /// /// ## Capability Check /// @@ -366,9 +365,8 @@ impl CompleteAccessor { let recursive = args.recursive(); match (recursive, cap.list_with_recursive) { - // - If service can list_with_recursive - // - If recursive is false - (_, true) | (false, _) => { + // - If service can list_with_recursive, we can forward list to it directly. + (_, true) => { let (rp, p) = self.inner.list(path, args).await?; Ok((rp, CompleteLister::AlreadyComplete(p))) } @@ -377,6 +375,20 @@ impl CompleteAccessor { let p = FlatLister::new(self.inner.clone(), path); Ok((RpList::default(), CompleteLister::NeedFlat(p))) } + // If recursive and service doesn't support list_with_recursive, we need to handle + // list prefix by ourselves. + (false, false) => { + // Forward path that ends with / + if path.ends_with('/') { + let (rp, p) = self.inner.list(path, args).await?; + Ok((rp, CompleteLister::AlreadyComplete(p))) + } else { + let parent = get_parent(path); + let (rp, p) = self.inner.list(parent, args).await?; + let p = PrefixLister::new(p, path); + Ok((rp, CompleteLister::NeedPrefix(p))) + } + } } } @@ -393,9 +405,8 @@ impl CompleteAccessor { let recursive = args.recursive(); match (recursive, cap.list_with_recursive) { - // - If service can both list_with_recursive - // - If recursive is false - (_, true) | (false, _) => { + // - If service can list_with_recursive, we can forward list to it directly. + (_, true) => { let (rp, p) = self.inner.blocking_list(path, args)?; Ok((rp, CompleteLister::AlreadyComplete(p))) } @@ -404,6 +415,20 @@ impl CompleteAccessor { let p = FlatLister::new(self.inner.clone(), path); Ok((RpList::default(), CompleteLister::NeedFlat(p))) } + // If recursive and service doesn't support list_with_recursive, we need to handle + // list prefix by ourselves. + (false, false) => { + // Forward path that ends with / + if path.ends_with('/') { + let (rp, p) = self.inner.blocking_list(path, args)?; + Ok((rp, CompleteLister::AlreadyComplete(p))) + } else { + let parent = get_parent(path); + let (rp, p) = self.inner.blocking_list(parent, args)?; + let p = PrefixLister::new(p, path); + Ok((rp, CompleteLister::NeedPrefix(p))) + } + } } } } @@ -706,6 +731,7 @@ where pub enum CompleteLister { AlreadyComplete(P), NeedFlat(FlatLister, P>), + NeedPrefix(PrefixLister

), } #[async_trait] @@ -720,6 +746,7 @@ where match self { AlreadyComplete(p) => p.poll_next(cx), NeedFlat(p) => p.poll_next(cx), + NeedPrefix(p) => p.poll_next(cx), } } } @@ -735,6 +762,7 @@ where match self { AlreadyComplete(p) => p.next(), NeedFlat(p) => p.next(), + NeedPrefix(p) => p.next(), } } } diff --git a/core/src/raw/oio/list/flat_list.rs b/core/src/raw/oio/list/flat_list.rs index 9e8e2a5ac58c..c574d064e30f 100644 --- a/core/src/raw/oio/list/flat_list.rs +++ b/core/src/raw/oio/list/flat_list.rs @@ -28,7 +28,7 @@ use crate::*; /// ListFuture is the future returned while calling async list. type ListFuture = BoxFuture<'static, (A, oio::Entry, Result<(RpList, L)>)>; -/// ToFlatLister will walk dir in bottom up way: +/// FlatLister will walk dir in bottom up way: /// /// - List nested dir first /// - Go back into parent dirs one by one diff --git a/core/src/raw/oio/list/mod.rs b/core/src/raw/oio/list/mod.rs index 62c52de3cb8d..31f70d6f2328 100644 --- a/core/src/raw/oio/list/mod.rs +++ b/core/src/raw/oio/list/mod.rs @@ -33,3 +33,6 @@ pub use flat_list::FlatLister; mod hierarchy_list; pub use hierarchy_list::HierarchyLister; + +mod prefix_list; +pub use prefix_list::PrefixLister; diff --git a/core/src/raw/oio/list/prefix_list.rs b/core/src/raw/oio/list/prefix_list.rs new file mode 100644 index 000000000000..f176e9bb9f1c --- /dev/null +++ b/core/src/raw/oio/list/prefix_list.rs @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::task::ready; +use std::task::Context; +use std::task::Poll; + +use crate::raw::*; +use crate::*; + +/// PrefixLister is used to filter entries by prefix. +/// +/// For example, if we have a lister that returns entries: +/// +/// ```txt +/// . +/// ├── file_a +/// └── file_b +/// ``` +/// +/// We can use `PrefixLister` to filter entries with prefix `file_`. +pub struct PrefixLister { + lister: L, + prefix: String, +} + +/// # Safety +/// +/// We will only take `&mut Self` reference for FsLister. +unsafe impl Sync for PrefixLister {} + +impl PrefixLister { + /// Create a new flat lister + pub fn new(lister: L, prefix: &str) -> PrefixLister { + PrefixLister { + lister, + prefix: prefix.to_string(), + } + } +} + +impl oio::List for PrefixLister +where + L: oio::List, +{ + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + loop { + match ready!(self.lister.poll_next(cx)) { + Ok(Some(e)) if !e.path().starts_with(&self.prefix) => continue, + v => return Poll::Ready(v), + } + } + } +} + +impl oio::BlockingList for PrefixLister +where + L: oio::BlockingList, +{ + fn next(&mut self) -> Result> { + loop { + match self.lister.next() { + Ok(Some(e)) if !e.path().starts_with(&self.prefix) => continue, + v => return v, + } + } + } +} diff --git a/core/src/services/dashmap/backend.rs b/core/src/services/dashmap/backend.rs index b85b528e2949..f626bcae28ff 100644 --- a/core/src/services/dashmap/backend.rs +++ b/core/src/services/dashmap/backend.rs @@ -16,7 +16,8 @@ // under the License. use std::collections::HashMap; -use std::fmt::{Debug, Formatter}; +use std::fmt::Debug; +use std::fmt::Formatter; use async_trait::async_trait; use dashmap::DashMap; diff --git a/core/src/services/dropbox/core.rs b/core/src/services/dropbox/core.rs index dbc53f69de81..3fff5115decb 100644 --- a/core/src/services/dropbox/core.rs +++ b/core/src/services/dropbox/core.rs @@ -15,13 +15,13 @@ // specific language governing permissions and limitations // under the License. -use backon::ExponentialBuilder; use std::default::Default; use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; use std::time::Duration; +use backon::ExponentialBuilder; use bytes::Bytes; use chrono::DateTime; use chrono::Utc; @@ -36,7 +36,8 @@ use serde::Deserialize; use serde::Serialize; use tokio::sync::Mutex; -use super::error::{parse_error, DropboxErrorResponse}; +use super::error::parse_error; +use super::error::DropboxErrorResponse; use crate::raw::*; use crate::*; diff --git a/core/src/services/huggingface/core.rs b/core/src/services/huggingface/core.rs index 67963082e81a..768b416006d6 100644 --- a/core/src/services/huggingface/core.rs +++ b/core/src/services/huggingface/core.rs @@ -234,10 +234,11 @@ pub(super) struct HuggingfaceImport { #[cfg(test)] mod tests { + use bytes::Bytes; + use super::*; use crate::raw::new_json_deserialize_error; use crate::types::Result; - use bytes::Bytes; #[test] fn parse_list_response_test() -> Result<()> { diff --git a/core/src/types/list.rs b/core/src/types/list.rs index bc2e751205b1..d9140c9dad61 100644 --- a/core/src/types/list.rs +++ b/core/src/types/list.rs @@ -40,7 +40,6 @@ use crate::*; /// /// - Lister implements `Stream>`. /// - Lister will return `None` if there is no more entries or error has been returned. -/// pub struct Lister { acc: FusedAccessor, lister: Option, @@ -85,8 +84,9 @@ pub struct Lister { /// /// ```rust /// use std::mem::size_of; -/// use opendal::Result; +/// /// use opendal::Entry; +/// use opendal::Result; /// /// assert_eq!(264, size_of::<(String, Result)>()); /// assert_eq!(264, size_of::>()); diff --git a/core/src/types/operator/blocking_operator.rs b/core/src/types/operator/blocking_operator.rs index f811eeedb9a6..d3efad854d4e 100644 --- a/core/src/types/operator/blocking_operator.rs +++ b/core/src/types/operator/blocking_operator.rs @@ -1024,16 +1024,6 @@ impl BlockingOperator { path, OpList::default(), |inner, path, args| { - if !validate_path(&path, EntryMode::DIR) { - return Err(Error::new( - ErrorKind::NotADirectory, - "the path trying to list should end with `/`", - ) - .with_operation("BlockingOperator::list") - .with_context("service", inner.info().scheme().into_static()) - .with_context("path", &path)); - } - let lister = BlockingLister::create(inner, &path, args)?; lister.collect() @@ -1197,19 +1187,7 @@ impl BlockingOperator { self.inner().clone(), path, OpList::default(), - |inner, path, args| { - if !validate_path(&path, EntryMode::DIR) { - return Err(Error::new( - ErrorKind::NotADirectory, - "the path trying to list should end with `/`", - ) - .with_operation("BlockingOperator::list") - .with_context("service", inner.info().scheme().into_static()) - .with_context("path", &path)); - } - - BlockingLister::create(inner, &path, args) - }, + |inner, path, args| BlockingLister::create(inner, &path, args), )) } } diff --git a/core/src/types/operator/operator.rs b/core/src/types/operator/operator.rs index 63557797689e..3e961fdaece0 100644 --- a/core/src/types/operator/operator.rs +++ b/core/src/types/operator/operator.rs @@ -1153,16 +1153,6 @@ impl Operator { OpList::default(), |inner, path, args| { let fut = async move { - if !validate_path(&path, EntryMode::DIR) { - return Err(Error::new( - ErrorKind::NotADirectory, - "the path trying to list should end with `/`", - ) - .with_operation("Operator::list") - .with_context("service", inner.info().scheme().into_static()) - .with_context("path", &path)); - } - let lister = Lister::create(inner, &path, args).await?; lister.try_collect().await @@ -1330,19 +1320,7 @@ impl Operator { path, OpList::default(), |inner, path, args| { - let fut = async move { - if !validate_path(&path, EntryMode::DIR) { - return Err(Error::new( - ErrorKind::NotADirectory, - "the path trying to list should end with `/`", - ) - .with_operation("Operator::list") - .with_context("service", inner.info().scheme().into_static()) - .with_context("path", &path)); - } - - Lister::create(inner, &path, args).await - }; + let fut = async move { Lister::create(inner, &path, args).await }; Box::pin(fut) }, )); diff --git a/core/tests/behavior/list.rs b/core/tests/behavior/list.rs index 17f4a695a273..d9da4909030e 100644 --- a/core/tests/behavior/list.rs +++ b/core/tests/behavior/list.rs @@ -39,6 +39,7 @@ pub fn behavior_list_tests(op: &Operator) -> Vec { test_list_dir, test_list_dir_with_metakey, test_list_dir_with_metakey_complete, + test_list_prefix, test_list_rich_dir, test_list_empty_dir, test_list_non_exist_dir, @@ -46,8 +47,8 @@ pub fn behavior_list_tests(op: &Operator) -> Vec { test_list_nested_dir, test_list_dir_with_file_path, test_list_with_start_after, - test_scan, - test_scan_root, + test_list_with_recursive, + test_list_root_with_recursive, test_remove_all ) } @@ -175,6 +176,23 @@ pub async fn test_list_dir_with_metakey_complete(op: Operator) -> Result<()> { Ok(()) } +/// List prefix should return newly created file. +pub async fn test_list_prefix(op: Operator) -> Result<()> { + let path = uuid::Uuid::new_v4().to_string(); + debug!("Generate a random file: {}", &path); + let (content, _) = gen_bytes(op.info().full_capability()); + + op.write(&path, content).await.expect("write must succeed"); + + let obs = op.list(&path[..path.len() - 1]).await?; + assert_eq!(obs.len(), 1); + assert_eq!(obs[0].path(), path); + assert_eq!(obs[0].metadata().mode(), EntryMode::FILE); + + op.delete(&path).await.expect("delete must succeed"); + Ok(()) +} + /// listing a directory, which contains more objects than a single page can take. pub async fn test_list_rich_dir(op: Operator) -> Result<()> { op.create_dir("test_list_rich_dir/").await?; @@ -322,10 +340,17 @@ pub async fn test_list_nested_dir(op: Operator) -> Result<()> { /// List with path file should auto add / suffix. pub async fn test_list_dir_with_file_path(op: Operator) -> Result<()> { let parent = uuid::Uuid::new_v4().to_string(); + let file = format!("{parent}/{}", uuid::Uuid::new_v4()); + + let (content, _) = gen_bytes(op.info().full_capability()); + op.write(&file, content).await?; + + let obs = op.list(&parent).await?; + assert_eq!(obs.len(), 1); + assert_eq!(obs[0].path(), format!("{parent}/")); + assert_eq!(obs[0].metadata().mode(), EntryMode::DIR); - let obs = op.lister(&parent).await.map(|_| ()); - assert!(obs.is_err()); - assert_eq!(obs.unwrap_err().kind(), ErrorKind::NotADirectory); + op.delete(&file).await?; Ok(()) } @@ -371,7 +396,7 @@ pub async fn test_list_with_start_after(op: Operator) -> Result<()> { Ok(()) } -pub async fn test_scan_root(op: Operator) -> Result<()> { +pub async fn test_list_root_with_recursive(op: Operator) -> Result<()> { let w = op.lister_with("").recursive(true).await?; let actual = w .try_collect::>() @@ -386,7 +411,7 @@ pub async fn test_scan_root(op: Operator) -> Result<()> { } // Walk top down should output as expected -pub async fn test_scan(op: Operator) -> Result<()> { +pub async fn test_list_with_recursive(op: Operator) -> Result<()> { let parent = uuid::Uuid::new_v4().to_string(); let expected = [