Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(core/raw): Migrate oio::Write to async in trait #4358

Merged
merged 7 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions core/benches/oio/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
// specific language governing permissions and limitations
// under the License.

use std::task::Context;
use std::task::Poll;

use bytes::Bytes;
use opendal::raw::oio;
use rand::prelude::ThreadRng;
Expand All @@ -27,16 +24,16 @@ use rand::RngCore;
pub struct BlackHoleWriter;

impl oio::Write for BlackHoleWriter {
fn poll_write(&mut self, _: &mut Context<'_>, bs: Bytes) -> Poll<opendal::Result<usize>> {
Poll::Ready(Ok(bs.len()))
async fn write(&mut self, bs: Bytes) -> opendal::Result<usize> {
Ok(bs.len())
}

fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<opendal::Result<()>> {
Poll::Ready(Ok(()))
async fn abort(&mut self) -> opendal::Result<()> {
Ok(())
}

fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<opendal::Result<()>> {
Poll::Ready(Ok(()))
async fn close(&mut self) -> opendal::Result<()> {
Ok(())
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/benches/oio/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use bytes::Buf;
use criterion::Criterion;
use once_cell::sync::Lazy;
use opendal::raw::oio::ExactBufWriter;
use opendal::raw::oio::WriteExt;
use opendal::raw::oio::Write;
use rand::thread_rng;
use size::Size;

Expand Down
8 changes: 3 additions & 5 deletions core/src/layers/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use async_trait::async_trait;
use bytes;
use bytes::Bytes;
use futures::future::poll_fn;

use tokio::runtime::Handle;

use crate::raw::*;
Expand Down Expand Up @@ -299,13 +299,11 @@ impl<I: oio::Read + 'static> oio::BlockingRead for BlockingWrapper<I> {

impl<I: oio::Write + 'static> oio::BlockingWrite for BlockingWrapper<I> {
fn write(&mut self, bs: Bytes) -> Result<usize> {
self.handle
.block_on(poll_fn(|cx| self.inner.poll_write(cx, bs.clone())))
self.handle.block_on(self.inner.write(bs))
}

fn close(&mut self) -> Result<()> {
self.handle
.block_on(poll_fn(|cx| self.inner.poll_close(cx)))
self.handle.block_on(self.inner.close())
}
}

Expand Down
23 changes: 10 additions & 13 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
use std::cmp;
use std::fmt::Debug;
use std::fmt::Formatter;

use std::sync::Arc;
use std::task::ready;
use std::task::Context;
use std::task::Poll;

use async_trait::async_trait;
use bytes::Bytes;
Expand Down Expand Up @@ -158,7 +156,7 @@ impl<A: Accessor> CompleteAccessor<A> {
}
if capability.write_can_empty && capability.list {
let (_, mut w) = self.inner.write(path, OpWrite::default()).await?;
oio::WriteExt::close(&mut w).await?;
oio::Write::close(&mut w).await?;
return Ok(RpCreateDir::default());
}

Expand Down Expand Up @@ -712,35 +710,34 @@ impl<W> oio::Write for CompleteWriter<W>
where
W: oio::Write,
{
fn poll_write(&mut self, cx: &mut Context<'_>, bs: Bytes) -> Poll<Result<usize>> {
async fn write(&mut self, bs: Bytes) -> Result<usize> {
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
})?;
let n = ready!(w.poll_write(cx, bs))?;

Poll::Ready(Ok(n))
w.write(bs).await
}

fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
async fn close(&mut self) -> Result<()> {
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
})?;

ready!(w.poll_close(cx))?;
w.close().await?;
self.inner = None;

Poll::Ready(Ok(()))
Ok(())
}

fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
async fn abort(&mut self) -> Result<()> {
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
})?;

ready!(w.poll_abort(cx))?;
w.abort().await?;
self.inner = None;

Poll::Ready(Ok(()))
Ok(())
}
}

Expand Down
14 changes: 6 additions & 8 deletions core/src/layers/concurrent_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ use std::fmt::Debug;

use std::io::SeekFrom;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;

use async_trait::async_trait;
use bytes::Bytes;
Expand Down Expand Up @@ -278,16 +276,16 @@ impl<R: oio::BlockingRead> oio::BlockingRead for ConcurrentLimitWrapper<R> {
}

impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: Bytes) -> Poll<Result<usize>> {
self.inner.poll_write(cx, bs)
async fn write(&mut self, bs: Bytes) -> Result<usize> {
self.inner.write(bs).await
}

fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.inner.poll_close(cx)
async fn close(&mut self) -> Result<()> {
self.inner.close().await
}

fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.inner.poll_abort(cx)
async fn abort(&mut self) -> Result<()> {
self.inner.abort().await
}
}

Expand Down
24 changes: 13 additions & 11 deletions core/src/layers/dtrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
use std::ffi::CString;
use std::fmt::Debug;
use std::fmt::Formatter;

use std::io;
use std::task::Context;
use std::task::Poll;

use async_trait::async_trait;
use bytes::Bytes;
Expand Down Expand Up @@ -408,12 +407,13 @@ impl<R: oio::BlockingRead> oio::BlockingRead for DtraceLayerWrapper<R> {
}

