Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allocation pool the chunked iters in InstanceEnv #2038

Merged
merged 1 commit into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 52 additions & 74 deletions crates/core/src/host/instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::database_logger::{BacktraceProvider, LogLevel, Record};
use crate::db::datastore::locking_tx_datastore::MutTxId;
use crate::error::{IndexError, NodesError};
use crate::replica_context::ReplicaContext;
use core::mem;
use parking_lot::{Mutex, MutexGuard};
use smallvec::SmallVec;
use spacetimedb_primitives::{ColId, IndexId, TableId};
Expand All @@ -23,48 +24,67 @@ pub struct TxSlot {
inner: Arc<Mutex<Option<MutTxId>>>,
}

/// A pool of available unused chunks.
///
/// The chunk places currently no limits on its size.
#[derive(Default)]
pub struct ChunkPool {
free_chunks: Vec<Vec<u8>>,
}

impl ChunkPool {
/// Takes an unused chunk from this pool
/// or creates a new chunk if none are available.
/// New chunks are not actually allocated,
/// but will be, on first use.
fn take(&mut self) -> Vec<u8> {
self.free_chunks.pop().unwrap_or_default()
}

/// Return a chunk back to the pool.
pub fn put(&mut self, mut chunk: Vec<u8>) {
chunk.clear();
self.free_chunks.push(chunk);
}
}

#[derive(Default)]
struct ChunkedWriter {
chunks: Vec<Box<[u8]>>,
scratch_space: Vec<u8>,
/// Chunks collected thus far.
chunks: Vec<Vec<u8>>,
/// Current in progress chunk that will be added to `chunks`.
curr: Vec<u8>,
}

