Skip to content

Commit

Permalink
feat(core/types): Implement concurrent read for blocking read
Browse files Browse the repository at this point in the history
  • Loading branch information
Xuanwo authored and hoslo committed May 7, 2024
1 parent 9d587ea commit cb882aa
Show file tree
Hide file tree
Showing 7 changed files with 327 additions and 59 deletions.
1 change: 1 addition & 0 deletions core/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ tracing = { version = "0.1", optional = true }
# for layers-dtrace
probe = { version = "0.5.1", optional = true }
crc32c = "0.6.5"
rayon = "1.10.0"

[target.'cfg(target_arch = "wasm32")'.dependencies]
getrandom = { version = "0.2", features = ["js"] }
Expand Down
161 changes: 123 additions & 38 deletions core/src/types/blocking_read/blocking_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,20 @@
use std::collections::Bound;
use std::ops::Range;
use std::ops::RangeBounds;
use std::sync::Arc;

use bytes::Buf;
use bytes::BufMut;

use crate::raw::*;
use crate::*;

use super::buffer_iterator::BufferIterator;

/// BlockingReader is designed to read data from given path in an blocking
/// manner.
pub struct BlockingReader {
pub(crate) inner: oio::BlockingReader,
options: OpReader,
}

impl BlockingReader {
Expand All @@ -39,10 +42,18 @@ impl BlockingReader {
///
/// We don't want to expose those details to users so keep this function
/// in crate only.
pub(crate) fn create(acc: Accessor, path: &str, op: OpRead) -> crate::Result<Self> {
pub(crate) fn create(
acc: Accessor,
path: &str,
op: OpRead,
options: OpReader,
) -> crate::Result<Self> {
let (_, r) = acc.blocking_read(path, op)?;

Ok(BlockingReader { inner: r })
Ok(BlockingReader {
inner: Arc::new(r),
options,
})
}

/// Read give range from reader into [`Buffer`].
Expand Down Expand Up @@ -73,24 +84,13 @@ impl BlockingReader {
}
}

let mut bufs = Vec::new();
let mut offset = start;

loop {
// TODO: use service preferred io size instead.
let limit = end.map(|end| end - offset).unwrap_or(4 * 1024 * 1024) as usize;
let bs = self.inner.read_at(offset, limit)?;
let n = bs.remaining();
bufs.push(bs);
if n < limit {
return Ok(bufs.into_iter().flatten().collect());
}
let iter = BufferIterator::new(self.inner.clone(), self.options.clone(), start, end);

offset += n as u64;
if Some(offset) == end {
return Ok(bufs.into_iter().flatten().collect());
}
}
Ok(iter
.collect::<Result<Vec<Buffer>>>()?
.into_iter()
.flatten()
.collect())
}

///
Expand Down Expand Up @@ -120,38 +120,123 @@ impl BlockingReader {
}
}

let mut offset = start;
let mut read = 0;
let iter = BufferIterator::new(self.inner.clone(), self.options.clone(), start, end);

loop {
// TODO: use service preferred io size instead.
let limit = end.map(|end| end - offset).unwrap_or(4 * 1024 * 1024) as usize;
let bs = self.inner.read_at(offset, limit)?;
let n = bs.remaining();
buf.put(bs);
read += n as u64;
if n < limit {
return Ok(read as _);
}
let bufs: Result<Vec<Buffer>> = iter.collect();

offset += n as u64;
if Some(offset) == end {
return Ok(read as _);
}
let mut read = 0;
for bs in bufs? {
read += bs.len();
buf.put(bs);
}

Ok(read)
}

/// Convert reader into [`StdReader`] which implements [`futures::AsyncRead`],
/// [`futures::AsyncSeek`] and [`futures::AsyncBufRead`].
#[inline]
pub fn into_std_read(self, range: Range<u64>) -> StdReader {
// TODO: the capacity should be decided by services.
StdReader::new(self.inner, range)
StdReader::new(self.inner.clone(), range)
}

/// Convert reader into [`StdBytesIterator`] which implements [`Iterator`].
#[inline]
pub fn into_bytes_iterator(self, range: Range<u64>) -> StdBytesIterator {
StdBytesIterator::new(self.inner, range)
StdBytesIterator::new(self.inner.clone(), range)
}
}

