Skip to content

Commit

Permalink
feat(core): Tune buffer operations based on benchmark results (#4468)
Browse files Browse the repository at this point in the history
* Debug

Signed-off-by: Xuanwo <[email protected]>

* Add inline for buffer operations

Signed-off-by: Xuanwo <[email protected]>

* Try

Signed-off-by: Xuanwo <[email protected]>

* Drop buffer before start new request

Signed-off-by: Xuanwo <[email protected]>

* Add cap to 8MiB

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Apr 12, 2024
1 parent b565446 commit 62fe9b0
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 1 deletion.
11 changes: 11 additions & 0 deletions core/src/types/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,42 +226,49 @@ impl Buffer {
}

impl From<Vec<u8>> for Buffer {
#[inline]
fn from(bs: Vec<u8>) -> Self {
Self(Inner::Contiguous(bs.into()))
}
}

impl From<Bytes> for Buffer {
#[inline]
fn from(bs: Bytes) -> Self {
Self(Inner::Contiguous(bs))
}
}

impl From<String> for Buffer {
#[inline]
fn from(s: String) -> Self {
Self(Inner::Contiguous(Bytes::from(s)))
}
}

impl From<&'static [u8]> for Buffer {
#[inline]
fn from(s: &'static [u8]) -> Self {
Self(Inner::Contiguous(Bytes::from_static(s)))
}
}

impl From<&'static str> for Buffer {
#[inline]
fn from(s: &'static str) -> Self {
Self(Inner::Contiguous(Bytes::from_static(s.as_bytes())))
}
}

impl FromIterator<u8> for Buffer {
#[inline]
fn from_iter<T: IntoIterator<Item = u8>>(iter: T) -> Self {
Self(Inner::Contiguous(Bytes::from_iter(iter)))
}
}

impl From<VecDeque<Bytes>> for Buffer {
#[inline]
fn from(bs: VecDeque<Bytes>) -> Self {
let size = bs.iter().map(Bytes::len).sum();
Self(Inner::NonContiguous {
Expand All @@ -274,6 +281,7 @@ impl From<VecDeque<Bytes>> for Buffer {
}

impl From<Vec<Bytes>> for Buffer {
#[inline]
fn from(bs: Vec<Bytes>) -> Self {
let size = bs.iter().map(Bytes::len).sum();
Self(Inner::NonContiguous {
Expand All @@ -286,6 +294,7 @@ impl From<Vec<Bytes>> for Buffer {
}

impl From<Arc<[Bytes]>> for Buffer {
#[inline]
fn from(bs: Arc<[Bytes]>) -> Self {
let size = bs.iter().map(Bytes::len).sum();
Self(Inner::NonContiguous {
Expand All @@ -298,6 +307,7 @@ impl From<Arc<[Bytes]>> for Buffer {
}

impl FromIterator<Bytes> for Buffer {
#[inline]
fn from_iter<T: IntoIterator<Item = Bytes>>(iter: T) -> Self {
let mut size = 0;
let bs = iter.into_iter().inspect(|v| size += v.len());
Expand Down Expand Up @@ -340,6 +350,7 @@ impl Buf for Buffer {
}
}

#[inline]
fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
match &self.0 {
Inner::Contiguous(b) => {
Expand Down
8 changes: 7 additions & 1 deletion core/src/types/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ pub mod into_futures_async_read {
offset: range.start,
size: range.end - range.start,
// TODO: should use services preferred io size.
cap: 4 * 1024 * 1024,
cap: 8 * 1024 * 1024,

cur: 0,
buf: Buffer::new(),
Expand Down Expand Up @@ -297,6 +297,12 @@ pub mod into_futures_async_read {

fn consume(mut self: Pin<&mut Self>, amt: usize) {
self.buf.advance(amt);
// Make sure buf has been dropped before starting new request.
// Otherwise, we will hold those bytes in memory until next
// buffer reaching.
if self.buf.is_empty() {
self.buf = Buffer::new();
}
self.cur += amt as u64;
}
}
Expand Down

0 comments on commit 62fe9b0

Please sign in to comment.