diff --git a/bindings/c/include/opendal.h b/bindings/c/include/opendal.h index 1efc5c7bf06f..899babef772b 100644 --- a/bindings/c/include/opendal.h +++ b/bindings/c/include/opendal.h @@ -71,6 +71,14 @@ typedef enum opendal_code { * The given file paths are same. */ OPENDAL_IS_SAME_FILE, + /** + * The condition of this operation is not match. + */ + OPENDAL_CONDITION_NOT_MATCH, + /** + * The range of the content is not satisfied. + */ + OPENDAL_RANGE_NOT_SATISFIED, } opendal_code; /** diff --git a/bindings/c/src/error.rs b/bindings/c/src/error.rs index 31696a29d1e1..ac6cb6b5525e 100644 --- a/bindings/c/src/error.rs +++ b/bindings/c/src/error.rs @@ -45,6 +45,10 @@ pub enum opendal_code { OPENDAL_RATE_LIMITED, /// The given file paths are same. OPENDAL_IS_SAME_FILE, + /// The condition of this operation is not match. + OPENDAL_CONDITION_NOT_MATCH, + /// The range of the content is not satisfied. + OPENDAL_RANGE_NOT_SATISFIED, } impl From for opendal_code { @@ -60,6 +64,8 @@ impl From for opendal_code { core::ErrorKind::AlreadyExists => opendal_code::OPENDAL_ALREADY_EXISTS, core::ErrorKind::RateLimited => opendal_code::OPENDAL_RATE_LIMITED, core::ErrorKind::IsSameFile => opendal_code::OPENDAL_IS_SAME_FILE, + core::ErrorKind::ConditionNotMatch => opendal_code::OPENDAL_CONDITION_NOT_MATCH, + core::ErrorKind::RangeNotSatisfied => opendal_code::OPENDAL_RANGE_NOT_SATISFIED, // if this is triggered, check the [`core`] crate and add a // new error code accordingly _ => panic!("The newly added ErrorKind in core crate is not handled in C bindings"), diff --git a/bindings/c/src/operator.rs b/bindings/c/src/operator.rs index 021769bcc00f..a8d926602433 100644 --- a/bindings/c/src/operator.rs +++ b/bindings/c/src/operator.rs @@ -369,8 +369,8 @@ pub unsafe extern "C" fn opendal_operator_reader( let op = (*op).as_ref(); let path = unsafe { std::ffi::CStr::from_ptr(path).to_str().unwrap() }; - let meta = match op.stat(path) { - Ok(meta) => meta, + let reader = match op.reader(path) { + Ok(reader) => reader, Err(err) => { return opendal_result_operator_reader { reader: std::ptr::null_mut(), @@ -379,9 +379,9 @@ pub unsafe extern "C" fn opendal_operator_reader( } }; - match op.reader(path) { + match reader.into_std_read(..) { Ok(reader) => opendal_result_operator_reader { - reader: Box::into_raw(Box::new(opendal_reader::new(reader, meta.content_length()))), + reader: Box::into_raw(Box::new(opendal_reader::new(reader))), error: std::ptr::null_mut(), }, Err(e) => opendal_result_operator_reader { diff --git a/bindings/c/src/reader.rs b/bindings/c/src/reader.rs index 7e4d9098dcb8..398365ffff02 100644 --- a/bindings/c/src/reader.rs +++ b/bindings/c/src/reader.rs @@ -30,9 +30,9 @@ pub struct opendal_reader { } impl opendal_reader { - pub(crate) fn new(reader: core::BlockingReader, size: u64) -> Self { + pub(crate) fn new(reader: core::StdReader) -> Self { Self { - inner: Box::into_raw(Box::new(reader.into_std_read(0..size))), + inner: Box::into_raw(Box::new(reader)), } } diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index 720bf0bff0b6..8d8694782168 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -156,7 +156,9 @@ impl Operator { fn reader(&self, path: &str) -> Result> { let meta = self.0.stat(path)?; Ok(Box::new(Reader( - self.0.reader(path)?.into_std_read(0..meta.content_length()), + self.0 + .reader(path)? + .into_std_read(0..meta.content_length())?, ))) } diff --git a/bindings/java/.cargo/config b/bindings/java/.cargo/config deleted file mode 100644 index fc0bf0e2815f..000000000000 --- a/bindings/java/.cargo/config +++ /dev/null @@ -1,11 +0,0 @@ -# See also https://github.com/rust-lang/rust/issues/44991 - -[target.x86_64-unknown-linux-musl] -rustflags = [ - "-C", "target-feature=-crt-static", -] - -[target.aarch64-unknown-linux-musl] -rustflags = [ - "-C", "target-feature=-crt-static", -] diff --git a/bindings/java/.cargo/config.toml b/bindings/java/.cargo/config.toml new file mode 100644 index 000000000000..e733b105035b --- /dev/null +++ b/bindings/java/.cargo/config.toml @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# See also https://github.com/rust-lang/rust/issues/44991 + +[target.x86_64-unknown-linux-musl] +rustflags = [ + "-C", "target-feature=-crt-static", +] + +[target.aarch64-unknown-linux-musl] +rustflags = [ + "-C", "target-feature=-crt-static", +] diff --git a/bindings/java/src/error.rs b/bindings/java/src/error.rs index 61c8b62eff99..598050463a95 100644 --- a/bindings/java/src/error.rs +++ b/bindings/java/src/error.rs @@ -57,6 +57,7 @@ impl Error { ErrorKind::RateLimited => "RateLimited", ErrorKind::IsSameFile => "IsSameFile", ErrorKind::ConditionNotMatch => "ConditionNotMatch", + ErrorKind::RangeNotSatisfied => "RangeNotSatisfied", _ => "Unexpected", })?; let message = env.new_string(format!("{:?}", self.inner))?; diff --git a/bindings/java/src/main/java/org/apache/opendal/OpenDALException.java b/bindings/java/src/main/java/org/apache/opendal/OpenDALException.java index 5ee7a46cab2d..3df42e445609 100644 --- a/bindings/java/src/main/java/org/apache/opendal/OpenDALException.java +++ b/bindings/java/src/main/java/org/apache/opendal/OpenDALException.java @@ -71,7 +71,6 @@ public enum Code { RateLimited, IsSameFile, ConditionNotMatch, - ContentTruncated, - ContentIncomplete, + RangeNotSatisfied, } } diff --git a/bindings/java/src/operator_input_stream.rs b/bindings/java/src/operator_input_stream.rs index fd53237bf360..0e07d2382ceb 100644 --- a/bindings/java/src/operator_input_stream.rs +++ b/bindings/java/src/operator_input_stream.rs @@ -43,7 +43,7 @@ fn intern_construct_reader( path: JString, ) -> crate::Result { let path = jstring_to_string(env, &path)?; - let reader = op.reader(&path)?.into_bytes_iterator(..); + let reader = op.reader(&path)?.into_bytes_iterator(..)?; Ok(Box::into_raw(Box::new(reader)) as jlong) } diff --git a/bindings/nodejs/src/lib.rs b/bindings/nodejs/src/lib.rs index 966d47100d43..cc85c14d1140 100644 --- a/bindings/nodejs/src/lib.rs +++ b/bindings/nodejs/src/lib.rs @@ -219,10 +219,9 @@ impl Operator { /// It could be used to read large file in a streaming way. #[napi] pub fn reader_sync(&self, path: String) -> Result { - let meta = self.0.blocking().stat(&path).map_err(format_napi_error)?; let r = self.0.blocking().reader(&path).map_err(format_napi_error)?; Ok(BlockingReader { - inner: r.into_std_read(0..meta.content_length()), + inner: r.into_std_read(..).map_err(format_napi_error)?, }) } diff --git a/bindings/python/src/file.rs b/bindings/python/src/file.rs index 849a058a06d8..327d2055eeac 100644 --- a/bindings/python/src/file.rs +++ b/bindings/python/src/file.rs @@ -46,8 +46,8 @@ enum FileState { } impl File { - pub fn new_reader(reader: ocore::BlockingReader, size: u64, capability: Capability) -> Self { - Self(FileState::Reader(reader.into_std_read(0..size)), capability) + pub fn new_reader(reader: ocore::StdReader, capability: Capability) -> Self { + Self(FileState::Reader(reader), capability) } pub fn new_writer(writer: ocore::BlockingWriter, capability: Capability) -> Self { diff --git a/bindings/python/src/operator.rs b/bindings/python/src/operator.rs index bf1cc90bc7c6..8a6d592312e1 100644 --- a/bindings/python/src/operator.rs +++ b/bindings/python/src/operator.rs @@ -80,9 +80,12 @@ impl Operator { let this = self.0.clone(); let capability = self.capability()?; if mode == "rb" { - let meta = this.stat(&path).map_err(format_pyerr)?; - let r = this.reader(&path).map_err(format_pyerr)?; - Ok(File::new_reader(r, meta.content_length(), capability)) + let r = this + .reader(&path) + .map_err(format_pyerr)? + .into_std_read(..) + .map_err(format_pyerr)?; + Ok(File::new_reader(r, capability)) } else if mode == "wb" { let w = this.writer(&path).map_err(format_pyerr)?; Ok(File::new_writer(w, capability)) diff --git a/core/fuzz/fuzz_reader.rs b/core/fuzz/fuzz_reader.rs index 3c91d01e8aaf..3b713bb24c63 100644 --- a/core/fuzz/fuzz_reader.rs +++ b/core/fuzz/fuzz_reader.rs @@ -63,7 +63,7 @@ impl Arbitrary<'_> for FuzzInput { for _ in 0..count { let offset = u.int_in_range(0..=total_size)?; - let size = u.int_in_range(0..=total_size * 2)?; + let size = u.int_in_range(0..=total_size - offset)?; actions.push(ReadAction::Read(offset, size)); } diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs index 543c059d142c..fa9f97cd4924 100644 --- a/core/src/layers/blocking.rs +++ b/core/src/layers/blocking.rs @@ -282,8 +282,8 @@ impl BlockingWrapper { } impl oio::BlockingRead for BlockingWrapper { - fn read_at(&self, offset: u64, limit: usize) -> Result { - self.handle.block_on(self.inner.read_at(offset, limit)) + fn read_at(&self, offset: u64, size: usize) -> Result { + self.handle.block_on(self.inner.read_at(offset, size)) } } diff --git a/core/src/layers/chaos.rs b/core/src/layers/chaos.rs index a525beefeb3f..98855dc157bd 100644 --- a/core/src/layers/chaos.rs +++ b/core/src/layers/chaos.rs @@ -171,9 +171,9 @@ impl ChaosReader { } impl oio::Read for ChaosReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { + async fn read_at(&self, offset: u64, size: usize) -> Result { if self.i_feel_lucky() { - self.inner.read_at(offset, limit).await + self.inner.read_at(offset, size).await } else { Err(Self::unexpected_eof()) } @@ -181,9 +181,9 @@ impl oio::Read for ChaosReader { } impl oio::BlockingRead for ChaosReader { - fn read_at(&self, offset: u64, limit: usize) -> Result { + fn read_at(&self, offset: u64, size: usize) -> Result { if self.i_feel_lucky() { - self.inner.read_at(offset, limit) + self.inner.read_at(offset, size) } else { Err(Self::unexpected_eof()) } diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index b8f5cc251549..0d38a4a45093 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -587,22 +587,40 @@ pub type CompleteLister = pub struct CompleteReader(R); impl oio::Read for CompleteReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - if limit == 0 { + async fn read_at(&self, offset: u64, size: usize) -> Result { + if size == 0 { return Ok(Buffer::new()); } - self.0.read_at(offset, limit).await + let buf = self.0.read_at(offset, size).await?; + if buf.len() != size { + return Err(Error::new( + ErrorKind::RangeNotSatisfied, + "service didn't return the expected size", + ) + .with_context("expect", size.to_string()) + .with_context("actual", buf.len().to_string())); + } + Ok(buf) } } impl oio::BlockingRead for CompleteReader { - fn read_at(&self, offset: u64, limit: usize) -> Result { - if limit == 0 { + fn read_at(&self, offset: u64, size: usize) -> Result { + if size == 0 { return Ok(Buffer::new()); } - self.0.read_at(offset, limit) + let buf = self.0.read_at(offset, size)?; + if buf.len() != size { + return Err(Error::new( + ErrorKind::RangeNotSatisfied, + "service didn't return the expected size", + ) + .with_context("expect", size.to_string()) + .with_context("actual", buf.len().to_string())); + } + Ok(buf) } } diff --git a/core/src/layers/concurrent_limit.rs b/core/src/layers/concurrent_limit.rs index efe5d0b4ae1a..b3f334ff99e3 100644 --- a/core/src/layers/concurrent_limit.rs +++ b/core/src/layers/concurrent_limit.rs @@ -250,14 +250,14 @@ impl ConcurrentLimitWrapper { } impl oio::Read for ConcurrentLimitWrapper { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - self.inner.read_at(offset, limit).await + async fn read_at(&self, offset: u64, size: usize) -> Result { + self.inner.read_at(offset, size).await } } impl oio::BlockingRead for ConcurrentLimitWrapper { - fn read_at(&self, offset: u64, limit: usize) -> Result { - self.inner.read_at(offset, limit) + fn read_at(&self, offset: u64, size: usize) -> Result { + self.inner.read_at(offset, size) } } diff --git a/core/src/layers/dtrace.rs b/core/src/layers/dtrace.rs index 86f493f8a554..8fe54039ce90 100644 --- a/core/src/layers/dtrace.rs +++ b/core/src/layers/dtrace.rs @@ -340,10 +340,10 @@ impl DtraceLayerWrapper { } impl oio::Read for DtraceLayerWrapper { - async fn read_at(&self, offset: u64, limit: usize) -> Result { + async fn read_at(&self, offset: u64, size: usize) -> Result { let c_path = CString::new(self.path.clone()).unwrap(); probe_lazy!(opendal, reader_read_start, c_path.as_ptr()); - match self.inner.read_at(offset, limit).await { + match self.inner.read_at(offset, size).await { Ok(bs) => { probe_lazy!(opendal, reader_read_ok, c_path.as_ptr(), bs.remaining()); Ok(bs) @@ -357,11 +357,11 @@ impl oio::Read for DtraceLayerWrapper { } impl oio::BlockingRead for DtraceLayerWrapper { - fn read_at(&self, offset: u64, limit: usize) -> Result { + fn read_at(&self, offset: u64, size: usize) -> Result { let c_path = CString::new(self.path.clone()).unwrap(); probe_lazy!(opendal, blocking_reader_read_start, c_path.as_ptr()); self.inner - .read_at(offset, limit) + .read_at(offset, size) .map(|bs| { probe_lazy!( opendal, diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs index 1b6cf420a02f..27b577649e9f 100644 --- a/core/src/layers/error_context.rs +++ b/core/src/layers/error_context.rs @@ -339,25 +339,25 @@ pub struct ErrorContextWrapper { } impl oio::Read for ErrorContextWrapper { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - self.inner.read_at(offset, limit).await.map_err(|err| { + async fn read_at(&self, offset: u64, size: usize) -> Result { + self.inner.read_at(offset, size).await.map_err(|err| { err.with_operation(ReadOperation::Read) .with_context("service", self.scheme) .with_context("path", &self.path) .with_context("offset", offset.to_string()) - .with_context("limit", limit.to_string()) + .with_context("size", size.to_string()) }) } } impl oio::BlockingRead for ErrorContextWrapper { - fn read_at(&self, offset: u64, limit: usize) -> Result { - self.inner.read_at(offset, limit).map_err(|err| { + fn read_at(&self, offset: u64, size: usize) -> Result { + self.inner.read_at(offset, size).map_err(|err| { err.with_operation(ReadOperation::BlockingRead) .with_context("service", self.scheme) .with_context("path", &self.path) .with_context("offset", offset.to_string()) - .with_context("limit", limit.to_string()) + .with_context("size", size.to_string()) }) } } diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs index 62ca491f9591..844bd12ac341 100644 --- a/core/src/layers/logging.rs +++ b/core/src/layers/logging.rs @@ -978,8 +978,8 @@ impl Drop for LoggingReader { } impl oio::Read for LoggingReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - match self.inner.read_at(offset, limit).await { + async fn read_at(&self, offset: u64, size: usize) -> Result { + match self.inner.read_at(offset, size).await { Ok(bs) => { self.read .fetch_add(bs.remaining() as u64, Ordering::Relaxed); @@ -1014,8 +1014,8 @@ impl oio::Read for LoggingReader { } impl oio::BlockingRead for LoggingReader { - fn read_at(&self, offset: u64, limit: usize) -> Result { - match self.inner.read_at(offset, limit) { + fn read_at(&self, offset: u64, size: usize) -> Result { + match self.inner.read_at(offset, size) { Ok(bs) => { self.read .fetch_add(bs.remaining() as u64, Ordering::Relaxed); diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs index b5319a96a500..7607d45f31ad 100644 --- a/core/src/layers/madsim.rs +++ b/core/src/layers/madsim.rs @@ -261,9 +261,9 @@ pub struct MadsimReader { } impl oio::Read for MadsimReader { - async fn read_at(&self, offset: u64, limit: usize) -> crate::Result { + async fn read_at(&self, offset: u64, size: usize) -> crate::Result { if let Some(ref data) = self.data { - let size = min(limit, data.len()); + let size = min(size, data.len()); Ok(data.clone().split_to(size).into()) } else { Ok(Buffer::new()) diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs index 1933835795ee..9c9a343c5de7 100644 --- a/core/src/layers/metrics.rs +++ b/core/src/layers/metrics.rs @@ -747,10 +747,10 @@ impl MetricWrapper { } impl oio::Read for MetricWrapper { - async fn read_at(&self, offset: u64, limit: usize) -> Result { + async fn read_at(&self, offset: u64, size: usize) -> Result { let start = Instant::now(); - match self.inner.read_at(offset, limit).await { + match self.inner.read_at(offset, size).await { Ok(bs) => { self.bytes_counter.increment(bs.remaining() as u64); self.requests_duration_seconds @@ -766,11 +766,11 @@ impl oio::Read for MetricWrapper { } impl oio::BlockingRead for MetricWrapper { - fn read_at(&self, offset: u64, limit: usize) -> Result { + fn read_at(&self, offset: u64, size: usize) -> Result { let start = Instant::now(); self.inner - .read_at(offset, limit) + .read_at(offset, size) .map(|bs| { self.bytes_counter.increment(bs.remaining() as u64); self.requests_duration_seconds diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs index 5ea91b9fce25..8ec44d086f21 100644 --- a/core/src/layers/minitrace.rs +++ b/core/src/layers/minitrace.rs @@ -292,16 +292,16 @@ impl MinitraceWrapper { impl oio::Read for MinitraceWrapper { #[trace(enter_on_poll = true)] - async fn read_at(&self, offset: u64, limit: usize) -> Result { - self.inner.read_at(offset, limit).await + async fn read_at(&self, offset: u64, size: usize) -> Result { + self.inner.read_at(offset, size).await } } impl oio::BlockingRead for MinitraceWrapper { - fn read_at(&self, offset: u64, limit: usize) -> Result { + fn read_at(&self, offset: u64, size: usize) -> Result { let _g = self.span.set_local_parent(); let _span = LocalSpan::enter_with_local_parent(ReadOperation::BlockingRead.into_static()); - self.inner.read_at(offset, limit) + self.inner.read_at(offset, size) } } diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs index 2efb380dc4e3..8085d6d9831e 100644 --- a/core/src/layers/oteltrace.rs +++ b/core/src/layers/oteltrace.rs @@ -272,14 +272,14 @@ impl OtelTraceWrapper { } impl oio::Read for OtelTraceWrapper { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - self.inner.read_at(offset, limit).await + async fn read_at(&self, offset: u64, size: usize) -> Result { + self.inner.read_at(offset, size).await } } impl oio::BlockingRead for OtelTraceWrapper { - fn read_at(&self, offset: u64, limit: usize) -> Result { - self.inner.read_at(offset, limit) + fn read_at(&self, offset: u64, size: usize) -> Result { + self.inner.read_at(offset, size) } } diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 48ff52c1ba0e..741a864dd74b 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -680,13 +680,13 @@ impl PrometheusMetricWrapper { } impl oio::Read for PrometheusMetricWrapper { - async fn read_at(&self, offset: u64, limit: usize) -> Result { + async fn read_at(&self, offset: u64, size: usize) -> Result { let labels = self.stats.generate_metric_label( self.scheme.into_static(), Operation::Read.into_static(), &self.path, ); - match self.inner.read_at(offset, limit).await { + match self.inner.read_at(offset, size).await { Ok(bytes) => { self.stats .bytes_total @@ -703,14 +703,14 @@ impl oio::Read for PrometheusMetricWrapper { } impl oio::BlockingRead for PrometheusMetricWrapper { - fn read_at(&self, offset: u64, limit: usize) -> Result { + fn read_at(&self, offset: u64, size: usize) -> Result { let labels = self.stats.generate_metric_label( self.scheme.into_static(), Operation::BlockingRead.into_static(), &self.path, ); self.inner - .read_at(offset, limit) + .read_at(offset, size) .map(|bs| { self.stats .bytes_total diff --git a/core/src/layers/prometheus_client.rs b/core/src/layers/prometheus_client.rs index 9db54604fc06..ac436d808976 100644 --- a/core/src/layers/prometheus_client.rs +++ b/core/src/layers/prometheus_client.rs @@ -533,10 +533,10 @@ impl PrometheusMetricWrapper { } impl oio::Read for PrometheusMetricWrapper { - async fn read_at(&self, offset: u64, limit: usize) -> Result { + async fn read_at(&self, offset: u64, size: usize) -> Result { let start = Instant::now(); - match self.inner.read_at(offset, limit).await { + match self.inner.read_at(offset, size).await { Ok(bs) => { self.metrics .observe_bytes_total(self.scheme, self.op, bs.remaining()); @@ -554,10 +554,10 @@ impl oio::Read for PrometheusMetricWrapper { } impl oio::BlockingRead for PrometheusMetricWrapper { - fn read_at(&self, offset: u64, limit: usize) -> Result { + fn read_at(&self, offset: u64, size: usize) -> Result { let start = Instant::now(); self.inner - .read_at(offset, limit) + .read_at(offset, size) .map(|bs| { self.metrics .observe_bytes_total(self.scheme, self.op, bs.remaining()); diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index 1756a6fbd2d3..1bd4505bdbba 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -656,13 +656,13 @@ impl RetryWrapper { } impl oio::Read for RetryWrapper { - async fn read_at(&self, offset: u64, limit: usize) -> Result { + async fn read_at(&self, offset: u64, size: usize) -> Result { { || { self.inner .as_ref() .expect("inner must be valid") - .read_at(offset, limit) + .read_at(offset, size) } } .retry(&self.builder) @@ -683,8 +683,8 @@ impl oio::Read for RetryWrapper { } impl oio::BlockingRead for RetryWrapper { - fn read_at(&self, offset: u64, limit: usize) -> Result { - { || self.inner.as_ref().unwrap().read_at(offset, limit) } + fn read_at(&self, offset: u64, size: usize) -> Result { + { || self.inner.as_ref().unwrap().read_at(offset, size) } .retry(&self.builder) .when(|e| e.is_temporary()) .notify(|err, dur| { diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs index 3c1a851efc39..fbeb3a643636 100644 --- a/core/src/layers/throttle.rs +++ b/core/src/layers/throttle.rs @@ -179,16 +179,14 @@ impl ThrottleWrapper { } impl oio::Read for ThrottleWrapper { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - // TODO: How can we handle buffer reads with a limiter? - self.inner.read_at(offset, limit).await + async fn read_at(&self, offset: u64, size: usize) -> Result { + self.inner.read_at(offset, size).await } } impl oio::BlockingRead for ThrottleWrapper { - fn read_at(&self, offset: u64, limit: usize) -> Result { - // TODO: How can we handle buffer reads with a limiter? - self.inner.read_at(offset, limit) + fn read_at(&self, offset: u64, size: usize) -> Result { + self.inner.read_at(offset, size) } } diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index ed2387e9e08c..9703f2edc643 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -287,8 +287,8 @@ impl TimeoutWrapper { } impl oio::Read for TimeoutWrapper { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let fut = self.inner.read_at(offset, limit); + async fn read_at(&self, offset: u64, size: usize) -> Result { + let fut = self.inner.read_at(offset, size); Self::io_timeout(self.timeout, ReadOperation::Read.into_static(), fut).await } } diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs index fa4e7593fc9a..be01d97e622b 100644 --- a/core/src/layers/tracing.rs +++ b/core/src/layers/tracing.rs @@ -266,8 +266,8 @@ impl oio::Read for TracingWrapper { parent = &self.span, level = "trace", skip_all)] - async fn read_at(&self, offset: u64, limit: usize) -> Result { - self.inner.read_at(offset, limit).await + async fn read_at(&self, offset: u64, size: usize) -> Result { + self.inner.read_at(offset, size).await } } @@ -276,8 +276,8 @@ impl oio::BlockingRead for TracingWrapper { parent = &self.span, level = "trace", skip_all)] - fn read_at(&self, offset: u64, limit: usize) -> Result { - self.inner.read_at(offset, limit) + fn read_at(&self, offset: u64, size: usize) -> Result { + self.inner.read_at(offset, size) } } diff --git a/core/src/raw/enum_utils.rs b/core/src/raw/enum_utils.rs index 40c3b95902ba..6c614a752453 100644 --- a/core/src/raw/enum_utils.rs +++ b/core/src/raw/enum_utils.rs @@ -52,19 +52,19 @@ pub enum TwoWays { } impl oio::Read for TwoWays { - async fn read_at(&self, offset: u64, limit: usize) -> Result { + async fn read_at(&self, offset: u64, size: usize) -> Result { match self { - TwoWays::One(v) => v.read_at(offset, limit).await, - TwoWays::Two(v) => v.read_at(offset, limit).await, + TwoWays::One(v) => v.read_at(offset, size).await, + TwoWays::Two(v) => v.read_at(offset, size).await, } } } impl oio::BlockingRead for TwoWays { - fn read_at(&self, offset: u64, limit: usize) -> Result { + fn read_at(&self, offset: u64, size: usize) -> Result { match self { - Self::One(v) => v.read_at(offset, limit), - Self::Two(v) => v.read_at(offset, limit), + Self::One(v) => v.read_at(offset, size), + Self::Two(v) => v.read_at(offset, size), } } } @@ -105,11 +105,11 @@ pub enum ThreeWays { } impl oio::Read for ThreeWays { - async fn read_at(&self, offset: u64, limit: usize) -> Result { + async fn read_at(&self, offset: u64, size: usize) -> Result { match self { - ThreeWays::One(v) => v.read_at(offset, limit).await, - ThreeWays::Two(v) => v.read_at(offset, limit).await, - ThreeWays::Three(v) => v.read_at(offset, limit).await, + ThreeWays::One(v) => v.read_at(offset, size).await, + ThreeWays::Two(v) => v.read_at(offset, size).await, + ThreeWays::Three(v) => v.read_at(offset, size).await, } } } @@ -117,11 +117,11 @@ impl oio::Read for ThreeWays oio::BlockingRead for ThreeWays { - fn read_at(&self, offset: u64, limit: usize) -> Result { + fn read_at(&self, offset: u64, size: usize) -> Result { match self { - Self::One(v) => v.read_at(offset, limit), - Self::Two(v) => v.read_at(offset, limit), - Self::Three(v) => v.read_at(offset, limit), + Self::One(v) => v.read_at(offset, size), + Self::Two(v) => v.read_at(offset, size), + Self::Three(v) => v.read_at(offset, size), } } } @@ -175,12 +175,12 @@ where THREE: oio::Read, FOUR: oio::Read, { - async fn read_at(&self, offset: u64, limit: usize) -> Result { + async fn read_at(&self, offset: u64, size: usize) -> Result { match self { - FourWays::One(v) => v.read_at(offset, limit).await, - FourWays::Two(v) => v.read_at(offset, limit).await, - FourWays::Three(v) => v.read_at(offset, limit).await, - FourWays::Four(v) => v.read_at(offset, limit).await, + FourWays::One(v) => v.read_at(offset, size).await, + FourWays::Two(v) => v.read_at(offset, size).await, + FourWays::Three(v) => v.read_at(offset, size).await, + FourWays::Four(v) => v.read_at(offset, size).await, } } } @@ -192,12 +192,12 @@ where THREE: oio::BlockingRead, FOUR: oio::BlockingRead, { - fn read_at(&self, offset: u64, limit: usize) -> Result { + fn read_at(&self, offset: u64, size: usize) -> Result { match self { - Self::One(v) => v.read_at(offset, limit), - Self::Two(v) => v.read_at(offset, limit), - Self::Three(v) => v.read_at(offset, limit), - Self::Four(v) => v.read_at(offset, limit), + Self::One(v) => v.read_at(offset, size), + Self::Two(v) => v.read_at(offset, size), + Self::Three(v) => v.read_at(offset, size), + Self::Four(v) => v.read_at(offset, size), } } } diff --git a/core/src/raw/oio/read/api.rs b/core/src/raw/oio/read/api.rs index 99123a8dd5e8..49847032f684 100644 --- a/core/src/raw/oio/read/api.rs +++ b/core/src/raw/oio/read/api.rs @@ -78,22 +78,14 @@ pub type Reader = Arc; /// an additional layer of indirection and an extra allocation. Ideally, `ReadDyn` should occur only /// once, at the outermost level of our API. pub trait Read: Unpin + Send + Sync { - /// Read at the given offset with the given limit. - /// - /// # Notes - /// - /// Storage services should try to read as much as possible, only return bytes less than the - /// limit while reaching the end of the file. - fn read_at( - &self, - offset: u64, - limit: usize, - ) -> impl Future> + MaybeSend; + /// Read at the given offset with the given size. + fn read_at(&self, offset: u64, size: usize) + -> impl Future> + MaybeSend; } impl Read for () { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let (_, _) = (offset, limit); + async fn read_at(&self, offset: u64, size: usize) -> Result { + let _ = (offset, size); Err(Error::new( ErrorKind::Unsupported, @@ -103,24 +95,44 @@ impl Read for () { } impl Read for Bytes { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - if offset >= self.len() as u64 { - return Ok(Buffer::new()); - } + async fn read_at(&self, offset: u64, size: usize) -> Result { let offset = offset as usize; - let limit = limit.min(self.len() - offset); - Ok(Buffer::from(self.slice(offset..offset + limit))) + + if offset >= self.len() { + return Err(Error::new( + ErrorKind::RangeNotSatisfied, + "offset out of range", + )); + } + if size > self.len() - offset { + return Err(Error::new( + ErrorKind::RangeNotSatisfied, + "size out of range", + )); + } + + Ok(Buffer::from(self.slice(offset..offset + size))) } } impl Read for Buffer { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - if offset >= self.len() as u64 { - return Ok(Buffer::new()); - } + async fn read_at(&self, offset: u64, size: usize) -> Result { let offset = offset as usize; - let limit = limit.min(self.len() - offset); - Ok(self.slice(offset..offset + limit)) + + if offset >= self.len() { + return Err(Error::new( + ErrorKind::RangeNotSatisfied, + "offset out of range", + )); + } + if size > self.len() - offset { + return Err(Error::new( + ErrorKind::RangeNotSatisfied, + "size out of range", + )); + } + + Ok(self.slice(offset..offset + size)) } } @@ -130,12 +142,12 @@ pub trait ReadDyn: Unpin + Send + Sync { /// The dyn version of [`Read::read_at`]. /// /// This function returns a boxed future to make it object safe. - fn read_at_dyn(&self, offset: u64, limit: usize) -> BoxedFuture>; + fn read_at_dyn(&self, offset: u64, size: usize) -> BoxedFuture>; } impl ReadDyn for T { - fn read_at_dyn(&self, offset: u64, limit: usize) -> BoxedFuture> { - Box::pin(self.read_at(offset, limit)) + fn read_at_dyn(&self, offset: u64, size: usize) -> BoxedFuture> { + Box::pin(self.read_at(offset, size)) } } @@ -144,14 +156,14 @@ impl ReadDyn for T { /// Take care about the `deref_mut()` here. This makes sure that we are calling functions /// upon `&mut T` instead of `&mut Box`. The later could result in infinite recursion. impl Read for Box { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - self.deref().read_at_dyn(offset, limit).await + async fn read_at(&self, offset: u64, size: usize) -> Result { + self.deref().read_at_dyn(offset, size).await } } impl Read for Arc { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - self.deref().read_at_dyn(offset, limit).await + async fn read_at(&self, offset: u64, size: usize) -> Result { + self.deref().read_at_dyn(offset, size).await } } @@ -160,49 +172,64 @@ pub type BlockingReader = Arc; /// Read is the trait that OpenDAL returns to callers. pub trait BlockingRead: Send + Sync { - /// Read data from the reader at the given offset with the given limit. - /// - /// # Notes - /// - /// Storage services should try to read as much as possible, only return bytes less than the - /// limit while reaching the end of the file. - fn read_at(&self, offset: u64, limit: usize) -> Result; + /// Read data from the reader at the given offset with the given size. + fn read_at(&self, offset: u64, size: usize) -> Result; } impl BlockingRead for () { - fn read_at(&self, offset: u64, limit: usize) -> Result { - let _ = (offset, limit); + fn read_at(&self, offset: u64, size: usize) -> Result { + let _ = (offset, size); unimplemented!("read is required to be implemented for oio::BlockingRead") } } impl BlockingRead for Bytes { - fn read_at(&self, offset: u64, limit: usize) -> Result { - if offset >= self.len() as u64 { - return Ok(Buffer::new()); - } + fn read_at(&self, offset: u64, size: usize) -> Result { let offset = offset as usize; - let limit = limit.min(self.len() - offset); - Ok(Buffer::from(self.slice(offset..offset + limit))) + + if offset >= self.len() { + return Err(Error::new( + ErrorKind::RangeNotSatisfied, + "offset out of range", + )); + } + if size > self.len() - offset { + return Err(Error::new( + ErrorKind::RangeNotSatisfied, + "size out of range", + )); + } + + Ok(Buffer::from(self.slice(offset..offset + size))) } } impl BlockingRead for Buffer { - fn read_at(&self, offset: u64, limit: usize) -> Result { - if offset >= self.len() as u64 { - return Ok(Buffer::new()); - } + fn read_at(&self, offset: u64, size: usize) -> Result { let offset = offset as usize; - let limit = limit.min(self.len() - offset); - Ok(self.slice(offset..offset + limit)) + + if offset >= self.len() { + return Err(Error::new( + ErrorKind::RangeNotSatisfied, + "offset out of range", + )); + } + if size > self.len() - offset { + return Err(Error::new( + ErrorKind::RangeNotSatisfied, + "size out of range", + )); + } + + Ok(self.slice(offset..offset + size)) } } /// `Arc` won't implement `BlockingRead` automatically. /// To make BlockingReader work as expected, we must add this impl. impl BlockingRead for Arc { - fn read_at(&self, offset: u64, limit: usize) -> Result { - (**self).read_at(offset, limit) + fn read_at(&self, offset: u64, size: usize) -> Result { + (**self).read_at(offset, size) } } diff --git a/core/src/services/aliyun_drive/reader.rs b/core/src/services/aliyun_drive/reader.rs index 770abf8b88fc..f6268d3f3d8d 100644 --- a/core/src/services/aliyun_drive/reader.rs +++ b/core/src/services/aliyun_drive/reader.rs @@ -45,15 +45,15 @@ impl AliyunDriveReader { } impl oio::Read for AliyunDriveReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { + async fn read_at(&self, offset: u64, size: usize) -> Result { // AliyunDrive responds with status OK even if the range is not statisfiable. // and then the whole file will be read. let limit = if offset >= self.size { return Ok(Buffer::new()); - } else if offset + (limit as u64) - 1 > self.size { + } else if offset + (size as u64) - 1 > self.size { self.size - offset } else { - limit as u64 + size as u64 }; let range = BytesRange::new(offset, Some(limit)); let req = Request::get(self.download_url.as_str()) diff --git a/core/src/services/alluxio/reader.rs b/core/src/services/alluxio/reader.rs index 23b5c15aeb1e..53dd23912eaf 100644 --- a/core/src/services/alluxio/reader.rs +++ b/core/src/services/alluxio/reader.rs @@ -40,8 +40,8 @@ impl AlluxioReader { } impl oio::Read for AlluxioReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let range = BytesRange::new(offset, Some(limit as u64)); + async fn read_at(&self, offset: u64, size: usize) -> Result { + let range = BytesRange::new(offset, Some(size as u64)); let resp = self.core.read(self.stream_id, range).await?; diff --git a/core/src/services/azblob/reader.rs b/core/src/services/azblob/reader.rs index 6f9540baf6fa..918779438ed0 100644 --- a/core/src/services/azblob/reader.rs +++ b/core/src/services/azblob/reader.rs @@ -42,8 +42,8 @@ impl AzblobReader { } impl oio::Read for AzblobReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let range = BytesRange::new(offset, Some(limit as u64)); + async fn read_at(&self, offset: u64, size: usize) -> Result { + let range = BytesRange::new(offset, Some(size as u64)); let resp = self .core @@ -54,7 +54,6 @@ impl oio::Read for AzblobReader { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(resp.into_body()), - StatusCode::RANGE_NOT_SATISFIABLE => Ok(Buffer::new()), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/azdls/reader.rs b/core/src/services/azdls/reader.rs index a144f8c55c64..e8f049295bc8 100644 --- a/core/src/services/azdls/reader.rs +++ b/core/src/services/azdls/reader.rs @@ -42,8 +42,8 @@ impl AzdlsReader { } impl oio::Read for AzdlsReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let range = BytesRange::new(offset, Some(limit as u64)); + async fn read_at(&self, offset: u64, size: usize) -> Result { + let range = BytesRange::new(offset, Some(size as u64)); let resp = self.core.azdls_read(&self.path, range).await?; @@ -51,7 +51,6 @@ impl oio::Read for AzdlsReader { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(resp.into_body()), - StatusCode::RANGE_NOT_SATISFIABLE => Ok(Buffer::new()), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/azfile/reader.rs b/core/src/services/azfile/reader.rs index 04dfebe5a005..58066d67a467 100644 --- a/core/src/services/azfile/reader.rs +++ b/core/src/services/azfile/reader.rs @@ -42,8 +42,8 @@ impl AzfileReader { } impl oio::Read for AzfileReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let range = BytesRange::new(offset, Some(limit as u64)); + async fn read_at(&self, offset: u64, size: usize) -> Result { + let range = BytesRange::new(offset, Some(size as u64)); let resp = self.core.azfile_read(&self.path, range).await?; @@ -51,7 +51,6 @@ impl oio::Read for AzfileReader { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(resp.into_body()), - StatusCode::RANGE_NOT_SATISFIABLE => Ok(Buffer::new()), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/b2/reader.rs b/core/src/services/b2/reader.rs index 6071d1d3c99e..727ef647d580 100644 --- a/core/src/services/b2/reader.rs +++ b/core/src/services/b2/reader.rs @@ -42,8 +42,8 @@ impl B2Reader { } impl oio::Read for B2Reader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let range = BytesRange::new(offset, Some(limit as u64)); + async fn read_at(&self, offset: u64, size: usize) -> Result { + let range = BytesRange::new(offset, Some(size as u64)); let resp = self .core @@ -54,7 +54,6 @@ impl oio::Read for B2Reader { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(resp.into_body()), - StatusCode::RANGE_NOT_SATISFIABLE => Ok(Buffer::new()), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/chainsafe/reader.rs b/core/src/services/chainsafe/reader.rs index 9e7b635ad97c..1d1e27118aae 100644 --- a/core/src/services/chainsafe/reader.rs +++ b/core/src/services/chainsafe/reader.rs @@ -42,8 +42,8 @@ impl ChainsafeReader { } impl oio::Read for ChainsafeReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let range = BytesRange::new(offset, Some(limit as u64)); + async fn read_at(&self, offset: u64, size: usize) -> Result { + let range = BytesRange::new(offset, Some(size as u64)); let resp = self.core.download_object(&self.path, range).await?; @@ -51,7 +51,6 @@ impl oio::Read for ChainsafeReader { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(resp.into_body()), - StatusCode::RANGE_NOT_SATISFIABLE => Ok(Buffer::new()), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/cos/reader.rs b/core/src/services/cos/reader.rs index 75c031174963..15dbfdf69532 100644 --- a/core/src/services/cos/reader.rs +++ b/core/src/services/cos/reader.rs @@ -42,8 +42,8 @@ impl CosReader { } impl oio::Read for CosReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let range = BytesRange::new(offset, Some(limit as u64)); + async fn read_at(&self, offset: u64, size: usize) -> Result { + let range = BytesRange::new(offset, Some(size as u64)); let resp = self .core @@ -54,7 +54,6 @@ impl oio::Read for CosReader { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(resp.into_body()), - StatusCode::RANGE_NOT_SATISFIABLE => Ok(Buffer::new()), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/dropbox/reader.rs b/core/src/services/dropbox/reader.rs index 9d4152831b09..2675bb35bea4 100644 --- a/core/src/services/dropbox/reader.rs +++ b/core/src/services/dropbox/reader.rs @@ -42,8 +42,8 @@ impl DropboxReader { } impl oio::Read for DropboxReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let range = BytesRange::new(offset, Some(limit as u64)); + async fn read_at(&self, offset: u64, size: usize) -> Result { + let range = BytesRange::new(offset, Some(size as u64)); let resp = self.core.dropbox_get(&self.path, range, &self.op).await?; @@ -51,7 +51,6 @@ impl oio::Read for DropboxReader { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(resp.into_body()), - StatusCode::RANGE_NOT_SATISFIABLE => Ok(Buffer::new()), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/fs/reader.rs b/core/src/services/fs/reader.rs index f5b7b9fb7f3f..afb5e18a054b 100644 --- a/core/src/services/fs/reader.rs +++ b/core/src/services/fs/reader.rs @@ -61,12 +61,12 @@ impl FsReader { } impl oio::Read for FsReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { + async fn read_at(&self, offset: u64, size: usize) -> Result { let handle = self.try_clone()?; match tokio::runtime::Handle::try_current() { Ok(runtime) => runtime - .spawn_blocking(move || oio::BlockingRead::read_at(&handle, offset, limit)) + .spawn_blocking(move || oio::BlockingRead::read_at(&handle, offset, size)) .await .map_err(|err| { Error::new(ErrorKind::Unexpected, "tokio spawn io task failed").set_source(err) @@ -80,16 +80,16 @@ impl oio::Read for FsReader { } impl oio::BlockingRead for FsReader { - fn read_at(&self, mut offset: u64, limit: usize) -> Result { + fn read_at(&self, mut offset: u64, size: usize) -> Result { let mut bs = self.core.buf_pool.get(); - bs.reserve(limit); + bs.reserve(size); - let buf = &mut bs.spare_capacity_mut()[..limit]; + let buf = &mut bs.spare_capacity_mut()[..size]; let mut read_buf: ReadBuf = ReadBuf::uninit(buf); // SAFETY: Read at most `limit` bytes into `read_buf`. unsafe { - read_buf.assume_init(limit); + read_buf.assume_init(size); } loop { diff --git a/core/src/services/ftp/reader.rs b/core/src/services/ftp/reader.rs index 72cd02fb2fcf..cf37429ccf01 100644 --- a/core/src/services/ftp/reader.rs +++ b/core/src/services/ftp/reader.rs @@ -40,7 +40,7 @@ impl FtpReader { } impl oio::Read for FtpReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { + async fn read_at(&self, offset: u64, size: usize) -> Result { let mut ftp_stream = self.core.ftp_connect(Operation::Read).await?; if offset != 0 { @@ -54,8 +54,8 @@ impl oio::Read for FtpReader { .retr_as_stream(&self.path) .await .map_err(parse_error)? - .take(limit as _); - let mut bs = Vec::with_capacity(limit); + .take(size as _); + let mut bs = Vec::with_capacity(size); ds.read_to_end(&mut bs).await.map_err(new_std_io_error)?; Ok(Buffer::from(bs)) } diff --git a/core/src/services/gcs/reader.rs b/core/src/services/gcs/reader.rs index 88a80c048e47..70ad4f4a5c37 100644 --- a/core/src/services/gcs/reader.rs +++ b/core/src/services/gcs/reader.rs @@ -42,8 +42,8 @@ impl GcsReader { } impl oio::Read for GcsReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let range = BytesRange::new(offset, Some(limit as u64)); + async fn read_at(&self, offset: u64, size: usize) -> Result { + let range = BytesRange::new(offset, Some(size as u64)); let resp = self .core @@ -54,7 +54,6 @@ impl oio::Read for GcsReader { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(resp.into_body()), - StatusCode::RANGE_NOT_SATISFIABLE => Ok(Buffer::new()), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/gdrive/reader.rs b/core/src/services/gdrive/reader.rs index 557864d57754..1cce64a1af9c 100644 --- a/core/src/services/gdrive/reader.rs +++ b/core/src/services/gdrive/reader.rs @@ -42,8 +42,8 @@ impl GdriveReader { } impl oio::Read for GdriveReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let range = BytesRange::new(offset, Some(limit as u64)); + async fn read_at(&self, offset: u64, size: usize) -> Result { + let range = BytesRange::new(offset, Some(size as u64)); let resp = self.core.gdrive_get(&self.path, range).await?; @@ -51,7 +51,6 @@ impl oio::Read for GdriveReader { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(resp.into_body()), - StatusCode::RANGE_NOT_SATISFIABLE => Ok(Buffer::new()), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/ghac/reader.rs b/core/src/services/ghac/reader.rs index eb1b87b28dd9..2674dfcbed67 100644 --- a/core/src/services/ghac/reader.rs +++ b/core/src/services/ghac/reader.rs @@ -40,8 +40,8 @@ impl GhacReader { } impl oio::Read for GhacReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let range = BytesRange::new(offset, Some(limit as u64)); + async fn read_at(&self, offset: u64, size: usize) -> Result { + let range = BytesRange::new(offset, Some(size as u64)); let req = self.core.ghac_get_location(&self.location, range).await?; let resp = self.core.client.send(req).await?; @@ -50,7 +50,6 @@ impl oio::Read for GhacReader { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(resp.into_body()), - StatusCode::RANGE_NOT_SATISFIABLE => Ok(Buffer::new()), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/github/reader.rs b/core/src/services/github/reader.rs index 6c0276208151..9425aa60b072 100644 --- a/core/src/services/github/reader.rs +++ b/core/src/services/github/reader.rs @@ -42,8 +42,8 @@ impl GithubReader { } impl oio::Read for GithubReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let range = BytesRange::new(offset, Some(limit as u64)); + async fn read_at(&self, offset: u64, size: usize) -> Result { + let range = BytesRange::new(offset, Some(size as u64)); let resp = self.core.get(&self.path, range).await?; @@ -51,7 +51,6 @@ impl oio::Read for GithubReader { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(resp.into_body()), - StatusCode::RANGE_NOT_SATISFIABLE => Ok(Buffer::new()), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/hdfs/reader.rs b/core/src/services/hdfs/reader.rs index ceb563e18dac..6f5565b68749 100644 --- a/core/src/services/hdfs/reader.rs +++ b/core/src/services/hdfs/reader.rs @@ -34,11 +34,11 @@ impl HdfsReader { } impl oio::Read for HdfsReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { + async fn read_at(&self, offset: u64, size: usize) -> Result { let r = Self { f: self.f.clone() }; match tokio::runtime::Handle::try_current() { Ok(runtime) => runtime - .spawn_blocking(move || oio::BlockingRead::read_at(&r, offset, limit)) + .spawn_blocking(move || oio::BlockingRead::read_at(&r, offset, size)) .await .map_err(|err| { Error::new(ErrorKind::Unexpected, "tokio spawn io task failed").set_source(err) @@ -52,15 +52,15 @@ impl oio::Read for HdfsReader { } impl oio::BlockingRead for HdfsReader { - fn read_at(&self, mut offset: u64, limit: usize) -> Result { - let mut bs = Vec::with_capacity(limit); + fn read_at(&self, mut offset: u64, size: usize) -> Result { + let mut bs = Vec::with_capacity(size); let buf = bs.spare_capacity_mut(); let mut read_buf: ReadBuf = ReadBuf::uninit(buf); // SAFETY: Read at most `size` bytes into `read_buf`. unsafe { - read_buf.assume_init(limit); + read_buf.assume_init(size); } loop { diff --git a/core/src/services/http/reader.rs b/core/src/services/http/reader.rs index 204babe69848..e4a57722fab8 100644 --- a/core/src/services/http/reader.rs +++ b/core/src/services/http/reader.rs @@ -40,8 +40,8 @@ impl HttpReader { } impl oio::Read for HttpReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let range = BytesRange::new(offset, Some(limit as u64)); + async fn read_at(&self, offset: u64, size: usize) -> Result { + let range = BytesRange::new(offset, Some(size as u64)); let resp = self.core.http_get(&self.path, range, &self.op).await?; @@ -49,7 +49,6 @@ impl oio::Read for HttpReader { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(resp.into_body()), - StatusCode::RANGE_NOT_SATISFIABLE => Ok(Buffer::new()), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/huggingface/reader.rs b/core/src/services/huggingface/reader.rs index 371b362a07e7..40e0a8d7f62e 100644 --- a/core/src/services/huggingface/reader.rs +++ b/core/src/services/huggingface/reader.rs @@ -42,8 +42,8 @@ impl HuggingfaceReader { } impl oio::Read for HuggingfaceReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let range = BytesRange::new(offset, Some(limit as u64)); + async fn read_at(&self, offset: u64, size: usize) -> Result { + let range = BytesRange::new(offset, Some(size as u64)); let resp = self.core.hf_resolve(&self.path, range, &self.op).await?; @@ -51,7 +51,6 @@ impl oio::Read for HuggingfaceReader { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(resp.into_body()), - StatusCode::RANGE_NOT_SATISFIABLE => Ok(Buffer::new()), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/icloud/reader.rs b/core/src/services/icloud/reader.rs index 281e9aca0390..84fd95fdf9fb 100644 --- a/core/src/services/icloud/reader.rs +++ b/core/src/services/icloud/reader.rs @@ -42,8 +42,8 @@ impl IcloudReader { } impl oio::Read for IcloudReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let range = BytesRange::new(offset, Some(limit as u64)); + async fn read_at(&self, offset: u64, size: usize) -> Result { + let range = BytesRange::new(offset, Some(size as u64)); let resp = self.core.read(&self.path, range, &self.op).await?; @@ -51,7 +51,6 @@ impl oio::Read for IcloudReader { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(resp.into_body()), - StatusCode::RANGE_NOT_SATISFIABLE => Ok(Buffer::new()), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/ipfs/reader.rs b/core/src/services/ipfs/reader.rs index 064a20b62c36..4b7e0157d5b5 100644 --- a/core/src/services/ipfs/reader.rs +++ b/core/src/services/ipfs/reader.rs @@ -40,8 +40,8 @@ impl IpfsReader { } impl oio::Read for IpfsReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let range = BytesRange::new(offset, Some(limit as u64)); + async fn read_at(&self, offset: u64, size: usize) -> Result { + let range = BytesRange::new(offset, Some(size as u64)); let resp = self.core.ipfs_get(&self.path, range).await?; @@ -49,7 +49,6 @@ impl oio::Read for IpfsReader { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(resp.into_body()), - StatusCode::RANGE_NOT_SATISFIABLE => Ok(Buffer::new()), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/ipmfs/reader.rs b/core/src/services/ipmfs/reader.rs index ee802afbe8c5..48449edd6a26 100644 --- a/core/src/services/ipmfs/reader.rs +++ b/core/src/services/ipmfs/reader.rs @@ -40,8 +40,8 @@ impl IpmfsReader { } impl oio::Read for IpmfsReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let range = BytesRange::new(offset, Some(limit as u64)); + async fn read_at(&self, offset: u64, size: usize) -> Result { + let range = BytesRange::new(offset, Some(size as u64)); let resp = self.core.ipmfs_read(&self.path, range).await?; @@ -49,7 +49,6 @@ impl oio::Read for IpmfsReader { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(resp.into_body()), - StatusCode::RANGE_NOT_SATISFIABLE => Ok(Buffer::new()), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/koofr/reader.rs b/core/src/services/koofr/reader.rs index 7f7f65c4d4e5..36c2bc0fcc19 100644 --- a/core/src/services/koofr/reader.rs +++ b/core/src/services/koofr/reader.rs @@ -42,8 +42,8 @@ impl KoofrReader { } impl oio::Read for KoofrReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let range = BytesRange::new(offset, Some(limit as u64)); + async fn read_at(&self, offset: u64, size: usize) -> Result { + let range = BytesRange::new(offset, Some(size as u64)); let resp = self.core.get(&self.path, range).await?; @@ -51,7 +51,6 @@ impl oio::Read for KoofrReader { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(resp.into_body()), - StatusCode::RANGE_NOT_SATISFIABLE => Ok(Buffer::new()), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/obs/reader.rs b/core/src/services/obs/reader.rs index b85290d893ce..55e94684829d 100644 --- a/core/src/services/obs/reader.rs +++ b/core/src/services/obs/reader.rs @@ -42,8 +42,8 @@ impl ObsReader { } impl oio::Read for ObsReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let range = BytesRange::new(offset, Some(limit as u64)); + async fn read_at(&self, offset: u64, size: usize) -> Result { + let range = BytesRange::new(offset, Some(size as u64)); let resp = self .core @@ -54,7 +54,6 @@ impl oio::Read for ObsReader { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(resp.into_body()), - StatusCode::RANGE_NOT_SATISFIABLE => Ok(Buffer::new()), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/onedrive/reader.rs b/core/src/services/onedrive/reader.rs index 42b4ee366132..8a8bbe987eee 100644 --- a/core/src/services/onedrive/reader.rs +++ b/core/src/services/onedrive/reader.rs @@ -40,8 +40,8 @@ impl OnedriveReader { } impl oio::Read for OnedriveReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let range = BytesRange::new(offset, Some(limit as u64)); + async fn read_at(&self, offset: u64, size: usize) -> Result { + let range = BytesRange::new(offset, Some(size as u64)); let resp = self.core.onedrive_get_content(&self.path, range).await?; @@ -49,7 +49,6 @@ impl oio::Read for OnedriveReader { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(resp.into_body()), - StatusCode::RANGE_NOT_SATISFIABLE => Ok(Buffer::new()), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/oss/reader.rs b/core/src/services/oss/reader.rs index 108958e397a7..eceb1d168407 100644 --- a/core/src/services/oss/reader.rs +++ b/core/src/services/oss/reader.rs @@ -42,8 +42,8 @@ impl OssReader { } impl oio::Read for OssReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let range = BytesRange::new(offset, Some(limit as u64)); + async fn read_at(&self, offset: u64, size: usize) -> Result { + let range = BytesRange::new(offset, Some(size as u64)); let resp = self .core @@ -54,7 +54,6 @@ impl oio::Read for OssReader { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(resp.into_body()), - StatusCode::RANGE_NOT_SATISFIABLE => Ok(Buffer::new()), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/pcloud/reader.rs b/core/src/services/pcloud/reader.rs index 848dfa9d0a69..81288ab2f2cd 100644 --- a/core/src/services/pcloud/reader.rs +++ b/core/src/services/pcloud/reader.rs @@ -42,8 +42,8 @@ impl PcloudReader { } impl oio::Read for PcloudReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let range = BytesRange::new(offset, Some(limit as u64)); + async fn read_at(&self, offset: u64, size: usize) -> Result { + let range = BytesRange::new(offset, Some(size as u64)); let resp = self.core.download(&self.link, range).await?; @@ -51,7 +51,6 @@ impl oio::Read for PcloudReader { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(resp.into_body()), - StatusCode::RANGE_NOT_SATISFIABLE => Ok(Buffer::new()), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/s3/reader.rs b/core/src/services/s3/reader.rs index f5a83ea2bde9..db0bacb29477 100644 --- a/core/src/services/s3/reader.rs +++ b/core/src/services/s3/reader.rs @@ -42,15 +42,14 @@ impl S3Reader { } impl oio::Read for S3Reader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let range = BytesRange::new(offset, Some(limit as u64)); + async fn read_at(&self, offset: u64, size: usize) -> Result { + let range = BytesRange::new(offset, Some(size as u64)); let resp = self.core.s3_get_object(&self.path, range, &self.op).await?; let status = resp.status(); match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(resp.into_body()), - StatusCode::RANGE_NOT_SATISFIABLE => Ok(Buffer::new()), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/seafile/reader.rs b/core/src/services/seafile/reader.rs index b28e48e6260c..bb125e30699f 100644 --- a/core/src/services/seafile/reader.rs +++ b/core/src/services/seafile/reader.rs @@ -42,8 +42,8 @@ impl SeafileReader { } impl oio::Read for SeafileReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let range = BytesRange::new(offset, Some(limit as u64)); + async fn read_at(&self, offset: u64, size: usize) -> Result { + let range = BytesRange::new(offset, Some(size as u64)); let resp = self.core.download_file(&self.path, range).await?; @@ -51,7 +51,6 @@ impl oio::Read for SeafileReader { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(resp.into_body()), - StatusCode::RANGE_NOT_SATISFIABLE => Ok(Buffer::new()), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/sftp/reader.rs b/core/src/services/sftp/reader.rs index 05ee2fdd997a..bdbe34a0c5e7 100644 --- a/core/src/services/sftp/reader.rs +++ b/core/src/services/sftp/reader.rs @@ -38,7 +38,7 @@ impl SftpReader { } impl oio::Read for SftpReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { + async fn read_at(&self, offset: u64, size: usize) -> Result { let client = self.inner.connect().await?; let mut fs = client.fs(); @@ -58,12 +58,12 @@ impl oio::Read for SftpReader { .await .map_err(new_std_io_error)?; - let mut size = limit; + let mut size = size; if size == 0 { return Ok(Buffer::new()); } - let mut buf = BytesMut::with_capacity(limit); + let mut buf = BytesMut::with_capacity(size); while size > 0 { let len = buf.len(); if let Some(bytes) = f diff --git a/core/src/services/supabase/reader.rs b/core/src/services/supabase/reader.rs index 544706502aa9..d7e662a35074 100644 --- a/core/src/services/supabase/reader.rs +++ b/core/src/services/supabase/reader.rs @@ -42,8 +42,8 @@ impl SupabaseReader { } impl oio::Read for SupabaseReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let range = BytesRange::new(offset, Some(limit as u64)); + async fn read_at(&self, offset: u64, size: usize) -> Result { + let range = BytesRange::new(offset, Some(size as u64)); let resp = self.core.supabase_get_object(&self.path, range).await?; @@ -51,7 +51,6 @@ impl oio::Read for SupabaseReader { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(resp.into_body()), - StatusCode::RANGE_NOT_SATISFIABLE => Ok(Buffer::new()), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/swift/reader.rs b/core/src/services/swift/reader.rs index 7fe365b896f7..2c83f7740850 100644 --- a/core/src/services/swift/reader.rs +++ b/core/src/services/swift/reader.rs @@ -42,8 +42,8 @@ impl SwiftReader { } impl oio::Read for SwiftReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let range = BytesRange::new(offset, Some(limit as u64)); + async fn read_at(&self, offset: u64, size: usize) -> Result { + let range = BytesRange::new(offset, Some(size as u64)); let resp = self.core.swift_read(&self.path, range, &self.op).await?; @@ -51,7 +51,6 @@ impl oio::Read for SwiftReader { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(resp.into_body()), - StatusCode::RANGE_NOT_SATISFIABLE => Ok(Buffer::new()), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/upyun/reader.rs b/core/src/services/upyun/reader.rs index 452bf5ff417e..9a5ec37c46a1 100644 --- a/core/src/services/upyun/reader.rs +++ b/core/src/services/upyun/reader.rs @@ -42,8 +42,8 @@ impl UpyunReader { } impl oio::Read for UpyunReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let range = BytesRange::new(offset, Some(limit as u64)); + async fn read_at(&self, offset: u64, size: usize) -> Result { + let range = BytesRange::new(offset, Some(size as u64)); let resp = self.core.download_file(&self.path, range).await?; @@ -51,7 +51,6 @@ impl oio::Read for UpyunReader { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(resp.into_body()), - StatusCode::RANGE_NOT_SATISFIABLE => Ok(Buffer::new()), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/vercel_artifacts/reader.rs b/core/src/services/vercel_artifacts/reader.rs index d60d1b4ec935..de6dac1cdb5d 100644 --- a/core/src/services/vercel_artifacts/reader.rs +++ b/core/src/services/vercel_artifacts/reader.rs @@ -40,8 +40,8 @@ impl VercelArtifactsReader { } impl oio::Read for VercelArtifactsReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let range = BytesRange::new(offset, Some(limit as u64)); + async fn read_at(&self, offset: u64, size: usize) -> Result { + let range = BytesRange::new(offset, Some(size as u64)); let resp = self .core @@ -52,7 +52,6 @@ impl oio::Read for VercelArtifactsReader { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(resp.into_body()), - StatusCode::RANGE_NOT_SATISFIABLE => Ok(Buffer::new()), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/vercel_blob/reader.rs b/core/src/services/vercel_blob/reader.rs index 3cd99fc7b6d0..a76c651922f2 100644 --- a/core/src/services/vercel_blob/reader.rs +++ b/core/src/services/vercel_blob/reader.rs @@ -42,8 +42,8 @@ impl VercelBlobReader { } impl oio::Read for VercelBlobReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let range = BytesRange::new(offset, Some(limit as u64)); + async fn read_at(&self, offset: u64, size: usize) -> Result { + let range = BytesRange::new(offset, Some(size as u64)); let resp = self.core.download(&self.path, range, &self.op).await?; @@ -51,7 +51,6 @@ impl oio::Read for VercelBlobReader { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(resp.into_body()), - StatusCode::RANGE_NOT_SATISFIABLE => Ok(Buffer::new()), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/webdav/reader.rs b/core/src/services/webdav/reader.rs index 83c09398e47c..3af6a95b7750 100644 --- a/core/src/services/webdav/reader.rs +++ b/core/src/services/webdav/reader.rs @@ -42,8 +42,8 @@ impl WebdavReader { } impl oio::Read for WebdavReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let range = BytesRange::new(offset, Some(limit as u64)); + async fn read_at(&self, offset: u64, size: usize) -> Result { + let range = BytesRange::new(offset, Some(size as u64)); let resp = self.core.webdav_get(&self.path, range, &self.op).await?; @@ -51,7 +51,6 @@ impl oio::Read for WebdavReader { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(resp.into_body()), - StatusCode::RANGE_NOT_SATISFIABLE => Ok(Buffer::new()), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/webhdfs/reader.rs b/core/src/services/webhdfs/reader.rs index 30c8dcbfb5b4..3ecb79d873ad 100644 --- a/core/src/services/webhdfs/reader.rs +++ b/core/src/services/webhdfs/reader.rs @@ -15,11 +15,9 @@ // specific language governing permissions and limitations // under the License. -use bytes::Buf; use http::StatusCode; use super::error::parse_error; -use super::error::parse_error_msg; use crate::raw::*; use crate::services::webhdfs::backend::WebhdfsBackend; use crate::*; @@ -42,8 +40,8 @@ impl WebhdfsReader { } impl oio::Read for WebhdfsReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let range = BytesRange::new(offset, Some(limit as u64)); + async fn read_at(&self, offset: u64, size: usize) -> Result { + let range = BytesRange::new(offset, Some(size as u64)); let resp = self.core.webhdfs_read_file(&self.path, range).await?; @@ -51,18 +49,6 @@ impl oio::Read for WebhdfsReader { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(resp.into_body()), - // WebHDFS will returns 403 when range is outside of the end. - StatusCode::FORBIDDEN => { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); - let s = String::from_utf8_lossy(&bs); - if s.contains("out of the range") { - Ok(Buffer::new()) - } else { - Err(parse_error_msg(parts, &s)?) - } - } - StatusCode::RANGE_NOT_SATISFIABLE => Ok(Buffer::new()), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/yandex_disk/reader.rs b/core/src/services/yandex_disk/reader.rs index 57887f3299f7..0dbca67500db 100644 --- a/core/src/services/yandex_disk/reader.rs +++ b/core/src/services/yandex_disk/reader.rs @@ -44,8 +44,8 @@ impl YandexDiskReader { } impl oio::Read for YandexDiskReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let range = BytesRange::new(offset, Some(limit as u64)); + async fn read_at(&self, offset: u64, size: usize) -> Result { + let range = BytesRange::new(offset, Some(size as u64)); // TODO: move this out of reader. let download_url = self.core.get_download_url(&self.path).await?; @@ -60,7 +60,6 @@ impl oio::Read for YandexDiskReader { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(resp.into_body()), - StatusCode::RANGE_NOT_SATISFIABLE => Ok(Buffer::new()), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/types/blocking_read/blocking_reader.rs b/core/src/types/blocking_read/blocking_reader.rs index 6dfedf7b1d57..13264ebf52ee 100644 --- a/core/src/types/blocking_read/blocking_reader.rs +++ b/core/src/types/blocking_read/blocking_reader.rs @@ -18,6 +18,7 @@ use std::collections::Bound; use std::ops::Range; use std::ops::RangeBounds; +use std::sync::Arc; use bytes::Buf; use bytes::BufMut; @@ -28,7 +29,13 @@ use crate::*; /// BlockingReader is designed to read data from given path in an blocking /// manner. pub struct BlockingReader { + acc: Accessor, + path: Arc, + pub(crate) inner: oio::BlockingReader, + + /// Total size of the reader. + size: Arc, } impl BlockingReader { @@ -39,10 +46,45 @@ impl BlockingReader { /// /// We don't want to expose those details to users so keep this function /// in crate only. - pub(crate) fn create(acc: Accessor, path: &str, op: OpRead) -> crate::Result { - let (_, r) = acc.blocking_read(path, op)?; + pub(crate) fn create(acc: Accessor, path: Arc, op: OpRead) -> crate::Result { + let (_, r) = acc.blocking_read(&path, op)?; + + Ok(BlockingReader { + acc, + path, + inner: r, + size: Arc::new(AtomicContentLength::new()), + }) + } + + /// Parse users input range bounds into valid `Range`. + /// + /// To avoid duplicated stat call, we will cache the size of the reader. + fn parse_range(&self, range: impl RangeBounds) -> Result> { + let start = match range.start_bound() { + Bound::Included(v) => *v, + Bound::Excluded(v) => v + 1, + Bound::Unbounded => 0, + }; - Ok(BlockingReader { inner: r }) + let end = match range.end_bound() { + Bound::Included(v) => v + 1, + Bound::Excluded(v) => *v, + Bound::Unbounded => match self.size.load() { + Some(v) => v, + None => { + let size = self + .acc + .blocking_stat(&self.path, OpStat::new())? + .into_metadata() + .content_length(); + self.size.store(size); + size + } + }, + }; + + Ok(start..end) } /// Read give range from reader into [`Buffer`]. @@ -54,40 +96,25 @@ impl BlockingReader { /// /// - Buffer length smaller than range means we have reached the end of file. pub fn read(&self, range: impl RangeBounds) -> Result { - let start = match range.start_bound().cloned() { - Bound::Included(start) => start, - Bound::Excluded(start) => start + 1, - Bound::Unbounded => 0, - }; - - let end = match range.end_bound().cloned() { - Bound::Included(end) => Some(end + 1), - Bound::Excluded(end) => Some(end), - Bound::Unbounded => None, - }; - - // If range is empty, return Ok(0) directly. - if let Some(end) = end { - if end <= start { - return Ok(Buffer::new()); - } - } + let range = self.parse_range(range)?; + let start = range.start; + let end = range.end; let mut bufs = Vec::new(); let mut offset = start; loop { // TODO: use service preferred io size instead. - let limit = end.map(|end| end - offset).unwrap_or(4 * 1024 * 1024) as usize; - let bs = self.inner.read_at(offset, limit)?; + let size = (end - offset) as usize; + let bs = self.inner.read_at(offset, size)?; let n = bs.remaining(); bufs.push(bs); - if n < limit { + if n < size { return Ok(bufs.into_iter().flatten().collect()); } offset += n as u64; - if Some(offset) == end { + if offset == end { return Ok(bufs.into_iter().flatten().collect()); } } @@ -101,41 +128,22 @@ impl BlockingReader { /// /// - Returning length smaller than range means we have reached the end of file. pub fn read_into(&self, buf: &mut impl BufMut, range: impl RangeBounds) -> Result { - let start = match range.start_bound().cloned() { - Bound::Included(start) => start, - Bound::Excluded(start) => start + 1, - Bound::Unbounded => 0, - }; - - let end = match range.end_bound().cloned() { - Bound::Included(end) => Some(end + 1), - Bound::Excluded(end) => Some(end), - Bound::Unbounded => None, - }; - - // If range is empty, return Ok(0) directly. - if let Some(end) = end { - if end <= start { - return Ok(0); - } - } + let range = self.parse_range(range)?; + let start = range.start; + let end = range.end; let mut offset = start; let mut read = 0; loop { // TODO: use service preferred io size instead. - let limit = end.map(|end| end - offset).unwrap_or(4 * 1024 * 1024) as usize; - let bs = self.inner.read_at(offset, limit)?; + let size = (end - offset) as usize; + let bs = self.inner.read_at(offset, size)?; let n = bs.remaining(); buf.put(bs); read += n as u64; - if n < limit { - return Ok(read as _); - } - offset += n as u64; - if Some(offset) == end { + if offset == end { return Ok(read as _); } } @@ -144,14 +152,15 @@ impl BlockingReader { /// Convert reader into [`StdReader`] which implements [`futures::AsyncRead`], /// [`futures::AsyncSeek`] and [`futures::AsyncBufRead`]. #[inline] - pub fn into_std_read(self, range: Range) -> StdReader { - // TODO: the capacity should be decided by services. - StdReader::new(self.inner, range) + pub fn into_std_read(self, range: impl RangeBounds) -> Result { + let range = self.parse_range(range)?; + Ok(StdReader::new(self.inner, range)) } /// Convert reader into [`StdBytesIterator`] which implements [`Iterator`]. #[inline] - pub fn into_bytes_iterator(self, range: impl RangeBounds) -> StdBytesIterator { - StdBytesIterator::new(self.inner, range) + pub fn into_bytes_iterator(self, range: impl RangeBounds) -> Result { + let range = self.parse_range(range)?; + Ok(StdBytesIterator::new(self.inner, range)) } } diff --git a/core/src/types/blocking_read/std_bytes_iterator.rs b/core/src/types/blocking_read/std_bytes_iterator.rs index 7f8752a1c605..ea29cb1ecafd 100644 --- a/core/src/types/blocking_read/std_bytes_iterator.rs +++ b/core/src/types/blocking_read/std_bytes_iterator.rs @@ -15,9 +15,8 @@ // specific language governing permissions and limitations // under the License. -use std::collections::Bound; use std::io; -use std::ops::RangeBounds; +use std::ops::Range; use bytes::Buf; use bytes::Bytes; @@ -41,23 +40,11 @@ pub struct StdBytesIterator { impl StdBytesIterator { /// NOTE: don't allow users to create StdIterator directly. #[inline] - pub(crate) fn new(r: oio::BlockingReader, range: impl RangeBounds) -> Self { - let start = match range.start_bound().cloned() { - Bound::Included(start) => start, - Bound::Excluded(start) => start + 1, - Bound::Unbounded => 0, - }; - - let end = match range.end_bound().cloned() { - Bound::Included(end) => Some(end + 1), - Bound::Excluded(end) => Some(end), - Bound::Unbounded => None, - }; - + pub(crate) fn new(r: oio::BlockingReader, range: Range) -> Self { StdBytesIterator { inner: r, - offset: start, - end: end.unwrap_or(u64::MAX), + offset: range.start, + end: range.end, // TODO: should use services preferred io size. cap: 4 * 1024 * 1024, cur: 0, @@ -80,7 +67,7 @@ impl Iterator for StdBytesIterator { } let next_offset = self.offset + self.cur; - let next_size = (self.end - self.offset).min(self.cap as u64) as usize; + let next_size = (self.end - self.offset - self.cur).min(self.cap as u64) as usize; match self.inner.read_at(next_offset, next_size) { Ok(buf) if !buf.has_remaining() => None, Ok(mut buf) => { diff --git a/core/src/types/error.rs b/core/src/types/error.rs index 8aa36a669fe4..f2c258dc61a0 100644 --- a/core/src/types/error.rs +++ b/core/src/types/error.rs @@ -82,6 +82,10 @@ pub enum ErrorKind { /// As OpenDAL cannot handle the `condition not match` error, it will always return this error to users. /// So users could to handle this error by themselves. ConditionNotMatch, + /// The range of the content is not satisfied. + /// + /// OpenDAL returns this error to indicate that the range of the read request is not satisfied. + RangeNotSatisfied, } impl ErrorKind { @@ -111,6 +115,7 @@ impl From for &'static str { ErrorKind::RateLimited => "RateLimited", ErrorKind::IsSameFile => "IsSameFile", ErrorKind::ConditionNotMatch => "ConditionNotMatch", + ErrorKind::RangeNotSatisfied => "RangeNotSatisfied", } } } diff --git a/core/src/types/operator/blocking_operator.rs b/core/src/types/operator/blocking_operator.rs index 4c6e4808ae04..a3ab195b991b 100644 --- a/core/src/types/operator/blocking_operator.rs +++ b/core/src/types/operator/blocking_operator.rs @@ -17,6 +17,7 @@ use bytes::Buf; use bytes::Bytes; +use std::sync::Arc; use super::operator_functions::*; use crate::raw::*; @@ -396,7 +397,8 @@ impl BlockingOperator { ); } - let r = BlockingReader::create(inner, &path, args)?; + let path = Arc::new(path); + let r = BlockingReader::create(inner, path, args)?; let buf = r.read(range.to_range())?; Ok(buf) }, @@ -454,7 +456,8 @@ impl BlockingOperator { ); } - BlockingReader::create(inner.clone(), &path, args) + let path = Arc::new(path); + BlockingReader::create(inner.clone(), path, args) }, )) } diff --git a/core/src/types/read/reader.rs b/core/src/types/read/reader.rs index e779cbb46541..867d09e6b797 100644 --- a/core/src/types/read/reader.rs +++ b/core/src/types/read/reader.rs @@ -110,10 +110,6 @@ impl Reader { /// /// This operation is zero-copy, which means it keeps the [`Bytes`] returned by underlying /// storage services without any extra copy or intensive memory allocations. - /// - /// # Notes - /// - /// - Buffer length smaller than range means we have reached the end of file. pub async fn read(&self, range: impl RangeBounds) -> Result { let bufs: Vec<_> = self.clone().into_stream(range).await?.try_collect().await?; Ok(bufs.into_iter().flatten().collect()) @@ -123,10 +119,6 @@ impl Reader { /// /// This operation will copy and write bytes into given [`BufMut`]. Allocation happens while /// [`BufMut`] doesn't have enough space. - /// - /// # Notes - /// - /// - Returning length smaller than range means we have reached the end of file. pub async fn read_into( &self, buf: &mut impl BufMut, diff --git a/core/tests/behavior/async_read.rs b/core/tests/behavior/async_read.rs index 75b9e9961ff4..53e6f29a6102 100644 --- a/core/tests/behavior/async_read.rs +++ b/core/tests/behavior/async_read.rs @@ -36,7 +36,6 @@ pub fn tests(op: &Operator, tests: &mut Vec) { op, test_read_full, test_read_range, - test_read_large_range, test_reader, test_read_not_exist, test_read_with_if_match, @@ -109,34 +108,6 @@ pub async fn test_read_range(op: Operator) -> anyhow::Result<()> { Ok(()) } -/// Read large range content should match. -pub async fn test_read_large_range(op: Operator) -> anyhow::Result<()> { - let (path, content, size) = TEST_FIXTURE.new_file(op.clone()); - let (offset, _) = gen_offset_length(size); - - op.write(&path, content.clone()) - .await - .expect("write must succeed"); - - let bs = op - .read_with(&path) - .range(offset..u32::MAX as u64) - .await? - .to_bytes(); - assert_eq!( - bs.len() as u64, - size as u64 - offset, - "read size with large range" - ); - assert_eq!( - format!("{:x}", Sha256::digest(&bs)), - format!("{:x}", Sha256::digest(&content[offset as usize..])), - "read content with large range" - ); - - Ok(()) -} - /// Read full content should match. pub async fn test_reader(op: Operator) -> anyhow::Result<()> { let (path, content, size) = TEST_FIXTURE.new_file(op.clone()); diff --git a/core/tests/behavior/blocking_read.rs b/core/tests/behavior/blocking_read.rs index b44864267125..3eb09aed045e 100644 --- a/core/tests/behavior/blocking_read.rs +++ b/core/tests/behavior/blocking_read.rs @@ -30,7 +30,6 @@ pub fn tests(op: &Operator, tests: &mut Vec) { op, test_blocking_read_full, test_blocking_read_range, - test_blocking_read_large_range, test_blocking_read_not_exist )) } @@ -98,36 +97,6 @@ pub fn test_blocking_read_range(op: BlockingOperator) -> Result<()> { Ok(()) } -/// Read large range content should match. -pub fn test_blocking_read_large_range(op: BlockingOperator) -> Result<()> { - let path = uuid::Uuid::new_v4().to_string(); - debug!("Generate a random file: {}", &path); - let (content, size) = gen_bytes(op.info().full_capability()); - let (offset, _) = gen_offset_length(size); - - op.write(&path, content.clone()) - .expect("write must succeed"); - - let bs = op - .read_with(&path) - .range(offset..u32::MAX as u64) - .call()? - .to_bytes(); - assert_eq!( - bs.len() as u64, - size as u64 - offset, - "read size with large range" - ); - assert_eq!( - format!("{:x}", Sha256::digest(&bs)), - format!("{:x}", Sha256::digest(&content[offset as usize..])), - "read content with large range" - ); - - op.delete(&path).expect("delete must succeed"); - Ok(()) -} - /// Read not exist file should return NotFound pub fn test_blocking_read_not_exist(op: BlockingOperator) -> Result<()> { let path = uuid::Uuid::new_v4().to_string();