diff --git a/core/Cargo.lock b/core/Cargo.lock index 33087f04034b..07bc633c9fbe 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -4802,6 +4802,7 @@ dependencies = [ "quick-xml", "r2d2", "rand 0.8.5", + "rayon", "redb", "redis", "reqsign", diff --git a/core/Cargo.toml b/core/Cargo.toml index 701969629b8c..0c741a3a7975 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -351,6 +351,7 @@ tracing = { version = "0.1", optional = true } # for layers-dtrace probe = { version = "0.5.1", optional = true } crc32c = "0.6.5" +rayon = "1.10.0" [target.'cfg(target_arch = "wasm32")'.dependencies] getrandom = { version = "0.2", features = ["js"] } diff --git a/core/src/types/blocking_read/blocking_reader.rs b/core/src/types/blocking_read/blocking_reader.rs index e87528fceabf..07ee870fe6fd 100644 --- a/core/src/types/blocking_read/blocking_reader.rs +++ b/core/src/types/blocking_read/blocking_reader.rs @@ -18,17 +18,20 @@ use std::collections::Bound; use std::ops::Range; use std::ops::RangeBounds; +use std::sync::Arc; -use bytes::Buf; use bytes::BufMut; use crate::raw::*; use crate::*; +use super::buffer_iterator::BufferIterator; + /// BlockingReader is designed to read data from given path in an blocking /// manner. pub struct BlockingReader { - pub(crate) inner: oio::BlockingReader, + pub(crate) inner: Arc, + options: OpReader, } impl BlockingReader { @@ -39,10 +42,18 @@ impl BlockingReader { /// /// We don't want to expose those details to users so keep this function /// in crate only. - pub(crate) fn create(acc: Accessor, path: &str, op: OpRead) -> crate::Result { + pub(crate) fn create( + acc: FusedAccessor, + path: &str, + op: OpRead, + options: OpReader, + ) -> crate::Result { let (_, r) = acc.blocking_read(path, op)?; - Ok(BlockingReader { inner: r }) + Ok(BlockingReader { + inner: Arc::new(r), + options, + }) } /// Read give range from reader into [`Buffer`]. @@ -73,24 +84,13 @@ impl BlockingReader { } } - let mut bufs = Vec::new(); - let mut offset = start; - - loop { - // TODO: use service preferred io size instead. - let limit = end.map(|end| end - offset).unwrap_or(4 * 1024 * 1024) as usize; - let bs = self.inner.read_at(offset, limit)?; - let n = bs.remaining(); - bufs.push(bs); - if n < limit { - return Ok(bufs.into_iter().flatten().collect()); - } + let iter = BufferIterator::new(self.inner.clone(), self.options.clone(), start, end); - offset += n as u64; - if Some(offset) == end { - return Ok(bufs.into_iter().flatten().collect()); - } - } + Ok(iter + .collect::>>()? + .into_iter() + .flatten() + .collect()) } /// @@ -120,25 +120,17 @@ impl BlockingReader { } } - let mut offset = start; - let mut read = 0; + let iter = BufferIterator::new(self.inner.clone(), self.options.clone(), start, end); - loop { - // TODO: use service preferred io size instead. - let limit = end.map(|end| end - offset).unwrap_or(4 * 1024 * 1024) as usize; - let bs = self.inner.read_at(offset, limit)?; - let n = bs.remaining(); - buf.put(bs); - read += n as u64; - if n < limit { - return Ok(read as _); - } + let bufs: Result> = iter.collect(); - offset += n as u64; - if Some(offset) == end { - return Ok(read as _); - } + let mut read = 0; + for bs in bufs? { + read += bs.len(); + buf.put(bs); } + + Ok(read) } /// Convert reader into [`StdReader`] which implements [`futures::AsyncRead`], @@ -146,12 +138,105 @@ impl BlockingReader { #[inline] pub fn into_std_read(self, range: Range) -> StdReader { // TODO: the capacity should be decided by services. - StdReader::new(self.inner, range) + StdReader::new(self.inner.clone(), range) } /// Convert reader into [`StdBytesIterator`] which implements [`Iterator`]. #[inline] pub fn into_bytes_iterator(self, range: Range) -> StdBytesIterator { - StdBytesIterator::new(self.inner, range) + StdBytesIterator::new(self.inner.clone(), range) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rand::rngs::ThreadRng; + use rand::Rng; + use rand::RngCore; + + fn gen_random_bytes() -> Vec { + let mut rng = ThreadRng::default(); + // Generate size between 1B..16MB. + let size = rng.gen_range(1..16 * 1024 * 1024); + let mut content = vec![0; size]; + rng.fill_bytes(&mut content); + content + } + + #[test] + fn test_blocking_reader_read() { + let op = Operator::new(services::Memory::default()) + .unwrap() + .finish() + .blocking(); + let path = "test_file"; + + let content = gen_random_bytes(); + op.write(path, content.clone()).expect("write must succeed"); + + let reader = op.reader(path).unwrap(); + let buf = reader.read(..).expect("read to end must succeed"); + + assert_eq!(buf.to_bytes(), content); + } + + #[test] + fn test_reader_read_with_chunk() { + let op = Operator::new(services::Memory::default()) + .unwrap() + .finish() + .blocking(); + let path = "test_file"; + + let content = gen_random_bytes(); + op.write(path, content.clone()).expect("write must succeed"); + + let reader = op.reader_with(path).chunk(16).call().unwrap(); + let buf = reader.read(..).expect("read to end must succeed"); + + assert_eq!(buf.to_bytes(), content); + } + + #[test] + fn test_reader_read_with_concurrent() { + let op = Operator::new(services::Memory::default()) + .unwrap() + .finish() + .blocking(); + let path = "test_file"; + + let content = gen_random_bytes(); + op.write(path, content.clone()).expect("write must succeed"); + + let reader = op + .reader_with(path) + .chunk(128) + .concurrent(16) + .call() + .unwrap(); + let buf = reader.read(..).expect("read to end must succeed"); + + assert_eq!(buf.to_bytes(), content); + } + + #[test] + fn test_reader_read_into() { + let op = Operator::new(services::Memory::default()) + .unwrap() + .finish() + .blocking(); + let path = "test_file"; + + let content = gen_random_bytes(); + op.write(path, content.clone()).expect("write must succeed"); + + let reader = op.reader(path).unwrap(); + let mut buf = Vec::new(); + reader + .read_into(&mut buf, ..) + .expect("read to end must succeed"); + + assert_eq!(buf, content); } } diff --git a/core/src/types/blocking_read/buffer_iterator.rs b/core/src/types/blocking_read/buffer_iterator.rs new file mode 100644 index 000000000000..c97156d1c5a5 --- /dev/null +++ b/core/src/types/blocking_read/buffer_iterator.rs @@ -0,0 +1,114 @@ +use std::ops::Range; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +use bytes::Buf; +use rayon::prelude::*; + +use crate::raw::*; +use crate::*; + +pub(super) struct BufferIterator { + inner: Arc, + it: RangeIterator, + + chunk: usize, + end: Option, + concurrent: usize, + finished: Arc, +} + +impl BufferIterator { + pub fn new( + inner: Arc, + options: OpReader, + offset: u64, + end: Option, + ) -> Self { + let chunk = options.chunk().unwrap_or(4 * 1024 * 1024); + let it = RangeIterator { + offset, + chunk: chunk as u64, + }; + Self { + inner, + it, + chunk, + end, + concurrent: options.concurrent(), + finished: Arc::new(AtomicBool::new(false)), + } + } +} + +impl Iterator for BufferIterator { + type Item = Result; + + fn next(&mut self) -> Option { + if self.finished.load(Ordering::Relaxed) { + return None; + } + + let mut bufs = Vec::with_capacity(self.concurrent); + + let intervals: Vec> = (0..self.concurrent) + .map(|_| { + let range = self.it.next().unwrap_or(Range { + start: u64::MAX, + end: u64::MAX, + }); + if let Some(end) = self.end { + if range.start + range.end > end { + return Range { + start: range.start, + end, + }; + } + } + range + }) + .filter(|range| range.start < range.end) + .collect(); + + let results: Vec> = intervals + .into_par_iter() + .map(|range| -> Result<(usize, Buffer)> { + let limit = (range.end - range.start) as usize; + + let bs = self.inner.read_at(range.start, limit)?; + let n = bs.remaining(); + + Ok((n, bs)) + }) + .collect(); + for result in results { + match result { + Ok((n, buf)) => { + bufs.push(buf); + if n < self.chunk { + self.finished.store(true, Ordering::Relaxed); + return Some(Ok(bufs.into_iter().flatten().collect())); + } + } + Err(err) => return Some(Err(err)), + } + } + + Some(Ok(bufs.into_iter().flatten().collect())) + } +} + +pub(super) struct RangeIterator { + offset: u64, + chunk: u64, +} + +impl Iterator for RangeIterator { + type Item = Range; + + fn next(&mut self) -> Option { + let offset = self.offset; + self.offset += self.chunk; + Some(offset..offset + self.chunk) + } +} diff --git a/core/src/types/blocking_read/mod.rs b/core/src/types/blocking_read/mod.rs index ff7631cd99eb..2f9178190375 100644 --- a/core/src/types/blocking_read/mod.rs +++ b/core/src/types/blocking_read/mod.rs @@ -23,3 +23,4 @@ mod std_bytes_iterator; pub use std_bytes_iterator::StdBytesIterator; mod std_reader; pub use std_reader::StdReader; +mod buffer_iterator; diff --git a/core/src/types/blocking_read/std_bytes_iterator.rs b/core/src/types/blocking_read/std_bytes_iterator.rs index dba05ae734f2..dc9e75a5a00a 100644 --- a/core/src/types/blocking_read/std_bytes_iterator.rs +++ b/core/src/types/blocking_read/std_bytes_iterator.rs @@ -16,6 +16,7 @@ // under the License. use std::io; +use std::sync::Arc; use bytes::Buf; use bytes::Bytes; @@ -28,7 +29,7 @@ use crate::raw::*; /// /// StdIterator also implements [`Send`] and [`Sync`]. pub struct StdBytesIterator { - inner: oio::BlockingReader, + inner: Arc, offset: u64, size: u64, cap: usize, @@ -39,7 +40,7 @@ pub struct StdBytesIterator { impl StdBytesIterator { /// NOTE: don't allow users to create StdIterator directly. #[inline] - pub(crate) fn new(r: oio::BlockingReader, range: std::ops::Range) -> Self { + pub(crate) fn new(r: Arc, range: std::ops::Range) -> Self { StdBytesIterator { inner: r, offset: range.start, diff --git a/core/src/types/blocking_read/std_reader.rs b/core/src/types/blocking_read/std_reader.rs index 191eeb1fbbc3..b7d4f439b3e5 100644 --- a/core/src/types/blocking_read/std_reader.rs +++ b/core/src/types/blocking_read/std_reader.rs @@ -21,6 +21,7 @@ use std::io::Read; use std::io::Seek; use std::io::SeekFrom; use std::ops::Range; +use std::sync::Arc; use bytes::Buf; @@ -33,7 +34,7 @@ use crate::*; /// /// StdReader also implements [`Send`] and [`Sync`]. pub struct StdReader { - inner: oio::BlockingReader, + inner: Arc, offset: u64, size: u64, cap: usize, @@ -45,7 +46,7 @@ pub struct StdReader { impl StdReader { /// NOTE: don't allow users to create StdReader directly. #[inline] - pub(super) fn new(r: oio::BlockingReader, range: Range) -> Self { + pub(super) fn new(r: Arc, range: Range) -> Self { StdReader { inner: r, offset: range.start, diff --git a/core/src/types/operator/blocking_operator.rs b/core/src/types/operator/blocking_operator.rs index 4c6e4808ae04..e48e79b09266 100644 --- a/core/src/types/operator/blocking_operator.rs +++ b/core/src/types/operator/blocking_operator.rs @@ -385,8 +385,12 @@ impl BlockingOperator { FunctionRead(OperatorFunction::new( self.inner().clone(), path, - (OpRead::default(), BytesRange::default()), - |inner, path, (args, range)| { + ( + OpRead::default(), + BytesRange::default(), + OpReader::default(), + ), + |inner, path, (args, range, options)| { if !validate_path(&path, EntryMode::FILE) { return Err( Error::new(ErrorKind::IsADirectory, "read path is a directory") @@ -396,7 +400,7 @@ impl BlockingOperator { ); } - let r = BlockingReader::create(inner, &path, args)?; + let r = BlockingReader::create(inner, &path, args, options)?; let buf = r.read(range.to_range())?; Ok(buf) }, @@ -443,8 +447,12 @@ impl BlockingOperator { FunctionReader(OperatorFunction::new( self.inner().clone(), path, - OpRead::default(), - |inner, path, args| { + ( + OpRead::default(), + BytesRange::default(), + OpReader::default(), + ), + |inner, path, (args, _, options)| { if !validate_path(&path, EntryMode::FILE) { return Err( Error::new(ErrorKind::IsADirectory, "reader path is a directory") @@ -454,7 +462,7 @@ impl BlockingOperator { ); } - BlockingReader::create(inner.clone(), &path, args) + BlockingReader::create(inner.clone(), &path, args, options) }, )) } diff --git a/core/src/types/operator/operator_functions.rs b/core/src/types/operator/operator_functions.rs index dbb7200f0f63..c30cb81f9bd5 100644 --- a/core/src/types/operator/operator_functions.rs +++ b/core/src/types/operator/operator_functions.rs @@ -320,12 +320,14 @@ impl FunctionLister { /// Function that generated by [`BlockingOperator::read_with`]. /// /// Users can add more options by public functions provided by this struct. -pub struct FunctionRead(pub(crate) OperatorFunction<(OpRead, BytesRange), Buffer>); +pub struct FunctionRead(pub(crate) OperatorFunction<(OpRead, BytesRange, OpReader), Buffer>); impl FunctionRead { /// Set the range for this operation. pub fn range(mut self, range: impl RangeBounds) -> Self { - self.0 = self.0.map_args(|(args, _)| (args, range.into())); + self.0 = self + .0 + .map_args(|(args, _, options)| (args, range.into(), options)); self } @@ -334,53 +336,89 @@ impl FunctionRead { pub fn call(self) -> Result { self.0.call() } + + /// Set the concurrent read task amount. + pub fn concurrent(mut self, concurrent: usize) -> Self { + self.0 = self + .0 + .map_args(|(args, range, options)| (args, range, options.with_concurrent(concurrent))); + self + } + + /// Set the chunk size for this reader. + pub fn chunk(mut self, chunk_size: usize) -> Self { + self.0 = self + .0 + .map_args(|(args, range, options)| (args, range, options.with_chunk(chunk_size))); + self + } } /// Function that generated by [`BlockingOperator::reader_with`]. /// /// Users can add more options by public functions provided by this struct. -pub struct FunctionReader(pub(crate) OperatorFunction); +pub struct FunctionReader( + pub(crate) OperatorFunction<(OpRead, BytesRange, OpReader), BlockingReader>, +); impl FunctionReader { /// Sets the content-disposition header that should be send back by the remote read operation. pub fn override_content_disposition(mut self, content_disposition: &str) -> Self { - self.0 = self - .0 - .map_args(|args| args.with_override_content_disposition(content_disposition)); + self.0 = self.0.map_args(|(args, range, options)| { + ( + args.with_override_content_disposition(content_disposition), + range, + options, + ) + }); self } /// Sets the cache-control header that should be send back by the remote read operation. pub fn override_cache_control(mut self, cache_control: &str) -> Self { - self.0 = self - .0 - .map_args(|args| args.with_override_cache_control(cache_control)); + self.0 = self.0.map_args(|(args, range, options)| { + ( + args.with_override_cache_control(cache_control), + range, + options, + ) + }); self } /// Sets the content-type header that should be send back by the remote read operation. pub fn override_content_type(mut self, content_type: &str) -> Self { - self.0 = self - .0 - .map_args(|args| args.with_override_content_type(content_type)); + self.0 = self.0.map_args(|(args, range, options)| { + ( + args.with_override_content_type(content_type), + range, + options, + ) + }); self } /// Set the If-Match for this operation. pub fn if_match(mut self, v: &str) -> Self { - self.0 = self.0.map_args(|args| args.with_if_match(v)); + self.0 = self + .0 + .map_args(|(args, range, options)| (args.with_if_match(v), range, options)); self } /// Set the If-None-Match for this operation. pub fn if_none_match(mut self, v: &str) -> Self { - self.0 = self.0.map_args(|args| args.with_if_none_match(v)); + self.0 = self + .0 + .map_args(|(args, range, options)| (args.with_if_none_match(v), range, options)); self } /// Set the version for this operation. pub fn version(mut self, v: &str) -> Self { - self.0 = self.0.map_args(|args| args.with_version(v)); + self.0 = self + .0 + .map_args(|(args, range, options)| (args.with_version(v), range, options)); self } @@ -389,6 +427,22 @@ impl FunctionReader { pub fn call(self) -> Result { self.0.call() } + + /// Set the concurrent read task amount. + pub fn concurrent(mut self, concurrent: usize) -> Self { + self.0 = self + .0 + .map_args(|(args, range, options)| (args, range, options.with_concurrent(concurrent))); + self + } + + /// Set the chunk size for this reader. + pub fn chunk(mut self, chunk_size: usize) -> Self { + self.0 = self + .0 + .map_args(|(args, range, options)| (args, range, options.with_chunk(chunk_size))); + self + } } /// Function that generated by [`BlockingOperator::stat_with`].