Skip to content

Commit

Permalink
feat(core/types): Implement concurrent read for blocking read
Browse files Browse the repository at this point in the history
  • Loading branch information
Xuanwo authored and hoslo committed May 7, 2024
1 parent 9d587ea commit 661ba73
Show file tree
Hide file tree
Showing 9 changed files with 330 additions and 64 deletions.
1 change: 1 addition & 0 deletions core/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ tracing = { version = "0.1", optional = true }
# for layers-dtrace
probe = { version = "0.5.1", optional = true }
crc32c = "0.6.5"
rayon = "1.10.0"

[target.'cfg(target_arch = "wasm32")'.dependencies]
getrandom = { version = "0.2", features = ["js"] }
Expand Down
163 changes: 124 additions & 39 deletions core/src/types/blocking_read/blocking_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,20 @@
use std::collections::Bound;
use std::ops::Range;
use std::ops::RangeBounds;
use std::sync::Arc;

use bytes::Buf;
use bytes::BufMut;

use crate::raw::*;
use crate::*;

use super::buffer_iterator::BufferIterator;

/// BlockingReader is designed to read data from given path in an blocking
/// manner.
pub struct BlockingReader {
pub(crate) inner: oio::BlockingReader,
pub(crate) inner: Arc<dyn oio::BlockingRead>,
options: OpReader,
}

impl BlockingReader {
Expand All @@ -39,10 +42,18 @@ impl BlockingReader {
///
/// We don't want to expose those details to users so keep this function
/// in crate only.
pub(crate) fn create(acc: Accessor, path: &str, op: OpRead) -> crate::Result<Self> {
pub(crate) fn create(
acc: FusedAccessor,
path: &str,
op: OpRead,
options: OpReader,
) -> crate::Result<Self> {
let (_, r) = acc.blocking_read(path, op)?;

Ok(BlockingReader { inner: r })
Ok(BlockingReader {
inner: Arc::new(r),
options,
})
}

/// Read give range from reader into [`Buffer`].
Expand Down Expand Up @@ -73,24 +84,13 @@ impl BlockingReader {
}
}

let mut bufs = Vec::new();
let mut offset = start;

loop {
// TODO: use service preferred io size instead.
let limit = end.map(|end| end - offset).unwrap_or(4 * 1024 * 1024) as usize;
let bs = self.inner.read_at(offset, limit)?;
let n = bs.remaining();
bufs.push(bs);
if n < limit {
return Ok(bufs.into_iter().flatten().collect());
}
let iter = BufferIterator::new(self.inner.clone(), self.options.clone(), start, end);

offset += n as u64;
if Some(offset) == end {
return Ok(bufs.into_iter().flatten().collect());
}
}
Ok(iter
.collect::<Result<Vec<Buffer>>>()?
.into_iter()
.flatten()
.collect())
}

///
Expand Down Expand Up @@ -120,38 +120,123 @@ impl BlockingReader {
}
}

let mut offset = start;
let mut read = 0;
let iter = BufferIterator::new(self.inner.clone(), self.options.clone(), start, end);

loop {
// TODO: use service preferred io size instead.
let limit = end.map(|end| end - offset).unwrap_or(4 * 1024 * 1024) as usize;
let bs = self.inner.read_at(offset, limit)?;
let n = bs.remaining();
buf.put(bs);
read += n as u64;
if n < limit {
return Ok(read as _);
}
let bufs: Result<Vec<Buffer>> = iter.collect();

offset += n as u64;
if Some(offset) == end {
return Ok(read as _);
}
let mut read = 0;
for bs in bufs? {
read += bs.len();
buf.put(bs);
}

Ok(read)
}

/// Convert reader into [`StdReader`] which implements [`futures::AsyncRead`],
/// [`futures::AsyncSeek`] and [`futures::AsyncBufRead`].
#[inline]
pub fn into_std_read(self, range: Range<u64>) -> StdReader {
// TODO: the capacity should be decided by services.
StdReader::new(self.inner, range)
StdReader::new(self.inner.clone(), range)
}

/// Convert reader into [`StdBytesIterator`] which implements [`Iterator`].
#[inline]
pub fn into_bytes_iterator(self, range: Range<u64>) -> StdBytesIterator {
StdBytesIterator::new(self.inner, range)
StdBytesIterator::new(self.inner.clone(), range)
}
}

