Skip to content

Commit

Permalink
fix: Accessor in layers not set correctly (#840)
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Oct 11, 2022
1 parent f86ecdc commit e8a8efb
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 16 deletions.
3 changes: 2 additions & 1 deletion src/io_util/walk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ use futures::future::BoxFuture;
use futures::ready;
use futures::Future;

use crate::Accessor;
use crate::Object;
use crate::ObjectEntry;
use crate::ObjectMetadata;
use crate::ObjectMode;
use crate::ObjectStreamer;
use crate::{Accessor, Object};

/// TopDownWalker will walk dir in top down way:
///
Expand Down
23 changes: 23 additions & 0 deletions src/layers/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,29 @@ use crate::Accessor;
/// to maintain the internal mutability. Please also keep in mind that `Accessor`
/// requires `Send` and `Sync`.
///
/// # Notes
///
/// `list` and `blocking_list` operations will set `Arc<dyn Accessor>` for
/// `ObjectEntry`.
///
/// All layers must make sure the `accessor` is set correctly, for example:
///
/// ```no_build
/// impl Stream for ExampleStreamer {
/// type Item = Result<ObjectEntry>;
///
/// fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
/// match Pin::new(&mut (*self.inner)).poll_next(cx) {
/// Poll::Ready(Some(Ok(mut de))) => {
/// de.set_accessor(self.acc.clone());
/// Poll::Ready(Some(Ok(de)))
/// }
/// v => v,
/// }
/// }
/// }
/// ```
///
/// # Examples
///
/// ```
Expand Down
20 changes: 13 additions & 7 deletions src/layers/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl Layer for LoggingLayer {
}
}

