diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 3b029d5db59..694bc96a64c 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -1,4 +1,4 @@ -use anyhow::{anyhow, Context}; +use anyhow::Context; use bytes::Bytes; use std::sync::Arc; use std::time::Duration; @@ -6,7 +6,6 @@ use std::time::Duration; use spacetimedb_lib::buffer::DecodeError; use spacetimedb_lib::identity::AuthCtx; use spacetimedb_lib::{bsatn, Address, ModuleDef, ModuleValidationError, TableDesc}; -use spacetimedb_vm::expr::CrudExpr; use super::instrumentation::CallTimes; use crate::database_instance_context::DatabaseInstanceContext; @@ -278,20 +277,11 @@ impl Module for WasmModuleHostActor { log::debug!("One-off query: {query}"); // Don't need the `slow query` logger on compilation let ctx = &ExecutionContext::sql(db.address(), db.read_config().slow_query); - let compiled: Vec<_> = db.with_read_only(ctx, |tx| { + db.with_read_only(ctx, |tx| { let ast = sql::compiler::compile_sql(db, tx, &query)?; - ast.into_iter() - .map(|expr| { - if matches!(expr, CrudExpr::Query { .. }) { - Ok(expr) - } else { - Err(anyhow!("One-off queries are not allowed to modify the database")) - } - }) - .collect::>() - })?; - - sql::execute::execute_sql(db, &query, compiled, auth) + sql::execute::execute_sql_tx(db, tx, &query, ast, auth) + .and_then(|res| Ok(res.context("One-off queries are not allowed to modify the database")?)) + }) } fn clear_table(&self, table_name: &str) -> Result<(), anyhow::Error> { diff --git a/crates/core/src/sql/execute.rs b/crates/core/src/sql/execute.rs index 78c659a9d6d..3591df8ab70 100644 --- a/crates/core/src/sql/execute.rs +++ b/crates/core/src/sql/execute.rs @@ -5,6 +5,7 @@ use crate::error::DBError; use crate::execution_context::ExecutionContext; use crate::util::slow::SlowQueryLogger; use crate::vm::{DbProgram, TxMode}; +use itertools::Either; use spacetimedb_lib::identity::AuthCtx; use spacetimedb_lib::relation::FieldName; use spacetimedb_lib::{ProductType, ProductValue}; @@ -40,23 +41,23 @@ pub fn ctx_sql(db: &RelationalDB) -> ExecutionContext { ExecutionContext::sql(db.address(), db.read_config().slow_query) } +fn execute(p: &mut DbProgram<'_, '_>, ast: Vec) -> Result, DBError> { + let mut result = Vec::with_capacity(ast.len()); + let query = Expr::Block(ast.into_iter().map(|x| Expr::Crud(Box::new(x))).collect()); + // SQL queries can never reference `MemTable`s, so pass an empty `SourceSet`. + collect_result(&mut result, run_ast(p, query, [].into()).into())?; + Ok(result) +} + /// Run the compiled `SQL` expression inside the `vm` created by [DbProgram] /// /// Evaluates `ast` and accordingly triggers mutable or read tx to execute /// /// Also, in case the execution takes more than x, log it as `slow query` pub fn execute_sql(db: &RelationalDB, sql: &str, ast: Vec, auth: AuthCtx) -> Result, DBError> { - fn execute(p: &mut DbProgram<'_, '_>, ast: Vec) -> Result, DBError> { - let mut result = Vec::with_capacity(ast.len()); - let query = Expr::Block(ast.into_iter().map(|x| Expr::Crud(Box::new(x))).collect()); - // SQL queries can never reference `MemTable`s, so pass an empty `SourceSet`. - collect_result(&mut result, run_ast(p, query, [].into()).into())?; - Ok(result) - } - let ctx = ctx_sql(db); - let slow_logger = SlowQueryLogger::query(&ctx, sql); - let result = if CrudExpr::is_reads(&ast) { + let _slow_logger = SlowQueryLogger::query(&ctx, sql).log_guard(); + if CrudExpr::is_reads(&ast) { db.with_read_only(&ctx, |tx| { execute(&mut DbProgram::new(&ctx, db, &mut TxMode::Tx(tx), auth), ast) }) @@ -64,16 +65,54 @@ pub fn execute_sql(db: &RelationalDB, sql: &str, ast: Vec, auth: AuthC db.with_auto_commit(&ctx, |mut_tx| { execute(&mut DbProgram::new(&ctx, db, &mut mut_tx.into(), auth), ast) }) - }?; - slow_logger.log(); + } +} - Ok(result) +/// Like [`execute_sql`], but for providing your own `tx`. +/// +/// Returns None if you pass a mutable query with an immutable tx. +pub fn execute_sql_tx<'a>( + db: &RelationalDB, + tx: impl Into>, + sql: &str, + ast: Vec, + auth: AuthCtx, +) -> Result>, DBError> { + let mut tx = tx.into(); + + if matches!(tx, TxMode::Tx(_)) && !CrudExpr::is_reads(&ast) { + return Ok(None); + } + + let ctx = ctx_sql(db); + let _slow_logger = SlowQueryLogger::query(&ctx, sql).log_guard(); + execute(&mut DbProgram::new(&ctx, db, &mut tx, auth), ast).map(Some) } /// Run the `SQL` string using the `auth` credentials pub fn run(db: &RelationalDB, sql_text: &str, auth: AuthCtx) -> Result, DBError> { - let ast = db.with_read_only(&ctx_sql(db), |tx| compile_sql(db, tx, sql_text))?; - execute_sql(db, sql_text, ast, auth) + let ctx = ctx_sql(db); + let _slow_logger = SlowQueryLogger::query(&ctx, sql_text).log_guard(); + let result = db.with_read_only(&ctx, |tx| { + let ast = compile_sql(db, tx, sql_text)?; + if CrudExpr::is_reads(&ast) { + let result = execute(&mut DbProgram::new(&ctx, db, &mut TxMode::Tx(tx), auth), ast)?; + Ok::<_, DBError>(Either::Left(result)) + } else { + // hehe. right. write. + Ok(Either::Right(ast)) + } + })?; + match result { + Either::Left(result) => Ok(result), + // TODO: this should perhaps be an upgradable_read upgrade? or we should try + // and figure out if we can detect the mutablility of the query before we take + // the tx? once we have migrations we probably don't want to have stale + // sql queries after a database schema have been updated. + Either::Right(ast) => db.with_auto_commit(&ctx, |tx| { + execute(&mut DbProgram::new(&ctx, db, &mut tx.into(), auth), ast) + }), + } } /// Translates a `FieldName` to the field's name. diff --git a/crates/core/src/subscription/execution_unit.rs b/crates/core/src/subscription/execution_unit.rs index bb20992b167..543b5d6e916 100644 --- a/crates/core/src/subscription/execution_unit.rs +++ b/crates/core/src/subscription/execution_unit.rs @@ -261,10 +261,9 @@ impl ExecutionUnit { convert: impl FnMut(RelValue<'_>) -> T, ) -> Result, DBError> { let tx: TxMode = tx.into(); - let slow_query = SlowQueryLogger::subscription(ctx, sql); + let _slow_query = SlowQueryLogger::subscription(ctx, sql).log_guard(); let query = build_query(ctx, db, &tx, eval_plan, &mut NoInMemUsed)?; let ops = query.collect_vec(convert)?; - slow_query.log(); Ok(ops) } @@ -277,12 +276,11 @@ impl ExecutionUnit { sql: &'a str, tables: impl 'a + Clone + Iterator, ) -> Result>, DBError> { - let slow_query = SlowQueryLogger::incremental_updates(ctx, sql); + let _slow_query = SlowQueryLogger::incremental_updates(ctx, sql).log_guard(); let updates = match &self.eval_incr_plan { EvalIncrPlan::Select(plan) => Self::eval_incr_query_expr(ctx, db, tx, tables, plan, self.return_table())?, EvalIncrPlan::Semijoin(plan) => plan.eval(ctx, db, tx, tables)?, }; - slow_query.log(); Ok(updates.has_updates().then(|| DatabaseTableUpdateRelValue { table_id: self.return_table(), diff --git a/crates/core/src/util/slow.rs b/crates/core/src/util/slow.rs index 0998b477ca6..b39816b7441 100644 --- a/crates/core/src/util/slow.rs +++ b/crates/core/src/util/slow.rs @@ -81,16 +81,21 @@ impl<'a> SlowQueryLogger<'a> { Self::new(sql, &ctx.slow_query_config.incremental_updates, ctx.workload()) } + pub fn log_guard(self) -> impl Drop + 'a { + scopeguard::guard(self, |logger| { + logger.log(); + }) + } + /// Log as `tracing::warn!` the query if it exceeds the threshold. pub fn log(&self) -> Option { - if let Some((start, threshold)) = self.start.zip(self.threshold.as_ref()) { + if let Some((start, threshold)) = self.start.zip(*self.threshold) { let elapsed = start.elapsed(); - if &elapsed > threshold { - let workload = self.workload.as_ref(); - tracing::warn!(?workload, ?threshold, ?elapsed, ?self.sql, "SLOW QUERY"); + if elapsed > threshold { + tracing::warn!(workload = %self.workload, ?threshold, ?elapsed, sql = ?self.sql, "SLOW QUERY"); return Some(elapsed); } - }; + } None } } diff --git a/crates/core/src/vm.rs b/crates/core/src/vm.rs index 58f51d9c5af..a2d33a856c2 100644 --- a/crates/core/src/vm.rs +++ b/crates/core/src/vm.rs @@ -53,6 +53,12 @@ impl<'a> From<&'a Tx> for TxMode<'a> { } } +impl<'a> From<&'a mut Tx> for TxMode<'a> { + fn from(tx: &'a mut Tx) -> Self { + TxMode::Tx(tx) + } +} + fn bound_is_satisfiable(lower: &Bound, upper: &Bound) -> bool { match (lower, upper) { (Bound::Excluded(lower), Bound::Excluded(upper)) if lower >= upper => false,