Skip to content

Commit

Permalink
Allocation pool the chunked iters in InstanceEnv (#2038)
Browse files Browse the repository at this point in the history
  • Loading branch information
Centril authored Dec 9, 2024
1 parent 96a3871 commit d8154e7
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 83 deletions.
126 changes: 52 additions & 74 deletions crates/core/src/host/instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::database_logger::{BacktraceProvider, LogLevel, Record};
use crate::db::datastore::locking_tx_datastore::MutTxId;
use crate::error::{IndexError, NodesError};
use crate::replica_context::ReplicaContext;
use core::mem;
use parking_lot::{Mutex, MutexGuard};
use smallvec::SmallVec;
use spacetimedb_primitives::{ColId, IndexId, TableId};
Expand All @@ -23,48 +24,67 @@ pub struct TxSlot {
inner: Arc<Mutex<Option<MutTxId>>>,
}

/// A pool of available unused chunks.
///
/// The chunk places currently no limits on its size.
#[derive(Default)]
pub struct ChunkPool {
free_chunks: Vec<Vec<u8>>,
}

impl ChunkPool {
/// Takes an unused chunk from this pool
/// or creates a new chunk if none are available.
/// New chunks are not actually allocated,
/// but will be, on first use.
fn take(&mut self) -> Vec<u8> {
self.free_chunks.pop().unwrap_or_default()
}

/// Return a chunk back to the pool.
pub fn put(&mut self, mut chunk: Vec<u8>) {
chunk.clear();
self.free_chunks.push(chunk);
}
}

#[derive(Default)]
struct ChunkedWriter {
chunks: Vec<Box<[u8]>>,
scratch_space: Vec<u8>,
/// Chunks collected thus far.
chunks: Vec<Vec<u8>>,
/// Current in progress chunk that will be added to `chunks`.
curr: Vec<u8>,
}

impl ChunkedWriter {
/// Flushes the data collected in the scratch space if it's larger than our
/// chunking threshold.
pub fn flush(&mut self) {
if self.scratch_space.len() > spacetimedb_primitives::ROW_ITER_CHUNK_SIZE {
// We intentionally clone here so that our scratch space is not
// recreated with zero capacity (via `Vec::new`), but instead can
// be `.clear()`ed in-place and reused.
//
// This way the buffers in `chunks` are always fitted fixed-size to
// the actual data they contain, while the scratch space is ever-
// growing and has higher chance of fitting each next row without
// reallocation.
self.chunks.push(self.scratch_space.as_slice().into());
self.scratch_space.clear();
/// Flushes the data collected in the current chunk
/// if it's larger than our chunking threshold.
fn flush(&mut self, pool: &mut ChunkPool) {
if self.curr.len() > spacetimedb_primitives::ROW_ITER_CHUNK_SIZE {
let curr = mem::replace(&mut self.curr, pool.take());
self.chunks.push(curr);
}
}

/// Finalises the writer and returns all the chunks.
pub fn into_chunks(mut self) -> Vec<Box<[u8]>> {
if !self.scratch_space.is_empty() {
// Avoid extra clone by just shrinking and pushing the scratch space
// in-place.
self.chunks.push(self.scratch_space.into());
fn into_chunks(mut self) -> Vec<Vec<u8>> {
if !self.curr.is_empty() {
self.chunks.push(self.curr);
}
self.chunks
}

pub fn collect_iter(iter: impl Iterator<Item = impl ToBsatn>) -> Vec<Box<[u8]>> {
pub fn collect_iter(pool: &mut ChunkPool, iter: impl Iterator<Item = impl ToBsatn>) -> Vec<Vec<u8>> {
let mut chunked_writer = Self::default();
// Consume the iterator, serializing each `item`,
// while allowing a chunk to be created at boundaries.
for item in iter {
// Write the item directly to the BSATN `chunked_writer` buffer.
item.to_bsatn_extend(&mut chunked_writer.scratch_space).unwrap();
item.to_bsatn_extend(&mut chunked_writer.curr).unwrap();
// Flush at item boundaries.
chunked_writer.flush();
chunked_writer.flush(pool);
}

chunked_writer.into_chunks()
}
}
Expand Down Expand Up @@ -131,29 +151,6 @@ impl InstanceEnv {
Ok(gen_cols)
}

/// Deletes all rows in the table identified by `table_id`
/// where the column identified by `cols` equates to `value`.
///
/// Returns an error if no rows were deleted or if the column wasn't found.
pub fn delete_by_col_eq(&self, table_id: TableId, col_id: ColId, value: &[u8]) -> Result<u32, NodesError> {
let stdb = &*self.replica_ctx.relational_db;
let tx = &mut *self.get_tx()?;

// Interpret the `value` using the schema of the column.
let eq_value = &stdb.decode_column(tx, table_id, col_id, value)?;

// Find all rows in the table where the column data equates to `value`.
let rows_to_delete = stdb
.iter_by_col_eq_mut(tx, table_id, col_id, eq_value)?
.map(|row_ref| row_ref.pointer())
// `delete_by_field` only cares about 1 element,
// so optimize for that.
.collect::<SmallVec<[_; 1]>>();

// Delete them and count how many we deleted.
Ok(stdb.delete(tx, table_id, rows_to_delete))
}

#[tracing::instrument(skip_all)]
pub fn datastore_delete_by_btree_scan_bsatn(
&self,
Expand Down Expand Up @@ -239,53 +236,34 @@ impl InstanceEnv {
stdb.table_row_count_mut(tx, table_id).ok_or(NodesError::TableNotFound)
}

/// Finds all rows in the table identified by `table_id`
/// where the column identified by `cols` matches to `value`.
///
/// These rows are returned concatenated with each row bsatn encoded.
///
/// Matching is defined by decoding of `value` to an `AlgebraicValue`
/// according to the column's schema and then `Ord for AlgebraicValue`.
pub fn iter_by_col_eq_chunks(
#[tracing::instrument(skip_all)]
pub fn datastore_table_scan_bsatn_chunks(
&self,
pool: &mut ChunkPool,
table_id: TableId,
col_id: ColId,
value: &[u8],
) -> Result<Vec<Box<[u8]>>, NodesError> {
let stdb = &*self.replica_ctx.relational_db;
let tx = &mut *self.get_tx()?;

// Interpret the `value` using the schema of the column.
let value = &stdb.decode_column(tx, table_id, col_id, value)?;

// Find all rows in the table where the column data matches `value`.
let chunks = ChunkedWriter::collect_iter(stdb.iter_by_col_eq_mut(tx, table_id, col_id, value)?);
Ok(chunks)
}

#[tracing::instrument(skip_all)]
pub fn datastore_table_scan_bsatn_chunks(&self, table_id: TableId) -> Result<Vec<Box<[u8]>>, NodesError> {
) -> Result<Vec<Vec<u8>>, NodesError> {
let stdb = &*self.replica_ctx.relational_db;
let tx = &mut *self.tx.get()?;

let chunks = ChunkedWriter::collect_iter(stdb.iter_mut(tx, table_id)?);
let chunks = ChunkedWriter::collect_iter(pool, stdb.iter_mut(tx, table_id)?);
Ok(chunks)
}

#[tracing::instrument(skip_all)]
pub fn datastore_btree_scan_bsatn_chunks(
&self,
pool: &mut ChunkPool,
index_id: IndexId,
prefix: &[u8],
prefix_elems: ColId,
rstart: &[u8],
rend: &[u8],
) -> Result<Vec<Box<[u8]>>, NodesError> {
) -> Result<Vec<Vec<u8>>, NodesError> {
let stdb = &*self.replica_ctx.relational_db;
let tx = &mut *self.tx.get()?;

let (_, iter) = stdb.btree_scan(tx, index_id, prefix, prefix_elems, rstart, rend)?;
let chunks = ChunkedWriter::collect_iter(iter);
let chunks = ChunkedWriter::collect_iter(pool, iter);
Ok(chunks)
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/host/wasm_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ impl<I: ResourceIndex> ResourceSlab<I> {
}
}

decl_index!(RowIterIdx => std::vec::IntoIter<Box<[u8]>>);
decl_index!(RowIterIdx => std::vec::IntoIter<Vec<u8>>);
pub(super) type RowIters = ResourceSlab<RowIterIdx>;

pub(super) struct TimingSpan {
Expand Down
20 changes: 12 additions & 8 deletions crates/core/src/host/wasmtime/wasm_instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::time::Instant;

use crate::database_logger::{BacktraceFrame, BacktraceProvider, ModuleBacktrace, Record};
use crate::host::instance_env::{ChunkPool, InstanceEnv};
use crate::host::wasm_common::instrumentation;
use crate::host::wasm_common::module_host_actor::ExecutionTimings;
use crate::host::wasm_common::{
Expand All @@ -16,8 +17,6 @@ use spacetimedb_sats::bsatn;
use spacetimedb_sats::buffer::{CountWriter, TeeWriter};
use wasmtime::{AsContext, Caller, StoreContextMut};

use crate::host::instance_env::InstanceEnv;

use super::{Mem, MemView, NullableMemOp, WasmError, WasmPointee, WasmPtr};

#[cfg(not(feature = "spacetimedb-wasm-instance-env-times"))]
Expand Down Expand Up @@ -73,6 +72,9 @@ pub(super) struct WasmInstanceEnv {

/// The last, including current, reducer to be executed by this environment.
reducer_name: String,
/// A pool of unused allocated chunks that can be reused.
// TODO(Centril): consider using this pool for `console_timer_start` and `bytes_sink_write`.
chunk_pool: ChunkPool,
}

const CALL_REDUCER_ARGS_SOURCE: u32 = 1;
Expand All @@ -97,6 +99,7 @@ impl WasmInstanceEnv {
reducer_start,
call_times: CallTimes::new(),
reducer_name: String::from(""),
chunk_pool: <_>::default(),
}
}

Expand Down Expand Up @@ -399,7 +402,9 @@ impl WasmInstanceEnv {
Self::cvt_ret(caller, AbiCall::DatastoreTableScanBsatn, out, |caller| {
let env = caller.data_mut();
// Collect the iterator chunks.
let chunks = env.instance_env.datastore_table_scan_bsatn_chunks(table_id.into())?;
let chunks = env
.instance_env
.datastore_table_scan_bsatn_chunks(&mut env.chunk_pool, table_id.into())?;
// Register the iterator and get back the index to write to `out`.
// Calls to the iterator are done through dynamic dispatch.
Ok(env.iters.insert(chunks.into_iter()))
Expand Down Expand Up @@ -495,6 +500,7 @@ impl WasmInstanceEnv {

// Find the relevant rows.
let chunks = env.instance_env.datastore_btree_scan_bsatn_chunks(
&mut env.chunk_pool,
index_id.into(),
prefix,
prefix_elems,
Expand Down Expand Up @@ -574,11 +580,9 @@ impl WasmInstanceEnv {
buffer = &mut buffer[chunk.len()..];

// Advance the iterator, as we used a chunk.
// TODO(Centril): consider putting these into a pool for reuse
// by the next `ChunkedWriter::collect_iter`, `span_start`, and `bytes_sink_write`.
// Although we need to shrink these chunks to fit due to `Box<[u8]>`,
// in practice, `realloc` will in practice not move the data to a new heap allocation.
iter.next();
// SAFETY: We peeked one `chunk`, so there must be one at least.
let chunk = unsafe { iter.next().unwrap_unchecked() };
env.chunk_pool.put(chunk);
}

let ret = match (written, iter.as_slice().first()) {
Expand Down

2 comments on commit d8154e7

@github-actions
Copy link

@github-actions github-actions bot commented on d8154e7 Dec 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmarking failed. Please check the workflow run for details.

@github-actions
Copy link

@github-actions github-actions bot commented on d8154e7 Dec 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callgrind benchmark results

Callgrind Benchmark Report

These benchmarks were run using callgrind,
an instruction-level profiler. They allow comparisons between sqlite (sqlite), SpacetimeDB running through a module (stdb_module), and the underlying SpacetimeDB data storage engine (stdb_raw). Callgrind emulates a CPU to collect the below estimates.

Measurement changes larger than five percent are in bold.

In-memory benchmarks

callgrind: empty transaction

db total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw 6399 6403 -0.06% 6487 6541 -0.83%
sqlite 5609 5609 0.00% 6067 6083 -0.26%

callgrind: filter

db schema indices count preload _column data_type total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str no_index 64 128 1 u64 74695 74680 0.02% 75107 75172 -0.09%
stdb_raw u32_u64_str no_index 64 128 2 string 116921 116922 -0.00% 117487 117728 -0.20%
stdb_raw u32_u64_str btree_each_column 64 128 2 string 25087 25090 -0.01% 25545 25688 -0.56%
stdb_raw u32_u64_str btree_each_column 64 128 1 u64 24051 24055 -0.02% 24471 24603 -0.54%
sqlite u32_u64_str no_index 64 128 2 string 144415 144415 0.00% 145747 145759 -0.01%
sqlite u32_u64_str no_index 64 128 1 u64 123763 123763 0.00% 124865 124913 -0.04%
sqlite u32_u64_str btree_each_column 64 128 1 u64 131080 131080 0.00% 132510 132478 0.02%
sqlite u32_u64_str btree_each_column 64 128 2 string 134222 134222 0.00% 135822 135782 0.03%

callgrind: insert bulk

db schema indices count preload total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 64 128 812228 810725 0.19% 858980 830105 3.48%
stdb_raw u32_u64_str btree_each_column 64 128 1045501 1044485 0.10% 1075993 1070453 0.52%
sqlite u32_u64_str unique_0 64 128 399360 399360 0.00% 420554 416660 0.93%
sqlite u32_u64_str btree_each_column 64 128 984611 984611 0.00% 1021525 1027193 -0.55%

callgrind: iterate

db schema indices count total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 1024 138392 138379 0.01% 138490 138507 -0.01%
stdb_raw u32_u64_str unique_0 64 15817 15804 0.08% 15883 15912 -0.18%
sqlite u32_u64_str unique_0 1024 1042718 1042718 0.00% 1046022 1045990 0.00%
sqlite u32_u64_str unique_0 64 74704 74722 -0.02% 75722 75780 -0.08%

callgrind: serialize_product_value

count format total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
64 json 47528 47528 0.00% 50282 50316 -0.07%
64 bsatn 25509 25509 0.00% 27753 27787 -0.12%
16 bsatn 8200 8200 0.00% 9594 9628 -0.35%
16 json 12188 12188 0.00% 14160 14194 -0.24%

callgrind: update bulk

db schema indices count preload total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 1024 1024 19157478 19148592 0.05% 19797088 19673064 0.63%
stdb_raw u32_u64_str unique_0 64 128 1218994 1216617 0.20% 1291810 1252777 3.12%
sqlite u32_u64_str unique_0 1024 1024 1802137 1802137 0.00% 1811135 1811383 -0.01%
sqlite u32_u64_str unique_0 64 128 128540 128540 0.00% 131370 131336 0.03%
On-disk benchmarks

callgrind: empty transaction

db total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw 6404 6408 -0.06% 6508 6542 -0.52%
sqlite 5651 5651 0.00% 6127 6155 -0.45%

callgrind: filter

db schema indices count preload _column data_type total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str no_index 64 128 1 u64 74700 74685 0.02% 75072 75145 -0.10%
stdb_raw u32_u64_str no_index 64 128 2 string 116942 116948 -0.01% 117524 117670 -0.12%
stdb_raw u32_u64_str btree_each_column 64 128 2 string 25088 25094 -0.02% 25510 25612 -0.40%
stdb_raw u32_u64_str btree_each_column 64 128 1 u64 24056 24060 -0.02% 24444 24556 -0.46%
sqlite u32_u64_str no_index 64 128 1 u64 125684 125684 0.00% 127258 127086 0.14%
sqlite u32_u64_str no_index 64 128 2 string 146336 146336 0.00% 148040 148072 -0.02%
sqlite u32_u64_str btree_each_column 64 128 2 string 136418 136418 0.00% 138462 138366 0.07%
sqlite u32_u64_str btree_each_column 64 128 1 u64 133176 133176 0.00% 135002 134942 0.04%

callgrind: insert bulk

db schema indices count preload total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 64 128 759490 759592 -0.01% 774476 778010 -0.45%
stdb_raw u32_u64_str btree_each_column 64 128 998460 996145 0.23% 1058418 1051225 0.68%
sqlite u32_u64_str unique_0 64 128 416914 416914 0.00% 437720 433724 0.92%
sqlite u32_u64_str btree_each_column 64 128 1023158 1023176 -0.00% 1059426 1064640 -0.49%

callgrind: iterate

db schema indices count total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 1024 138397 138384 0.01% 138467 138512 -0.03%
stdb_raw u32_u64_str unique_0 64 15822 15809 0.08% 15884 15937 -0.33%
sqlite u32_u64_str unique_0 1024 1045786 1045786 0.00% 1049484 1049404 0.01%
sqlite u32_u64_str unique_0 64 76476 76476 0.00% 77754 77710 0.06%

callgrind: serialize_product_value

count format total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
64 json 47528 47528 0.00% 50282 50316 -0.07%
64 bsatn 25509 25509 0.00% 27753 27787 -0.12%
16 bsatn 8200 8200 0.00% 9594 9628 -0.35%
16 json 12188 12188 0.00% 14160 14194 -0.24%

callgrind: update bulk

db schema indices count preload total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 1024 1024 17661858 17637401 0.14% 18341242 18205673 0.74%
stdb_raw u32_u64_str unique_0 64 128 1171853 1170817 0.09% 1244577 1237195 0.60%
sqlite u32_u64_str unique_0 1024 1024 1809785 1809785 0.00% 1818439 1818479 -0.00%
sqlite u32_u64_str unique_0 64 128 132705 132687 0.01% 135691 135607 0.06%

Please sign in to comment.