Skip to content

Commit

Permalink
Put ExecutionContext inside the Tx context (#1876)
Browse files Browse the repository at this point in the history
Co-authored-by: joshua-spacetime <[email protected]>
  • Loading branch information
mamcx and joshua-spacetime authored Oct 29, 2024
1 parent 7de743d commit a6a6c4f
Show file tree
Hide file tree
Showing 29 changed files with 793 additions and 1,077 deletions.
30 changes: 10 additions & 20 deletions crates/bench/benches/subscription.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use spacetimedb::error::DBError;
use spacetimedb::execution_context::ExecutionContext;
use spacetimedb::execution_context::Workload;
use spacetimedb::host::module_host::DatabaseTableUpdate;
use spacetimedb::identity::AuthCtx;
use spacetimedb::messages::websocket::BsatnFormat;
Expand Down Expand Up @@ -52,9 +52,10 @@ fn eval(c: &mut Criterion) {
let lhs = create_table_footprint(&raw.db).unwrap();
let rhs = create_table_location(&raw.db).unwrap();

//TODO: Change this to `Workload::ForTest` once `#[cfg(bench)]` is stabilized.
let _ = raw
.db
.with_auto_commit(&ExecutionContext::default(), |tx| -> Result<(), DBError> {
.with_auto_commit(Workload::Internal, |tx| -> Result<(), DBError> {
// 1M rows
for entity_id in 0u64..1_000_000 {
let owner = entity_id % 1_000;
Expand All @@ -67,7 +68,7 @@ fn eval(c: &mut Criterion) {

let _ = raw
.db
.with_auto_commit(&ExecutionContext::default(), |tx| -> Result<(), DBError> {
.with_auto_commit(Workload::Internal, |tx| -> Result<(), DBError> {
// 1000 chunks, 1200 rows per chunk = 1.2M rows
for chunk_index in 0u64..1_000 {
for i in 0u64..1200 {
Expand Down Expand Up @@ -100,13 +101,12 @@ fn eval(c: &mut Criterion) {

let bench_eval = |c: &mut Criterion, name, sql| {
c.bench_function(name, |b| {
let tx = raw.db.begin_tx();
let tx = raw.db.begin_tx(Workload::Update);
let query = compile_read_only_query(&raw.db, &AuthCtx::for_testing(), &tx, sql).unwrap();
let query: ExecutionSet = query.into();
let ctx = &ExecutionContext::subscribe(raw.db.database_identity());

b.iter(|| {
drop(black_box(query.eval::<BsatnFormat>(
ctx,
&raw.db,
&tx,
None,
Expand Down Expand Up @@ -134,25 +134,19 @@ fn eval(c: &mut Criterion) {
);
bench_eval(c, "full-join", &name);

let ctx_incr = &ExecutionContext::incremental_update(raw.db.database_identity());

// To profile this benchmark for 30s
// samply record -r 10000000 cargo bench --bench=subscription --profile=profiling -- incr-select --exact --profile-time=30
c.bench_function("incr-select", |b| {
// A passthru executed independently of the database.
let select_lhs = "select * from footprint";
let select_rhs = "select * from location";
let tx = &raw.db.begin_tx();
let tx = &raw.db.begin_tx(Workload::Update);
let query_lhs = compile_read_only_query(&raw.db, &AuthCtx::for_testing(), tx, select_lhs).unwrap();
let query_rhs = compile_read_only_query(&raw.db, &AuthCtx::for_testing(), tx, select_rhs).unwrap();
let query = ExecutionSet::from_iter(query_lhs.into_iter().chain(query_rhs));
let tx = &tx.into();

b.iter(|| {
drop(black_box(
query.eval_incr_for_test(ctx_incr, &raw.db, tx, &update, None),
))
})
b.iter(|| drop(black_box(query.eval_incr_for_test(&raw.db, tx, &update, None))))
});

// To profile this benchmark for 30s
Expand All @@ -165,16 +159,12 @@ fn eval(c: &mut Criterion) {
from footprint join location on footprint.entity_id = location.entity_id \
where location.chunk_index = {chunk_index}"
);
let tx = &raw.db.begin_tx();
let tx = &raw.db.begin_tx(Workload::Update);
let query = compile_read_only_query(&raw.db, &AuthCtx::for_testing(), tx, &join).unwrap();
let query: ExecutionSet = query.into();
let tx = &tx.into();

b.iter(|| {
drop(black_box(
query.eval_incr_for_test(ctx_incr, &raw.db, tx, &update, None),
))
});
b.iter(|| drop(black_box(query.eval_incr_for_test(&raw.db, tx, &update, None))));
});

// To profile this benchmark for 30s
Expand Down
32 changes: 14 additions & 18 deletions crates/bench/src/spacetime_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
ResultBench,
};
use spacetimedb::db::relational_db::{tests_utils::TestDB, RelationalDB};
use spacetimedb::execution_context::ExecutionContext;
use spacetimedb::execution_context::Workload;
use spacetimedb_lib::sats::AlgebraicValue;
use spacetimedb_primitives::{ColId, IndexId, TableId};
use spacetimedb_schema::{
Expand Down Expand Up @@ -40,7 +40,7 @@ impl BenchDatabase for SpacetimeRaw {

fn create_table<T: BenchTable>(&mut self, index_strategy: IndexStrategy) -> ResultBench<Self::TableId> {
let name = table_name::<T>(index_strategy);
self.db.with_auto_commit(&ExecutionContext::default(), |tx| {
self.db.with_auto_commit(Workload::Internal, |tx| {
let mut table_schema = TableSchema::from_product_type(T::product_type());
table_schema.table_name = name.clone().into();
let table_id = self.db.create_table(tx, table_schema)?;
Expand Down Expand Up @@ -84,25 +84,24 @@ impl BenchDatabase for SpacetimeRaw {
}

fn clear_table(&mut self, table_id: &Self::TableId) -> ResultBench<()> {
self.db.with_auto_commit(&ExecutionContext::default(), |tx| {
self.db.with_auto_commit(Workload::Internal, |tx| {
self.db.clear_table(tx, *table_id)?;
Ok(())
})
}

fn count_table(&mut self, table_id: &Self::TableId) -> ResultBench<u32> {
let ctx = ExecutionContext::default();
self.db.with_auto_commit(&ctx, |tx| {
Ok(self.db.iter_mut(&ctx, tx, *table_id)?.map(|_| 1u32).sum())
self.db.with_auto_commit(Workload::Internal, |tx| {
Ok(self.db.iter_mut(tx, *table_id)?.map(|_| 1u32).sum())
})
}

fn empty_transaction(&mut self) -> ResultBench<()> {
self.db.with_auto_commit(&ExecutionContext::default(), |_tx| Ok(()))
self.db.with_auto_commit(Workload::Internal, |_tx| Ok(()))
}

fn insert_bulk<T: BenchTable>(&mut self, table_id: &Self::TableId, rows: Vec<T>) -> ResultBench<()> {
self.db.with_auto_commit(&ExecutionContext::default(), |tx| {
self.db.with_auto_commit(Workload::Internal, |tx| {
for row in rows {
self.db.insert(tx, *table_id, row.into_product_value())?;
}
Expand All @@ -111,11 +110,10 @@ impl BenchDatabase for SpacetimeRaw {
}

fn update_bulk<T: BenchTable>(&mut self, table_id: &Self::TableId, row_count: u32) -> ResultBench<()> {
let ctx = ExecutionContext::default();
self.db.with_auto_commit(&ctx, |tx| {
self.db.with_auto_commit(Workload::Internal, |tx| {
let rows = self
.db
.iter_mut(&ctx, tx, *table_id)?
.iter_mut(tx, *table_id)?
.take(row_count as usize)
.map(|row| row.to_product_value())
.collect::<Vec<_>>();
Expand All @@ -127,7 +125,7 @@ impl BenchDatabase for SpacetimeRaw {
// (update_by_{field} -> spacetimedb::query::update_by_field -> (delete_by_col_eq; insert))
let id = self
.db
.iter_by_col_eq_mut(&ctx, tx, *table_id, 0, &row.elements[0])?
.iter_by_col_eq_mut(tx, *table_id, 0, &row.elements[0])?
.next()
.expect("failed to find row during update!")
.pointer();
Expand All @@ -152,9 +150,8 @@ impl BenchDatabase for SpacetimeRaw {
}

fn iterate(&mut self, table_id: &Self::TableId) -> ResultBench<()> {
let ctx = ExecutionContext::default();
self.db.with_auto_commit(&ctx, |tx| {
for row in self.db.iter_mut(&ctx, tx, *table_id)? {
self.db.with_auto_commit(Workload::Internal, |tx| {
for row in self.db.iter_mut(tx, *table_id)? {
black_box(row);
}
Ok(())
Expand All @@ -167,9 +164,8 @@ impl BenchDatabase for SpacetimeRaw {
col_id: impl Into<ColId>,
value: AlgebraicValue,
) -> ResultBench<()> {
let ctx = ExecutionContext::default();
self.db.with_auto_commit(&ctx, |tx| {
for row in self.db.iter_by_col_eq_mut(&ctx, tx, *table_id, col_id, &value)? {
self.db.with_auto_commit(Workload::Internal, |tx| {
for row in self.db.iter_by_col_eq_mut(tx, *table_id, col_id, &value)? {
black_box(row);
}
Ok(())
Expand Down
5 changes: 3 additions & 2 deletions crates/client-api/src/routes/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use serde::Deserialize;
use serde_json::{json, Value};
use spacetimedb::address::Address;
use spacetimedb::database_logger::DatabaseLogger;
use spacetimedb::execution_context::Workload;
use spacetimedb::host::ReducerCallError;
use spacetimedb::host::ReducerOutcome;
use spacetimedb::host::{DescribedEntityType, UpdateDatabaseResult};
Expand All @@ -24,7 +25,7 @@ use spacetimedb::identity::Identity;
use spacetimedb::json::client_api::StmtResultJson;
use spacetimedb::messages::control_db::{Database, HostType, Replica};
use spacetimedb::sql;
use spacetimedb::sql::execute::{ctx_sql, translate_col};
use spacetimedb::sql::execute::translate_col;
use spacetimedb_client_api_messages::name::{self, DnsLookupResponse, DomainName, PublishOp, PublishResult};
use spacetimedb_data_structures::map::HashMap;
use spacetimedb_lib::address::AddressForUrl;
Expand Down Expand Up @@ -557,7 +558,7 @@ where
}
})?;

let json = db.with_read_only(&ctx_sql(db), |tx| {
let json = db.with_read_only(Workload::Sql, |tx| {
results
.into_iter()
.map(|result| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,26 +77,25 @@ impl StateView for CommittedState {
self.get_table(table_id).map(|table| table.row_count)
}

fn iter<'a>(&'a self, ctx: &'a ExecutionContext, table_id: TableId) -> Result<Iter<'a>> {
fn iter(&self, table_id: TableId) -> Result<Iter<'_>> {
if let Some(table_name) = self.table_name(table_id) {
return Ok(Iter::new(ctx, table_id, table_name, None, self));
return Ok(Iter::new(table_id, table_name, None, self));
}
Err(TableError::IdNotFound(SystemTable::st_table, table_id.0).into())
}
/// Returns an iterator,
/// yielding every row in the table identified by `table_id`,
/// where the values of `cols` are contained in `range`.
fn iter_by_col_range<'a, R: RangeBounds<AlgebraicValue>>(
&'a self,
ctx: &'a ExecutionContext,
fn iter_by_col_range<R: RangeBounds<AlgebraicValue>>(
&self,
table_id: TableId,
cols: ColList,
range: R,
) -> Result<IterByColRange<'a, R>> {
) -> Result<IterByColRange<'_, R>> {
// TODO: Why does this unconditionally return a `Scan` iter,
// instead of trying to return a `CommittedIndex` iter?
Ok(IterByColRange::Scan(ScanIterByColRange::new(
self.iter(ctx, table_id)?,
self.iter(table_id)?,
cols,
range,
)))
Expand Down Expand Up @@ -274,7 +273,7 @@ impl CommittedState {
with_label_values(ST_SEQUENCE_ID, ST_SEQUENCE_NAME).inc();
}

self.reset_system_table_schemas(database_identity)?;
self.reset_system_table_schemas()?;

Ok(())
}
Expand All @@ -286,12 +285,11 @@ impl CommittedState {
/// for objects like indexes and constraints
/// which are computed at insert-time,
/// and therefore not included in the hardcoded schemas.
pub(super) fn reset_system_table_schemas(&mut self, database_identity: Identity) -> Result<()> {
pub(super) fn reset_system_table_schemas(&mut self) -> Result<()> {
// Re-read the schema with the correct ids...
let ctx = ExecutionContext::internal(database_identity);
for schema in system_tables() {
self.tables.get_mut(&schema.table_id).unwrap().schema =
Arc::new(self.schema_for_table_raw(&ctx, schema.table_id)?);
Arc::new(self.schema_for_table_raw(schema.table_id)?);
}

Ok(())
Expand Down Expand Up @@ -384,7 +382,7 @@ impl CommittedState {
// For already built tables, we need to reschema them to account for constraints et al.
let mut schemas = Vec::with_capacity(self.tables.len());
for table_id in self.tables.keys().copied() {
schemas.push(self.schema_for_table_raw(&ExecutionContext::default(), table_id)?);
schemas.push(self.schema_for_table_raw(table_id)?);
}
for (table, schema) in self.tables.values_mut().zip(schemas) {
table.with_mut_schema(|s| *s = schema);
Expand All @@ -407,7 +405,7 @@ impl CommittedState {

// Construct their schemas and insert tables for them.
for table_id in table_ids {
let schema = self.schema_for_table(&ExecutionContext::default(), table_id)?;
let schema = self.schema_for_table(table_id)?;
self.tables.insert(table_id, Self::make_table(schema));
}
Ok(())
Expand Down Expand Up @@ -467,12 +465,7 @@ impl CommittedState {
// Note that this may change in the future: some analytics and/or
// timetravel queries may benefit from seeing all inputs, even if
// the database state did not change.
tx_data.inserts().any(|(_, inserted_rows)| !inserted_rows.is_empty())
|| tx_data.deletes().any(|(_, deleted_rows)| !deleted_rows.is_empty())
|| matches!(
ctx.reducer_context().map(|rcx| rcx.name.strip_prefix("__identity_")),
Some(Some("connected__" | "disconnected__"))
)
tx_data.has_rows_or_connect_disconnect(ctx.reducer_context())
}

pub(super) fn merge(&mut self, tx_state: TxState, ctx: &ExecutionContext) -> TxData {
Expand Down Expand Up @@ -636,8 +629,6 @@ impl CommittedState {
}

pub struct CommittedIndexIter<'a> {
#[allow(dead_code)]
ctx: &'a ExecutionContext,
table_id: TableId,
tx_state: Option<&'a TxState>,
#[allow(dead_code)]
Expand All @@ -648,14 +639,12 @@ pub struct CommittedIndexIter<'a> {

impl<'a> CommittedIndexIter<'a> {
pub(super) fn new(
ctx: &'a ExecutionContext,
table_id: TableId,
tx_state: Option<&'a TxState>,
committed_state: &'a CommittedState,
committed_rows: IndexScanIter<'a>,
) -> Self {
Self {
ctx,
table_id,
tx_state,
committed_state,
Expand Down
Loading

2 comments on commit a6a6c4f

@github-actions
Copy link

@github-actions github-actions bot commented on a6a6c4f Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmarking failed. Please check the workflow run for details.

@github-actions
Copy link

@github-actions github-actions bot commented on a6a6c4f Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callgrind benchmark results

Callgrind Benchmark Report

These benchmarks were run using callgrind,
an instruction-level profiler. They allow comparisons between sqlite (sqlite), SpacetimeDB running through a module (stdb_module), and the underlying SpacetimeDB data storage engine (stdb_raw). Callgrind emulates a CPU to collect the below estimates.

Measurement changes larger than five percent are in bold.

In-memory benchmarks

callgrind: empty transaction

db total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw 6890 6512 5.80% 6982 6550 6.60%
sqlite 5579 5579 0.00% 6013 6043 -0.50%

callgrind: filter

db schema indices count preload _column data_type total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str no_index 64 128 1 u64 77086 76708 0.49% 77702 77090 0.79%
stdb_raw u32_u64_str no_index 64 128 2 string 119585 119206 0.32% 120467 119756 0.59%
stdb_raw u32_u64_str btree_each_column 64 128 2 string 25594 25223 1.47% 26312 25641 2.62%
stdb_raw u32_u64_str btree_each_column 64 128 1 u64 24543 24191 1.46% 25147 24557 2.40%
sqlite u32_u64_str no_index 64 128 2 string 144701 144695 0.00% 146227 146049 0.12%
sqlite u32_u64_str no_index 64 128 1 u64 124050 124044 0.00% 125366 125234 0.11%
sqlite u32_u64_str btree_each_column 64 128 1 u64 131361 131361 0.00% 132851 132739 0.08%
sqlite u32_u64_str btree_each_column 64 128 2 string 134494 134494 0.00% 136042 136196 -0.11%

callgrind: insert bulk

db schema indices count preload total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 64 128 878676 903492 -2.75% 927284 923522 0.41%
stdb_raw u32_u64_str btree_each_column 64 128 1027906 1053119 -2.39% 1084358 1081827 0.23%
sqlite u32_u64_str unique_0 64 128 398320 398320 0.00% 418312 417240 0.26%
sqlite u32_u64_str btree_each_column 64 128 983643 983637 0.00% 1025189 1031623 -0.62%

callgrind: iterate

db schema indices count total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 1024 154220 153832 0.25% 154306 153882 0.28%
stdb_raw u32_u64_str unique_0 64 17243 16857 2.29% 17313 16907 2.40%
sqlite u32_u64_str unique_0 1024 1067255 1067261 -0.00% 1070527 1070645 -0.01%
sqlite u32_u64_str unique_0 64 76201 76201 0.00% 77179 77193 -0.02%

callgrind: serialize_product_value

count format total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
64 json 47528 47528 0.00% 50252 50180 0.14%
64 bsatn 25509 25509 0.00% 27787 27753 0.12%
16 bsatn 8200 8200 0.00% 9594 9560 0.36%
16 json 12188 12188 0.00% 14160 14092 0.48%

callgrind: update bulk

db schema indices count preload total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 1024 1024 20303386 20799068 -2.38% 20957684 21347292 -1.83%
stdb_raw u32_u64_str unique_0 64 128 1287287 1316444 -2.21% 1332165 1356344 -1.78%
sqlite u32_u64_str unique_0 1024 1024 1802188 1802182 0.00% 1811532 1811436 0.01%
sqlite u32_u64_str unique_0 64 128 128534 128528 0.00% 131396 131308 0.07%
On-disk benchmarks

callgrind: empty transaction

db total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw 6895 6517 5.80% 7003 6555 6.83%
sqlite 5621 5631 -0.18% 6119 6189 -1.13%

callgrind: filter

db schema indices count preload _column data_type total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str no_index 64 128 1 u64 77091 76713 0.49% 77687 77059 0.81%
stdb_raw u32_u64_str no_index 64 128 2 string 119590 119211 0.32% 120396 119733 0.55%
stdb_raw u32_u64_str btree_each_column 64 128 2 string 25580 25231 1.38% 26210 25681 2.06%
stdb_raw u32_u64_str btree_each_column 64 128 1 u64 24548 24196 1.45% 25120 24534 2.39%
sqlite u32_u64_str no_index 64 128 1 u64 125965 125965 0.00% 127443 127615 -0.13%
sqlite u32_u64_str no_index 64 128 2 string 146616 146634 -0.01% 148340 148456 -0.08%
sqlite u32_u64_str btree_each_column 64 128 2 string 136616 136616 0.00% 138694 138824 -0.09%
sqlite u32_u64_str btree_each_column 64 128 1 u64 133457 133457 0.00% 135305 135409 -0.08%

callgrind: insert bulk

db schema indices count preload total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 64 128 828195 853833 -3.00% 876439 904001 -3.05%
stdb_raw u32_u64_str btree_each_column 64 128 979212 1004721 -2.54% 1035648 1064765 -2.73%
sqlite u32_u64_str unique_0 64 128 415857 415857 0.00% 435139 434015 0.26%
sqlite u32_u64_str btree_each_column 64 128 1021908 1021898 0.00% 1062652 1068362 -0.53%

callgrind: iterate

db schema indices count total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 1024 154225 153837 0.25% 154303 153883 0.27%
stdb_raw u32_u64_str unique_0 64 17248 16862 2.29% 17314 16908 2.40%
sqlite u32_u64_str unique_0 1024 1070323 1070323 0.00% 1074169 1074151 0.00%
sqlite u32_u64_str unique_0 64 77973 77973 0.00% 79259 79257 0.00%

callgrind: serialize_product_value

count format total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
64 json 47528 47528 0.00% 50252 50180 0.14%
64 bsatn 25509 25509 0.00% 27787 27753 0.12%
16 bsatn 8200 8200 0.00% 9594 9560 0.36%
16 json 12188 12188 0.00% 14160 14092 0.48%

callgrind: update bulk

db schema indices count preload total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 1024 1024 19038620 19501618 -2.37% 19749276 20099522 -1.74%
stdb_raw u32_u64_str unique_0 64 128 1240771 1268793 -2.21% 1315919 1336789 -1.56%
sqlite u32_u64_str unique_0 1024 1024 1809743 1809761 -0.00% 1818449 1818487 -0.00%
sqlite u32_u64_str unique_0 64 128 132654 132654 0.00% 135586 135598 -0.01%

Please sign in to comment.