Skip to content

Commit

Permalink
feat: Move Buffer as public API (#4450)
Browse files Browse the repository at this point in the history
* feat: Move Buffer as public API

Signed-off-by: Xuanwo <[email protected]>

* FIx build

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Apr 9, 2024
1 parent 9c480de commit a5f313d
Show file tree
Hide file tree
Showing 155 changed files with 567 additions and 671 deletions.
3 changes: 2 additions & 1 deletion core/benches/oio/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

use bytes::Bytes;
use opendal::raw::oio;
use opendal::*;
use rand::prelude::ThreadRng;
use rand::RngCore;

/// BlackHoleWriter will discard all data written to it so we can measure the buffer's cost.
pub struct BlackHoleWriter;

impl oio::Write for BlackHoleWriter {
async fn write(&mut self, bs: oio::Buffer) -> opendal::Result<usize> {
async fn write(&mut self, bs: Buffer) -> opendal::Result<usize> {
Ok(bs.len())
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,13 +285,13 @@ impl<I> BlockingWrapper<I> {
}

impl<I: oio::Read + 'static> oio::BlockingRead for BlockingWrapper<I> {
fn read_at(&self, offset: u64, limit: usize) -> Result<oio::Buffer> {
fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
self.handle.block_on(self.inner.read_at(offset, limit))
}
}

impl<I: oio::Write + 'static> oio::BlockingWrite for BlockingWrapper<I> {
fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
self.handle.block_on(self.inner.write(bs))
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/chaos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ impl<R> ChaosReader<R> {
}

impl<R: oio::Read> oio::Read for ChaosReader<R> {
async fn read_at(&self, offset: u64, limit: usize) -> Result<oio::Buffer> {
async fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
if self.i_feel_lucky() {
self.inner.read_at(offset, limit).await
} else {
Expand All @@ -184,7 +184,7 @@ impl<R: oio::Read> oio::Read for ChaosReader<R> {
}

impl<R: oio::BlockingRead> oio::BlockingRead for ChaosReader<R> {
fn read_at(&self, offset: u64, limit: usize) -> Result<oio::Buffer> {
fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
if self.i_feel_lucky() {
self.inner.read_at(offset, limit)
} else {
Expand Down
12 changes: 6 additions & 6 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,19 +593,19 @@ pub type CompleteLister<A, P> =
pub struct CompleteReader<R>(R);

impl<R: oio::Read> oio::Read for CompleteReader<R> {
async fn read_at(&self, offset: u64, limit: usize) -> Result<oio::Buffer> {
async fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
if limit == 0 {
return Ok(oio::Buffer::new());
return Ok(Buffer::new());
}

self.0.read_at(offset, limit).await
}
}

impl<R: oio::BlockingRead> oio::BlockingRead for CompleteReader<R> {
fn read_at(&self, offset: u64, limit: usize) -> Result<oio::Buffer> {
fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
if limit == 0 {
return Ok(oio::Buffer::new());
return Ok(Buffer::new());
}

self.0.read_at(offset, limit)
Expand Down Expand Up @@ -638,7 +638,7 @@ impl<W> oio::Write for CompleteWriter<W>
where
W: oio::Write,
{
async fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
async fn write(&mut self, bs: Buffer) -> Result<usize> {
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
})?;
Expand Down Expand Up @@ -673,7 +673,7 @@ impl<W> oio::BlockingWrite for CompleteWriter<W>
where
W: oio::BlockingWrite,
{
fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
})?;
Expand Down
7 changes: 3 additions & 4 deletions core/src/layers/concurrent_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use async_trait::async_trait;
use tokio::sync::OwnedSemaphorePermit;
use tokio::sync::Semaphore;

use crate::raw::oio::Buffer;
use crate::raw::*;
use crate::*;

Expand Down Expand Up @@ -260,13 +259,13 @@ impl<R: oio::Read> oio::Read for ConcurrentLimitWrapper<R> {
}

impl<R: oio::BlockingRead> oio::BlockingRead for ConcurrentLimitWrapper<R> {
fn read_at(&self, offset: u64, limit: usize) -> Result<oio::Buffer> {
fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
self.inner.read_at(offset, limit)
}
}

impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> {
async fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
async fn write(&mut self, bs: Buffer) -> Result<usize> {
self.inner.write(bs).await
}

Expand All @@ -280,7 +279,7 @@ impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for ConcurrentLimitWrapper<R> {
fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
self.inner.write(bs)
}

Expand Down
8 changes: 4 additions & 4 deletions core/src/layers/dtrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ impl<R> DtraceLayerWrapper<R> {
}

impl<R: oio::Read> oio::Read for DtraceLayerWrapper<R> {
async fn read_at(&self, offset: u64, limit: usize) -> Result<oio::Buffer> {
async fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
let c_path = CString::new(self.path.clone()).unwrap();
probe_lazy!(opendal, reader_read_start, c_path.as_ptr());
match self.inner.read_at(offset, limit).await {
Expand All @@ -359,7 +359,7 @@ impl<R: oio::Read> oio::Read for DtraceLayerWrapper<R> {
}

impl<R: oio::BlockingRead> oio::BlockingRead for DtraceLayerWrapper<R> {
fn read_at(&self, offset: u64, limit: usize) -> Result<oio::Buffer> {
fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
let c_path = CString::new(self.path.clone()).unwrap();
probe_lazy!(opendal, blocking_reader_read_start, c_path.as_ptr());
self.inner
Expand All @@ -381,7 +381,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for DtraceLayerWrapper<R> {
}

impl<R: oio::Write> oio::Write for DtraceLayerWrapper<R> {
async fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
async fn write(&mut self, bs: Buffer) -> Result<usize> {
let c_path = CString::new(self.path.clone()).unwrap();
probe_lazy!(opendal, writer_write_start, c_path.as_ptr());
self.inner
Expand Down Expand Up @@ -429,7 +429,7 @@ impl<R: oio::Write> oio::Write for DtraceLayerWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for DtraceLayerWrapper<R> {
fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
let c_path = CString::new(self.path.clone()).unwrap();
probe_lazy!(opendal, blocking_writer_write_start, c_path.as_ptr());
self.inner
Expand Down
8 changes: 4 additions & 4 deletions core/src/layers/error_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ pub struct ErrorContextWrapper<T> {
}

impl<T: oio::Read> oio::Read for ErrorContextWrapper<T> {
async fn read_at(&self, offset: u64, limit: usize) -> Result<oio::Buffer> {
async fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
self.inner.read_at(offset, limit).await.map_err(|err| {
err.with_operation(ReadOperation::Read)
.with_context("service", self.scheme)
Expand All @@ -354,7 +354,7 @@ impl<T: oio::Read> oio::Read for ErrorContextWrapper<T> {
}

impl<T: oio::BlockingRead> oio::BlockingRead for ErrorContextWrapper<T> {
fn read_at(&self, offset: u64, limit: usize) -> Result<oio::Buffer> {
fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
self.inner.read_at(offset, limit).map_err(|err| {
err.with_operation(ReadOperation::BlockingRead)
.with_context("service", self.scheme)
Expand All @@ -366,7 +366,7 @@ impl<T: oio::BlockingRead> oio::BlockingRead for ErrorContextWrapper<T> {
}

impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
async fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
async fn write(&mut self, bs: Buffer) -> Result<usize> {
self.inner.write(bs.clone()).await.map_err(|err| {
err.with_operation(WriteOperation::Write)
.with_context("service", self.scheme)
Expand All @@ -393,7 +393,7 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
}

impl<T: oio::BlockingWrite> oio::BlockingWrite for ErrorContextWrapper<T> {
fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
self.inner.write(bs.clone()).map_err(|err| {
err.with_operation(WriteOperation::BlockingWrite)
.with_context("service", self.scheme)
Expand Down
8 changes: 4 additions & 4 deletions core/src/layers/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,7 @@ impl<R> Drop for LoggingReader<R> {
}

impl<R: oio::Read> oio::Read for LoggingReader<R> {
async fn read_at(&self, offset: u64, limit: usize) -> Result<oio::Buffer> {
async fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
match self.inner.read_at(offset, limit).await {
Ok(bs) => {
self.read
Expand Down Expand Up @@ -1017,7 +1017,7 @@ impl<R: oio::Read> oio::Read for LoggingReader<R> {
}

impl<R: oio::BlockingRead> oio::BlockingRead for LoggingReader<R> {
fn read_at(&self, offset: u64, limit: usize) -> Result<oio::Buffer> {
fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
match self.inner.read_at(offset, limit) {
Ok(bs) => {
self.read
Expand Down Expand Up @@ -1075,7 +1075,7 @@ impl<W> LoggingWriter<W> {
}

impl<W: oio::Write> oio::Write for LoggingWriter<W> {
async fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
async fn write(&mut self, bs: Buffer) -> Result<usize> {
match self.inner.write(bs.clone()).await {
Ok(n) => {
self.written += n as u64;
Expand Down Expand Up @@ -1173,7 +1173,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
}

impl<W: oio::BlockingWrite> oio::BlockingWrite for LoggingWriter<W> {
fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
match self.inner.write(bs.clone()) {
Ok(n) => {
self.written += n as u64;
Expand Down
6 changes: 3 additions & 3 deletions core/src/layers/madsim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,12 +264,12 @@ pub struct MadsimReader {
}

impl oio::Read for MadsimReader {
async fn read_at(&self, offset: u64, limit: usize) -> crate::Result<oio::Buffer> {
async fn read_at(&self, offset: u64, limit: usize) -> crate::Result<Buffer> {
if let Some(ref data) = self.data {
let size = min(limit, data.len());
Ok(data.clone().split_to(size).into())
} else {
Ok(oio::Buffer::new())
Ok(Buffer::new())
}
}
}
Expand All @@ -284,7 +284,7 @@ pub struct MadsimWriter {
}

impl oio::Write for MadsimWriter {
async fn write(&mut self, bs: oio::Buffer) -> crate::Result<usize> {
async fn write(&mut self, bs: Buffer) -> crate::Result<usize> {
#[cfg(madsim)]
{
let req = Request::Write(self.path.to_string(), bs);
Expand Down
8 changes: 4 additions & 4 deletions core/src/layers/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,7 @@ impl<R> MetricWrapper<R> {
}

impl<R: oio::Read> oio::Read for MetricWrapper<R> {
async fn read_at(&self, offset: u64, limit: usize) -> Result<oio::Buffer> {
async fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
let start = Instant::now();

match self.inner.read_at(offset, limit).await {
Expand All @@ -769,7 +769,7 @@ impl<R: oio::Read> oio::Read for MetricWrapper<R> {
}

impl<R: oio::BlockingRead> oio::BlockingRead for MetricWrapper<R> {
fn read_at(&self, offset: u64, limit: usize) -> Result<oio::Buffer> {
fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
let start = Instant::now();

self.inner
Expand All @@ -788,7 +788,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for MetricWrapper<R> {
}

impl<R: oio::Write> oio::Write for MetricWrapper<R> {
async fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
async fn write(&mut self, bs: Buffer) -> Result<usize> {
let start = Instant::now();

self.inner
Expand Down Expand Up @@ -822,7 +822,7 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for MetricWrapper<R> {
fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
self.inner
.write(bs)
.map(|n| {
Expand Down
8 changes: 4 additions & 4 deletions core/src/layers/minitrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,21 +295,21 @@ impl<R> MinitraceWrapper<R> {

impl<R: oio::Read> oio::Read for MinitraceWrapper<R> {
#[trace(enter_on_poll = true)]
async fn read_at(&self, offset: u64, limit: usize) -> Result<oio::Buffer> {
async fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
self.inner.read_at(offset, limit).await
}
}

impl<R: oio::BlockingRead> oio::BlockingRead for MinitraceWrapper<R> {
fn read_at(&self, offset: u64, limit: usize) -> Result<oio::Buffer> {
fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
let _g = self.span.set_local_parent();
let _span = LocalSpan::enter_with_local_parent(ReadOperation::BlockingRead.into_static());
self.inner.read_at(offset, limit)
}
}

impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
fn write(&mut self, bs: oio::Buffer) -> impl Future<Output = Result<usize>> + Send {
fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> + Send {
let _g = self.span.set_local_parent();
let _span = LocalSpan::enter_with_local_parent(WriteOperation::Write.into_static());
self.inner.write(bs)
Expand All @@ -329,7 +329,7 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for MinitraceWrapper<R> {
fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
let _g = self.span.set_local_parent();
let _span = LocalSpan::enter_with_local_parent(WriteOperation::BlockingWrite.into_static());
self.inner.write(bs)
Expand Down
8 changes: 4 additions & 4 deletions core/src/layers/oteltrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,19 +275,19 @@ impl<R> OtelTraceWrapper<R> {
}

impl<R: oio::Read> oio::Read for OtelTraceWrapper<R> {
async fn read_at(&self, offset: u64, limit: usize) -> Result<oio::Buffer> {
async fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
self.inner.read_at(offset, limit).await
}
}

impl<R: oio::BlockingRead> oio::BlockingRead for OtelTraceWrapper<R> {
fn read_at(&self, offset: u64, limit: usize) -> Result<oio::Buffer> {
fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
self.inner.read_at(offset, limit)
}
}

impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
fn write(&mut self, bs: oio::Buffer) -> impl Future<Output = Result<usize>> + Send {
fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> + Send {
self.inner.write(bs)
}

Expand All @@ -301,7 +301,7 @@ impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for OtelTraceWrapper<R> {
fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
self.inner.write(bs)
}

Expand Down
8 changes: 4 additions & 4 deletions core/src/layers/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ impl<R> PrometheusMetricWrapper<R> {
}

impl<R: oio::Read> oio::Read for PrometheusMetricWrapper<R> {
async fn read_at(&self, offset: u64, limit: usize) -> Result<oio::Buffer> {
async fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
Operation::Read.into_static(),
Expand All @@ -706,7 +706,7 @@ impl<R: oio::Read> oio::Read for PrometheusMetricWrapper<R> {
}

impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> {
fn read_at(&self, offset: u64, limit: usize) -> Result<oio::Buffer> {
fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
Operation::BlockingRead.into_static(),
Expand All @@ -729,7 +729,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> {
}

impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
async fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
async fn write(&mut self, bs: Buffer) -> Result<usize> {
let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
Operation::Write.into_static(),
Expand Down Expand Up @@ -767,7 +767,7 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
fn write(&mut self, bs: oio::Buffer) -> Result<usize> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
Operation::BlockingWrite.into_static(),
Expand Down
Loading

0 comments on commit a5f313d

Please sign in to comment.