#[derive(Debug)]
#[derive(Clone, Debug)]
struct LoggingAccessor {
scheme: Scheme,
inner: Arc<dyn Accessor>,
Expand Down Expand Up @@ -316,7 +316,7 @@ impl Accessor for LoggingAccessor {
"service={} operation={} path={} -> got dir streamer",
self.scheme, Operation::List, path
);
let streamer = LoggingStreamer::new(self.scheme, path, v);
let streamer = LoggingStreamer::new(Arc::new(self.clone()), self.scheme, path, v);
Box::new(streamer) as ObjectStreamer
})
.map_err(|err| {
Expand Down Expand Up @@ -769,7 +769,7 @@ impl Accessor for LoggingAccessor {
Operation::BlockingList,
path
);
let li = LoggingIterator::new(self.scheme, path, v);
let li = LoggingIterator::new(Arc::new(self.clone()), self.scheme, path, v);
Box::new(li) as ObjectIterator
})
.map_err(|err| {
Expand Down Expand Up @@ -909,14 +909,16 @@ impl Read for BlockingLoggingReader {
}

struct LoggingStreamer {
acc: Arc<LoggingAccessor>,
scheme: Scheme,
path: String,
inner: ObjectStreamer,
}

impl LoggingStreamer {
fn new(scheme: Scheme, path: &str, inner: ObjectStreamer) -> Self {
fn new(acc: Arc<LoggingAccessor>, scheme: Scheme, path: &str, inner: ObjectStreamer) -> Self {
Self {
acc,
scheme,
path: path.to_string(),
inner,
Expand All @@ -930,7 +932,7 @@ impl Stream for LoggingStreamer {
match Pin::new(&mut (*self.inner)).poll_next(cx) {
Poll::Ready(opt) => match opt {
Some(res) => match res {
Ok(de) => {
Ok(mut de) => {
debug!(
target: "opendal::service",
"service={} operation={} path={} -> got entry: mode={} path={}",
Expand All @@ -940,6 +942,7 @@ impl Stream for LoggingStreamer {
de.mode(),
de.path(),
);
de.set_accessor(self.acc.clone());
Poll::Ready(Some(Ok(de)))
}
Err(e) => {
Expand Down Expand Up @@ -991,14 +994,16 @@ impl Stream for LoggingStreamer {
}

struct LoggingIterator {
acc: Arc<LoggingAccessor>,
scheme: Scheme,
path: String,
inner: ObjectIterator,
}

impl LoggingIterator {
fn new(scheme: Scheme, path: &str, inner: ObjectIterator) -> Self {
fn new(acc: Arc<LoggingAccessor>, scheme: Scheme, path: &str, inner: ObjectIterator) -> Self {
Self {
acc,
scheme,
path: path.to_string(),
inner,
Expand All @@ -1012,7 +1017,7 @@ impl Iterator for LoggingIterator {
fn next(&mut self) -> Option<Self::Item> {
match self.inner.next() {
Some(res) => match res {
Ok(de) => {
Ok(mut de) => {
debug!(
target: "opendal::service",
"service={} operation={} path={} -> got entry: mode={} path={}",
Expand All @@ -1022,6 +1027,7 @@ impl Iterator for LoggingIterator {
de.mode(),
de.path(),
);
de.set_accessor(self.acc.clone());
Some(Ok(de))
}
Err(e) => {
Expand Down
13 changes: 10 additions & 3 deletions src/layers/metadata_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use async_trait::async_trait;
use futures::io;
use futures::io::Cursor;

use super::util::set_accessor_for_object_iterator;
use super::util::set_accessor_for_object_steamer;
use crate::error::new_other_object_error;
use crate::ops::OpAbortMultipart;
use crate::ops::OpCompleteMultipart;
Expand Down Expand Up @@ -99,7 +101,7 @@ impl Layer for MetadataCacheLayer {
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
struct MetadataCacheAccessor {
cache: Arc<dyn Accessor>,
inner: Arc<dyn Accessor>,
Expand Down Expand Up @@ -162,7 +164,10 @@ impl Accessor for MetadataCacheAccessor {
}

async fn list(&self, path: &str, args: OpList) -> Result<ObjectStreamer> {
self.inner.list(path, args).await
self.inner
.list(path, args)
.await
.map(|s| set_accessor_for_object_steamer(s, self.clone()))
}

fn presign(&self, path: &str, args: OpPresign) -> Result<PresignedRequest> {
Expand Down Expand Up @@ -235,7 +240,9 @@ impl Accessor for MetadataCacheAccessor {
}

fn blocking_list(&self, path: &str, args: OpList) -> Result<ObjectIterator> {
self.inner.blocking_list(path, args)
self.inner
.blocking_list(path, args)
.map(|s| set_accessor_for_object_iterator(s, self.clone()))
}
}

Expand Down
11 changes: 9 additions & 2 deletions src/layers/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ use metrics::counter;
use metrics::histogram;
use metrics::increment_counter;

use super::util::set_accessor_for_object_iterator;
use super::util::set_accessor_for_object_steamer;
use crate::ops::OpAbortMultipart;
use crate::ops::OpCompleteMultipart;
use crate::ops::OpCreate;
Expand Down Expand Up @@ -126,6 +128,7 @@ impl Layer for MetricsLayer {
}
}

#[derive(Clone)]
struct MetricsAccessor {
meta: AccessorMetadata,
inner: Arc<dyn Accessor>,
Expand Down Expand Up @@ -286,7 +289,9 @@ impl Accessor for MetricsAccessor {
LABEL_OPERATION => Operation::List.into_static(),
);

result.map_err(|e| increase_error_counter(e, self.meta.scheme(), Operation::List))
result
.map_err(|e| increase_error_counter(e, self.meta.scheme(), Operation::List))
.map(|s| set_accessor_for_object_steamer(s, self.clone()))
}

fn presign(&self, path: &str, args: OpPresign) -> Result<PresignedRequest> {
Expand Down Expand Up @@ -528,7 +533,9 @@ impl Accessor for MetricsAccessor {
LABEL_OPERATION => Operation::BlockingList.into_static(),
);

result.map_err(|e| increase_error_counter(e, self.meta.scheme(), Operation::BlockingList))
result
.map_err(|e| increase_error_counter(e, self.meta.scheme(), Operation::BlockingList))
.map(|s| set_accessor_for_object_iterator(s, self.clone()))
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/layers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,5 @@ pub use subdir::SubdirLayer;
mod tracing;
#[cfg(feature = "layers-tracing")]
pub use self::tracing::TracingLayer;

mod util;
7 changes: 5 additions & 2 deletions src/layers/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use log::warn;
use pin_project::pin_project;
use tokio::time::Sleep;

use super::util::set_accessor_for_object_iterator;
use super::util::set_accessor_for_object_steamer;
use crate::ops::OpAbortMultipart;
use crate::ops::OpCompleteMultipart;
use crate::ops::OpCreate;
Expand Down Expand Up @@ -108,7 +110,7 @@ where
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
struct RetryAccessor<B: Backoff + Debug + Send + Sync> {
inner: Arc<dyn Accessor>,
backoff: B,
Expand Down Expand Up @@ -212,6 +214,7 @@ where
})
.await
.map_err(convert_interrupted_error)
.map(|s| set_accessor_for_object_steamer(s, self.clone()))
}

fn presign(&self, path: &str, args: OpPresign) -> Result<PresignedRequest> {
Expand Down Expand Up @@ -410,7 +413,7 @@ where
let res = self.inner.blocking_list(path, args.clone());

match res {
Ok(v) => return Ok(v),
Ok(v) => return Ok(set_accessor_for_object_iterator(v, self.clone())),
Err(err) => {
let kind = err.kind();
e = Some(err);
Expand Down
6 changes: 5 additions & 1 deletion src/layers/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use async_trait::async_trait;
use futures::AsyncRead;
use tracing::Span;

use super::util::set_accessor_for_object_iterator;
use super::util::set_accessor_for_object_steamer;
use crate::ops::OpAbortMultipart;
use crate::ops::OpCompleteMultipart;
use crate::ops::OpCreate;
Expand Down Expand Up @@ -69,7 +71,7 @@ impl Layer for TracingLayer {
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
struct TracingAccessor {
inner: Arc<dyn Accessor>,
}
Expand Down Expand Up @@ -116,6 +118,7 @@ impl Accessor for TracingAccessor {
.list(path, args)
.await
.map(|s| Box::new(TracingStreamer::new(Span::current(), s)) as ObjectStreamer)
.map(|s| set_accessor_for_object_steamer(s, self.clone()))
}

#[tracing::instrument(level = "debug", skip(self))]
Expand Down Expand Up @@ -181,6 +184,7 @@ impl Accessor for TracingAccessor {
self.inner
.blocking_list(path, args)
.map(|it| Box::new(TracingInterator::new(Span::current(), it)) as ObjectIterator)
.map(|s| set_accessor_for_object_iterator(s, self.clone()))
}
}

Expand Down
96 changes: 96 additions & 0 deletions src/layers/util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::io::Result;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;

use futures::Stream;

use crate::Accessor;
use crate::ObjectEntry;
use crate::ObjectIterator;
use crate::ObjectStreamer;

/// set_accessor_for_object_steamer will fix the accessor for object entry.
pub fn set_accessor_for_object_steamer(
inner: ObjectStreamer,
acc: impl Accessor,
) -> ObjectStreamer {
Box::new(AccessorSetterObjectStreamer::new(Arc::new(acc), inner)) as ObjectStreamer
}

/// AccessorSetterObjectStreamer will set the inner accessor for object entry.
struct AccessorSetterObjectStreamer {
acc: Arc<dyn Accessor>,
inner: ObjectStreamer,
}

impl AccessorSetterObjectStreamer {
/// Create a new AccessorSetterObjectStreamer.
fn new(acc: Arc<dyn Accessor>, inner: ObjectStreamer) -> Self {
Self { acc, inner }
}
}

impl Stream for AccessorSetterObjectStreamer {
type Item = Result<ObjectEntry>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.inner).poll_next(cx) {
Poll::Ready(Some(Ok(mut de))) => {
de.set_accessor(self.acc.clone());
Poll::Ready(Some(Ok(de)))
}
v => v,
}
}
}

/// set_accessor_for_object_iterator will fix the accessor for object entry.
pub fn set_accessor_for_object_iterator(
inner: ObjectIterator,
acc: impl Accessor,
) -> ObjectIterator {
Box::new(AccessorSetterObjectIterator::new(Arc::new(acc), inner)) as ObjectIterator
}

/// AccessorSetterObjectIterator that set accessor for entry.
struct AccessorSetterObjectIterator {
acc: Arc<dyn Accessor>,
inner: ObjectIterator,
}

impl AccessorSetterObjectIterator {
/// Create a new AccessorSetterObjectIterator.
fn new(acc: Arc<dyn Accessor>, inner: ObjectIterator) -> Self {
Self { acc, inner }
}
}

impl Iterator for AccessorSetterObjectIterator {
type Item = Result<ObjectEntry>;

fn next(&mut self) -> Option<Self::Item> {
match self.inner.next() {
Some(Ok(mut de)) => {
de.set_accessor(self.acc.clone());
Some(Ok(de))
}
v => v,
}
}
}

1 comment on commit e8a8efb

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for opendal ready!

✅ Preview
https://opendal-3wmmkml1d-databend.vercel.app

Built with commit e8a8efb.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.