Skip to content

Commit

Permalink
Reuse Wasm-side buffer while reading iterators (#435)
Browse files Browse the repository at this point in the history
  • Loading branch information
RReverser authored Oct 16, 2023
1 parent a66faa4 commit 7478e78
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 37 deletions.
25 changes: 16 additions & 9 deletions crates/bindings-sys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -711,13 +711,21 @@ impl Buffer {
unsafe { raw::_buffer_len(self.handle()) }
}

/// Read the contents of the buffer into a boxed byte slice.
/// Read the contents of the buffer into the provided Vec.
/// The Vec is cleared in the process.
pub fn read_into(self, buf: &mut Vec<u8>) {
let data_len = self.data_len();
buf.clear();
buf.reserve(data_len);
self.read_uninit(&mut buf.spare_capacity_mut()[..data_len]);
// SAFETY: We just wrote `data_len` bytes into `buf`.
unsafe { buf.set_len(data_len) };
}

/// Read the contents of the buffer into a new boxed byte slice.
pub fn read(self) -> Box<[u8]> {
let len = self.data_len();
let mut buf = alloc::vec::Vec::with_capacity(len);
self.read_uninit(buf.spare_capacity_mut());
// SAFETY: We just wrote `len` bytes to `buf`.
unsafe { buf.set_len(len) };
let mut buf = alloc::vec::Vec::new();
self.read_into(&mut buf);
buf.into_boxed_slice()
}

Expand Down Expand Up @@ -746,14 +754,13 @@ impl Buffer {
}

impl Iterator for BufferIter {
type Item = Result<Box<[u8]>, Errno>;
type Item = Result<Buffer, Errno>;

fn next(&mut self) -> Option<Self::Item> {
let buf = unsafe { call(|out| raw::_iter_next(self.handle(), out)) };
match buf {
Ok(buf) if buf.is_invalid() => None,
Ok(buf) => Some(Ok(buf.read())),
Err(e) => Some(Err(e)),
res => Some(res),
}
}
}
Expand Down
45 changes: 17 additions & 28 deletions crates/bindings/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,11 @@ fn buffer_table_iter(
let mut iter = sys::iter(table_id, filter.as_deref())?;

// First item is an encoded schema.
let schema_raw = iter.next().expect("Missing schema").expect("Failed to get schema");
let schema_raw = iter
.next()
.expect("Missing schema")
.expect("Failed to get schema")
.read();
let schema = decode_schema(&mut &schema_raw[..]).expect("Could not decode schema");

Ok((iter, schema))
Expand Down Expand Up @@ -345,10 +349,8 @@ struct RawTableIter<De> {
/// The underlying source of our `Buffer`s.
inner: BufferIter,

/// The current position in the current buffer,
/// from which `deserializer` can read.
/// A value of `None` indicates that we need to pull another `Buffer` from `inner`.
reader: Option<Cursor<Box<[u8]>>>,
/// The current position in the buffer, from which `deserializer` can read.
reader: Cursor<Vec<u8>>,

deserializer: De,
}
Expand All @@ -357,7 +359,7 @@ impl<De: BufferDeserialize> RawTableIter<De> {
fn new(iter: BufferIter, deserializer: De) -> Self {
RawTableIter {
inner: iter,
reader: None,
reader: Cursor::new(Vec::new()),
deserializer,
}
}
Expand All @@ -368,30 +370,17 @@ impl<T, De: BufferDeserialize<Item = T>> Iterator for RawTableIter<De> {

fn next(&mut self) -> Option<Self::Item> {
loop {
// If we currently have some bytes in the buffer to still decode,
// do that. Otherwise, try to fetch the next buffer first.

match &self.reader {
Some(reader) => {
if reader.remaining() == 0 {
self.reader = None;
continue;
}
break;
}
None => {
// If we receive None here, iteration is complete.
let buffer = self.inner.next()?;
let buffer = buffer.expect("RawTableIter::next: Failed to get buffer!");
self.reader = Some(Cursor::new(buffer));
break;
}
// If we currently have some bytes in the buffer to still decode, do that.
if (&self.reader).remaining() > 0 {
let row = self.deserializer.deserialize(&self.reader);
return Some(row);
}
// Otherwise, try to fetch the next chunk while reusing the buffer.
let buffer = self.inner.next()?;
let buffer = buffer.expect("RawTableIter::next: Failed to get buffer!");
self.reader.pos.set(0);
buffer.read_into(&mut self.reader.buf);
}

let reader = self.reader.as_ref().unwrap();
let row = self.deserializer.deserialize(reader);
Some(row)
}
}

Expand Down

0 comments on commit 7478e78

Please sign in to comment.