Skip to content

Commit

Permalink
refactor: Polish operator read_with (#3775)
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Dec 19, 2023
1 parent 41df0a1 commit 7463f91
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 107 deletions.
66 changes: 0 additions & 66 deletions core/src/raw/http_util/bytes_content_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use std::ops::Range;
use std::ops::RangeInclusive;
use std::str::FromStr;

use crate::raw::*;
use crate::Error;
use crate::ErrorKind;
use crate::Result;
Expand Down Expand Up @@ -111,27 +110,6 @@ impl BytesContentRange {
pub fn to_header(&self) -> String {
format!("bytes {self}")
}

/// Calculate bytes content range from size and specified range.
pub fn from_bytes_range(total_size: u64, range: BytesRange) -> Self {
let (start, end) = match (range.offset(), range.size()) {
(Some(offset), Some(size)) => (offset, offset + size - 1),
(Some(offset), None) => (offset, total_size - 1),
(None, Some(size)) => (total_size - size, total_size - 1),
(None, None) => (0, total_size - 1),
};

Self(Some(start), Some(end), Some(total_size))
}

/// Calculate bytes range from bytes content range.
pub fn to_bytes_range(self) -> Option<BytesRange> {
match (self.0, self.1, self.2) {
(Some(start), Some(end), _) => Some(BytesRange::from(start..=end)),
(None, None, Some(_)) => None,
_ => unreachable!("invalid bytes range: {:?}", self),
}
}
}

impl Display for BytesContentRange {
Expand Down Expand Up @@ -233,50 +211,6 @@ mod tests {
Ok(())
}

#[test]
fn test_from_bytes_range() {
let cases = vec![
(
"offset only",
BytesRange::new(Some(1024), None),
2048,
BytesContentRange::default()
.with_size(2048)
.with_range(1024, 2047),
),
(
"size only",
BytesRange::new(None, Some(1024)),
2048,
BytesContentRange::default()
.with_size(2048)
.with_range(1024, 2047),
),
(
"offset zero",
BytesRange::new(Some(0), Some(1024)),
2048,
BytesContentRange::default()
.with_size(2048)
.with_range(0, 1023),
),
(
"part of data",
BytesRange::new(Some(1024), Some(1)),
4096,
BytesContentRange::default()
.with_size(4096)
.with_range(1024, 1024),
),
];

for (name, input, input_size, expected) in cases {
let actual = BytesContentRange::from_bytes_range(input_size, input);

assert_eq!(expected, actual, "{name}")
}
}

#[test]
fn test_bytes_content_range_to_string() {
let h = BytesContentRange::default().with_size(1024);
Expand Down
10 changes: 10 additions & 0 deletions core/src/raw/http_util/bytes_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,16 @@ impl BytesRange {
)
}

/// Complete range with total size.
pub fn complete(&self, total_size: u64) -> Self {
match (self.offset(), self.size()) {
(Some(_), Some(_)) => *self,
(Some(offset), None) => Self(Some(offset), Some(total_size - offset)),
(None, Some(size)) => Self(Some(total_size - size), Some(size)),
(None, None) => Self(Some(0), Some(total_size)),
}
}

/// apply_on_bytes will apply range on bytes.
pub fn apply_on_bytes(&self, mut bs: Bytes) -> Bytes {
match (self.0, self.1) {
Expand Down
17 changes: 0 additions & 17 deletions core/src/raw/oio/read/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,6 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<usize>> {
let this = self.project();
let start_len = this.buf.len();
let start_cap = this.buf.capacity();

loop {
if this.buf.len() == this.buf.capacity() {
Expand All @@ -306,22 +305,6 @@ where
}
Err(e) => return Poll::Ready(Err(e)),
}

// The buffer might be an exact fit. Let's read into a probe buffer
// and see if it returns `Ok(0)`. If so, we've avoided an
// unnecessary doubling of the capacity. But if not, append the
// probe buffer to the primary buffer and let its capacity grow.
if this.buf.len() == this.buf.capacity() && this.buf.capacity() == start_cap {
let mut probe = [0u8; 32];

match ready!(this.reader.poll_read(cx, &mut probe)) {
Ok(0) => return Poll::Ready(Ok(this.buf.len() - start_len)),
Ok(n) => {
this.buf.extend_from_slice(&probe[..n]);
}
Err(e) => return Poll::Ready(Err(e)),
}
}
}
}
}
Expand Down
22 changes: 10 additions & 12 deletions core/src/types/operator/blocking_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,20 +399,18 @@ impl BlockingOperator {
}

let range = args.range();
let size_hint = match range.size() {
Some(v) => v,
None => {
let mut size = inner
.blocking_stat(&path, OpStat::default())?
.into_metadata()
.content_length();
size -= range.offset().unwrap_or(0);
size
}
let (size_hint, range) = if let Some(size) = range.size() {
(size, range)
} else {
let size = inner
.blocking_stat(&path, OpStat::default())?
.into_metadata()
.content_length();
let range = range.complete(size);
(range.size().unwrap(), range)
};

let (_, mut s) = inner.blocking_read(&path, args)?;

let (_, mut s) = inner.blocking_read(&path, args.with_range(range))?;
let mut buf = Vec::with_capacity(size_hint as usize);
s.read_to_end(&mut buf)?;

Expand Down
23 changes: 11 additions & 12 deletions core/src/types/operator/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,20 +429,19 @@ impl Operator {
}

let range = args.range();
let size_hint = match range.size() {
Some(v) => v,
None => {
let mut size = inner
.stat(&path, OpStat::default())
.await?
.into_metadata()
.content_length();
size -= range.offset().unwrap_or(0);
size
}
let (size_hint, range) = if let Some(size) = range.size() {
(size, range)
} else {
let size = inner
.stat(&path, OpStat::default())
.await?
.into_metadata()
.content_length();
let range = range.complete(size);
(range.size().unwrap(), range)
};

let (_, mut s) = inner.read(&path, args).await?;
let (_, mut s) = inner.read(&path, args.with_range(range)).await?;
let mut buf = Vec::with_capacity(size_hint as usize);
s.read_to_end(&mut buf).await?;

Expand Down

0 comments on commit 7463f91

Please sign in to comment.