Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: apache/opendal
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 9720e2b7d52b7b0d8bd460e58130219fce0f5b52
Choose a base ref
..
head repository: apache/opendal
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: d500961d6d916d6a6e5dfda7ec1279709f3a0571
Choose a head ref
Showing with 43 additions and 26 deletions.
  1. +43 −26 core/src/types/blocking_read/buffer_iterator.rs
69 changes: 43 additions & 26 deletions core/src/types/blocking_read/buffer_iterator.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::ops::Range;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

@@ -9,9 +10,9 @@ use crate::*;

pub(super) struct BufferIterator {
inner: Arc<dyn oio::BlockingRead>,
chunk: Option<usize>,
it: RangeIterator,

offset: u64,
chunk: usize,
end: Option<u64>,
concurrent: usize,
finished: Arc<AtomicBool>,
@@ -25,10 +26,15 @@ impl BufferIterator {
offset: u64,
end: Option<u64>,
) -> Self {
let chunk = chunk.unwrap_or(4 * 1024 * 1024);
let it = RangeIterator {
offset,
chunk: chunk as u64,
};
Self {
inner,
it,
chunk,
offset,
end,
concurrent,
finished: Arc::new(AtomicBool::new(false)),
@@ -40,36 +46,37 @@ impl Iterator for BufferIterator {
type Item = Result<Buffer>;

fn next(&mut self) -> Option<Self::Item> {
if self.offset >= self.end.unwrap_or(u64::MAX) {
return None;
}
if self.finished.load(Ordering::Relaxed) {
return None;
}

let mut bufs = Vec::with_capacity(self.concurrent);
let interval_size = self.chunk.unwrap_or(4 * 1024 * 1024) as u64;

let intervals: Vec<(u64, u64)> = (0..self.concurrent as u64)
.map(|i| {
let current = self.offset + i * interval_size;
// If end is set, we need to make sure we don't go beyond it
let intervals: Vec<Range<u64>> = (0..self.concurrent)
.map(|_| {
let range = self.it.next().unwrap_or(Range {
start: u64::MAX,
end: u64::MAX,
});
if let Some(end) = self.end {
if current + interval_size > end {
return (current, end);
if range.start + range.end > end {
return Range {
start: range.start,
end,
};
}
}
(current, current + interval_size)
range
})
// Filter out empty intervals
.filter(|(start, end)| start < end)
.filter(|range| range.start < range.end)
.collect();

let results: Vec<Result<(usize, Buffer)>> = intervals
.into_par_iter()
.map(|(start, end)| -> Result<(usize, Buffer)> {
let limit = (end - start) as usize;
.map(|range| -> Result<(usize, Buffer)> {
let limit = (range.end - range.start) as usize;

let bs = self.inner.read_at(start, limit)?;
let bs = self.inner.read_at(range.start, limit)?;
let n = bs.remaining();

Ok((n, bs))
@@ -79,20 +86,30 @@ impl Iterator for BufferIterator {
match result {
Ok((n, buf)) => {
bufs.push(buf);
if n < interval_size as usize {
self.finished.store(true, Ordering::Relaxed);
return Some(Ok(bufs.into_iter().flatten().collect()));
}

self.offset += n as u64;
if Some(self.offset) == self.end {
if n < self.chunk {
self.finished.store(true, Ordering::Relaxed);
return Some(Ok(bufs.into_iter().flatten().collect()));
}
}
Err(err) => return Some(Err(err)),
}
}

Some(Ok(bufs.into_iter().flatten().collect()))
}
}

pub(super) struct RangeIterator {
offset: u64,
chunk: u64,
}

impl Iterator for RangeIterator {
type Item = Range<u64>;

fn next(&mut self) -> Option<Self::Item> {
let offset = self.offset;
self.offset += self.chunk;
Some(offset..offset + self.chunk)
}
}