From 4f75135bebcabe8641f18536fcbefa289bc98035 Mon Sep 17 00:00:00 2001 From: meteorgan Date: Fri, 22 Nov 2024 21:09:40 +0800 Subject: [PATCH 1/5] feat(core/layers): add capability_check layer to check whether the operation is supported by the underlying service --- core/src/layers/capability_check.rs | 965 ++++++++++++++++++++++++++++ core/src/layers/complete.rs | 369 +---------- core/src/layers/mod.rs | 3 + core/src/layers/timeout.rs | 4 +- core/src/raw/accessor.rs | 2 +- core/src/types/operator/builder.rs | 1 + 6 files changed, 976 insertions(+), 368 deletions(-) create mode 100644 core/src/layers/capability_check.rs diff --git a/core/src/layers/capability_check.rs b/core/src/layers/capability_check.rs new file mode 100644 index 000000000000..257ecbf0bf87 --- /dev/null +++ b/core/src/layers/capability_check.rs @@ -0,0 +1,965 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::{Debug, Formatter}; +use std::sync::Arc; + +use crate::raw::{ + Access, AccessorInfo, Layer, LayeredAccess, OpBatch, OpCopy, OpCreateDir, OpDelete, OpList, + OpPresign, OpRead, OpRename, OpStat, OpWrite, Operation, RpBatch, RpCopy, RpCreateDir, + RpDelete, RpList, RpPresign, RpRead, RpRename, RpStat, RpWrite, TwoWays, +}; +use crate::{Error, ErrorKind}; + +/// Add a capability check layer for every operation +/// +/// Before performing any operations, we will first check +/// the operation against capability of the underlying service. If the +/// operation is not supported, an error will be returned directly. +/// +/// # Notes +/// +/// Currently, we have two types of capability checkers: `DefaultCapabilityChecker` and `CorrectnessCapabilityChecker` +/// +/// ## DefaultCapabilityChecker +/// +/// OpenDAL applies this checker to every accessor by default, so users don't need to invoke it manually. +/// in `DefaultCapabilityChecker`, we'll verify whether the operation itself is supported by underlying service. +/// +/// for example, when calling `list()`, if `list` is not supported by the underlying service, an `Unsupported` error +/// is returned. +/// +/// ## CorrectnessCapabilityChecker +/// +/// this checker ensures that critical arguments, which might affect the correctness of the call, are +/// supported by the underlying service. +/// +/// for example, when calling `read()` with a specified version, but `read_with_version` is not supported by +/// the underlying service, an `Unsupported` error is returned. without this check, incorrect or undesired data +/// may be retrieved. +/// +/// # examples +/// +/// ```no_run +/// # use opendal::layers::CapabilityCheckLayer; +/// # use opendal::services; +/// # use opendal::Operator; +/// # use opendal::Result; +/// # use opendal::Scheme; +/// +/// # fn main() -> Result<()> { +/// use opendal::layers::CapabilityCheckLayer; +/// let _ = Operator::new(services::Memory::default())? +/// .layer(CapabilityCheckLayer::with_correctness_checker()) +/// .finish(); +/// Ok(()) +/// # } +/// ``` +#[derive(Default)] +pub struct CapabilityCheckLayer { + check_correctness: bool, +} + +impl CapabilityCheckLayer { + /// Create a `CapabilityLayer` with default settings + pub fn new() -> Self { + Self::default() + } + + /// Create a `CapabilityLayer` with correctness checker + pub fn with_correctness_checker() -> Self { + CapabilityCheckLayer { + check_correctness: true, + } + } + + fn new_unsupported_error(info: &AccessorInfo, op: impl Into<&'static str>) -> Error { + let scheme = info.scheme(); + let op = op.into(); + + Error::new( + ErrorKind::Unsupported, + format!("service {scheme} doesn't support operation {op}"), + ) + .with_operation(op) + } + + fn new_unsupported_args_error( + info: &AccessorInfo, + op: impl Into<&'static str>, + args: &str, + ) -> Error { + let scheme = info.scheme(); + let op = op.into(); + + Error::new( + ErrorKind::Unsupported, + format!("service {scheme} doesn't support operation {op} with args {args}"), + ) + .with_operation(op) + } +} + +impl Layer for CapabilityCheckLayer { + type LayeredAccess = + TwoWays, CorrectnessCapabilityCheckAccessor>; + + fn layer(&self, inner: A) -> Self::LayeredAccess { + if !self.check_correctness { + TwoWays::One(DefaultCapabilityCheckAccessor { + info: inner.info(), + inner, + }) + } else { + TwoWays::Two(CorrectnessCapabilityCheckAccessor { + info: inner.info(), + inner, + }) + } + } +} + +impl Debug + for TwoWays, CorrectnessCapabilityCheckAccessor> +{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + self.inner().fmt(f) + } +} + +impl LayeredAccess + for TwoWays, CorrectnessCapabilityCheckAccessor> +{ + type Inner = A; + type Reader = A::Reader; + type BlockingReader = A::BlockingReader; + type Writer = A::Writer; + type BlockingWriter = A::BlockingWriter; + type Lister = A::Lister; + type BlockingLister = A::BlockingLister; + + fn inner(&self) -> &Self::Inner { + match self { + TwoWays::One(v) => v.inner(), + TwoWays::Two(v) => v.inner(), + } + } + + fn info(&self) -> Arc { + match self { + TwoWays::One(v) => LayeredAccess::info(v), + TwoWays::Two(v) => LayeredAccess::info(v), + } + } + + async fn create_dir(&self, path: &str, args: OpCreateDir) -> crate::Result { + match self { + TwoWays::One(v) => LayeredAccess::create_dir(v, path, args).await, + TwoWays::Two(v) => LayeredAccess::create_dir(v, path, args).await, + } + } + + async fn read(&self, path: &str, args: OpRead) -> crate::Result<(RpRead, Self::Reader)> { + match self { + TwoWays::One(v) => LayeredAccess::read(v, path, args).await, + TwoWays::Two(v) => LayeredAccess::read(v, path, args).await, + } + } + + async fn write(&self, path: &str, args: OpWrite) -> crate::Result<(RpWrite, Self::Writer)> { + match self { + TwoWays::One(v) => LayeredAccess::write(v, path, args).await, + TwoWays::Two(v) => LayeredAccess::write(v, path, args).await, + } + } + + async fn copy(&self, from: &str, to: &str, args: OpCopy) -> crate::Result { + match self { + TwoWays::One(v) => LayeredAccess::copy(v, from, to, args).await, + TwoWays::Two(v) => LayeredAccess::copy(v, from, to, args).await, + } + } + + async fn rename(&self, from: &str, to: &str, args: OpRename) -> crate::Result { + match self { + TwoWays::One(v) => LayeredAccess::rename(v, from, to, args).await, + TwoWays::Two(v) => LayeredAccess::rename(v, from, to, args).await, + } + } + + async fn stat(&self, path: &str, args: OpStat) -> crate::Result { + match self { + TwoWays::One(v) => LayeredAccess::stat(v, path, args).await, + TwoWays::Two(v) => LayeredAccess::stat(v, path, args).await, + } + } + + async fn delete(&self, path: &str, args: OpDelete) -> crate::Result { + match self { + TwoWays::One(v) => LayeredAccess::delete(v, path, args).await, + TwoWays::Two(v) => LayeredAccess::delete(v, path, args).await, + } + } + + async fn list(&self, path: &str, args: OpList) -> crate::Result<(RpList, Self::Lister)> { + match self { + TwoWays::One(v) => LayeredAccess::list(v, path, args).await, + TwoWays::Two(v) => LayeredAccess::list(v, path, args).await, + } + } + + async fn batch(&self, args: OpBatch) -> crate::Result { + match self { + TwoWays::One(v) => LayeredAccess::batch(v, args).await, + TwoWays::Two(v) => LayeredAccess::batch(v, args).await, + } + } + + async fn presign(&self, path: &str, args: OpPresign) -> crate::Result { + match self { + TwoWays::One(v) => LayeredAccess::presign(v, path, args).await, + TwoWays::Two(v) => LayeredAccess::presign(v, path, args).await, + } + } + + fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> crate::Result { + match self { + TwoWays::One(v) => LayeredAccess::blocking_create_dir(v, path, args), + TwoWays::Two(v) => LayeredAccess::blocking_create_dir(v, path, args), + } + } + + fn blocking_read( + &self, + path: &str, + args: OpRead, + ) -> crate::Result<(RpRead, Self::BlockingReader)> { + match self { + TwoWays::One(v) => LayeredAccess::blocking_read(v, path, args), + TwoWays::Two(v) => LayeredAccess::blocking_read(v, path, args), + } + } + + fn blocking_write( + &self, + path: &str, + args: OpWrite, + ) -> crate::Result<(RpWrite, Self::BlockingWriter)> { + match self { + TwoWays::One(v) => LayeredAccess::blocking_write(v, path, args), + TwoWays::Two(v) => LayeredAccess::blocking_write(v, path, args), + } + } + + fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> crate::Result { + match self { + TwoWays::One(v) => LayeredAccess::blocking_copy(v, from, to, args), + TwoWays::Two(v) => LayeredAccess::blocking_copy(v, from, to, args), + } + } + + fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> crate::Result { + match self { + TwoWays::One(v) => LayeredAccess::blocking_rename(v, from, to, args), + TwoWays::Two(v) => LayeredAccess::blocking_rename(v, from, to, args), + } + } + + fn blocking_stat(&self, path: &str, args: OpStat) -> crate::Result { + match self { + TwoWays::One(v) => LayeredAccess::blocking_stat(v, path, args), + TwoWays::Two(v) => LayeredAccess::blocking_stat(v, path, args), + } + } + + fn blocking_delete(&self, path: &str, args: OpDelete) -> crate::Result { + match self { + TwoWays::One(v) => LayeredAccess::blocking_delete(v, path, args), + TwoWays::Two(v) => LayeredAccess::blocking_delete(v, path, args), + } + } + + fn blocking_list( + &self, + path: &str, + args: OpList, + ) -> crate::Result<(RpList, Self::BlockingLister)> { + match self { + TwoWays::One(v) => LayeredAccess::blocking_list(v, path, args), + TwoWays::Two(v) => LayeredAccess::blocking_list(v, path, args), + } + } +} + +pub struct DefaultCapabilityCheckAccessor { + info: Arc, + inner: A, +} + +impl Debug for DefaultCapabilityCheckAccessor { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DefaultCapabilityCheckAccessor") + .field("inner", &self.inner) + .finish_non_exhaustive() + } +} + +impl LayeredAccess for DefaultCapabilityCheckAccessor { + type Inner = A; + type Reader = A::Reader; + type BlockingReader = A::BlockingReader; + type Writer = A::Writer; + type BlockingWriter = A::BlockingWriter; + type Lister = A::Lister; + type BlockingLister = A::BlockingLister; + + fn inner(&self) -> &Self::Inner { + &self.inner + } + + fn info(&self) -> Arc { + self.info.clone() + } + + async fn create_dir(&self, path: &str, args: OpCreateDir) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.create_dir { + return Err(CapabilityCheckLayer::new_unsupported_error( + self.info.as_ref(), + Operation::CreateDir, + )); + } + + self.inner.create_dir(path, args).await + } + + async fn read(&self, path: &str, args: OpRead) -> crate::Result<(RpRead, Self::Reader)> { + let capability = self.info.full_capability(); + if !capability.read { + return Err(CapabilityCheckLayer::new_unsupported_error( + self.info.as_ref(), + Operation::Read, + )); + } + + self.inner.read(path, args).await + } + + async fn write(&self, path: &str, args: OpWrite) -> crate::Result<(RpWrite, Self::Writer)> { + let capability = self.info.full_capability(); + if !capability.write { + return Err(CapabilityCheckLayer::new_unsupported_error( + &self.info, + Operation::Write, + )); + } + if args.append() && !capability.write_can_append { + return Err(CapabilityCheckLayer::new_unsupported_args_error( + &self.info, + Operation::Write, + "append", + )); + } + + self.inner.write(path, args).await + } + + async fn copy(&self, from: &str, to: &str, args: OpCopy) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.copy { + return Err(CapabilityCheckLayer::new_unsupported_error( + self.info.as_ref(), + Operation::Copy, + )); + } + + self.inner.copy(from, to, args).await + } + + async fn rename(&self, from: &str, to: &str, args: OpRename) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.rename { + return Err(CapabilityCheckLayer::new_unsupported_error( + self.info.as_ref(), + Operation::Rename, + )); + } + + self.inner.rename(from, to, args).await + } + + async fn stat(&self, path: &str, args: OpStat) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.stat { + return Err(CapabilityCheckLayer::new_unsupported_error( + self.info.as_ref(), + Operation::Stat, + )); + } + + self.inner.stat(path, args).await + } + + async fn delete(&self, path: &str, args: OpDelete) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.delete { + return Err(CapabilityCheckLayer::new_unsupported_error( + self.info.as_ref(), + Operation::Delete, + )); + } + + self.inner.delete(path, args).await + } + + async fn list(&self, path: &str, args: OpList) -> crate::Result<(RpList, Self::Lister)> { + let capability = self.info.full_capability(); + if !capability.list { + return Err(CapabilityCheckLayer::new_unsupported_error( + self.info.as_ref(), + Operation::List, + )); + } + + self.inner.list(path, args).await + } + + async fn batch(&self, args: OpBatch) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.batch { + return Err(CapabilityCheckLayer::new_unsupported_error( + self.info.as_ref(), + Operation::Batch, + )); + } + + self.inner.batch(args).await + } + + async fn presign(&self, path: &str, args: OpPresign) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.presign { + return Err(CapabilityCheckLayer::new_unsupported_error( + self.info.as_ref(), + Operation::Presign, + )); + } + + self.inner.presign(path, args).await + } + + fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.create_dir || !capability.blocking { + return Err(CapabilityCheckLayer::new_unsupported_error( + self.info.as_ref(), + Operation::BlockingCreateDir, + )); + } + + self.inner.blocking_create_dir(path, args) + } + + fn blocking_read( + &self, + path: &str, + args: OpRead, + ) -> crate::Result<(RpRead, Self::BlockingReader)> { + let capability = self.info.full_capability(); + if !capability.read || !capability.blocking { + return Err(CapabilityCheckLayer::new_unsupported_error( + self.info.as_ref(), + Operation::BlockingRead, + )); + } + + self.inner.blocking_read(path, args) + } + + fn blocking_write( + &self, + path: &str, + args: OpWrite, + ) -> crate::Result<(RpWrite, Self::BlockingWriter)> { + let capability = self.info.full_capability(); + if !capability.write || !capability.blocking { + return Err(CapabilityCheckLayer::new_unsupported_error( + &self.info, + Operation::BlockingWrite, + )); + } + + if args.append() && !capability.write_can_append { + return Err(CapabilityCheckLayer::new_unsupported_args_error( + &self.info, + Operation::BlockingWrite, + "append", + )); + } + + self.inner.blocking_write(path, args) + } + + fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.copy || !capability.blocking { + return Err(CapabilityCheckLayer::new_unsupported_error( + self.info.as_ref(), + Operation::BlockingCopy, + )); + } + + self.inner().blocking_copy(from, to, args) + } + + fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.rename || !capability.blocking { + return Err(CapabilityCheckLayer::new_unsupported_error( + self.info.as_ref(), + Operation::BlockingRename, + )); + } + + self.inner().blocking_rename(from, to, args) + } + + fn blocking_stat(&self, path: &str, args: OpStat) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.stat || !capability.blocking { + return Err(CapabilityCheckLayer::new_unsupported_error( + self.info.as_ref(), + Operation::BlockingStat, + )); + } + + self.inner.blocking_stat(path, args) + } + + fn blocking_delete(&self, path: &str, args: OpDelete) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.delete || !capability.blocking { + return Err(CapabilityCheckLayer::new_unsupported_error( + self.info.as_ref(), + Operation::BlockingDelete, + )); + } + + self.inner().blocking_delete(path, args) + } + + fn blocking_list( + &self, + path: &str, + args: OpList, + ) -> crate::Result<(RpList, Self::BlockingLister)> { + let capability = self.info.full_capability(); + if !capability.list || !capability.blocking { + return Err(CapabilityCheckLayer::new_unsupported_error( + self.info.as_ref(), + Operation::BlockingList, + )); + } + + self.inner.blocking_list(path, args) + } +} + +pub struct CorrectnessCapabilityCheckAccessor { + info: Arc, + inner: A, +} + +impl Debug for CorrectnessCapabilityCheckAccessor { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CorrectnessCapabilityCheckAccessor") + .field("inner", &self.inner) + .finish_non_exhaustive() + } +} + +impl LayeredAccess for CorrectnessCapabilityCheckAccessor { + type Inner = A; + type Reader = A::Reader; + type BlockingReader = A::BlockingReader; + type Writer = A::Writer; + type BlockingWriter = A::BlockingWriter; + type Lister = A::Lister; + type BlockingLister = A::BlockingLister; + + fn inner(&self) -> &Self::Inner { + &self.inner + } + + async fn read(&self, path: &str, args: OpRead) -> crate::Result<(RpRead, Self::Reader)> { + let capability = self.info.full_capability(); + if !capability.read_with_version && args.version().is_some() { + return Err(CapabilityCheckLayer::new_unsupported_args_error( + self.info.as_ref(), + Operation::Read, + "version", + )); + } + + self.inner.read(path, args).await + } + + async fn write(&self, path: &str, args: OpWrite) -> crate::Result<(RpWrite, Self::Writer)> { + let capability = self.info.full_capability(); + if !capability.write_with_if_none_match && args.if_none_match().is_some() { + return Err(CapabilityCheckLayer::new_unsupported_args_error( + self.info.as_ref(), + Operation::Write, + "if_none_match", + )); + } + + self.inner.write(path, args).await + } + + async fn stat(&self, path: &str, args: OpStat) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.stat_with_version && args.version().is_some() { + return Err(CapabilityCheckLayer::new_unsupported_args_error( + self.info.as_ref(), + Operation::Stat, + "version", + )); + } + + self.inner.stat(path, args).await + } + + async fn delete(&self, path: &str, args: OpDelete) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.delete_with_version && args.version().is_some() { + return Err(CapabilityCheckLayer::new_unsupported_args_error( + self.info.as_ref(), + Operation::Delete, + "version", + )); + } + + self.inner.delete(path, args).await + } + + async fn list(&self, path: &str, args: OpList) -> crate::Result<(RpList, Self::Lister)> { + let capability = self.info.full_capability(); + if !capability.list_with_version && args.version() { + return Err(CapabilityCheckLayer::new_unsupported_args_error( + self.info.as_ref(), + Operation::List, + "version", + )); + } + + self.inner.list(path, args).await + } + + fn blocking_read( + &self, + path: &str, + args: OpRead, + ) -> crate::Result<(RpRead, Self::BlockingReader)> { + let capability = self.info.full_capability(); + if !capability.read_with_version && args.version().is_some() { + return Err(CapabilityCheckLayer::new_unsupported_args_error( + self.info.as_ref(), + Operation::BlockingRead, + "version", + )); + } + + self.inner.blocking_read(path, args) + } + + fn blocking_write( + &self, + path: &str, + args: OpWrite, + ) -> crate::Result<(RpWrite, Self::BlockingWriter)> { + let capability = self.info.full_capability(); + if !capability.write_with_if_none_match && args.if_none_match().is_some() { + return Err(CapabilityCheckLayer::new_unsupported_args_error( + self.info.as_ref(), + Operation::BlockingWrite, + "if_none_match", + )); + } + + self.inner.blocking_write(path, args) + } + + fn blocking_stat(&self, path: &str, args: OpStat) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.stat_with_version && args.version().is_some() { + return Err(CapabilityCheckLayer::new_unsupported_args_error( + self.info.as_ref(), + Operation::BlockingStat, + "version", + )); + } + + self.inner.blocking_stat(path, args) + } + + fn blocking_delete(&self, path: &str, args: OpDelete) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.delete_with_version && args.version().is_some() { + return Err(CapabilityCheckLayer::new_unsupported_args_error( + self.info.as_ref(), + Operation::BlockingDelete, + "version", + )); + } + + self.inner.blocking_delete(path, args) + } + + fn blocking_list( + &self, + path: &str, + args: OpList, + ) -> crate::Result<(RpList, Self::BlockingLister)> { + let capability = self.info.full_capability(); + if !capability.list_with_version && args.version() { + return Err(CapabilityCheckLayer::new_unsupported_args_error( + self.info.as_ref(), + Operation::BlockingList, + "version", + )); + } + + self.inner.blocking_list(path, args) + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::*; + use crate::raw::{oio, PresignedRequest}; + use crate::{Capability, EntryMode, Metadata, Operator}; + use http::HeaderMap; + use http::Method as HttpMethod; + + #[derive(Debug)] + struct MockService { + capability: Capability, + } + + impl Access for MockService { + type Reader = oio::Reader; + type Writer = oio::Writer; + type Lister = oio::Lister; + type BlockingReader = oio::BlockingReader; + type BlockingWriter = oio::BlockingWriter; + type BlockingLister = oio::BlockingLister; + + fn info(&self) -> Arc { + let mut info = AccessorInfo::default(); + info.set_native_capability(self.capability); + + info.into() + } + + async fn create_dir(&self, _: &str, _: OpCreateDir) -> crate::Result { + Ok(RpCreateDir {}) + } + + async fn stat(&self, _: &str, _: OpStat) -> crate::Result { + Ok(RpStat::new(Metadata::new(EntryMode::Unknown))) + } + + async fn read(&self, _: &str, _: OpRead) -> crate::Result<(RpRead, Self::Reader)> { + Ok((RpRead::new(), Box::new(bytes::Bytes::new()))) + } + + async fn write(&self, _: &str, _: OpWrite) -> crate::Result<(RpWrite, Self::Writer)> { + Ok((RpWrite::new(), Box::new(()))) + } + + async fn delete(&self, _: &str, _: OpDelete) -> crate::Result { + Ok(RpDelete {}) + } + + async fn list(&self, _: &str, _: OpList) -> crate::Result<(RpList, Self::Lister)> { + Ok((RpList {}, Box::new(()))) + } + + async fn copy(&self, _: &str, _: &str, _: OpCopy) -> crate::Result { + Ok(RpCopy {}) + } + + async fn rename(&self, _: &str, _: &str, _: OpRename) -> crate::Result { + Ok(RpRename {}) + } + + async fn presign(&self, _: &str, _: OpPresign) -> crate::Result { + Ok(RpPresign::new(PresignedRequest::new( + HttpMethod::POST, + "https://example.com/presign".parse().expect("should parse"), + HeaderMap::new(), + ))) + } + } + + fn new_test_operator(capability: Capability) -> Operator { + let srv = MockService { capability }; + + Operator::from_inner(Arc::new(srv)).layer(CapabilityCheckLayer::new()) + } + + #[tokio::test] + async fn test_read() { + let op = new_test_operator(Capability::default()); + let res = op.read("path").await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + read: true, + stat: true, + ..Default::default() + }); + let res = op.read("path").await; + assert!(res.is_ok()) + } + + #[tokio::test] + async fn test_stat() { + let op = new_test_operator(Capability::default()); + let res = op.stat("path").await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + stat: true, + ..Default::default() + }); + let res = op.stat("path").await; + assert!(res.is_ok()) + } + + #[tokio::test] + async fn test_writer() { + let op = new_test_operator(Capability::default()); + let bs: Vec = vec![]; + let res = op.write("path", bs).await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + write: true, + ..Default::default() + }); + let res = op.writer("path").await; + assert!(res.is_ok()) + } + + #[tokio::test] + async fn test_create_dir() { + let op = new_test_operator(Capability::default()); + let res = op.create_dir("path/").await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + create_dir: true, + ..Default::default() + }); + let res = op.create_dir("path/").await; + assert!(res.is_ok()) + } + + #[tokio::test] + async fn test_delete() { + let op = new_test_operator(Capability::default()); + let res = op.delete("path").await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + delete: true, + ..Default::default() + }); + let res = op.delete("path").await; + assert!(res.is_ok()) + } + + #[tokio::test] + async fn test_copy() { + let op = new_test_operator(Capability::default()); + let res = op.copy("path_a", "path_b").await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + copy: true, + ..Default::default() + }); + let res = op.copy("path_a", "path_b").await; + assert!(res.is_ok()) + } + + #[tokio::test] + async fn test_rename() { + let op = new_test_operator(Capability::default()); + let res = op.rename("path_a", "path_b").await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + rename: true, + ..Default::default() + }); + let res = op.rename("path_a", "path_b").await; + assert!(res.is_ok()) + } + + #[tokio::test] + async fn test_list() { + let op = new_test_operator(Capability::default()); + let res = op.list("path/").await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + list: true, + list_with_recursive: true, + ..Default::default() + }); + let res = op.list("path/").await; + assert!(res.is_ok()) + } + + #[tokio::test] + async fn test_presign() { + let op = new_test_operator(Capability::default()); + let res = op.presign_read("path", Duration::from_secs(1)).await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + presign: true, + ..Default::default() + }); + let res = op.presign_read("path", Duration::from_secs(1)).await; + assert!(res.is_ok()) + } +} diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index 46f9e4ccc0c1..1136a7eaef96 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -98,11 +98,6 @@ use crate::*; /// - If support `list_with_recursive`, return directly. /// - if not, wrap with [`FlatLister`]. /// -/// ## Capability Check -/// -/// Before performing any operations, `CompleteLayer` will first check -/// the operation against capability of the underlying service. If the -/// operation is not supported, an error will be returned directly. pub struct CompleteLayer; impl Layer for CompleteLayer { @@ -129,28 +124,19 @@ impl Debug for CompleteAccessor { } impl CompleteAccessor { - fn new_unsupported_error(&self, op: impl Into<&'static str>) -> Error { - let scheme = self.info.scheme(); - let op = op.into(); - Error::new( - ErrorKind::Unsupported, - format!("service {scheme} doesn't support operation {op}"), - ) - .with_operation(op) - } - async fn complete_create_dir(&self, path: &str, args: OpCreateDir) -> Result { let capability = self.info.full_capability(); if capability.create_dir { return self.inner().create_dir(path, args).await; } + if capability.write_can_empty && capability.list { let (_, mut w) = self.inner.write(path, OpWrite::default()).await?; oio::Write::close(&mut w).await?; return Ok(RpCreateDir::default()); } - Err(self.new_unsupported_error(Operation::CreateDir)) + unreachable!("with capability check, we cannot reach here") } fn complete_blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result { @@ -158,20 +144,18 @@ impl CompleteAccessor { if capability.create_dir && capability.blocking { return self.inner().blocking_create_dir(path, args); } + if capability.write_can_empty && capability.list && capability.blocking { let (_, mut w) = self.inner.blocking_write(path, OpWrite::default())?; oio::BlockingWrite::close(&mut w)?; return Ok(RpCreateDir::default()); } - Err(self.new_unsupported_error(Operation::BlockingCreateDir)) + unreachable!("with capability check, we cannot reach here") } async fn complete_stat(&self, path: &str, args: OpStat) -> Result { let capability = self.info.full_capability(); - if !capability.stat { - return Err(self.new_unsupported_error(Operation::Stat)); - } if path == "/" { return Ok(RpStat::new(Metadata::new(EntryMode::DIR))); @@ -214,9 +198,6 @@ impl CompleteAccessor { fn complete_blocking_stat(&self, path: &str, args: OpStat) -> Result { let capability = self.info.full_capability(); - if !capability.stat { - return Err(self.new_unsupported_error(Operation::Stat)); - } if path == "/" { return Ok(RpStat::new(Metadata::new(EntryMode::DIR))); @@ -262,9 +243,6 @@ impl CompleteAccessor { args: OpList, ) -> Result<(RpList, CompleteLister)> { let cap = self.info.full_capability(); - if !cap.list { - return Err(self.new_unsupported_error(Operation::List)); - } let recursive = args.recursive(); @@ -310,9 +288,6 @@ impl CompleteAccessor { args: OpList, ) -> Result<(RpList, CompleteLister)> { let cap = self.info.full_capability(); - if !cap.list { - return Err(self.new_unsupported_error(Operation::BlockingList)); - } let recursive = args.recursive(); @@ -381,11 +356,6 @@ impl LayeredAccess for CompleteAccessor { } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - let capability = self.info.full_capability(); - if !capability.read { - return Err(self.new_unsupported_error(Operation::Read)); - } - let size = args.range().size(); self.inner .read(path, args) @@ -394,93 +364,24 @@ impl LayeredAccess for CompleteAccessor { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - let capability = self.info.full_capability(); - if !capability.write { - return Err(self.new_unsupported_error(Operation::Write)); - } - if args.append() && !capability.write_can_append { - return Err(Error::new( - ErrorKind::Unsupported, - format!( - "service {} doesn't support operation write with append", - self.info.scheme() - ), - )); - } - let (rp, w) = self.inner.write(path, args.clone()).await?; let w = CompleteWriter::new(w); Ok((rp, w)) } - async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result { - let capability = self.info.full_capability(); - if !capability.copy { - return Err(self.new_unsupported_error(Operation::Copy)); - } - - self.inner().copy(from, to, args).await - } - - async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result { - let capability = self.info.full_capability(); - if !capability.rename { - return Err(self.new_unsupported_error(Operation::Rename)); - } - - self.inner().rename(from, to, args).await - } - async fn stat(&self, path: &str, args: OpStat) -> Result { self.complete_stat(path, args).await } - async fn delete(&self, path: &str, args: OpDelete) -> Result { - let capability = self.info.full_capability(); - if !capability.delete { - return Err(self.new_unsupported_error(Operation::Delete)); - } - - self.inner().delete(path, args).await - } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { - let capability = self.info.full_capability(); - if !capability.list { - return Err(self.new_unsupported_error(Operation::List)); - } - self.complete_list(path, args).await } - async fn batch(&self, args: OpBatch) -> Result { - let capability = self.info.full_capability(); - if !capability.batch { - return Err(self.new_unsupported_error(Operation::Batch)); - } - - self.inner().batch(args).await - } - - async fn presign(&self, path: &str, args: OpPresign) -> Result { - let capability = self.info.full_capability(); - if !capability.presign { - return Err(self.new_unsupported_error(Operation::Presign)); - } - - self.inner.presign(path, args).await - } - fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result { self.complete_blocking_create_dir(path, args) } fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { - let capability = self.info.full_capability(); - if !capability.read || !capability.blocking { - return Err(self.new_unsupported_error(Operation::Read)); - } - let size = args.range().size(); self.inner .blocking_read(path, args) @@ -488,63 +389,16 @@ impl LayeredAccess for CompleteAccessor { } fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { - let capability = self.info.full_capability(); - if !capability.write || !capability.blocking { - return Err(self.new_unsupported_error(Operation::BlockingWrite)); - } - - if args.append() && !capability.write_can_append { - return Err(Error::new( - ErrorKind::Unsupported, - format!( - "service {} doesn't support operation write with append", - self.info.scheme() - ), - )); - } - self.inner .blocking_write(path, args) .map(|(rp, w)| (rp, CompleteWriter::new(w))) } - fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result { - let capability = self.info.full_capability(); - if !capability.copy || !capability.blocking { - return Err(self.new_unsupported_error(Operation::BlockingCopy)); - } - - self.inner().blocking_copy(from, to, args) - } - - fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> Result { - let capability = self.info.full_capability(); - if !capability.rename || !capability.blocking { - return Err(self.new_unsupported_error(Operation::BlockingRename)); - } - - self.inner().blocking_rename(from, to, args) - } - fn blocking_stat(&self, path: &str, args: OpStat) -> Result { self.complete_blocking_stat(path, args) } - fn blocking_delete(&self, path: &str, args: OpDelete) -> Result { - let capability = self.info.full_capability(); - if !capability.delete || !capability.blocking { - return Err(self.new_unsupported_error(Operation::BlockingDelete)); - } - - self.inner().blocking_delete(path, args) - } - fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> { - let capability = self.info.full_capability(); - if !capability.list || !capability.blocking { - return Err(self.new_unsupported_error(Operation::BlockingList)); - } - self.complete_blocking_list(path, args) } } @@ -694,218 +548,3 @@ where Ok(()) } } - -#[cfg(test)] -mod tests { - use std::time::Duration; - - use http::HeaderMap; - use http::Method as HttpMethod; - - use super::*; - - #[derive(Debug)] - struct MockService { - capability: Capability, - } - - impl Access for MockService { - type Reader = oio::Reader; - type Writer = oio::Writer; - type Lister = oio::Lister; - type BlockingReader = oio::BlockingReader; - type BlockingWriter = oio::BlockingWriter; - type BlockingLister = oio::BlockingLister; - - fn info(&self) -> Arc { - let mut info = AccessorInfo::default(); - info.set_native_capability(self.capability); - - info.into() - } - - async fn create_dir(&self, _: &str, _: OpCreateDir) -> Result { - Ok(RpCreateDir {}) - } - - async fn stat(&self, _: &str, _: OpStat) -> Result { - Ok(RpStat::new(Metadata::new(EntryMode::Unknown))) - } - - async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> { - Ok((RpRead::new(), Box::new(bytes::Bytes::new()))) - } - - async fn write(&self, _: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> { - Ok((RpWrite::new(), Box::new(()))) - } - - async fn delete(&self, _: &str, _: OpDelete) -> Result { - Ok(RpDelete {}) - } - - async fn list(&self, _: &str, _: OpList) -> Result<(RpList, Self::Lister)> { - Ok((RpList {}, Box::new(()))) - } - - async fn copy(&self, _: &str, _: &str, _: OpCopy) -> Result { - Ok(RpCopy {}) - } - - async fn rename(&self, _: &str, _: &str, _: OpRename) -> Result { - Ok(RpRename {}) - } - - async fn presign(&self, _: &str, _: OpPresign) -> Result { - Ok(RpPresign::new(PresignedRequest::new( - HttpMethod::POST, - "https://example.com/presign".parse().expect("should parse"), - HeaderMap::new(), - ))) - } - } - - fn new_test_operator(capability: Capability) -> Operator { - let srv = MockService { capability }; - - Operator::from_inner(Arc::new(srv)).layer(CompleteLayer) - } - - #[tokio::test] - async fn test_read() { - let op = new_test_operator(Capability::default()); - let res = op.read("path").await; - assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); - - let op = new_test_operator(Capability { - read: true, - stat: true, - ..Default::default() - }); - let res = op.read("path").await; - assert!(res.is_ok()) - } - - #[tokio::test] - async fn test_stat() { - let op = new_test_operator(Capability::default()); - let res = op.stat("path").await; - assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); - - let op = new_test_operator(Capability { - stat: true, - ..Default::default() - }); - let res = op.stat("path").await; - assert!(res.is_ok()) - } - - #[tokio::test] - async fn test_writer() { - let op = new_test_operator(Capability::default()); - let bs: Vec = vec![]; - let res = op.write("path", bs).await; - assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); - - let op = new_test_operator(Capability { - write: true, - ..Default::default() - }); - let res = op.writer("path").await; - assert!(res.is_ok()) - } - - #[tokio::test] - async fn test_create_dir() { - let op = new_test_operator(Capability::default()); - let res = op.create_dir("path/").await; - assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); - - let op = new_test_operator(Capability { - create_dir: true, - ..Default::default() - }); - let res = op.create_dir("path/").await; - assert!(res.is_ok()) - } - - #[tokio::test] - async fn test_delete() { - let op = new_test_operator(Capability::default()); - let res = op.delete("path").await; - assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); - - let op = new_test_operator(Capability { - delete: true, - ..Default::default() - }); - let res = op.delete("path").await; - assert!(res.is_ok()) - } - - #[tokio::test] - async fn test_copy() { - let op = new_test_operator(Capability::default()); - let res = op.copy("path_a", "path_b").await; - assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); - - let op = new_test_operator(Capability { - copy: true, - ..Default::default() - }); - let res = op.copy("path_a", "path_b").await; - assert!(res.is_ok()) - } - - #[tokio::test] - async fn test_rename() { - let op = new_test_operator(Capability::default()); - let res = op.rename("path_a", "path_b").await; - assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); - - let op = new_test_operator(Capability { - rename: true, - ..Default::default() - }); - let res = op.rename("path_a", "path_b").await; - assert!(res.is_ok()) - } - - #[tokio::test] - async fn test_list() { - let op = new_test_operator(Capability::default()); - let res = op.list("path/").await; - assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); - - let op = new_test_operator(Capability { - list: true, - list_with_recursive: true, - ..Default::default() - }); - let res = op.list("path/").await; - assert!(res.is_ok()) - } - - #[tokio::test] - async fn test_presign() { - let op = new_test_operator(Capability::default()); - let res = op.presign_read("path", Duration::from_secs(1)).await; - assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); - - let op = new_test_operator(Capability { - presign: true, - ..Default::default() - }); - let res = op.presign_read("path", Duration::from_secs(1)).await; - assert!(res.is_ok()) - } -} diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs index c43db2331aff..cf672d59032a 100644 --- a/core/src/layers/mod.rs +++ b/core/src/layers/mod.rs @@ -113,3 +113,6 @@ mod dtrace; pub use self::dtrace::DtraceLayer; pub mod observe; + +mod capability_check; +pub use capability_check::CapabilityCheckLayer; diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index 2bda1292b03b..41f6e0ab5c35 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -107,9 +107,9 @@ use crate::*; /// This might introduce a bit overhead for IO operations, but it's the only way to implement /// timeout correctly. We used to implement timeout layer in zero cost way that only stores /// a [`std::time::Instant`] and check the timeout by comparing the instant with current time. -/// However, it doesn't works for all cases. +/// However, it doesn't work for all cases. /// -/// For examples, users TCP connection could be in [Busy ESTAB](https://blog.cloudflare.com/when-tcp-sockets-refuse-to-die) state. In this state, no IO event will be emit. The runtime +/// For examples, users TCP connection could be in [Busy ESTAB](https://blog.cloudflare.com/when-tcp-sockets-refuse-to-die) state. In this state, no IO event will be emitted. The runtime /// will never poll our future again. From the application side, this future is hanging forever /// until this TCP connection is closed for reaching the linux [net.ipv4.tcp_retries2](https://man7.org/linux/man-pages/man7/tcp.7.html) times. #[derive(Clone)] diff --git a/core/src/raw/accessor.rs b/core/src/raw/accessor.rs index 888dc4fa18ed..e9b6e1f6e22a 100644 --- a/core/src/raw/accessor.rs +++ b/core/src/raw/accessor.rs @@ -76,7 +76,7 @@ pub trait Access: Send + Sync + Debug + Unpin + 'static { /// This function is required to be implemented. /// /// By returning AccessorInfo, underlying services can declare - /// some useful information about it self. + /// some useful information about itself. /// /// - scheme: declare the scheme of backend. /// - capabilities: declare the capabilities of current backend. diff --git a/core/src/types/operator/builder.rs b/core/src/types/operator/builder.rs index 95dfa1c17a1e..2c6c33fc0b77 100644 --- a/core/src/types/operator/builder.rs +++ b/core/src/types/operator/builder.rs @@ -472,6 +472,7 @@ impl OperatorBuilder { OperatorBuilder { accessor } .layer(ErrorContextLayer) .layer(CompleteLayer) + .layer(CapabilityCheckLayer::default()) } /// Create a new layer with static dispatch. From df6a2fbcce67399739633e59fac8635c80aaa1f0 Mon Sep 17 00:00:00 2001 From: meteorgan Date: Mon, 25 Nov 2024 21:38:25 +0800 Subject: [PATCH 2/5] fix typo --- core/src/layers/blocking.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs index a68538ec6dec..6972849799d5 100644 --- a/core/src/layers/blocking.rs +++ b/core/src/layers/blocking.rs @@ -59,7 +59,7 @@ use crate::*; /// ## In async context with blocking functions /// /// If `BlockingLayer` is called in blocking function, please fetch a [`tokio::runtime::EnterGuard`] -/// first. You can use [`Handle::try_current`] first to get the handle and than call [`Handle::enter`]. +/// first. You can use [`Handle::try_current`] first to get the handle and then call [`Handle::enter`]. /// This often happens in the case that async function calls blocking function. /// /// ```rust,no_run From 477525ff21ff6e74e95821572a5547b3245cb50b Mon Sep 17 00:00:00 2001 From: meteorgan Date: Tue, 3 Dec 2024 18:27:49 +0800 Subject: [PATCH 3/5] add CapabilityChecker and CorrectnessChecker --- core/src/layers/capability_check.rs | 713 +++------------------------ core/src/layers/complete.rs | 4 +- core/src/layers/correctness_check.rs | 602 ++++++++++++++++++++++ core/src/layers/mod.rs | 4 +- core/src/types/operator/builder.rs | 2 +- 5 files changed, 688 insertions(+), 637 deletions(-) create mode 100644 core/src/layers/correctness_check.rs diff --git a/core/src/layers/capability_check.rs b/core/src/layers/capability_check.rs index 257ecbf0bf87..40ae15d1bf88 100644 --- a/core/src/layers/capability_check.rs +++ b/core/src/layers/capability_check.rs @@ -15,584 +15,74 @@ // specific language governing permissions and limitations // under the License. -use std::fmt::{Debug, Formatter}; -use std::sync::Arc; - +use crate::layers::correctness_check::new_unsupported_args_error; use crate::raw::{ - Access, AccessorInfo, Layer, LayeredAccess, OpBatch, OpCopy, OpCreateDir, OpDelete, OpList, - OpPresign, OpRead, OpRename, OpStat, OpWrite, Operation, RpBatch, RpCopy, RpCreateDir, - RpDelete, RpList, RpPresign, RpRead, RpRename, RpStat, RpWrite, TwoWays, + Access, AccessorInfo, Layer, LayeredAccess, OpDelete, OpList, OpRead, OpStat, OpWrite, + Operation, RpDelete, RpList, RpRead, RpStat, RpWrite, }; -use crate::{Error, ErrorKind}; +use std::fmt::{Debug, Formatter}; +use std::sync::Arc; -/// Add a capability check layer for every operation -/// -/// Before performing any operations, we will first check -/// the operation against capability of the underlying service. If the -/// operation is not supported, an error will be returned directly. +/// Add an extra capability check layer for every operation /// -/// # Notes +/// Similar to `CorrectnessChecker`, Before performing any operations, this layer will first verify +/// its arguments against the capability of the underlying service. If the arguments is not supported, +/// an error will be returned directly. /// -/// Currently, we have two types of capability checkers: `DefaultCapabilityChecker` and `CorrectnessCapabilityChecker` +/// Notes /// -/// ## DefaultCapabilityChecker +/// There are two main differences between this checker with the `CorrectnessChecker`: +/// 1. This checker provides additional checks for capabilities like write_with_content_type and +/// list_with_version, among others. These capabilities do not affect data integrity, even if +/// the underlying storage services do not support them. /// -/// OpenDAL applies this checker to every accessor by default, so users don't need to invoke it manually. -/// in `DefaultCapabilityChecker`, we'll verify whether the operation itself is supported by underlying service. -/// -/// for example, when calling `list()`, if `list` is not supported by the underlying service, an `Unsupported` error -/// is returned. -/// -/// ## CorrectnessCapabilityChecker -/// -/// this checker ensures that critical arguments, which might affect the correctness of the call, are -/// supported by the underlying service. -/// -/// for example, when calling `read()` with a specified version, but `read_with_version` is not supported by -/// the underlying service, an `Unsupported` error is returned. without this check, incorrect or undesired data -/// may be retrieved. +/// 2. OpenDAL doesn't apply this checker by default. Users can enable this layer if they want to +/// enforce stricter requirements. /// /// # examples /// /// ```no_run -/// # use opendal::layers::CapabilityCheckLayer; +/// # use opendal::layers::CapabilityChecker; /// # use opendal::services; /// # use opendal::Operator; /// # use opendal::Result; /// # use opendal::Scheme; /// /// # fn main() -> Result<()> { -/// use opendal::layers::CapabilityCheckLayer; +/// use opendal::layers::CapabilityChecker; /// let _ = Operator::new(services::Memory::default())? -/// .layer(CapabilityCheckLayer::with_correctness_checker()) +/// .layer(CapabilityChecker) /// .finish(); /// Ok(()) /// # } /// ``` #[derive(Default)] -pub struct CapabilityCheckLayer { - check_correctness: bool, -} +pub struct CapabilityChecker; -impl CapabilityCheckLayer { - /// Create a `CapabilityLayer` with default settings - pub fn new() -> Self { - Self::default() - } - - /// Create a `CapabilityLayer` with correctness checker - pub fn with_correctness_checker() -> Self { - CapabilityCheckLayer { - check_correctness: true, - } - } - - fn new_unsupported_error(info: &AccessorInfo, op: impl Into<&'static str>) -> Error { - let scheme = info.scheme(); - let op = op.into(); - - Error::new( - ErrorKind::Unsupported, - format!("service {scheme} doesn't support operation {op}"), - ) - .with_operation(op) - } - - fn new_unsupported_args_error( - info: &AccessorInfo, - op: impl Into<&'static str>, - args: &str, - ) -> Error { - let scheme = info.scheme(); - let op = op.into(); - - Error::new( - ErrorKind::Unsupported, - format!("service {scheme} doesn't support operation {op} with args {args}"), - ) - .with_operation(op) - } -} - -impl Layer for CapabilityCheckLayer { - type LayeredAccess = - TwoWays, CorrectnessCapabilityCheckAccessor>; +impl Layer for CapabilityChecker { + type LayeredAccess = CapabilityAccessor; fn layer(&self, inner: A) -> Self::LayeredAccess { - if !self.check_correctness { - TwoWays::One(DefaultCapabilityCheckAccessor { - info: inner.info(), - inner, - }) - } else { - TwoWays::Two(CorrectnessCapabilityCheckAccessor { - info: inner.info(), - inner, - }) - } - } -} - -impl Debug - for TwoWays, CorrectnessCapabilityCheckAccessor> -{ - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - self.inner().fmt(f) - } -} - -impl LayeredAccess - for TwoWays, CorrectnessCapabilityCheckAccessor> -{ - type Inner = A; - type Reader = A::Reader; - type BlockingReader = A::BlockingReader; - type Writer = A::Writer; - type BlockingWriter = A::BlockingWriter; - type Lister = A::Lister; - type BlockingLister = A::BlockingLister; - - fn inner(&self) -> &Self::Inner { - match self { - TwoWays::One(v) => v.inner(), - TwoWays::Two(v) => v.inner(), - } - } - - fn info(&self) -> Arc { - match self { - TwoWays::One(v) => LayeredAccess::info(v), - TwoWays::Two(v) => LayeredAccess::info(v), - } - } - - async fn create_dir(&self, path: &str, args: OpCreateDir) -> crate::Result { - match self { - TwoWays::One(v) => LayeredAccess::create_dir(v, path, args).await, - TwoWays::Two(v) => LayeredAccess::create_dir(v, path, args).await, - } - } - - async fn read(&self, path: &str, args: OpRead) -> crate::Result<(RpRead, Self::Reader)> { - match self { - TwoWays::One(v) => LayeredAccess::read(v, path, args).await, - TwoWays::Two(v) => LayeredAccess::read(v, path, args).await, - } - } - - async fn write(&self, path: &str, args: OpWrite) -> crate::Result<(RpWrite, Self::Writer)> { - match self { - TwoWays::One(v) => LayeredAccess::write(v, path, args).await, - TwoWays::Two(v) => LayeredAccess::write(v, path, args).await, - } - } - - async fn copy(&self, from: &str, to: &str, args: OpCopy) -> crate::Result { - match self { - TwoWays::One(v) => LayeredAccess::copy(v, from, to, args).await, - TwoWays::Two(v) => LayeredAccess::copy(v, from, to, args).await, - } - } - - async fn rename(&self, from: &str, to: &str, args: OpRename) -> crate::Result { - match self { - TwoWays::One(v) => LayeredAccess::rename(v, from, to, args).await, - TwoWays::Two(v) => LayeredAccess::rename(v, from, to, args).await, - } - } - - async fn stat(&self, path: &str, args: OpStat) -> crate::Result { - match self { - TwoWays::One(v) => LayeredAccess::stat(v, path, args).await, - TwoWays::Two(v) => LayeredAccess::stat(v, path, args).await, - } - } - - async fn delete(&self, path: &str, args: OpDelete) -> crate::Result { - match self { - TwoWays::One(v) => LayeredAccess::delete(v, path, args).await, - TwoWays::Two(v) => LayeredAccess::delete(v, path, args).await, - } - } - - async fn list(&self, path: &str, args: OpList) -> crate::Result<(RpList, Self::Lister)> { - match self { - TwoWays::One(v) => LayeredAccess::list(v, path, args).await, - TwoWays::Two(v) => LayeredAccess::list(v, path, args).await, - } - } - - async fn batch(&self, args: OpBatch) -> crate::Result { - match self { - TwoWays::One(v) => LayeredAccess::batch(v, args).await, - TwoWays::Two(v) => LayeredAccess::batch(v, args).await, - } - } - - async fn presign(&self, path: &str, args: OpPresign) -> crate::Result { - match self { - TwoWays::One(v) => LayeredAccess::presign(v, path, args).await, - TwoWays::Two(v) => LayeredAccess::presign(v, path, args).await, - } - } - - fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> crate::Result { - match self { - TwoWays::One(v) => LayeredAccess::blocking_create_dir(v, path, args), - TwoWays::Two(v) => LayeredAccess::blocking_create_dir(v, path, args), - } - } - - fn blocking_read( - &self, - path: &str, - args: OpRead, - ) -> crate::Result<(RpRead, Self::BlockingReader)> { - match self { - TwoWays::One(v) => LayeredAccess::blocking_read(v, path, args), - TwoWays::Two(v) => LayeredAccess::blocking_read(v, path, args), - } - } - - fn blocking_write( - &self, - path: &str, - args: OpWrite, - ) -> crate::Result<(RpWrite, Self::BlockingWriter)> { - match self { - TwoWays::One(v) => LayeredAccess::blocking_write(v, path, args), - TwoWays::Two(v) => LayeredAccess::blocking_write(v, path, args), - } - } - - fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> crate::Result { - match self { - TwoWays::One(v) => LayeredAccess::blocking_copy(v, from, to, args), - TwoWays::Two(v) => LayeredAccess::blocking_copy(v, from, to, args), - } - } - - fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> crate::Result { - match self { - TwoWays::One(v) => LayeredAccess::blocking_rename(v, from, to, args), - TwoWays::Two(v) => LayeredAccess::blocking_rename(v, from, to, args), - } - } - - fn blocking_stat(&self, path: &str, args: OpStat) -> crate::Result { - match self { - TwoWays::One(v) => LayeredAccess::blocking_stat(v, path, args), - TwoWays::Two(v) => LayeredAccess::blocking_stat(v, path, args), - } - } - - fn blocking_delete(&self, path: &str, args: OpDelete) -> crate::Result { - match self { - TwoWays::One(v) => LayeredAccess::blocking_delete(v, path, args), - TwoWays::Two(v) => LayeredAccess::blocking_delete(v, path, args), - } - } - - fn blocking_list( - &self, - path: &str, - args: OpList, - ) -> crate::Result<(RpList, Self::BlockingLister)> { - match self { - TwoWays::One(v) => LayeredAccess::blocking_list(v, path, args), - TwoWays::Two(v) => LayeredAccess::blocking_list(v, path, args), - } - } -} - -pub struct DefaultCapabilityCheckAccessor { - info: Arc, - inner: A, -} - -impl Debug for DefaultCapabilityCheckAccessor { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("DefaultCapabilityCheckAccessor") - .field("inner", &self.inner) - .finish_non_exhaustive() - } -} - -impl LayeredAccess for DefaultCapabilityCheckAccessor { - type Inner = A; - type Reader = A::Reader; - type BlockingReader = A::BlockingReader; - type Writer = A::Writer; - type BlockingWriter = A::BlockingWriter; - type Lister = A::Lister; - type BlockingLister = A::BlockingLister; - - fn inner(&self) -> &Self::Inner { - &self.inner - } - - fn info(&self) -> Arc { - self.info.clone() - } - - async fn create_dir(&self, path: &str, args: OpCreateDir) -> crate::Result { - let capability = self.info.full_capability(); - if !capability.create_dir { - return Err(CapabilityCheckLayer::new_unsupported_error( - self.info.as_ref(), - Operation::CreateDir, - )); - } - - self.inner.create_dir(path, args).await - } - - async fn read(&self, path: &str, args: OpRead) -> crate::Result<(RpRead, Self::Reader)> { - let capability = self.info.full_capability(); - if !capability.read { - return Err(CapabilityCheckLayer::new_unsupported_error( - self.info.as_ref(), - Operation::Read, - )); - } - - self.inner.read(path, args).await - } - - async fn write(&self, path: &str, args: OpWrite) -> crate::Result<(RpWrite, Self::Writer)> { - let capability = self.info.full_capability(); - if !capability.write { - return Err(CapabilityCheckLayer::new_unsupported_error( - &self.info, - Operation::Write, - )); - } - if args.append() && !capability.write_can_append { - return Err(CapabilityCheckLayer::new_unsupported_args_error( - &self.info, - Operation::Write, - "append", - )); - } - - self.inner.write(path, args).await - } - - async fn copy(&self, from: &str, to: &str, args: OpCopy) -> crate::Result { - let capability = self.info.full_capability(); - if !capability.copy { - return Err(CapabilityCheckLayer::new_unsupported_error( - self.info.as_ref(), - Operation::Copy, - )); - } - - self.inner.copy(from, to, args).await - } - - async fn rename(&self, from: &str, to: &str, args: OpRename) -> crate::Result { - let capability = self.info.full_capability(); - if !capability.rename { - return Err(CapabilityCheckLayer::new_unsupported_error( - self.info.as_ref(), - Operation::Rename, - )); - } - - self.inner.rename(from, to, args).await - } - - async fn stat(&self, path: &str, args: OpStat) -> crate::Result { - let capability = self.info.full_capability(); - if !capability.stat { - return Err(CapabilityCheckLayer::new_unsupported_error( - self.info.as_ref(), - Operation::Stat, - )); - } - - self.inner.stat(path, args).await - } - - async fn delete(&self, path: &str, args: OpDelete) -> crate::Result { - let capability = self.info.full_capability(); - if !capability.delete { - return Err(CapabilityCheckLayer::new_unsupported_error( - self.info.as_ref(), - Operation::Delete, - )); - } - - self.inner.delete(path, args).await - } - - async fn list(&self, path: &str, args: OpList) -> crate::Result<(RpList, Self::Lister)> { - let capability = self.info.full_capability(); - if !capability.list { - return Err(CapabilityCheckLayer::new_unsupported_error( - self.info.as_ref(), - Operation::List, - )); - } - - self.inner.list(path, args).await - } - - async fn batch(&self, args: OpBatch) -> crate::Result { - let capability = self.info.full_capability(); - if !capability.batch { - return Err(CapabilityCheckLayer::new_unsupported_error( - self.info.as_ref(), - Operation::Batch, - )); - } - - self.inner.batch(args).await - } - - async fn presign(&self, path: &str, args: OpPresign) -> crate::Result { - let capability = self.info.full_capability(); - if !capability.presign { - return Err(CapabilityCheckLayer::new_unsupported_error( - self.info.as_ref(), - Operation::Presign, - )); - } - - self.inner.presign(path, args).await - } - - fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> crate::Result { - let capability = self.info.full_capability(); - if !capability.create_dir || !capability.blocking { - return Err(CapabilityCheckLayer::new_unsupported_error( - self.info.as_ref(), - Operation::BlockingCreateDir, - )); - } - - self.inner.blocking_create_dir(path, args) - } - - fn blocking_read( - &self, - path: &str, - args: OpRead, - ) -> crate::Result<(RpRead, Self::BlockingReader)> { - let capability = self.info.full_capability(); - if !capability.read || !capability.blocking { - return Err(CapabilityCheckLayer::new_unsupported_error( - self.info.as_ref(), - Operation::BlockingRead, - )); - } - - self.inner.blocking_read(path, args) - } - - fn blocking_write( - &self, - path: &str, - args: OpWrite, - ) -> crate::Result<(RpWrite, Self::BlockingWriter)> { - let capability = self.info.full_capability(); - if !capability.write || !capability.blocking { - return Err(CapabilityCheckLayer::new_unsupported_error( - &self.info, - Operation::BlockingWrite, - )); - } - - if args.append() && !capability.write_can_append { - return Err(CapabilityCheckLayer::new_unsupported_args_error( - &self.info, - Operation::BlockingWrite, - "append", - )); - } - - self.inner.blocking_write(path, args) - } - - fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> crate::Result { - let capability = self.info.full_capability(); - if !capability.copy || !capability.blocking { - return Err(CapabilityCheckLayer::new_unsupported_error( - self.info.as_ref(), - Operation::BlockingCopy, - )); - } - - self.inner().blocking_copy(from, to, args) - } - - fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> crate::Result { - let capability = self.info.full_capability(); - if !capability.rename || !capability.blocking { - return Err(CapabilityCheckLayer::new_unsupported_error( - self.info.as_ref(), - Operation::BlockingRename, - )); + CapabilityAccessor { + info: inner.info(), + inner, } - - self.inner().blocking_rename(from, to, args) - } - - fn blocking_stat(&self, path: &str, args: OpStat) -> crate::Result { - let capability = self.info.full_capability(); - if !capability.stat || !capability.blocking { - return Err(CapabilityCheckLayer::new_unsupported_error( - self.info.as_ref(), - Operation::BlockingStat, - )); - } - - self.inner.blocking_stat(path, args) - } - - fn blocking_delete(&self, path: &str, args: OpDelete) -> crate::Result { - let capability = self.info.full_capability(); - if !capability.delete || !capability.blocking { - return Err(CapabilityCheckLayer::new_unsupported_error( - self.info.as_ref(), - Operation::BlockingDelete, - )); - } - - self.inner().blocking_delete(path, args) - } - - fn blocking_list( - &self, - path: &str, - args: OpList, - ) -> crate::Result<(RpList, Self::BlockingLister)> { - let capability = self.info.full_capability(); - if !capability.list || !capability.blocking { - return Err(CapabilityCheckLayer::new_unsupported_error( - self.info.as_ref(), - Operation::BlockingList, - )); - } - - self.inner.blocking_list(path, args) } } - -pub struct CorrectnessCapabilityCheckAccessor { +pub struct CapabilityAccessor { info: Arc, inner: A, } -impl Debug for CorrectnessCapabilityCheckAccessor { +impl Debug for CapabilityAccessor { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("CorrectnessCapabilityCheckAccessor") + f.debug_struct("CapabilityCheckAccessor") .field("inner", &self.inner) .finish_non_exhaustive() } } -impl LayeredAccess for CorrectnessCapabilityCheckAccessor { +impl LayeredAccess for CapabilityAccessor { type Inner = A; type Reader = A::Reader; type BlockingReader = A::BlockingReader; @@ -608,7 +98,7 @@ impl LayeredAccess for CorrectnessCapabilityCheckAccessor { async fn read(&self, path: &str, args: OpRead) -> crate::Result<(RpRead, Self::Reader)> { let capability = self.info.full_capability(); if !capability.read_with_version && args.version().is_some() { - return Err(CapabilityCheckLayer::new_unsupported_args_error( + return Err(new_unsupported_args_error( self.info.as_ref(), Operation::Read, "version", @@ -620,11 +110,11 @@ impl LayeredAccess for CorrectnessCapabilityCheckAccessor { async fn write(&self, path: &str, args: OpWrite) -> crate::Result<(RpWrite, Self::Writer)> { let capability = self.info.full_capability(); - if !capability.write_with_if_none_match && args.if_none_match().is_some() { - return Err(CapabilityCheckLayer::new_unsupported_args_error( + if !capability.write_with_content_type && args.content_type().is_some() { + return Err(new_unsupported_args_error( self.info.as_ref(), Operation::Write, - "if_none_match", + "content_type", )); } @@ -634,7 +124,7 @@ impl LayeredAccess for CorrectnessCapabilityCheckAccessor { async fn stat(&self, path: &str, args: OpStat) -> crate::Result { let capability = self.info.full_capability(); if !capability.stat_with_version && args.version().is_some() { - return Err(CapabilityCheckLayer::new_unsupported_args_error( + return Err(new_unsupported_args_error( self.info.as_ref(), Operation::Stat, "version", @@ -647,7 +137,7 @@ impl LayeredAccess for CorrectnessCapabilityCheckAccessor { async fn delete(&self, path: &str, args: OpDelete) -> crate::Result { let capability = self.info.full_capability(); if !capability.delete_with_version && args.version().is_some() { - return Err(CapabilityCheckLayer::new_unsupported_args_error( + return Err(new_unsupported_args_error( self.info.as_ref(), Operation::Delete, "version", @@ -660,7 +150,7 @@ impl LayeredAccess for CorrectnessCapabilityCheckAccessor { async fn list(&self, path: &str, args: OpList) -> crate::Result<(RpList, Self::Lister)> { let capability = self.info.full_capability(); if !capability.list_with_version && args.version() { - return Err(CapabilityCheckLayer::new_unsupported_args_error( + return Err(new_unsupported_args_error( self.info.as_ref(), Operation::List, "version", @@ -677,7 +167,7 @@ impl LayeredAccess for CorrectnessCapabilityCheckAccessor { ) -> crate::Result<(RpRead, Self::BlockingReader)> { let capability = self.info.full_capability(); if !capability.read_with_version && args.version().is_some() { - return Err(CapabilityCheckLayer::new_unsupported_args_error( + return Err(new_unsupported_args_error( self.info.as_ref(), Operation::BlockingRead, "version", @@ -693,11 +183,11 @@ impl LayeredAccess for CorrectnessCapabilityCheckAccessor { args: OpWrite, ) -> crate::Result<(RpWrite, Self::BlockingWriter)> { let capability = self.info.full_capability(); - if !capability.write_with_if_none_match && args.if_none_match().is_some() { - return Err(CapabilityCheckLayer::new_unsupported_args_error( + if !capability.write_with_content_type && args.content_type().is_some() { + return Err(new_unsupported_args_error( self.info.as_ref(), Operation::BlockingWrite, - "if_none_match", + "content_type", )); } @@ -707,7 +197,7 @@ impl LayeredAccess for CorrectnessCapabilityCheckAccessor { fn blocking_stat(&self, path: &str, args: OpStat) -> crate::Result { let capability = self.info.full_capability(); if !capability.stat_with_version && args.version().is_some() { - return Err(CapabilityCheckLayer::new_unsupported_args_error( + return Err(new_unsupported_args_error( self.info.as_ref(), Operation::BlockingStat, "version", @@ -720,7 +210,7 @@ impl LayeredAccess for CorrectnessCapabilityCheckAccessor { fn blocking_delete(&self, path: &str, args: OpDelete) -> crate::Result { let capability = self.info.full_capability(); if !capability.delete_with_version && args.version().is_some() { - return Err(CapabilityCheckLayer::new_unsupported_args_error( + return Err(new_unsupported_args_error( self.info.as_ref(), Operation::BlockingDelete, "version", @@ -737,7 +227,7 @@ impl LayeredAccess for CorrectnessCapabilityCheckAccessor { ) -> crate::Result<(RpList, Self::BlockingLister)> { let capability = self.info.full_capability(); if !capability.list_with_version && args.version() { - return Err(CapabilityCheckLayer::new_unsupported_args_error( + return Err(new_unsupported_args_error( self.info.as_ref(), Operation::BlockingList, "version", @@ -750,11 +240,12 @@ impl LayeredAccess for CorrectnessCapabilityCheckAccessor { #[cfg(test)] mod tests { - use std::time::Duration; - use super::*; - use crate::raw::{oio, PresignedRequest}; - use crate::{Capability, EntryMode, Metadata, Operator}; + use crate::raw::{ + oio, OpCopy, OpCreateDir, OpPresign, OpRename, PresignedRequest, RpCopy, RpCreateDir, + RpPresign, RpRename, + }; + use crate::{Capability, EntryMode, ErrorKind, Metadata, Operator}; use http::HeaderMap; use http::Method as HttpMethod; @@ -822,144 +313,100 @@ mod tests { fn new_test_operator(capability: Capability) -> Operator { let srv = MockService { capability }; - Operator::from_inner(Arc::new(srv)).layer(CapabilityCheckLayer::new()) + Operator::from_inner(Arc::new(srv)).layer(CapabilityChecker) } #[tokio::test] - async fn test_read() { - let op = new_test_operator(Capability::default()); - let res = op.read("path").await; + async fn test_read_with() { + let op = new_test_operator(Capability { + read: true, + ..Default::default() + }); + let res = op.read_with("path").version("version").await; assert!(res.is_err()); assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); let op = new_test_operator(Capability { read: true, - stat: true, + read_with_version: true, ..Default::default() }); - let res = op.read("path").await; + let res = op.read_with("path").version("version").await; assert!(res.is_ok()) } #[tokio::test] - async fn test_stat() { - let op = new_test_operator(Capability::default()); - let res = op.stat("path").await; - assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); - + async fn test_stat_with() { let op = new_test_operator(Capability { stat: true, ..Default::default() }); - let res = op.stat("path").await; - assert!(res.is_ok()) - } - - #[tokio::test] - async fn test_writer() { - let op = new_test_operator(Capability::default()); - let bs: Vec = vec![]; - let res = op.write("path", bs).await; + let res = op.stat_with("path").version("version").await; assert!(res.is_err()); assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); let op = new_test_operator(Capability { - write: true, + stat: true, + stat_with_version: true, ..Default::default() }); - let res = op.writer("path").await; + let res = op.stat_with("path").version("version").await; assert!(res.is_ok()) } #[tokio::test] - async fn test_create_dir() { - let op = new_test_operator(Capability::default()); - let res = op.create_dir("path/").await; - assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); - + async fn test_writer_with() { let op = new_test_operator(Capability { - create_dir: true, + write: true, ..Default::default() }); - let res = op.create_dir("path/").await; - assert!(res.is_ok()) - } - - #[tokio::test] - async fn test_delete() { - let op = new_test_operator(Capability::default()); - let res = op.delete("path").await; + let res = op.writer_with("path").content_type("type").await; assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); let op = new_test_operator(Capability { - delete: true, + write: true, + write_with_content_type: true, ..Default::default() }); - let res = op.delete("path").await; - assert!(res.is_ok()) + let res = op.writer_with("path").content_type("type").await; + assert!(res.is_ok()); } #[tokio::test] - async fn test_copy() { - let op = new_test_operator(Capability::default()); - let res = op.copy("path_a", "path_b").await; - assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); - + async fn test_delete_with() { let op = new_test_operator(Capability { - copy: true, + delete: true, ..Default::default() }); - let res = op.copy("path_a", "path_b").await; - assert!(res.is_ok()) - } - - #[tokio::test] - async fn test_rename() { - let op = new_test_operator(Capability::default()); - let res = op.rename("path_a", "path_b").await; + let res = op.delete_with("path").version("version").await; assert!(res.is_err()); assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); let op = new_test_operator(Capability { - rename: true, + delete: true, + delete_with_version: true, ..Default::default() }); - let res = op.rename("path_a", "path_b").await; + let res = op.delete_with("path").version("version").await; assert!(res.is_ok()) } #[tokio::test] - async fn test_list() { - let op = new_test_operator(Capability::default()); - let res = op.list("path/").await; - assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); - + async fn test_list_with() { let op = new_test_operator(Capability { list: true, - list_with_recursive: true, ..Default::default() }); - let res = op.list("path/").await; - assert!(res.is_ok()) - } - - #[tokio::test] - async fn test_presign() { - let op = new_test_operator(Capability::default()); - let res = op.presign_read("path", Duration::from_secs(1)).await; + let res = op.list_with("path/").version(true).await; assert!(res.is_err()); assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); let op = new_test_operator(Capability { - presign: true, + list: true, + list_with_version: true, ..Default::default() }); - let res = op.presign_read("path", Duration::from_secs(1)).await; + let res = op.lister_with("path/").version(true).await; assert!(res.is_ok()) } } diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index 1136a7eaef96..98290ba8678e 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -136,7 +136,7 @@ impl CompleteAccessor { return Ok(RpCreateDir::default()); } - unreachable!("with capability check, we cannot reach here") + unreachable!("with correctness check, we cannot reach here") } fn complete_blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result { @@ -151,7 +151,7 @@ impl CompleteAccessor { return Ok(RpCreateDir::default()); } - unreachable!("with capability check, we cannot reach here") + unreachable!("with correctness check, we cannot reach here") } async fn complete_stat(&self, path: &str, args: OpStat) -> Result { diff --git a/core/src/layers/correctness_check.rs b/core/src/layers/correctness_check.rs new file mode 100644 index 000000000000..15eea6022deb --- /dev/null +++ b/core/src/layers/correctness_check.rs @@ -0,0 +1,602 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::{Debug, Formatter}; +use std::sync::Arc; + +use crate::raw::{ + Access, AccessorInfo, Layer, LayeredAccess, OpBatch, OpCopy, OpCreateDir, OpDelete, OpList, + OpPresign, OpRead, OpRename, OpStat, OpWrite, Operation, RpBatch, RpCopy, RpCreateDir, + RpDelete, RpList, RpPresign, RpRead, RpRename, RpStat, RpWrite, +}; +use crate::{Error, ErrorKind}; + +/// Add a correctness capability check layer for every operation +/// +/// Before performing any operations, we will first verify the operation and its critical arguments +/// against the capability of the underlying service. If the operation or arguments is not supported, +/// an error will be returned directly. +/// +/// # Notes +/// +/// OpenDAL applies this checker to every accessor by default, so users don't need to invoke it manually. +/// this checker ensures the operation and its critical arguments, which might affect the correctness of +/// the call, are supported by the underlying service. +/// +/// for example, when calling `write_with_append`, but `append` is not supported by the underlying +/// service, an `Unsupported` error is returned. without this check, undesired data may be written. +#[derive(Default)] +pub struct CorrectnessChecker; + +impl Layer for CorrectnessChecker { + type LayeredAccess = CorrectnessAccessor; + + fn layer(&self, inner: A) -> Self::LayeredAccess { + CorrectnessAccessor { + info: inner.info(), + inner, + } + } +} + +pub(crate) fn new_unsupported_error(info: &AccessorInfo, op: impl Into<&'static str>) -> Error { + let scheme = info.scheme(); + let op = op.into(); + + Error::new( + ErrorKind::Unsupported, + format!("service {scheme} doesn't support operation {op}"), + ) + .with_operation(op) +} + +pub(crate) fn new_unsupported_args_error( + info: &AccessorInfo, + op: impl Into<&'static str>, + args: &str, +) -> Error { + let scheme = info.scheme(); + let op = op.into(); + + Error::new( + ErrorKind::Unsupported, + format!("service {scheme} doesn't support operation {op} with args {args}"), + ) + .with_operation(op) +} + +pub struct CorrectnessAccessor { + info: Arc, + inner: A, +} + +impl Debug for CorrectnessAccessor { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CorrectnessCheckAccessor") + .field("inner", &self.inner) + .finish_non_exhaustive() + } +} + +impl LayeredAccess for CorrectnessAccessor { + type Inner = A; + type Reader = A::Reader; + type BlockingReader = A::BlockingReader; + type Writer = A::Writer; + type BlockingWriter = A::BlockingWriter; + type Lister = A::Lister; + type BlockingLister = A::BlockingLister; + + fn inner(&self) -> &Self::Inner { + &self.inner + } + + fn info(&self) -> Arc { + self.info.clone() + } + + async fn create_dir(&self, path: &str, args: OpCreateDir) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.create_dir { + return Err(new_unsupported_error( + self.info.as_ref(), + Operation::CreateDir, + )); + } + + self.inner.create_dir(path, args).await + } + + async fn read(&self, path: &str, args: OpRead) -> crate::Result<(RpRead, Self::Reader)> { + let capability = self.info.full_capability(); + if !capability.read { + return Err(new_unsupported_error(self.info.as_ref(), Operation::Read)); + } + + self.inner.read(path, args).await + } + + async fn write(&self, path: &str, args: OpWrite) -> crate::Result<(RpWrite, Self::Writer)> { + let capability = self.info.full_capability(); + if !capability.write { + return Err(new_unsupported_error(&self.info, Operation::Write)); + } + if args.append() && !capability.write_can_append { + return Err(new_unsupported_args_error( + &self.info, + Operation::Write, + "append", + )); + } + if args.if_not_exists() && !capability.write_with_if_not_exists { + return Err(new_unsupported_args_error( + &self.info, + Operation::Write, + "if_not_exists", + )); + } + if args.if_none_match().is_some() && !capability.write_with_if_none_match { + return Err(new_unsupported_args_error( + self.info.as_ref(), + Operation::Write, + "if_none_match", + )); + } + + self.inner.write(path, args).await + } + + async fn copy(&self, from: &str, to: &str, args: OpCopy) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.copy { + return Err(new_unsupported_error(self.info.as_ref(), Operation::Copy)); + } + + self.inner.copy(from, to, args).await + } + + async fn rename(&self, from: &str, to: &str, args: OpRename) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.rename { + return Err(new_unsupported_error(self.info.as_ref(), Operation::Rename)); + } + + self.inner.rename(from, to, args).await + } + + async fn stat(&self, path: &str, args: OpStat) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.stat { + return Err(new_unsupported_error(self.info.as_ref(), Operation::Stat)); + } + + self.inner.stat(path, args).await + } + + async fn delete(&self, path: &str, args: OpDelete) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.delete { + return Err(new_unsupported_error(self.info.as_ref(), Operation::Delete)); + } + + self.inner.delete(path, args).await + } + + async fn list(&self, path: &str, args: OpList) -> crate::Result<(RpList, Self::Lister)> { + let capability = self.info.full_capability(); + if !capability.list { + return Err(new_unsupported_error(self.info.as_ref(), Operation::List)); + } + + self.inner.list(path, args).await + } + + async fn batch(&self, args: OpBatch) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.batch { + return Err(new_unsupported_error(self.info.as_ref(), Operation::Batch)); + } + + self.inner.batch(args).await + } + + async fn presign(&self, path: &str, args: OpPresign) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.presign { + return Err(new_unsupported_error( + self.info.as_ref(), + Operation::Presign, + )); + } + + self.inner.presign(path, args).await + } + + fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.create_dir || !capability.blocking { + return Err(new_unsupported_error( + self.info.as_ref(), + Operation::BlockingCreateDir, + )); + } + + self.inner.blocking_create_dir(path, args) + } + + fn blocking_read( + &self, + path: &str, + args: OpRead, + ) -> crate::Result<(RpRead, Self::BlockingReader)> { + let capability = self.info.full_capability(); + if !capability.read || !capability.blocking { + return Err(new_unsupported_error( + self.info.as_ref(), + Operation::BlockingRead, + )); + } + + self.inner.blocking_read(path, args) + } + + fn blocking_write( + &self, + path: &str, + args: OpWrite, + ) -> crate::Result<(RpWrite, Self::BlockingWriter)> { + let capability = self.info.full_capability(); + if !capability.write || !capability.blocking { + return Err(new_unsupported_error(&self.info, Operation::BlockingWrite)); + } + if args.append() && !capability.write_can_append { + return Err(new_unsupported_args_error( + &self.info, + Operation::BlockingWrite, + "append", + )); + } + if args.if_not_exists() && !capability.write_with_if_not_exists { + return Err(new_unsupported_args_error( + &self.info, + Operation::BlockingWrite, + "if_not_exists", + )); + } + if args.if_none_match().is_some() && !capability.write_with_if_none_match { + return Err(new_unsupported_args_error( + self.info.as_ref(), + Operation::BlockingWrite, + "if_none_match", + )); + } + + self.inner.blocking_write(path, args) + } + + fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.copy || !capability.blocking { + return Err(new_unsupported_error( + self.info.as_ref(), + Operation::BlockingCopy, + )); + } + + self.inner().blocking_copy(from, to, args) + } + + fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.rename || !capability.blocking { + return Err(new_unsupported_error( + self.info.as_ref(), + Operation::BlockingRename, + )); + } + + self.inner().blocking_rename(from, to, args) + } + + fn blocking_stat(&self, path: &str, args: OpStat) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.stat || !capability.blocking { + return Err(new_unsupported_error( + self.info.as_ref(), + Operation::BlockingStat, + )); + } + + self.inner.blocking_stat(path, args) + } + + fn blocking_delete(&self, path: &str, args: OpDelete) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.delete || !capability.blocking { + return Err(new_unsupported_error( + self.info.as_ref(), + Operation::BlockingDelete, + )); + } + + self.inner().blocking_delete(path, args) + } + + fn blocking_list( + &self, + path: &str, + args: OpList, + ) -> crate::Result<(RpList, Self::BlockingLister)> { + let capability = self.info.full_capability(); + if !capability.list || !capability.blocking { + return Err(new_unsupported_error( + self.info.as_ref(), + Operation::BlockingList, + )); + } + + self.inner.blocking_list(path, args) + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::*; + use crate::raw::{oio, PresignedRequest}; + use crate::{Capability, EntryMode, Metadata, Operator}; + use http::HeaderMap; + use http::Method as HttpMethod; + + #[derive(Debug)] + struct MockService { + capability: Capability, + } + + impl Access for MockService { + type Reader = oio::Reader; + type Writer = oio::Writer; + type Lister = oio::Lister; + type BlockingReader = oio::BlockingReader; + type BlockingWriter = oio::BlockingWriter; + type BlockingLister = oio::BlockingLister; + + fn info(&self) -> Arc { + let mut info = AccessorInfo::default(); + info.set_native_capability(self.capability); + + info.into() + } + + async fn create_dir(&self, _: &str, _: OpCreateDir) -> crate::Result { + Ok(RpCreateDir {}) + } + + async fn stat(&self, _: &str, _: OpStat) -> crate::Result { + Ok(RpStat::new(Metadata::new(EntryMode::Unknown))) + } + + async fn read(&self, _: &str, _: OpRead) -> crate::Result<(RpRead, Self::Reader)> { + Ok((RpRead::new(), Box::new(bytes::Bytes::new()))) + } + + async fn write(&self, _: &str, _: OpWrite) -> crate::Result<(RpWrite, Self::Writer)> { + Ok((RpWrite::new(), Box::new(()))) + } + + async fn delete(&self, _: &str, _: OpDelete) -> crate::Result { + Ok(RpDelete {}) + } + + async fn list(&self, _: &str, _: OpList) -> crate::Result<(RpList, Self::Lister)> { + Ok((RpList {}, Box::new(()))) + } + + async fn copy(&self, _: &str, _: &str, _: OpCopy) -> crate::Result { + Ok(RpCopy {}) + } + + async fn rename(&self, _: &str, _: &str, _: OpRename) -> crate::Result { + Ok(RpRename {}) + } + + async fn presign(&self, _: &str, _: OpPresign) -> crate::Result { + Ok(RpPresign::new(PresignedRequest::new( + HttpMethod::POST, + "https://example.com/presign".parse().expect("should parse"), + HeaderMap::new(), + ))) + } + } + + fn new_test_operator(capability: Capability) -> Operator { + let srv = MockService { capability }; + + Operator::from_inner(Arc::new(srv)).layer(CorrectnessChecker) + } + + #[tokio::test] + async fn test_read() { + let op = new_test_operator(Capability::default()); + let res = op.read("path").await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + read: true, + stat: true, + ..Default::default() + }); + let res = op.read("path").await; + assert!(res.is_ok()) + } + + #[tokio::test] + async fn test_stat() { + let op = new_test_operator(Capability::default()); + let res = op.stat("path").await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + stat: true, + ..Default::default() + }); + let res = op.stat("path").await; + assert!(res.is_ok()) + } + + #[tokio::test] + async fn test_writer() { + let op = new_test_operator(Capability::default()); + let bs: Vec = vec![]; + let res = op.write("path", bs).await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + write: true, + ..Default::default() + }); + let res = op.writer("path").await; + assert!(res.is_ok()) + } + + #[tokio::test] + async fn test_write_with() { + let op = new_test_operator(Capability { + write: true, + ..Default::default() + }); + let res = op.write_with("path", "".as_bytes()).append(true).await; + assert!(res.is_err()); + + let res = op + .write_with("path", "".as_bytes()) + .if_not_exists(true) + .await; + assert!(res.is_err()); + + let res = op + .write_with("path", "".as_bytes()) + .if_none_match("etag") + .await; + assert!(res.is_err()); + + let op = new_test_operator(Capability { + write: true, + write_can_append: true, + write_with_if_not_exists: true, + write_with_if_none_match: true, + ..Default::default() + }); + let res = op.writer_with("path").append(true).await; + assert!(res.is_ok()); + } + + #[tokio::test] + async fn test_create_dir() { + let op = new_test_operator(Capability::default()); + let res = op.create_dir("path/").await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + create_dir: true, + ..Default::default() + }); + let res = op.create_dir("path/").await; + assert!(res.is_ok()) + } + + #[tokio::test] + async fn test_delete() { + let op = new_test_operator(Capability::default()); + let res = op.delete("path").await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + delete: true, + ..Default::default() + }); + let res = op.delete("path").await; + assert!(res.is_ok()) + } + + #[tokio::test] + async fn test_copy() { + let op = new_test_operator(Capability::default()); + let res = op.copy("path_a", "path_b").await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + copy: true, + ..Default::default() + }); + let res = op.copy("path_a", "path_b").await; + assert!(res.is_ok()) + } + + #[tokio::test] + async fn test_rename() { + let op = new_test_operator(Capability::default()); + let res = op.rename("path_a", "path_b").await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + rename: true, + ..Default::default() + }); + let res = op.rename("path_a", "path_b").await; + assert!(res.is_ok()) + } + + #[tokio::test] + async fn test_list() { + let op = new_test_operator(Capability::default()); + let res = op.list("path/").await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + list: true, + list_with_recursive: true, + ..Default::default() + }); + let res = op.list("path/").await; + assert!(res.is_ok()) + } + + #[tokio::test] + async fn test_presign() { + let op = new_test_operator(Capability::default()); + let res = op.presign_read("path", Duration::from_secs(1)).await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + presign: true, + ..Default::default() + }); + let res = op.presign_read("path", Duration::from_secs(1)).await; + assert!(res.is_ok()) + } +} diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs index cf672d59032a..c46b4ec7dfbf 100644 --- a/core/src/layers/mod.rs +++ b/core/src/layers/mod.rs @@ -114,5 +114,7 @@ pub use self::dtrace::DtraceLayer; pub mod observe; +mod correctness_check; +pub(crate) use correctness_check::CorrectnessChecker; mod capability_check; -pub use capability_check::CapabilityCheckLayer; +pub use capability_check::CapabilityChecker; diff --git a/core/src/types/operator/builder.rs b/core/src/types/operator/builder.rs index 2c6c33fc0b77..f8349ae2c26c 100644 --- a/core/src/types/operator/builder.rs +++ b/core/src/types/operator/builder.rs @@ -472,7 +472,7 @@ impl OperatorBuilder { OperatorBuilder { accessor } .layer(ErrorContextLayer) .layer(CompleteLayer) - .layer(CapabilityCheckLayer::default()) + .layer(CorrectnessChecker) } /// Create a new layer with static dispatch. From 3e2bc37d0eaa2ba8a08db150990abbb50c93f9ff Mon Sep 17 00:00:00 2001 From: meteorgan Date: Wed, 4 Dec 2024 17:19:56 +0800 Subject: [PATCH 4/5] fix some CR comments --- core/src/layers/capability_check.rs | 17 +++++++---------- core/src/layers/complete.rs | 4 ++-- core/src/layers/correctness_check.rs | 12 ++++-------- core/src/layers/mod.rs | 4 ++-- core/src/types/operator/builder.rs | 2 +- 5 files changed, 16 insertions(+), 23 deletions(-) diff --git a/core/src/layers/capability_check.rs b/core/src/layers/capability_check.rs index 40ae15d1bf88..1ca1ffa994be 100644 --- a/core/src/layers/capability_check.rs +++ b/core/src/layers/capability_check.rs @@ -16,10 +16,7 @@ // under the License. use crate::layers::correctness_check::new_unsupported_args_error; -use crate::raw::{ - Access, AccessorInfo, Layer, LayeredAccess, OpDelete, OpList, OpRead, OpStat, OpWrite, - Operation, RpDelete, RpList, RpRead, RpStat, RpWrite, -}; +use crate::raw::*; use std::fmt::{Debug, Formatter}; use std::sync::Arc; @@ -42,24 +39,24 @@ use std::sync::Arc; /// # examples /// /// ```no_run -/// # use opendal::layers::CapabilityChecker; +/// # use opendal::layers::CapabilityCheckLayer; /// # use opendal::services; /// # use opendal::Operator; /// # use opendal::Result; /// # use opendal::Scheme; /// /// # fn main() -> Result<()> { -/// use opendal::layers::CapabilityChecker; +/// use opendal::layers::CapabilityCheckLayer; /// let _ = Operator::new(services::Memory::default())? -/// .layer(CapabilityChecker) +/// .layer(CapabilityCheckLayer) /// .finish(); /// Ok(()) /// # } /// ``` #[derive(Default)] -pub struct CapabilityChecker; +pub struct CapabilityCheckLayer; -impl Layer for CapabilityChecker { +impl Layer for CapabilityCheckLayer { type LayeredAccess = CapabilityAccessor; fn layer(&self, inner: A) -> Self::LayeredAccess { @@ -313,7 +310,7 @@ mod tests { fn new_test_operator(capability: Capability) -> Operator { let srv = MockService { capability }; - Operator::from_inner(Arc::new(srv)).layer(CapabilityChecker) + Operator::from_inner(Arc::new(srv)).layer(CapabilityCheckLayer) } #[tokio::test] diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index 98290ba8678e..bc06549014c4 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -136,7 +136,7 @@ impl CompleteAccessor { return Ok(RpCreateDir::default()); } - unreachable!("with correctness check, we cannot reach here") + return self.inner.create_dir(path, args).await; } fn complete_blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result { @@ -151,7 +151,7 @@ impl CompleteAccessor { return Ok(RpCreateDir::default()); } - unreachable!("with correctness check, we cannot reach here") + return self.inner.blocking_create_dir(path, args); } async fn complete_stat(&self, path: &str, args: OpStat) -> Result { diff --git a/core/src/layers/correctness_check.rs b/core/src/layers/correctness_check.rs index 15eea6022deb..0de038bab0d9 100644 --- a/core/src/layers/correctness_check.rs +++ b/core/src/layers/correctness_check.rs @@ -18,11 +18,7 @@ use std::fmt::{Debug, Formatter}; use std::sync::Arc; -use crate::raw::{ - Access, AccessorInfo, Layer, LayeredAccess, OpBatch, OpCopy, OpCreateDir, OpDelete, OpList, - OpPresign, OpRead, OpRename, OpStat, OpWrite, Operation, RpBatch, RpCopy, RpCreateDir, - RpDelete, RpList, RpPresign, RpRead, RpRename, RpStat, RpWrite, -}; +use crate::raw::*; use crate::{Error, ErrorKind}; /// Add a correctness capability check layer for every operation @@ -40,9 +36,9 @@ use crate::{Error, ErrorKind}; /// for example, when calling `write_with_append`, but `append` is not supported by the underlying /// service, an `Unsupported` error is returned. without this check, undesired data may be written. #[derive(Default)] -pub struct CorrectnessChecker; +pub struct CorrectnessCheckLayer; -impl Layer for CorrectnessChecker { +impl Layer for CorrectnessCheckLayer { type LayeredAccess = CorrectnessAccessor; fn layer(&self, inner: A) -> Self::LayeredAccess { @@ -427,7 +423,7 @@ mod tests { fn new_test_operator(capability: Capability) -> Operator { let srv = MockService { capability }; - Operator::from_inner(Arc::new(srv)).layer(CorrectnessChecker) + Operator::from_inner(Arc::new(srv)).layer(CorrectnessCheckLayer) } #[tokio::test] diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs index c46b4ec7dfbf..0ac8f5f67abe 100644 --- a/core/src/layers/mod.rs +++ b/core/src/layers/mod.rs @@ -115,6 +115,6 @@ pub use self::dtrace::DtraceLayer; pub mod observe; mod correctness_check; -pub(crate) use correctness_check::CorrectnessChecker; +pub(crate) use correctness_check::CorrectnessCheckLayer; mod capability_check; -pub use capability_check::CapabilityChecker; +pub use capability_check::CapabilityCheckLayer; diff --git a/core/src/types/operator/builder.rs b/core/src/types/operator/builder.rs index f8349ae2c26c..4393cd5e0206 100644 --- a/core/src/types/operator/builder.rs +++ b/core/src/types/operator/builder.rs @@ -472,7 +472,7 @@ impl OperatorBuilder { OperatorBuilder { accessor } .layer(ErrorContextLayer) .layer(CompleteLayer) - .layer(CorrectnessChecker) + .layer(CorrectnessCheckLayer) } /// Create a new layer with static dispatch. From e169e41c35de3b6a5010827483d45aba189fdb7c Mon Sep 17 00:00:00 2001 From: meteorgan Date: Wed, 4 Dec 2024 22:44:14 +0800 Subject: [PATCH 5/5] remove useless checks --- core/src/layers/capability_check.rs | 201 ++++----------- core/src/layers/complete.rs | 4 +- core/src/layers/correctness_check.rs | 350 +++++---------------------- 3 files changed, 109 insertions(+), 446 deletions(-) diff --git a/core/src/layers/capability_check.rs b/core/src/layers/capability_check.rs index 1ca1ffa994be..0066e031b3f1 100644 --- a/core/src/layers/capability_check.rs +++ b/core/src/layers/capability_check.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::layers::correctness_check::new_unsupported_args_error; +use crate::layers::correctness_check::new_unsupported_error; use crate::raw::*; use std::fmt::{Debug, Formatter}; use std::sync::Arc; @@ -93,61 +93,40 @@ impl LayeredAccess for CapabilityAccessor { } async fn read(&self, path: &str, args: OpRead) -> crate::Result<(RpRead, Self::Reader)> { - let capability = self.info.full_capability(); - if !capability.read_with_version && args.version().is_some() { - return Err(new_unsupported_args_error( - self.info.as_ref(), - Operation::Read, - "version", - )); - } - self.inner.read(path, args).await } async fn write(&self, path: &str, args: OpWrite) -> crate::Result<(RpWrite, Self::Writer)> { let capability = self.info.full_capability(); if !capability.write_with_content_type && args.content_type().is_some() { - return Err(new_unsupported_args_error( + return Err(new_unsupported_error( self.info.as_ref(), Operation::Write, "content_type", )); } - - self.inner.write(path, args).await - } - - async fn stat(&self, path: &str, args: OpStat) -> crate::Result { - let capability = self.info.full_capability(); - if !capability.stat_with_version && args.version().is_some() { - return Err(new_unsupported_args_error( + if !capability.write_with_cache_control && args.cache_control().is_some() { + return Err(new_unsupported_error( self.info.as_ref(), - Operation::Stat, - "version", + Operation::Write, + "cache_control", )); } - - self.inner.stat(path, args).await - } - - async fn delete(&self, path: &str, args: OpDelete) -> crate::Result { - let capability = self.info.full_capability(); - if !capability.delete_with_version && args.version().is_some() { - return Err(new_unsupported_args_error( + if !capability.write_with_content_disposition && args.content_disposition().is_some() { + return Err(new_unsupported_error( self.info.as_ref(), - Operation::Delete, - "version", + Operation::Write, + "content_disposition", )); } - self.inner.delete(path, args).await + self.inner.write(path, args).await } async fn list(&self, path: &str, args: OpList) -> crate::Result<(RpList, Self::Lister)> { let capability = self.info.full_capability(); if !capability.list_with_version && args.version() { - return Err(new_unsupported_args_error( + return Err(new_unsupported_error( self.info.as_ref(), Operation::List, "version", @@ -162,16 +141,7 @@ impl LayeredAccess for CapabilityAccessor { path: &str, args: OpRead, ) -> crate::Result<(RpRead, Self::BlockingReader)> { - let capability = self.info.full_capability(); - if !capability.read_with_version && args.version().is_some() { - return Err(new_unsupported_args_error( - self.info.as_ref(), - Operation::BlockingRead, - "version", - )); - } - - self.inner.blocking_read(path, args) + self.inner().blocking_read(path, args) } fn blocking_write( @@ -181,40 +151,28 @@ impl LayeredAccess for CapabilityAccessor { ) -> crate::Result<(RpWrite, Self::BlockingWriter)> { let capability = self.info.full_capability(); if !capability.write_with_content_type && args.content_type().is_some() { - return Err(new_unsupported_args_error( + return Err(new_unsupported_error( self.info.as_ref(), Operation::BlockingWrite, "content_type", )); } - - self.inner.blocking_write(path, args) - } - - fn blocking_stat(&self, path: &str, args: OpStat) -> crate::Result { - let capability = self.info.full_capability(); - if !capability.stat_with_version && args.version().is_some() { - return Err(new_unsupported_args_error( + if !capability.write_with_cache_control && args.cache_control().is_some() { + return Err(new_unsupported_error( self.info.as_ref(), - Operation::BlockingStat, - "version", + Operation::BlockingWrite, + "cache_control", )); } - - self.inner.blocking_stat(path, args) - } - - fn blocking_delete(&self, path: &str, args: OpDelete) -> crate::Result { - let capability = self.info.full_capability(); - if !capability.delete_with_version && args.version().is_some() { - return Err(new_unsupported_args_error( + if !capability.write_with_content_disposition && args.content_disposition().is_some() { + return Err(new_unsupported_error( self.info.as_ref(), - Operation::BlockingDelete, - "version", + Operation::BlockingWrite, + "content_disposition", )); } - self.inner.blocking_delete(path, args) + self.inner.blocking_write(path, args) } fn blocking_list( @@ -224,7 +182,7 @@ impl LayeredAccess for CapabilityAccessor { ) -> crate::Result<(RpList, Self::BlockingLister)> { let capability = self.info.full_capability(); if !capability.list_with_version && args.version() { - return Err(new_unsupported_args_error( + return Err(new_unsupported_error( self.info.as_ref(), Operation::BlockingList, "version", @@ -238,13 +196,7 @@ impl LayeredAccess for CapabilityAccessor { #[cfg(test)] mod tests { use super::*; - use crate::raw::{ - oio, OpCopy, OpCreateDir, OpPresign, OpRename, PresignedRequest, RpCopy, RpCreateDir, - RpPresign, RpRename, - }; - use crate::{Capability, EntryMode, ErrorKind, Metadata, Operator}; - use http::HeaderMap; - use http::Method as HttpMethod; + use crate::{Capability, ErrorKind, Operator}; #[derive(Debug)] struct MockService { @@ -266,45 +218,13 @@ mod tests { info.into() } - async fn create_dir(&self, _: &str, _: OpCreateDir) -> crate::Result { - Ok(RpCreateDir {}) - } - - async fn stat(&self, _: &str, _: OpStat) -> crate::Result { - Ok(RpStat::new(Metadata::new(EntryMode::Unknown))) - } - - async fn read(&self, _: &str, _: OpRead) -> crate::Result<(RpRead, Self::Reader)> { - Ok((RpRead::new(), Box::new(bytes::Bytes::new()))) - } - async fn write(&self, _: &str, _: OpWrite) -> crate::Result<(RpWrite, Self::Writer)> { Ok((RpWrite::new(), Box::new(()))) } - async fn delete(&self, _: &str, _: OpDelete) -> crate::Result { - Ok(RpDelete {}) - } - async fn list(&self, _: &str, _: OpList) -> crate::Result<(RpList, Self::Lister)> { Ok((RpList {}, Box::new(()))) } - - async fn copy(&self, _: &str, _: &str, _: OpCopy) -> crate::Result { - Ok(RpCopy {}) - } - - async fn rename(&self, _: &str, _: &str, _: OpRename) -> crate::Result { - Ok(RpRename {}) - } - - async fn presign(&self, _: &str, _: OpPresign) -> crate::Result { - Ok(RpPresign::new(PresignedRequest::new( - HttpMethod::POST, - "https://example.com/presign".parse().expect("should parse"), - HeaderMap::new(), - ))) - } } fn new_test_operator(capability: Capability) -> Operator { @@ -314,78 +234,41 @@ mod tests { } #[tokio::test] - async fn test_read_with() { + async fn test_writer_with() { let op = new_test_operator(Capability { - read: true, + write: true, ..Default::default() }); - let res = op.read_with("path").version("version").await; + let res = op.writer_with("path").content_type("type").await; assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); - - let op = new_test_operator(Capability { - read: true, - read_with_version: true, - ..Default::default() - }); - let res = op.read_with("path").version("version").await; - assert!(res.is_ok()) - } - #[tokio::test] - async fn test_stat_with() { - let op = new_test_operator(Capability { - stat: true, - ..Default::default() - }); - let res = op.stat_with("path").version("version").await; + let res = op.writer_with("path").cache_control("cache").await; assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); - let op = new_test_operator(Capability { - stat: true, - stat_with_version: true, - ..Default::default() - }); - let res = op.stat_with("path").version("version").await; - assert!(res.is_ok()) - } - - #[tokio::test] - async fn test_writer_with() { - let op = new_test_operator(Capability { - write: true, - ..Default::default() - }); - let res = op.writer_with("path").content_type("type").await; + let res = op + .writer_with("path") + .content_disposition("disposition") + .await; assert!(res.is_err()); let op = new_test_operator(Capability { write: true, write_with_content_type: true, + write_with_cache_control: true, + write_with_content_disposition: true, ..Default::default() }); let res = op.writer_with("path").content_type("type").await; assert!(res.is_ok()); - } - #[tokio::test] - async fn test_delete_with() { - let op = new_test_operator(Capability { - delete: true, - ..Default::default() - }); - let res = op.delete_with("path").version("version").await; - assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + let res = op.writer_with("path").cache_control("cache").await; + assert!(res.is_ok()); - let op = new_test_operator(Capability { - delete: true, - delete_with_version: true, - ..Default::default() - }); - let res = op.delete_with("path").version("version").await; - assert!(res.is_ok()) + let res = op + .writer_with("path") + .content_disposition("disposition") + .await; + assert!(res.is_ok()); } #[tokio::test] diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index bc06549014c4..31a3fb03e273 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -136,7 +136,7 @@ impl CompleteAccessor { return Ok(RpCreateDir::default()); } - return self.inner.create_dir(path, args).await; + self.inner.create_dir(path, args).await } fn complete_blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result { @@ -151,7 +151,7 @@ impl CompleteAccessor { return Ok(RpCreateDir::default()); } - return self.inner.blocking_create_dir(path, args); + self.inner.blocking_create_dir(path, args) } async fn complete_stat(&self, path: &str, args: OpStat) -> Result { diff --git a/core/src/layers/correctness_check.rs b/core/src/layers/correctness_check.rs index 0de038bab0d9..a2368d54d154 100644 --- a/core/src/layers/correctness_check.rs +++ b/core/src/layers/correctness_check.rs @@ -19,7 +19,7 @@ use std::fmt::{Debug, Formatter}; use std::sync::Arc; use crate::raw::*; -use crate::{Error, ErrorKind}; +use crate::*; /// Add a correctness capability check layer for every operation /// @@ -49,24 +49,9 @@ impl Layer for CorrectnessCheckLayer { } } -pub(crate) fn new_unsupported_error(info: &AccessorInfo, op: impl Into<&'static str>) -> Error { +pub(crate) fn new_unsupported_error(info: &AccessorInfo, op: Operation, args: &str) -> Error { let scheme = info.scheme(); - let op = op.into(); - - Error::new( - ErrorKind::Unsupported, - format!("service {scheme} doesn't support operation {op}"), - ) - .with_operation(op) -} - -pub(crate) fn new_unsupported_args_error( - info: &AccessorInfo, - op: impl Into<&'static str>, - args: &str, -) -> Error { - let scheme = info.scheme(); - let op = op.into(); + let op = op.into_static(); Error::new( ErrorKind::Unsupported, @@ -105,48 +90,37 @@ impl LayeredAccess for CorrectnessAccessor { self.info.clone() } - async fn create_dir(&self, path: &str, args: OpCreateDir) -> crate::Result { + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { let capability = self.info.full_capability(); - if !capability.create_dir { + if !capability.read_with_version && args.version().is_some() { return Err(new_unsupported_error( self.info.as_ref(), - Operation::CreateDir, + Operation::Read, + "version", )); } - self.inner.create_dir(path, args).await - } - - async fn read(&self, path: &str, args: OpRead) -> crate::Result<(RpRead, Self::Reader)> { - let capability = self.info.full_capability(); - if !capability.read { - return Err(new_unsupported_error(self.info.as_ref(), Operation::Read)); - } - self.inner.read(path, args).await } - async fn write(&self, path: &str, args: OpWrite) -> crate::Result<(RpWrite, Self::Writer)> { + async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { let capability = self.info.full_capability(); - if !capability.write { - return Err(new_unsupported_error(&self.info, Operation::Write)); - } if args.append() && !capability.write_can_append { - return Err(new_unsupported_args_error( + return Err(new_unsupported_error( &self.info, Operation::Write, "append", )); } if args.if_not_exists() && !capability.write_with_if_not_exists { - return Err(new_unsupported_args_error( + return Err(new_unsupported_error( &self.info, Operation::Write, "if_not_exists", )); } if args.if_none_match().is_some() && !capability.write_with_if_none_match { - return Err(new_unsupported_args_error( + return Err(new_unsupported_error( self.info.as_ref(), Operation::Write, "if_none_match", @@ -156,125 +130,67 @@ impl LayeredAccess for CorrectnessAccessor { self.inner.write(path, args).await } - async fn copy(&self, from: &str, to: &str, args: OpCopy) -> crate::Result { - let capability = self.info.full_capability(); - if !capability.copy { - return Err(new_unsupported_error(self.info.as_ref(), Operation::Copy)); - } - - self.inner.copy(from, to, args).await - } - - async fn rename(&self, from: &str, to: &str, args: OpRename) -> crate::Result { - let capability = self.info.full_capability(); - if !capability.rename { - return Err(new_unsupported_error(self.info.as_ref(), Operation::Rename)); - } - - self.inner.rename(from, to, args).await - } - - async fn stat(&self, path: &str, args: OpStat) -> crate::Result { - let capability = self.info.full_capability(); - if !capability.stat { - return Err(new_unsupported_error(self.info.as_ref(), Operation::Stat)); - } - - self.inner.stat(path, args).await - } - - async fn delete(&self, path: &str, args: OpDelete) -> crate::Result { - let capability = self.info.full_capability(); - if !capability.delete { - return Err(new_unsupported_error(self.info.as_ref(), Operation::Delete)); - } - - self.inner.delete(path, args).await - } - - async fn list(&self, path: &str, args: OpList) -> crate::Result<(RpList, Self::Lister)> { - let capability = self.info.full_capability(); - if !capability.list { - return Err(new_unsupported_error(self.info.as_ref(), Operation::List)); - } - - self.inner.list(path, args).await - } - - async fn batch(&self, args: OpBatch) -> crate::Result { - let capability = self.info.full_capability(); - if !capability.batch { - return Err(new_unsupported_error(self.info.as_ref(), Operation::Batch)); - } - - self.inner.batch(args).await - } - - async fn presign(&self, path: &str, args: OpPresign) -> crate::Result { + async fn stat(&self, path: &str, args: OpStat) -> Result { let capability = self.info.full_capability(); - if !capability.presign { + if !capability.stat_with_version && args.version().is_some() { return Err(new_unsupported_error( self.info.as_ref(), - Operation::Presign, + Operation::Stat, + "version", )); } - self.inner.presign(path, args).await + self.inner.stat(path, args).await } - fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> crate::Result { + async fn delete(&self, path: &str, args: OpDelete) -> Result { let capability = self.info.full_capability(); - if !capability.create_dir || !capability.blocking { + if !capability.delete_with_version && args.version().is_some() { return Err(new_unsupported_error( self.info.as_ref(), - Operation::BlockingCreateDir, + Operation::Delete, + "version", )); } - self.inner.blocking_create_dir(path, args) + self.inner.delete(path, args).await + } + + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { + self.inner.list(path, args).await } - fn blocking_read( - &self, - path: &str, - args: OpRead, - ) -> crate::Result<(RpRead, Self::BlockingReader)> { + fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { let capability = self.info.full_capability(); - if !capability.read || !capability.blocking { + if !capability.read_with_version && args.version().is_some() { return Err(new_unsupported_error( self.info.as_ref(), Operation::BlockingRead, + "version", )); } self.inner.blocking_read(path, args) } - fn blocking_write( - &self, - path: &str, - args: OpWrite, - ) -> crate::Result<(RpWrite, Self::BlockingWriter)> { + fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { let capability = self.info.full_capability(); - if !capability.write || !capability.blocking { - return Err(new_unsupported_error(&self.info, Operation::BlockingWrite)); - } if args.append() && !capability.write_can_append { - return Err(new_unsupported_args_error( + return Err(new_unsupported_error( &self.info, Operation::BlockingWrite, "append", )); } if args.if_not_exists() && !capability.write_with_if_not_exists { - return Err(new_unsupported_args_error( + return Err(new_unsupported_error( &self.info, Operation::BlockingWrite, "if_not_exists", )); } if args.if_none_match().is_some() && !capability.write_with_if_none_match { - return Err(new_unsupported_args_error( + return Err(new_unsupported_error( self.info.as_ref(), Operation::BlockingWrite, "if_none_match", @@ -284,80 +200,42 @@ impl LayeredAccess for CorrectnessAccessor { self.inner.blocking_write(path, args) } - fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> crate::Result { - let capability = self.info.full_capability(); - if !capability.copy || !capability.blocking { - return Err(new_unsupported_error( - self.info.as_ref(), - Operation::BlockingCopy, - )); - } - - self.inner().blocking_copy(from, to, args) - } - - fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> crate::Result { - let capability = self.info.full_capability(); - if !capability.rename || !capability.blocking { - return Err(new_unsupported_error( - self.info.as_ref(), - Operation::BlockingRename, - )); - } - - self.inner().blocking_rename(from, to, args) - } - - fn blocking_stat(&self, path: &str, args: OpStat) -> crate::Result { + fn blocking_stat(&self, path: &str, args: OpStat) -> Result { let capability = self.info.full_capability(); - if !capability.stat || !capability.blocking { + if !capability.stat_with_version && args.version().is_some() { return Err(new_unsupported_error( self.info.as_ref(), Operation::BlockingStat, + "version", )); } self.inner.blocking_stat(path, args) } - fn blocking_delete(&self, path: &str, args: OpDelete) -> crate::Result { + fn blocking_delete(&self, path: &str, args: OpDelete) -> Result { let capability = self.info.full_capability(); - if !capability.delete || !capability.blocking { + if !capability.delete_with_version && args.version().is_some() { return Err(new_unsupported_error( self.info.as_ref(), Operation::BlockingDelete, + "version", )); } self.inner().blocking_delete(path, args) } - fn blocking_list( - &self, - path: &str, - args: OpList, - ) -> crate::Result<(RpList, Self::BlockingLister)> { - let capability = self.info.full_capability(); - if !capability.list || !capability.blocking { - return Err(new_unsupported_error( - self.info.as_ref(), - Operation::BlockingList, - )); - } - + fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> { self.inner.blocking_list(path, args) } } #[cfg(test)] mod tests { - use std::time::Duration; - use super::*; - use crate::raw::{oio, PresignedRequest}; + use crate::raw::oio; use crate::{Capability, EntryMode, Metadata, Operator}; - use http::HeaderMap; - use http::Method as HttpMethod; #[derive(Debug)] struct MockService { @@ -379,45 +257,25 @@ mod tests { info.into() } - async fn create_dir(&self, _: &str, _: OpCreateDir) -> crate::Result { - Ok(RpCreateDir {}) - } - - async fn stat(&self, _: &str, _: OpStat) -> crate::Result { + async fn stat(&self, _: &str, _: OpStat) -> Result { Ok(RpStat::new(Metadata::new(EntryMode::Unknown))) } - async fn read(&self, _: &str, _: OpRead) -> crate::Result<(RpRead, Self::Reader)> { + async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> { Ok((RpRead::new(), Box::new(bytes::Bytes::new()))) } - async fn write(&self, _: &str, _: OpWrite) -> crate::Result<(RpWrite, Self::Writer)> { + async fn write(&self, _: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> { Ok((RpWrite::new(), Box::new(()))) } - async fn delete(&self, _: &str, _: OpDelete) -> crate::Result { + async fn delete(&self, _: &str, _: OpDelete) -> Result { Ok(RpDelete {}) } - async fn list(&self, _: &str, _: OpList) -> crate::Result<(RpList, Self::Lister)> { + async fn list(&self, _: &str, _: OpList) -> Result<(RpList, Self::Lister)> { Ok((RpList {}, Box::new(()))) } - - async fn copy(&self, _: &str, _: &str, _: OpCopy) -> crate::Result { - Ok(RpCopy {}) - } - - async fn rename(&self, _: &str, _: &str, _: OpRename) -> crate::Result { - Ok(RpRename {}) - } - - async fn presign(&self, _: &str, _: OpPresign) -> crate::Result { - Ok(RpPresign::new(PresignedRequest::new( - HttpMethod::POST, - "https://example.com/presign".parse().expect("should parse"), - HeaderMap::new(), - ))) - } } fn new_test_operator(capability: Capability) -> Operator { @@ -428,49 +286,40 @@ mod tests { #[tokio::test] async fn test_read() { - let op = new_test_operator(Capability::default()); - let res = op.read("path").await; + let op = new_test_operator(Capability { + read: true, + ..Default::default() + }); + let res = op.read_with("path").version("version").await; assert!(res.is_err()); assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); let op = new_test_operator(Capability { read: true, - stat: true, + read_with_version: true, ..Default::default() }); - let res = op.read("path").await; - assert!(res.is_ok()) + let res = op.read_with("path").version("version").await; + assert!(res.is_ok()); } #[tokio::test] async fn test_stat() { - let op = new_test_operator(Capability::default()); - let res = op.stat("path").await; - assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); - let op = new_test_operator(Capability { stat: true, ..Default::default() }); - let res = op.stat("path").await; - assert!(res.is_ok()) - } - - #[tokio::test] - async fn test_writer() { - let op = new_test_operator(Capability::default()); - let bs: Vec = vec![]; - let res = op.write("path", bs).await; + let res = op.stat_with("path").version("version").await; assert!(res.is_err()); assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); let op = new_test_operator(Capability { - write: true, + stat: true, + stat_with_version: true, ..Default::default() }); - let res = op.writer("path").await; - assert!(res.is_ok()) + let res = op.stat_with("path").version("version").await; + assert!(res.is_ok()); } #[tokio::test] @@ -481,18 +330,21 @@ mod tests { }); let res = op.write_with("path", "".as_bytes()).append(true).await; assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); let res = op .write_with("path", "".as_bytes()) .if_not_exists(true) .await; assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); let res = op .write_with("path", "".as_bytes()) .if_none_match("etag") .await; assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); let op = new_test_operator(Capability { write: true, @@ -505,94 +357,22 @@ mod tests { assert!(res.is_ok()); } - #[tokio::test] - async fn test_create_dir() { - let op = new_test_operator(Capability::default()); - let res = op.create_dir("path/").await; - assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); - - let op = new_test_operator(Capability { - create_dir: true, - ..Default::default() - }); - let res = op.create_dir("path/").await; - assert!(res.is_ok()) - } - #[tokio::test] async fn test_delete() { - let op = new_test_operator(Capability::default()); - let res = op.delete("path").await; - assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); - let op = new_test_operator(Capability { delete: true, ..Default::default() }); - let res = op.delete("path").await; - assert!(res.is_ok()) - } - - #[tokio::test] - async fn test_copy() { - let op = new_test_operator(Capability::default()); - let res = op.copy("path_a", "path_b").await; - assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); - - let op = new_test_operator(Capability { - copy: true, - ..Default::default() - }); - let res = op.copy("path_a", "path_b").await; - assert!(res.is_ok()) - } - - #[tokio::test] - async fn test_rename() { - let op = new_test_operator(Capability::default()); - let res = op.rename("path_a", "path_b").await; + let res = op.delete_with("path").version("version").await; assert!(res.is_err()); assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); let op = new_test_operator(Capability { - rename: true, - ..Default::default() - }); - let res = op.rename("path_a", "path_b").await; - assert!(res.is_ok()) - } - - #[tokio::test] - async fn test_list() { - let op = new_test_operator(Capability::default()); - let res = op.list("path/").await; - assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); - - let op = new_test_operator(Capability { - list: true, - list_with_recursive: true, - ..Default::default() - }); - let res = op.list("path/").await; - assert!(res.is_ok()) - } - - #[tokio::test] - async fn test_presign() { - let op = new_test_operator(Capability::default()); - let res = op.presign_read("path", Duration::from_secs(1)).await; - assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); - - let op = new_test_operator(Capability { - presign: true, + delete: true, + delete_with_version: true, ..Default::default() }); - let res = op.presign_read("path", Duration::from_secs(1)).await; + let res = op.delete_with("path").version("version").await; assert!(res.is_ok()) } }