#[cfg(test)]
mod tests {
use super::*;
use rand::rngs::ThreadRng;
use rand::Rng;
use rand::RngCore;

fn gen_random_bytes() -> Vec<u8> {
let mut rng = ThreadRng::default();
// Generate size between 1B..16MB.
let size = rng.gen_range(1..16 * 1024 * 1024);
let mut content = vec![0; size];
rng.fill_bytes(&mut content);
content
}

#[test]
fn test_blocking_reader_read() {
let op = Operator::new(services::Memory::default())
.unwrap()
.finish()
.blocking();
let path = "test_file";

let content = gen_random_bytes();
op.write(path, content.clone()).expect("write must succeed");

let reader = op.reader(path).unwrap();
let buf = reader.read(..).expect("read to end must succeed");

assert_eq!(buf.to_bytes(), content);
}

#[test]
fn test_reader_read_with_chunk() {
let op = Operator::new(services::Memory::default())
.unwrap()
.finish()
.blocking();
let path = "test_file";

let content = gen_random_bytes();
op.write(path, content.clone()).expect("write must succeed");

let reader = op.reader_with(path).chunk(16).call().unwrap();
let buf = reader.read(..).expect("read to end must succeed");

assert_eq!(buf.to_bytes(), content);
}

#[test]
fn test_reader_read_with_concurrent() {
let op = Operator::new(services::Memory::default())
.unwrap()
.finish()
.blocking();
let path = "test_file";

let content = gen_random_bytes();
op.write(path, content.clone()).expect("write must succeed");

let reader = op
.reader_with(path)
.chunk(128)
.concurrent(16)
.call()
.unwrap();
let buf = reader.read(..).expect("read to end must succeed");

assert_eq!(buf.to_bytes(), content);
}

#[test]
fn test_reader_read_into() {
let op = Operator::new(services::Memory::default())
.unwrap()
.finish()
.blocking();
let path = "test_file";

let content = gen_random_bytes();
op.write(path, content.clone()).expect("write must succeed");

let reader = op.reader(path).unwrap();
let mut buf = Vec::new();
reader
.read_into(&mut buf, ..)
.expect("read to end must succeed");

assert_eq!(buf, content);
}
}
118 changes: 118 additions & 0 deletions core/src/types/blocking_read/buffer_iterator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
use std::ops::Range;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use bytes::Buf;
use rayon::prelude::*;

use crate::raw::*;
use crate::*;

pub(super) struct BufferIterator {
inner: Arc<dyn oio::BlockingRead>,
it: RangeIterator,

chunk: usize,
concurrent: usize,
}

impl BufferIterator {
pub fn new(
inner: Arc<dyn oio::BlockingRead>,
options: OpReader,
offset: u64,
end: Option<u64>,
) -> Self {
let chunk = options.chunk().unwrap_or(4 * 1024 * 1024);
let it = RangeIterator {
offset,
chunk: chunk as u64,
end,
finished: Arc::new(AtomicBool::new(false)),
};
Self {
inner,
it,
chunk,
concurrent: options.concurrent(),
}
}
}

impl Iterator for BufferIterator {
type Item = Result<Buffer>;

fn next(&mut self) -> Option<Self::Item> {
let mut bufs = Vec::with_capacity(self.concurrent);

let intervals: Vec<Range<u64>> = (0..self.concurrent)
.filter_map(|_| self.it.next())
.collect();

if intervals.is_empty() {
return None;
}

let results: Vec<Result<(usize, Buffer)>> = intervals
.into_par_iter()
.map(|range| -> Result<(usize, Buffer)> {
let limit = (range.end - range.start) as usize;

let bs = self.inner.read_at(range.start, limit)?;
let n = bs.remaining();

Ok((n, bs))
})
.collect();
for result in results {
match result {
Ok((n, buf)) => {
bufs.push(buf);
if n < self.chunk {
self.it.finished();
return Some(Ok(bufs.into_iter().flatten().collect()));
}
}
Err(err) => return Some(Err(err)),
}
}

Some(Ok(bufs.into_iter().flatten().collect()))
}
}

pub(super) struct RangeIterator {
offset: u64,
chunk: u64,
end: Option<u64>,
finished: Arc<AtomicBool>,
}

impl RangeIterator {
fn finished(&mut self) {
self.finished.store(true, Ordering::Relaxed);
}
}

impl Iterator for RangeIterator {
type Item = Range<u64>;

fn next(&mut self) -> Option<Self::Item> {
if self.finished.load(Ordering::Relaxed) {
return None;
}

let offset = self.offset;
if let Some(end) = self.end {
if self.offset >= end {
return None;
}
if self.offset + self.chunk > end {
self.finished();
return Some(offset..end);
}
}
self.offset += self.chunk;
Some(offset..offset + self.chunk)
}
}
1 change: 1 addition & 0 deletions core/src/types/blocking_read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ mod std_bytes_iterator;
pub use std_bytes_iterator::StdBytesIterator;
mod std_reader;
pub use std_reader::StdReader;
mod buffer_iterator;
Loading

0 comments on commit cb882aa

Please sign in to comment.