From cb882aa2559c16295f495b84dc79d19f0fcac3a8 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Sat, 27 Apr 2024 16:18:10 +0800 Subject: [PATCH] feat(core/types): Implement concurrent read for blocking read --- core/Cargo.lock | 1 + core/Cargo.toml | 1 + .../types/blocking_read/blocking_reader.rs | 161 +++++++++++++----- .../types/blocking_read/buffer_iterator.rs | 118 +++++++++++++ core/src/types/blocking_read/mod.rs | 1 + core/src/types/operator/blocking_operator.rs | 20 ++- core/src/types/operator/operator_functions.rs | 84 +++++++-- 7 files changed, 327 insertions(+), 59 deletions(-) create mode 100644 core/src/types/blocking_read/buffer_iterator.rs diff --git a/core/Cargo.lock b/core/Cargo.lock index 33087f04034b..07bc633c9fbe 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -4802,6 +4802,7 @@ dependencies = [ "quick-xml", "r2d2", "rand 0.8.5", + "rayon", "redb", "redis", "reqsign", diff --git a/core/Cargo.toml b/core/Cargo.toml index 701969629b8c..0c741a3a7975 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -351,6 +351,7 @@ tracing = { version = "0.1", optional = true } # for layers-dtrace probe = { version = "0.5.1", optional = true } crc32c = "0.6.5" +rayon = "1.10.0" [target.'cfg(target_arch = "wasm32")'.dependencies] getrandom = { version = "0.2", features = ["js"] } diff --git a/core/src/types/blocking_read/blocking_reader.rs b/core/src/types/blocking_read/blocking_reader.rs index e87528fceabf..d1fe0915b747 100644 --- a/core/src/types/blocking_read/blocking_reader.rs +++ b/core/src/types/blocking_read/blocking_reader.rs @@ -18,17 +18,20 @@ use std::collections::Bound; use std::ops::Range; use std::ops::RangeBounds; +use std::sync::Arc; -use bytes::Buf; use bytes::BufMut; use crate::raw::*; use crate::*; +use super::buffer_iterator::BufferIterator; + /// BlockingReader is designed to read data from given path in an blocking /// manner. pub struct BlockingReader { pub(crate) inner: oio::BlockingReader, + options: OpReader, } impl BlockingReader { @@ -39,10 +42,18 @@ impl BlockingReader { /// /// We don't want to expose those details to users so keep this function /// in crate only. - pub(crate) fn create(acc: Accessor, path: &str, op: OpRead) -> crate::Result { + pub(crate) fn create( + acc: Accessor, + path: &str, + op: OpRead, + options: OpReader, + ) -> crate::Result { let (_, r) = acc.blocking_read(path, op)?; - Ok(BlockingReader { inner: r }) + Ok(BlockingReader { + inner: Arc::new(r), + options, + }) } /// Read give range from reader into [`Buffer`]. @@ -73,24 +84,13 @@ impl BlockingReader { } } - let mut bufs = Vec::new(); - let mut offset = start; - - loop { - // TODO: use service preferred io size instead. - let limit = end.map(|end| end - offset).unwrap_or(4 * 1024 * 1024) as usize; - let bs = self.inner.read_at(offset, limit)?; - let n = bs.remaining(); - bufs.push(bs); - if n < limit { - return Ok(bufs.into_iter().flatten().collect()); - } + let iter = BufferIterator::new(self.inner.clone(), self.options.clone(), start, end); - offset += n as u64; - if Some(offset) == end { - return Ok(bufs.into_iter().flatten().collect()); - } - } + Ok(iter + .collect::>>()? + .into_iter() + .flatten() + .collect()) } /// @@ -120,25 +120,17 @@ impl BlockingReader { } } - let mut offset = start; - let mut read = 0; + let iter = BufferIterator::new(self.inner.clone(), self.options.clone(), start, end); - loop { - // TODO: use service preferred io size instead. - let limit = end.map(|end| end - offset).unwrap_or(4 * 1024 * 1024) as usize; - let bs = self.inner.read_at(offset, limit)?; - let n = bs.remaining(); - buf.put(bs); - read += n as u64; - if n < limit { - return Ok(read as _); - } + let bufs: Result> = iter.collect(); - offset += n as u64; - if Some(offset) == end { - return Ok(read as _); - } + let mut read = 0; + for bs in bufs? { + read += bs.len(); + buf.put(bs); } + + Ok(read) } /// Convert reader into [`StdReader`] which implements [`futures::AsyncRead`], @@ -146,12 +138,105 @@ impl BlockingReader { #[inline] pub fn into_std_read(self, range: Range) -> StdReader { // TODO: the capacity should be decided by services. - StdReader::new(self.inner, range) + StdReader::new(self.inner.clone(), range) } /// Convert reader into [`StdBytesIterator`] which implements [`Iterator`]. #[inline] pub fn into_bytes_iterator(self, range: Range) -> StdBytesIterator { - StdBytesIterator::new(self.inner, range) + StdBytesIterator::new(self.inner.clone(), range) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rand::rngs::ThreadRng; + use rand::Rng; + use rand::RngCore; + + fn gen_random_bytes() -> Vec { + let mut rng = ThreadRng::default(); + // Generate size between 1B..16MB. + let size = rng.gen_range(1..16 * 1024 * 1024); + let mut content = vec![0; size]; + rng.fill_bytes(&mut content); + content + } + + #[test] + fn test_blocking_reader_read() { + let op = Operator::new(services::Memory::default()) + .unwrap() + .finish() + .blocking(); + let path = "test_file"; + + let content = gen_random_bytes(); + op.write(path, content.clone()).expect("write must succeed"); + + let reader = op.reader(path).unwrap(); + let buf = reader.read(..).expect("read to end must succeed"); + + assert_eq!(buf.to_bytes(), content); + } + + #[test] + fn test_reader_read_with_chunk() { + let op = Operator::new(services::Memory::default()) + .unwrap() + .finish() + .blocking(); + let path = "test_file"; + + let content = gen_random_bytes(); + op.write(path, content.clone()).expect("write must succeed"); + + let reader = op.reader_with(path).chunk(16).call().unwrap(); + let buf = reader.read(..).expect("read to end must succeed"); + + assert_eq!(buf.to_bytes(), content); + } + + #[test] + fn test_reader_read_with_concurrent() { + let op = Operator::new(services::Memory::default()) + .unwrap() + .finish() + .blocking(); + let path = "test_file"; + + let content = gen_random_bytes(); + op.write(path, content.clone()).expect("write must succeed"); + + let reader = op + .reader_with(path) + .chunk(128) + .concurrent(16) + .call() + .unwrap(); + let buf = reader.read(..).expect("read to end must succeed"); + + assert_eq!(buf.to_bytes(), content); + } + + #[test] + fn test_reader_read_into() { + let op = Operator::new(services::Memory::default()) + .unwrap() + .finish() + .blocking(); + let path = "test_file"; + + let content = gen_random_bytes(); + op.write(path, content.clone()).expect("write must succeed"); + + let reader = op.reader(path).unwrap(); + let mut buf = Vec::new(); + reader + .read_into(&mut buf, ..) + .expect("read to end must succeed"); + + assert_eq!(buf, content); } } diff --git a/core/src/types/blocking_read/buffer_iterator.rs b/core/src/types/blocking_read/buffer_iterator.rs new file mode 100644 index 000000000000..536c2834ee81 --- /dev/null +++ b/core/src/types/blocking_read/buffer_iterator.rs @@ -0,0 +1,118 @@ +use std::ops::Range; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +use bytes::Buf; +use rayon::prelude::*; + +use crate::raw::*; +use crate::*; + +pub(super) struct BufferIterator { + inner: Arc, + it: RangeIterator, + + chunk: usize, + concurrent: usize, +} + +impl BufferIterator { + pub fn new( + inner: Arc, + options: OpReader, + offset: u64, + end: Option, + ) -> Self { + let chunk = options.chunk().unwrap_or(4 * 1024 * 1024); + let it = RangeIterator { + offset, + chunk: chunk as u64, + end, + finished: Arc::new(AtomicBool::new(false)), + }; + Self { + inner, + it, + chunk, + concurrent: options.concurrent(), + } + } +} + +impl Iterator for BufferIterator { + type Item = Result; + + fn next(&mut self) -> Option { + let mut bufs = Vec::with_capacity(self.concurrent); + + let intervals: Vec> = (0..self.concurrent) + .filter_map(|_| self.it.next()) + .collect(); + + if intervals.is_empty() { + return None; + } + + let results: Vec> = intervals + .into_par_iter() + .map(|range| -> Result<(usize, Buffer)> { + let limit = (range.end - range.start) as usize; + + let bs = self.inner.read_at(range.start, limit)?; + let n = bs.remaining(); + + Ok((n, bs)) + }) + .collect(); + for result in results { + match result { + Ok((n, buf)) => { + bufs.push(buf); + if n < self.chunk { + self.it.finished(); + return Some(Ok(bufs.into_iter().flatten().collect())); + } + } + Err(err) => return Some(Err(err)), + } + } + + Some(Ok(bufs.into_iter().flatten().collect())) + } +} + +pub(super) struct RangeIterator { + offset: u64, + chunk: u64, + end: Option, + finished: Arc, +} + +impl RangeIterator { + fn finished(&mut self) { + self.finished.store(true, Ordering::Relaxed); + } +} + +impl Iterator for RangeIterator { + type Item = Range; + + fn next(&mut self) -> Option { + if self.finished.load(Ordering::Relaxed) { + return None; + } + + let offset = self.offset; + if let Some(end) = self.end { + if self.offset >= end { + return None; + } + if self.offset + self.chunk > end { + self.finished(); + return Some(offset..end); + } + } + self.offset += self.chunk; + Some(offset..offset + self.chunk) + } +} diff --git a/core/src/types/blocking_read/mod.rs b/core/src/types/blocking_read/mod.rs index ff7631cd99eb..2f9178190375 100644 --- a/core/src/types/blocking_read/mod.rs +++ b/core/src/types/blocking_read/mod.rs @@ -23,3 +23,4 @@ mod std_bytes_iterator; pub use std_bytes_iterator::StdBytesIterator; mod std_reader; pub use std_reader::StdReader; +mod buffer_iterator; diff --git a/core/src/types/operator/blocking_operator.rs b/core/src/types/operator/blocking_operator.rs index 4c6e4808ae04..e48e79b09266 100644 --- a/core/src/types/operator/blocking_operator.rs +++ b/core/src/types/operator/blocking_operator.rs @@ -385,8 +385,12 @@ impl BlockingOperator { FunctionRead(OperatorFunction::new( self.inner().clone(), path, - (OpRead::default(), BytesRange::default()), - |inner, path, (args, range)| { + ( + OpRead::default(), + BytesRange::default(), + OpReader::default(), + ), + |inner, path, (args, range, options)| { if !validate_path(&path, EntryMode::FILE) { return Err( Error::new(ErrorKind::IsADirectory, "read path is a directory") @@ -396,7 +400,7 @@ impl BlockingOperator { ); } - let r = BlockingReader::create(inner, &path, args)?; + let r = BlockingReader::create(inner, &path, args, options)?; let buf = r.read(range.to_range())?; Ok(buf) }, @@ -443,8 +447,12 @@ impl BlockingOperator { FunctionReader(OperatorFunction::new( self.inner().clone(), path, - OpRead::default(), - |inner, path, args| { + ( + OpRead::default(), + BytesRange::default(), + OpReader::default(), + ), + |inner, path, (args, _, options)| { if !validate_path(&path, EntryMode::FILE) { return Err( Error::new(ErrorKind::IsADirectory, "reader path is a directory") @@ -454,7 +462,7 @@ impl BlockingOperator { ); } - BlockingReader::create(inner.clone(), &path, args) + BlockingReader::create(inner.clone(), &path, args, options) }, )) } diff --git a/core/src/types/operator/operator_functions.rs b/core/src/types/operator/operator_functions.rs index dbb7200f0f63..c30cb81f9bd5 100644 --- a/core/src/types/operator/operator_functions.rs +++ b/core/src/types/operator/operator_functions.rs @@ -320,12 +320,14 @@ impl FunctionLister { /// Function that generated by [`BlockingOperator::read_with`]. /// /// Users can add more options by public functions provided by this struct. -pub struct FunctionRead(pub(crate) OperatorFunction<(OpRead, BytesRange), Buffer>); +pub struct FunctionRead(pub(crate) OperatorFunction<(OpRead, BytesRange, OpReader), Buffer>); impl FunctionRead { /// Set the range for this operation. pub fn range(mut self, range: impl RangeBounds) -> Self { - self.0 = self.0.map_args(|(args, _)| (args, range.into())); + self.0 = self + .0 + .map_args(|(args, _, options)| (args, range.into(), options)); self } @@ -334,53 +336,89 @@ impl FunctionRead { pub fn call(self) -> Result { self.0.call() } + + /// Set the concurrent read task amount. + pub fn concurrent(mut self, concurrent: usize) -> Self { + self.0 = self + .0 + .map_args(|(args, range, options)| (args, range, options.with_concurrent(concurrent))); + self + } + + /// Set the chunk size for this reader. + pub fn chunk(mut self, chunk_size: usize) -> Self { + self.0 = self + .0 + .map_args(|(args, range, options)| (args, range, options.with_chunk(chunk_size))); + self + } } /// Function that generated by [`BlockingOperator::reader_with`]. /// /// Users can add more options by public functions provided by this struct. -pub struct FunctionReader(pub(crate) OperatorFunction); +pub struct FunctionReader( + pub(crate) OperatorFunction<(OpRead, BytesRange, OpReader), BlockingReader>, +); impl FunctionReader { /// Sets the content-disposition header that should be send back by the remote read operation. pub fn override_content_disposition(mut self, content_disposition: &str) -> Self { - self.0 = self - .0 - .map_args(|args| args.with_override_content_disposition(content_disposition)); + self.0 = self.0.map_args(|(args, range, options)| { + ( + args.with_override_content_disposition(content_disposition), + range, + options, + ) + }); self } /// Sets the cache-control header that should be send back by the remote read operation. pub fn override_cache_control(mut self, cache_control: &str) -> Self { - self.0 = self - .0 - .map_args(|args| args.with_override_cache_control(cache_control)); + self.0 = self.0.map_args(|(args, range, options)| { + ( + args.with_override_cache_control(cache_control), + range, + options, + ) + }); self } /// Sets the content-type header that should be send back by the remote read operation. pub fn override_content_type(mut self, content_type: &str) -> Self { - self.0 = self - .0 - .map_args(|args| args.with_override_content_type(content_type)); + self.0 = self.0.map_args(|(args, range, options)| { + ( + args.with_override_content_type(content_type), + range, + options, + ) + }); self } /// Set the If-Match for this operation. pub fn if_match(mut self, v: &str) -> Self { - self.0 = self.0.map_args(|args| args.with_if_match(v)); + self.0 = self + .0 + .map_args(|(args, range, options)| (args.with_if_match(v), range, options)); self } /// Set the If-None-Match for this operation. pub fn if_none_match(mut self, v: &str) -> Self { - self.0 = self.0.map_args(|args| args.with_if_none_match(v)); + self.0 = self + .0 + .map_args(|(args, range, options)| (args.with_if_none_match(v), range, options)); self } /// Set the version for this operation. pub fn version(mut self, v: &str) -> Self { - self.0 = self.0.map_args(|args| args.with_version(v)); + self.0 = self + .0 + .map_args(|(args, range, options)| (args.with_version(v), range, options)); self } @@ -389,6 +427,22 @@ impl FunctionReader { pub fn call(self) -> Result { self.0.call() } + + /// Set the concurrent read task amount. + pub fn concurrent(mut self, concurrent: usize) -> Self { + self.0 = self + .0 + .map_args(|(args, range, options)| (args, range, options.with_concurrent(concurrent))); + self + } + + /// Set the chunk size for this reader. + pub fn chunk(mut self, chunk_size: usize) -> Self { + self.0 = self + .0 + .map_args(|(args, range, options)| (args, range, options.with_chunk(chunk_size))); + self + } } /// Function that generated by [`BlockingOperator::stat_with`].