impl<R: oio::Write> oio::Write for DtraceLayerWrapper<R> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: Bytes) -> Poll<Result<usize>> {
async fn write(&mut self, bs: Bytes) -> Result<usize> {
let c_path = CString::new(self.path.clone()).unwrap();
probe_lazy!(opendal, writer_write_start, c_path.as_ptr());
self.inner
.poll_write(cx, bs)
.map_ok(|n| {
.write(bs)
.await
.map(|n| {
probe_lazy!(opendal, writer_write_ok, c_path.as_ptr(), n);
n
})
Expand All @@ -423,12 +423,13 @@ impl<R: oio::Write> oio::Write for DtraceLayerWrapper<R> {
})
}

fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
async fn abort(&mut self) -> Result<()> {
let c_path = CString::new(self.path.clone()).unwrap();
probe_lazy!(opendal, writer_poll_abort_start, c_path.as_ptr());
self.inner
.poll_abort(cx)
.map_ok(|_| {
.abort()
.await
.map(|_| {
probe_lazy!(opendal, writer_poll_abort_ok, c_path.as_ptr());
})
.map_err(|err| {
Expand All @@ -437,12 +438,13 @@ impl<R: oio::Write> oio::Write for DtraceLayerWrapper<R> {
})
}

fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
async fn close(&mut self) -> Result<()> {
let c_path = CString::new(self.path.clone()).unwrap();
probe_lazy!(opendal, writer_close_start, c_path.as_ptr());
self.inner
.poll_close(cx)
.map_ok(|_| {
.close()
.await
.map(|_| {
probe_lazy!(opendal, writer_close_ok, c_path.as_ptr());
})
.map_err(|err| {
Expand Down
15 changes: 6 additions & 9 deletions core/src/layers/error_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ use std::fmt::Debug;
use std::fmt::Formatter;

use std::io::SeekFrom;
use std::task::Context;
use std::task::Poll;

use async_trait::async_trait;
use bytes::Bytes;
Expand Down Expand Up @@ -387,27 +385,26 @@ impl<T: oio::BlockingRead> oio::BlockingRead for ErrorContextWrapper<T> {
}
}

#[async_trait::async_trait]
impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: Bytes) -> Poll<Result<usize>> {
self.inner.poll_write(cx, bs.clone()).map_err(|err| {
async fn write(&mut self, bs: Bytes) -> Result<usize> {
self.inner.write(bs.clone()).await.map_err(|err| {
err.with_operation(WriteOperation::Write)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("write_buf", bs.len().to_string())
})
}

fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.inner.poll_close(cx).map_err(|err| {
async fn close(&mut self) -> Result<()> {
self.inner.close().await.map_err(|err| {
err.with_operation(WriteOperation::Close)
.with_context("service", self.scheme)
.with_context("path", &self.path)
})
}

fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.inner.poll_abort(cx).map_err(|err| {
async fn abort(&mut self) -> Result<()> {
self.inner.abort().await.map_err(|err| {
err.with_operation(WriteOperation::Abort)
.with_context("service", self.scheme)
.with_context("path", &self.path)
Expand Down
27 changes: 12 additions & 15 deletions core/src/layers/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
use std::fmt::Debug;

use std::io;
use std::task::ready;
use std::task::Context;
use std::task::Poll;

use async_trait::async_trait;
use bytes::Bytes;
Expand Down Expand Up @@ -1147,8 +1144,8 @@ impl<W> LoggingWriter<W> {
}

impl<W: oio::Write> oio::Write for LoggingWriter<W> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: Bytes) -> Poll<Result<usize>> {
match ready!(self.inner.poll_write(cx, bs.clone())) {
async fn write(&mut self, bs: Bytes) -> Result<usize> {
match self.inner.write(bs.clone()).await {
Ok(n) => {
self.written += n as u64;
trace!(
Expand All @@ -1161,7 +1158,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
bs.len(),
n,
);
Poll::Ready(Ok(n))
Ok(n)
}
Err(err) => {
if let Some(lvl) = self.ctx.error_level(&err) {
Expand All @@ -1176,13 +1173,13 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
self.ctx.error_print(&err),
)
}
Poll::Ready(Err(err))
Err(err)
}
}
}

fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
match ready!(self.inner.poll_abort(cx)) {
async fn abort(&mut self) -> Result<()> {
match self.inner.abort().await {
Ok(_) => {
trace!(
target: LOGGING_TARGET,
Expand All @@ -1192,7 +1189,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
self.path,
self.written,
);
Poll::Ready(Ok(()))
Ok(())
}
Err(err) => {
if let Some(lvl) = self.ctx.error_level(&err) {
Expand All @@ -1207,13 +1204,13 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
self.ctx.error_print(&err),
)
}
Poll::Ready(Err(err))
Err(err)
}
}
}

fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
match ready!(self.inner.poll_close(cx)) {
async fn close(&mut self) -> Result<()> {
match self.inner.close().await {
Ok(_) => {
debug!(
target: LOGGING_TARGET,
Expand All @@ -1223,7 +1220,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
self.path,
self.written
);
Poll::Ready(Ok(()))
Ok(())
}
Err(err) => {
if let Some(lvl) = self.ctx.error_level(&err) {
Expand All @@ -1238,7 +1235,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
self.ctx.error_print(&err),
)
}
Poll::Ready(Err(err))
Err(err)
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions core/src/layers/madsim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ pub struct MadsimWriter {
}

impl oio::Write for MadsimWriter {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: Bytes) -> Poll<crate::Result<usize>> {
async fn write(&mut self, bs: Bytes) -> crate::Result<usize> {
#[cfg(madsim)]
{
let req = Request::Write(self.path.to_string(), bs);
Expand All @@ -307,15 +307,15 @@ impl oio::Write for MadsimWriter {
}
}

fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
Poll::Ready(Err(Error::new(
async fn abort(&mut self) -> crate::Result<()> {
Err(Error::new(
ErrorKind::Unsupported,
"will be supported in the future",
)))
))
}

fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
Poll::Ready(Ok(()))
async fn close(&mut self) -> crate::Result<()> {
Ok(())
}
}

Expand Down
Loading
Loading