Skip to content

Commit

Permalink
refactor(core/oio): Migrate oio::List to async fn in trait (#4352)
Browse files Browse the repository at this point in the history
* Build passed

Signed-off-by: Xuanwo <[email protected]>

* Fix check

Signed-off-by: Xuanwo <[email protected]>

* Fix check

Signed-off-by: Xuanwo <[email protected]>

* Fix

Signed-off-by: Xuanwo <[email protected]>

* Format code

Signed-off-by: Xuanwo <[email protected]>

* Fix wasm

Signed-off-by: Xuanwo <[email protected]>

* FIx

Signed-off-by: Xuanwo <[email protected]>

* Fix list

Signed-off-by: Xuanwo <[email protected]>

* Fix tests

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Mar 13, 2024
1 parent 02437f8 commit 99941ec
Show file tree
Hide file tree
Showing 54 changed files with 234 additions and 435 deletions.
2 changes: 1 addition & 1 deletion core/src/layers/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ impl<I: oio::Write + 'static> oio::BlockingWrite for BlockingWrapper<I> {

impl<I: oio::List> oio::BlockingList for BlockingWrapper<I> {
fn next(&mut self) -> Result<Option<oio::Entry>> {
self.handle.block_on(poll_fn(|cx| self.inner.poll_next(cx)))
self.handle.block_on(self.inner.next())
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ impl<A: Accessor> CompleteAccessor<A> {
)
.await?;

return if oio::ListExt::next(&mut l).await?.is_some() {
return if oio::List::next(&mut l).await?.is_some() {
Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
} else {
Err(Error::new(
Expand Down
5 changes: 3 additions & 2 deletions core/src/layers/concurrent_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use std::fmt::Debug;

use std::io::SeekFrom;
use std::sync::Arc;
use std::task::Context;
Expand Down Expand Up @@ -301,8 +302,8 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for ConcurrentLimitWrapper<R> {
}

impl<R: oio::List> oio::List for ConcurrentLimitWrapper<R> {
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> {
self.inner.poll_next(cx)
async fn next(&mut self) -> Result<Option<oio::Entry>> {
self.inner.next().await
}
}

Expand Down
6 changes: 3 additions & 3 deletions core/src/layers/error_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use std::fmt::Debug;
use std::fmt::Formatter;

use std::io::SeekFrom;
use std::task::Context;
use std::task::Poll;
Expand Down Expand Up @@ -433,10 +434,9 @@ impl<T: oio::BlockingWrite> oio::BlockingWrite for ErrorContextWrapper<T> {
}
}

#[async_trait::async_trait]
impl<T: oio::List> oio::List for ErrorContextWrapper<T> {
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> {
self.inner.poll_next(cx).map_err(|err| {
async fn next(&mut self) -> Result<Option<oio::Entry>> {
self.inner.next().await.map_err(|err| {
err.with_operation(ListOperation::Next)
.with_context("service", self.scheme)
.with_context("path", &self.path)
Expand Down
7 changes: 3 additions & 4 deletions core/src/layers/immutable_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

use std::collections::HashSet;
use std::fmt::Debug;
use std::task::Context;
use std::task::Poll;

use std::vec::IntoIter;

use async_trait::async_trait;
Expand Down Expand Up @@ -233,8 +232,8 @@ impl ImmutableDir {
}

impl oio::List for ImmutableDir {
fn poll_next(&mut self, _: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> {
Poll::Ready(Ok(self.inner_next()))
async fn next(&mut self) -> Result<Option<oio::Entry>> {
Ok(self.inner_next())
}
}

Expand Down
9 changes: 4 additions & 5 deletions core/src/layers/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use std::fmt::Debug;

use std::io;
use std::task::ready;
use std::task::Context;
Expand Down Expand Up @@ -1353,11 +1354,9 @@ impl<P> Drop for LoggingLister<P> {
}
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl<P: oio::List> oio::List for LoggingLister<P> {
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> {
let res = ready!(self.inner.poll_next(cx));
async fn next(&mut self) -> Result<Option<oio::Entry>> {
let res = self.inner.next().await;

match &res {
Ok(Some(de)) => {
Expand Down Expand Up @@ -1395,7 +1394,7 @@ impl<P: oio::List> oio::List for LoggingLister<P> {
}
};

Poll::Ready(res)
res
}
}

Expand Down
7 changes: 4 additions & 3 deletions core/src/layers/madsim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::any::Any;
use std::cmp::min;
use std::collections::HashMap;
use std::fmt::Debug;
use std::future::Future;
use std::io::Result;
use std::io::SeekFrom;
use std::net::SocketAddr;
Expand Down Expand Up @@ -325,11 +326,11 @@ impl oio::Write for MadsimWriter {
pub struct MadsimLister {}

impl oio::List for MadsimLister {
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<Option<oio::Entry>>> {
Poll::Ready(Err(Error::new(
async fn next(&mut self) -> crate::Result<Option<oio::Entry>> {
Err(Error::new(
ErrorKind::Unsupported,
"will be supported in the future",
)))
))
}
}

Expand Down
8 changes: 4 additions & 4 deletions core/src/layers/minitrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use std::fmt::Debug;

use std::io;
use std::task::Context;
use std::task::Poll;
Expand Down Expand Up @@ -357,10 +358,9 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for MinitraceWrapper<R> {
}

impl<R: oio::List> oio::List for MinitraceWrapper<R> {
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> {
let _g = self.span.set_local_parent();
let _span = LocalSpan::enter_with_local_parent(ListOperation::Next.into_static());
self.inner.poll_next(cx)
#[trace(enter_on_poll = true)]
async fn next(&mut self) -> Result<Option<oio::Entry>> {
self.inner.next().await
}
}

Expand Down
6 changes: 2 additions & 4 deletions core/src/layers/oteltrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,11 +321,9 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for OtelTraceWrapper<R> {
}
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl<R: oio::List> oio::List for OtelTraceWrapper<R> {
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> {
self.inner.poll_next(cx)
async fn next(&mut self) -> Result<Option<oio::Entry>> {
self.inner.next().await
}
}

Expand Down
79 changes: 30 additions & 49 deletions core/src/layers/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use std::fmt::Debug;
use std::fmt::Formatter;

use std::io;
use std::pin::Pin;
use std::sync::Arc;
Expand Down Expand Up @@ -952,53 +953,37 @@ impl<R: oio::BlockingWrite, I: RetryInterceptor> oio::BlockingWrite for RetryWra
}
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl<P: oio::List, I: RetryInterceptor> oio::List for RetryWrapper<P, I> {
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> {
if let Some(sleep) = self.sleep.as_mut() {
ready!(sleep.poll_unpin(cx));
self.sleep = None;
}
async fn next(&mut self) -> Result<Option<oio::Entry>> {
use backon::RetryableWithContext;

match ready!(self.inner.as_mut().unwrap().poll_next(cx)) {
Ok(v) => {
self.current_backoff = None;
Poll::Ready(Ok(v))
}
Err(err) if !err.is_temporary() => {
self.current_backoff = None;
Poll::Ready(Err(err))
}
Err(err) => {
let backoff = match self.current_backoff.as_mut() {
Some(backoff) => backoff,
None => {
self.current_backoff = Some(self.builder.build());
self.current_backoff.as_mut().unwrap()
}
};
let inner = self.inner.take().expect("inner must be valid");

match backoff.next() {
None => {
self.current_backoff = None;
Poll::Ready(Err(err))
}
Some(dur) => {
self.notify.intercept(
&err,
dur,
&[
("operation", ListOperation::Next.into_static()),
("path", &self.path),
],
);
self.sleep = Some(Box::pin(tokio::time::sleep(dur)));
self.poll_next(cx)
}
}
let (inner, res) = {
|mut p: P| async move {
let res = p.next().await;

(p, res)
}
}
.retry(&self.builder)
.when(|e| e.is_temporary())
.context(inner)
.notify(|err, dur| {
self.notify.intercept(
err,
dur,
&[
("operation", ListOperation::Next.into_static()),
("path", &self.path),
],
)
})
.map(|(r, res)| (r, res.map_err(|err| err.set_persistent())))
.await;

self.inner = Some(inner);
res
}
}

Expand Down Expand Up @@ -1028,8 +1013,6 @@ mod tests {
use std::io;
use std::sync::Arc;
use std::sync::Mutex;
use std::task::Context;
use std::task::Poll;

use async_trait::async_trait;
use bytes::Bytes;
Expand Down Expand Up @@ -1208,9 +1191,9 @@ mod tests {
}

impl oio::List for MockLister {
fn poll_next(&mut self, _: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> {
async fn next(&mut self) -> Result<Option<oio::Entry>> {
self.attempt += 1;
let result = match self.attempt {
match self.attempt {
1 => Err(Error::new(
ErrorKind::RateLimited,
"retryable rate limited error from lister",
Expand Down Expand Up @@ -1240,9 +1223,7 @@ mod tests {
_ => {
unreachable!()
}
};

Poll::Ready(result)
}
}
}

Expand Down
19 changes: 8 additions & 11 deletions core/src/layers/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,12 +360,9 @@ impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
}

impl<R: oio::List> oio::List for TimeoutWrapper<R> {
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> {
self.poll_timeout(cx, ListOperation::Next.into_static())?;

let v = ready!(self.inner.poll_next(cx));
self.sleep = None;
Poll::Ready(v)
async fn next(&mut self) -> Result<Option<oio::Entry>> {
let fut = self.inner.next();
Self::io_timeout(self.timeout, ListOperation::Next.into_static(), fut).await
}
}

Expand All @@ -374,8 +371,7 @@ mod tests {
use std::future::{pending, Future};
use std::io::SeekFrom;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;

use std::time::Duration;

use async_trait::async_trait;
Expand Down Expand Up @@ -447,8 +443,8 @@ mod tests {
struct MockLister;

impl oio::List for MockLister {
fn poll_next(&mut self, _: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> {
Poll::Pending
fn next(&mut self) -> impl Future<Output = Result<Option<oio::Entry>>> {
pending()
}
}

Expand Down Expand Up @@ -506,6 +502,8 @@ mod tests {

#[tokio::test]
async fn test_list_timeout_raw() {
use oio::List;

let acc = MockService;
let timeout_layer = TimeoutLayer::new()
.with_timeout(Duration::from_secs(1))
Expand All @@ -516,7 +514,6 @@ mod tests {
.await
.unwrap();

use oio::ListExt;
let res = lister.next().await;
assert!(res.is_err());
let err = res.unwrap_err();
Expand Down
5 changes: 3 additions & 2 deletions core/src/layers/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use std::fmt::Debug;

use std::io;
use std::task::Context;
use std::task::Poll;
Expand Down Expand Up @@ -349,8 +350,8 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for TracingWrapper<R> {

impl<R: oio::List> oio::List for TracingWrapper<R> {
#[tracing::instrument(parent = &self.span, level = "debug", skip_all)]
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> {
self.inner.poll_next(cx)
async fn next(&mut self) -> Result<Option<oio::Entry>> {
self.inner.next().await
}
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/raw/adapters/kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ impl KvLister {
}

impl oio::List for KvLister {
fn poll_next(&mut self, _: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> {
Poll::Ready(Ok(self.inner_next()))
async fn next(&mut self) -> Result<Option<oio::Entry>> {
Ok(self.inner_next())
}
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/raw/adapters/typed_kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,8 @@ impl KvLister {
}

impl oio::List for KvLister {
fn poll_next(&mut self, _: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> {
Poll::Ready(Ok(self.inner_next()))
async fn next(&mut self) -> Result<Option<oio::Entry>> {
Ok(self.inner_next())
}
}

Expand Down
Loading

0 comments on commit 99941ec

Please sign in to comment.