diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index fe7c4fda38..39938b199d 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -3,6 +3,7 @@ use crate::database_logger::{BacktraceProvider, LogLevel, Record}; use crate::db::datastore::locking_tx_datastore::MutTxId; use crate::error::{IndexError, NodesError}; use crate::replica_context::ReplicaContext; +use core::mem; use parking_lot::{Mutex, MutexGuard}; use smallvec::SmallVec; use spacetimedb_primitives::{ColId, IndexId, TableId}; @@ -23,48 +24,67 @@ pub struct TxSlot { inner: Arc>>, } +/// A pool of available unused chunks. +/// +/// The chunk places currently no limits on its size. +#[derive(Default)] +pub struct ChunkPool { + free_chunks: Vec>, +} + +impl ChunkPool { + /// Takes an unused chunk from this pool + /// or creates a new chunk if none are available. + /// New chunks are not actually allocated, + /// but will be, on first use. + fn take(&mut self) -> Vec { + self.free_chunks.pop().unwrap_or_default() + } + + /// Return a chunk back to the pool. + pub fn put(&mut self, mut chunk: Vec) { + chunk.clear(); + self.free_chunks.push(chunk); + } +} + #[derive(Default)] struct ChunkedWriter { - chunks: Vec>, - scratch_space: Vec, + /// Chunks collected thus far. + chunks: Vec>, + /// Current in progress chunk that will be added to `chunks`. + curr: Vec, } impl ChunkedWriter { - /// Flushes the data collected in the scratch space if it's larger than our - /// chunking threshold. - pub fn flush(&mut self) { - if self.scratch_space.len() > spacetimedb_primitives::ROW_ITER_CHUNK_SIZE { - // We intentionally clone here so that our scratch space is not - // recreated with zero capacity (via `Vec::new`), but instead can - // be `.clear()`ed in-place and reused. - // - // This way the buffers in `chunks` are always fitted fixed-size to - // the actual data they contain, while the scratch space is ever- - // growing and has higher chance of fitting each next row without - // reallocation. - self.chunks.push(self.scratch_space.as_slice().into()); - self.scratch_space.clear(); + /// Flushes the data collected in the current chunk + /// if it's larger than our chunking threshold. + fn flush(&mut self, pool: &mut ChunkPool) { + if self.curr.len() > spacetimedb_primitives::ROW_ITER_CHUNK_SIZE { + let curr = mem::replace(&mut self.curr, pool.take()); + self.chunks.push(curr); } } /// Finalises the writer and returns all the chunks. - pub fn into_chunks(mut self) -> Vec> { - if !self.scratch_space.is_empty() { - // Avoid extra clone by just shrinking and pushing the scratch space - // in-place. - self.chunks.push(self.scratch_space.into()); + fn into_chunks(mut self) -> Vec> { + if !self.curr.is_empty() { + self.chunks.push(self.curr); } self.chunks } - pub fn collect_iter(iter: impl Iterator) -> Vec> { + pub fn collect_iter(pool: &mut ChunkPool, iter: impl Iterator) -> Vec> { let mut chunked_writer = Self::default(); + // Consume the iterator, serializing each `item`, + // while allowing a chunk to be created at boundaries. for item in iter { // Write the item directly to the BSATN `chunked_writer` buffer. - item.to_bsatn_extend(&mut chunked_writer.scratch_space).unwrap(); + item.to_bsatn_extend(&mut chunked_writer.curr).unwrap(); // Flush at item boundaries. - chunked_writer.flush(); + chunked_writer.flush(pool); } + chunked_writer.into_chunks() } } @@ -131,29 +151,6 @@ impl InstanceEnv { Ok(gen_cols) } - /// Deletes all rows in the table identified by `table_id` - /// where the column identified by `cols` equates to `value`. - /// - /// Returns an error if no rows were deleted or if the column wasn't found. - pub fn delete_by_col_eq(&self, table_id: TableId, col_id: ColId, value: &[u8]) -> Result { - let stdb = &*self.replica_ctx.relational_db; - let tx = &mut *self.get_tx()?; - - // Interpret the `value` using the schema of the column. - let eq_value = &stdb.decode_column(tx, table_id, col_id, value)?; - - // Find all rows in the table where the column data equates to `value`. - let rows_to_delete = stdb - .iter_by_col_eq_mut(tx, table_id, col_id, eq_value)? - .map(|row_ref| row_ref.pointer()) - // `delete_by_field` only cares about 1 element, - // so optimize for that. - .collect::>(); - - // Delete them and count how many we deleted. - Ok(stdb.delete(tx, table_id, rows_to_delete)) - } - #[tracing::instrument(skip_all)] pub fn datastore_delete_by_btree_scan_bsatn( &self, @@ -239,53 +236,34 @@ impl InstanceEnv { stdb.table_row_count_mut(tx, table_id).ok_or(NodesError::TableNotFound) } - /// Finds all rows in the table identified by `table_id` - /// where the column identified by `cols` matches to `value`. - /// - /// These rows are returned concatenated with each row bsatn encoded. - /// - /// Matching is defined by decoding of `value` to an `AlgebraicValue` - /// according to the column's schema and then `Ord for AlgebraicValue`. - pub fn iter_by_col_eq_chunks( + #[tracing::instrument(skip_all)] + pub fn datastore_table_scan_bsatn_chunks( &self, + pool: &mut ChunkPool, table_id: TableId, - col_id: ColId, - value: &[u8], - ) -> Result>, NodesError> { - let stdb = &*self.replica_ctx.relational_db; - let tx = &mut *self.get_tx()?; - - // Interpret the `value` using the schema of the column. - let value = &stdb.decode_column(tx, table_id, col_id, value)?; - - // Find all rows in the table where the column data matches `value`. - let chunks = ChunkedWriter::collect_iter(stdb.iter_by_col_eq_mut(tx, table_id, col_id, value)?); - Ok(chunks) - } - - #[tracing::instrument(skip_all)] - pub fn datastore_table_scan_bsatn_chunks(&self, table_id: TableId) -> Result>, NodesError> { + ) -> Result>, NodesError> { let stdb = &*self.replica_ctx.relational_db; let tx = &mut *self.tx.get()?; - let chunks = ChunkedWriter::collect_iter(stdb.iter_mut(tx, table_id)?); + let chunks = ChunkedWriter::collect_iter(pool, stdb.iter_mut(tx, table_id)?); Ok(chunks) } #[tracing::instrument(skip_all)] pub fn datastore_btree_scan_bsatn_chunks( &self, + pool: &mut ChunkPool, index_id: IndexId, prefix: &[u8], prefix_elems: ColId, rstart: &[u8], rend: &[u8], - ) -> Result>, NodesError> { + ) -> Result>, NodesError> { let stdb = &*self.replica_ctx.relational_db; let tx = &mut *self.tx.get()?; let (_, iter) = stdb.btree_scan(tx, index_id, prefix, prefix_elems, rstart, rend)?; - let chunks = ChunkedWriter::collect_iter(iter); + let chunks = ChunkedWriter::collect_iter(pool, iter); Ok(chunks) } } diff --git a/crates/core/src/host/wasm_common.rs b/crates/core/src/host/wasm_common.rs index 844f706b6d..d52e658005 100644 --- a/crates/core/src/host/wasm_common.rs +++ b/crates/core/src/host/wasm_common.rs @@ -324,7 +324,7 @@ impl ResourceSlab { } } -decl_index!(RowIterIdx => std::vec::IntoIter>); +decl_index!(RowIterIdx => std::vec::IntoIter>); pub(super) type RowIters = ResourceSlab; pub(super) struct TimingSpan { diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index faf640b6d7..4898057802 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -3,6 +3,7 @@ use std::time::Instant; use crate::database_logger::{BacktraceFrame, BacktraceProvider, ModuleBacktrace, Record}; +use crate::host::instance_env::{ChunkPool, InstanceEnv}; use crate::host::wasm_common::instrumentation; use crate::host::wasm_common::module_host_actor::ExecutionTimings; use crate::host::wasm_common::{ @@ -16,8 +17,6 @@ use spacetimedb_sats::bsatn; use spacetimedb_sats::buffer::{CountWriter, TeeWriter}; use wasmtime::{AsContext, Caller, StoreContextMut}; -use crate::host::instance_env::InstanceEnv; - use super::{Mem, MemView, NullableMemOp, WasmError, WasmPointee, WasmPtr}; #[cfg(not(feature = "spacetimedb-wasm-instance-env-times"))] @@ -73,6 +72,9 @@ pub(super) struct WasmInstanceEnv { /// The last, including current, reducer to be executed by this environment. reducer_name: String, + /// A pool of unused allocated chunks that can be reused. + // TODO(Centril): consider using this pool for `console_timer_start` and `bytes_sink_write`. + chunk_pool: ChunkPool, } const CALL_REDUCER_ARGS_SOURCE: u32 = 1; @@ -97,6 +99,7 @@ impl WasmInstanceEnv { reducer_start, call_times: CallTimes::new(), reducer_name: String::from(""), + chunk_pool: <_>::default(), } } @@ -399,7 +402,9 @@ impl WasmInstanceEnv { Self::cvt_ret(caller, AbiCall::DatastoreTableScanBsatn, out, |caller| { let env = caller.data_mut(); // Collect the iterator chunks. - let chunks = env.instance_env.datastore_table_scan_bsatn_chunks(table_id.into())?; + let chunks = env + .instance_env + .datastore_table_scan_bsatn_chunks(&mut env.chunk_pool, table_id.into())?; // Register the iterator and get back the index to write to `out`. // Calls to the iterator are done through dynamic dispatch. Ok(env.iters.insert(chunks.into_iter())) @@ -495,6 +500,7 @@ impl WasmInstanceEnv { // Find the relevant rows. let chunks = env.instance_env.datastore_btree_scan_bsatn_chunks( + &mut env.chunk_pool, index_id.into(), prefix, prefix_elems, @@ -574,11 +580,9 @@ impl WasmInstanceEnv { buffer = &mut buffer[chunk.len()..]; // Advance the iterator, as we used a chunk. - // TODO(Centril): consider putting these into a pool for reuse - // by the next `ChunkedWriter::collect_iter`, `span_start`, and `bytes_sink_write`. - // Although we need to shrink these chunks to fit due to `Box<[u8]>`, - // in practice, `realloc` will in practice not move the data to a new heap allocation. - iter.next(); + // SAFETY: We peeked one `chunk`, so there must be one at least. + let chunk = unsafe { iter.next().unwrap_unchecked() }; + env.chunk_pool.put(chunk); } let ret = match (written, iter.as_slice().first()) {