#[cfg(test)]
mod tests {
use super::*;
use rand::rngs::ThreadRng;
use rand::Rng;
use rand::RngCore;

fn gen_random_bytes() -> Vec<u8> {
let mut rng = ThreadRng::default();
// Generate size between 1B..16MB.
let size = rng.gen_range(1..16 * 1024 * 1024);
let mut content = vec![0; size];
rng.fill_bytes(&mut content);
content
}

#[test]
fn test_blocking_reader_read() {
let op = Operator::new(services::Memory::default())
.unwrap()
.finish()
.blocking();
let path = "test_file";

let content = gen_random_bytes();
op.write(path, content.clone()).expect("write must succeed");

let reader = op.reader(path).unwrap();
let buf = reader.read(..).expect("read to end must succeed");

assert_eq!(buf.to_bytes(), content);
}

#[test]
fn test_reader_read_with_chunk() {
let op = Operator::new(services::Memory::default())
.unwrap()
.finish()
.blocking();
let path = "test_file";

let content = gen_random_bytes();
op.write(path, content.clone()).expect("write must succeed");

let reader = op.reader_with(path).chunk(16).call().unwrap();
let buf = reader.read(..).expect("read to end must succeed");

assert_eq!(buf.to_bytes(), content);
}

#[test]
fn test_reader_read_with_concurrent() {
let op = Operator::new(services::Memory::default())
.unwrap()
.finish()
.blocking();
let path = "test_file";

let content = gen_random_bytes();
op.write(path, content.clone()).expect("write must succeed");

let reader = op
.reader_with(path)
.chunk(128)
.concurrent(16)
.call()
.unwrap();
let buf = reader.read(..).expect("read to end must succeed");

assert_eq!(buf.to_bytes(), content);
}

#[test]
fn test_reader_read_into() {
let op = Operator::new(services::Memory::default())
.unwrap()
.finish()
.blocking();
let path = "test_file";

let content = gen_random_bytes();
op.write(path, content.clone()).expect("write must succeed");

let reader = op.reader(path).unwrap();
let mut buf = Vec::new();
reader
.read_into(&mut buf, ..)
.expect("read to end must succeed");

assert_eq!(buf, content);
}
}
114 changes: 114 additions & 0 deletions core/src/types/blocking_read/buffer_iterator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use std::ops::Range;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use bytes::Buf;
use rayon::prelude::*;

use crate::raw::*;
use crate::*;

pub(super) struct BufferIterator {
inner: Arc<dyn oio::BlockingRead>,
it: RangeIterator,

chunk: usize,
end: Option<u64>,
concurrent: usize,
finished: Arc<AtomicBool>,
}

impl BufferIterator {
pub fn new(
inner: Arc<dyn oio::BlockingRead>,
options: OpReader,
offset: u64,
end: Option<u64>,
) -> Self {
let chunk = options.chunk().unwrap_or(4 * 1024 * 1024);
let it = RangeIterator {
offset,
chunk: chunk as u64,
};
Self {
inner,
it,
chunk,
end,
concurrent: options.concurrent(),
finished: Arc::new(AtomicBool::new(false)),
}
}
}

impl Iterator for BufferIterator {
type Item = Result<Buffer>;

fn next(&mut self) -> Option<Self::Item> {
if self.finished.load(Ordering::Relaxed) {
return None;
}

let mut bufs = Vec::with_capacity(self.concurrent);

let intervals: Vec<Range<u64>> = (0..self.concurrent)
.map(|_| {
let range = self.it.next().unwrap_or(Range {
start: u64::MAX,
end: u64::MAX,
});
if let Some(end) = self.end {
if range.start + range.end > end {
return Range {
start: range.start,
end,
};
}
}
range
})
.filter(|range| range.start < range.end)
.collect();

let results: Vec<Result<(usize, Buffer)>> = intervals
.into_par_iter()
.map(|range| -> Result<(usize, Buffer)> {
let limit = (range.end - range.start) as usize;

let bs = self.inner.read_at(range.start, limit)?;
let n = bs.remaining();

Ok((n, bs))
})
.collect();
for result in results {
match result {
Ok((n, buf)) => {
bufs.push(buf);
if n < self.chunk {
self.finished.store(true, Ordering::Relaxed);
return Some(Ok(bufs.into_iter().flatten().collect()));
}
}
Err(err) => return Some(Err(err)),
}
}

Some(Ok(bufs.into_iter().flatten().collect()))
}
}

pub(super) struct RangeIterator {
offset: u64,
chunk: u64,
}

impl Iterator for RangeIterator {
type Item = Range<u64>;

fn next(&mut self) -> Option<Self::Item> {
let offset = self.offset;
self.offset += self.chunk;
Some(offset..offset + self.chunk)
}
}
1 change: 1 addition & 0 deletions core/src/types/blocking_read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ mod std_bytes_iterator;
pub use std_bytes_iterator::StdBytesIterator;
mod std_reader;
pub use std_reader::StdReader;
mod buffer_iterator;
5 changes: 3 additions & 2 deletions core/src/types/blocking_read/std_bytes_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use std::io;
use std::sync::Arc;

use bytes::Buf;
use bytes::Bytes;
Expand All @@ -28,7 +29,7 @@ use crate::raw::*;
///
/// StdIterator also implements [`Send`] and [`Sync`].
pub struct StdBytesIterator {
inner: oio::BlockingReader,
inner: Arc<dyn oio::BlockingRead>,
offset: u64,
size: u64,
cap: usize,
Expand All @@ -39,7 +40,7 @@ pub struct StdBytesIterator {
impl StdBytesIterator {
/// NOTE: don't allow users to create StdIterator directly.
#[inline]
pub(crate) fn new(r: oio::BlockingReader, range: std::ops::Range<u64>) -> Self {
pub(crate) fn new(r: Arc<dyn oio::BlockingRead>, range: std::ops::Range<u64>) -> Self {
StdBytesIterator {
inner: r,
offset: range.start,
Expand Down
Loading

0 comments on commit 661ba73

Please sign in to comment.