impl ChunkedWriter {
/// Flushes the data collected in the scratch space if it's larger than our
/// chunking threshold.
pub fn flush(&mut self) {
if self.scratch_space.len() > spacetimedb_primitives::ROW_ITER_CHUNK_SIZE {
// We intentionally clone here so that our scratch space is not
// recreated with zero capacity (via `Vec::new`), but instead can
// be `.clear()`ed in-place and reused.
//
// This way the buffers in `chunks` are always fitted fixed-size to
// the actual data they contain, while the scratch space is ever-
// growing and has higher chance of fitting each next row without
// reallocation.
self.chunks.push(self.scratch_space.as_slice().into());
self.scratch_space.clear();
/// Flushes the data collected in the current chunk
/// if it's larger than our chunking threshold.
fn flush(&mut self, pool: &mut ChunkPool) {
if self.curr.len() > spacetimedb_primitives::ROW_ITER_CHUNK_SIZE {
let curr = mem::replace(&mut self.curr, pool.take());
self.chunks.push(curr);
}
}

/// Finalises the writer and returns all the chunks.
pub fn into_chunks(mut self) -> Vec<Box<[u8]>> {
if !self.scratch_space.is_empty() {
// Avoid extra clone by just shrinking and pushing the scratch space
// in-place.
self.chunks.push(self.scratch_space.into());
fn into_chunks(mut self) -> Vec<Vec<u8>> {
if !self.curr.is_empty() {
self.chunks.push(self.curr);
}
self.chunks
}

pub fn collect_iter(iter: impl Iterator<Item = impl ToBsatn>) -> Vec<Box<[u8]>> {
pub fn collect_iter(pool: &mut ChunkPool, iter: impl Iterator<Item = impl ToBsatn>) -> Vec<Vec<u8>> {
let mut chunked_writer = Self::default();
// Consume the iterator, serializing each `item`,
// while allowing a chunk to be created at boundaries.
for item in iter {
// Write the item directly to the BSATN `chunked_writer` buffer.
item.to_bsatn_extend(&mut chunked_writer.scratch_space).unwrap();
item.to_bsatn_extend(&mut chunked_writer.curr).unwrap();
// Flush at item boundaries.
chunked_writer.flush();
chunked_writer.flush(pool);
}

chunked_writer.into_chunks()
}
}
Expand Down Expand Up @@ -131,29 +151,6 @@ impl InstanceEnv {
Ok(gen_cols)
}

/// Deletes all rows in the table identified by `table_id`
/// where the column identified by `cols` equates to `value`.
///
/// Returns an error if no rows were deleted or if the column wasn't found.
pub fn delete_by_col_eq(&self, table_id: TableId, col_id: ColId, value: &[u8]) -> Result<u32, NodesError> {
let stdb = &*self.replica_ctx.relational_db;
let tx = &mut *self.get_tx()?;

// Interpret the `value` using the schema of the column.
let eq_value = &stdb.decode_column(tx, table_id, col_id, value)?;

// Find all rows in the table where the column data equates to `value`.
let rows_to_delete = stdb
.iter_by_col_eq_mut(tx, table_id, col_id, eq_value)?
.map(|row_ref| row_ref.pointer())
// `delete_by_field` only cares about 1 element,
// so optimize for that.
.collect::<SmallVec<[_; 1]>>();

// Delete them and count how many we deleted.
Ok(stdb.delete(tx, table_id, rows_to_delete))
}

#[tracing::instrument(skip_all)]
pub fn datastore_delete_by_btree_scan_bsatn(
&self,
Expand Down Expand Up @@ -239,53 +236,34 @@ impl InstanceEnv {
stdb.table_row_count_mut(tx, table_id).ok_or(NodesError::TableNotFound)
}

/// Finds all rows in the table identified by `table_id`
/// where the column identified by `cols` matches to `value`.
///
/// These rows are returned concatenated with each row bsatn encoded.
///
/// Matching is defined by decoding of `value` to an `AlgebraicValue`
/// according to the column's schema and then `Ord for AlgebraicValue`.
pub fn iter_by_col_eq_chunks(
#[tracing::instrument(skip_all)]
pub fn datastore_table_scan_bsatn_chunks(
&self,
pool: &mut ChunkPool,
table_id: TableId,
col_id: ColId,
value: &[u8],
) -> Result<Vec<Box<[u8]>>, NodesError> {
let stdb = &*self.replica_ctx.relational_db;
let tx = &mut *self.get_tx()?;

// Interpret the `value` using the schema of the column.
let value = &stdb.decode_column(tx, table_id, col_id, value)?;

// Find all rows in the table where the column data matches `value`.
let chunks = ChunkedWriter::collect_iter(stdb.iter_by_col_eq_mut(tx, table_id, col_id, value)?);
Ok(chunks)
}

#[tracing::instrument(skip_all)]
pub fn datastore_table_scan_bsatn_chunks(&self, table_id: TableId) -> Result<Vec<Box<[u8]>>, NodesError> {
) -> Result<Vec<Vec<u8>>, NodesError> {
let stdb = &*self.replica_ctx.relational_db;
let tx = &mut *self.tx.get()?;

let chunks = ChunkedWriter::collect_iter(stdb.iter_mut(tx, table_id)?);
let chunks = ChunkedWriter::collect_iter(pool, stdb.iter_mut(tx, table_id)?);
Ok(chunks)
}

#[tracing::instrument(skip_all)]
pub fn datastore_btree_scan_bsatn_chunks(
&self,
pool: &mut ChunkPool,
index_id: IndexId,
prefix: &[u8],
prefix_elems: ColId,
rstart: &[u8],
rend: &[u8],
) -> Result<Vec<Box<[u8]>>, NodesError> {
) -> Result<Vec<Vec<u8>>, NodesError> {
let stdb = &*self.replica_ctx.relational_db;
let tx = &mut *self.tx.get()?;

let (_, iter) = stdb.btree_scan(tx, index_id, prefix, prefix_elems, rstart, rend)?;
let chunks = ChunkedWriter::collect_iter(iter);
let chunks = ChunkedWriter::collect_iter(pool, iter);
Ok(chunks)
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/host/wasm_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ impl<I: ResourceIndex> ResourceSlab<I> {
}
}

decl_index!(RowIterIdx => std::vec::IntoIter<Box<[u8]>>);
decl_index!(RowIterIdx => std::vec::IntoIter<Vec<u8>>);
pub(super) type RowIters = ResourceSlab<RowIterIdx>;

pub(super) struct TimingSpan {
Expand Down
20 changes: 12 additions & 8 deletions crates/core/src/host/wasmtime/wasm_instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::time::Instant;

use crate::database_logger::{BacktraceFrame, BacktraceProvider, ModuleBacktrace, Record};
use crate::host::instance_env::{ChunkPool, InstanceEnv};
use crate::host::wasm_common::instrumentation;
use crate::host::wasm_common::module_host_actor::ExecutionTimings;
use crate::host::wasm_common::{
Expand All @@ -16,8 +17,6 @@ use spacetimedb_sats::bsatn;
use spacetimedb_sats::buffer::{CountWriter, TeeWriter};
use wasmtime::{AsContext, Caller, StoreContextMut};

use crate::host::instance_env::InstanceEnv;

use super::{Mem, MemView, NullableMemOp, WasmError, WasmPointee, WasmPtr};

#[cfg(not(feature = "spacetimedb-wasm-instance-env-times"))]
Expand Down Expand Up @@ -73,6 +72,9 @@ pub(super) struct WasmInstanceEnv {

/// The last, including current, reducer to be executed by this environment.
reducer_name: String,
/// A pool of unused allocated chunks that can be reused.
// TODO(Centril): consider using this pool for `console_timer_start` and `bytes_sink_write`.
chunk_pool: ChunkPool,
}

const CALL_REDUCER_ARGS_SOURCE: u32 = 1;
Expand All @@ -97,6 +99,7 @@ impl WasmInstanceEnv {
reducer_start,
call_times: CallTimes::new(),
reducer_name: String::from(""),
chunk_pool: <_>::default(),
}
}

Expand Down Expand Up @@ -399,7 +402,9 @@ impl WasmInstanceEnv {
Self::cvt_ret(caller, AbiCall::DatastoreTableScanBsatn, out, |caller| {
let env = caller.data_mut();
// Collect the iterator chunks.
let chunks = env.instance_env.datastore_table_scan_bsatn_chunks(table_id.into())?;
let chunks = env
.instance_env
.datastore_table_scan_bsatn_chunks(&mut env.chunk_pool, table_id.into())?;
// Register the iterator and get back the index to write to `out`.
// Calls to the iterator are done through dynamic dispatch.
Ok(env.iters.insert(chunks.into_iter()))
Expand Down Expand Up @@ -495,6 +500,7 @@ impl WasmInstanceEnv {

// Find the relevant rows.
let chunks = env.instance_env.datastore_btree_scan_bsatn_chunks(
&mut env.chunk_pool,
index_id.into(),
prefix,
prefix_elems,
Expand Down Expand Up @@ -574,11 +580,9 @@ impl WasmInstanceEnv {
buffer = &mut buffer[chunk.len()..];

// Advance the iterator, as we used a chunk.
// TODO(Centril): consider putting these into a pool for reuse
// by the next `ChunkedWriter::collect_iter`, `span_start`, and `bytes_sink_write`.
// Although we need to shrink these chunks to fit due to `Box<[u8]>`,
// in practice, `realloc` will in practice not move the data to a new heap allocation.
iter.next();
// SAFETY: We peeked one `chunk`, so there must be one at least.
let chunk = unsafe { iter.next().unwrap_unchecked() };
env.chunk_pool.put(chunk);
}

let ret = match (written, iter.as_slice().first()) {
Expand Down
Loading