From 558822e9eda3975d6da385e5a95a882a611105d6 Mon Sep 17 00:00:00 2001 From: Noa Date: Wed, 28 Feb 2024 00:14:30 -0600 Subject: [PATCH] Parallelize QuerySet::eval (#891) * Parallelize QuerySet::eval * Reduce number of arguments for make_actor --- crates/core/src/host/host_controller.rs | 100 +++--------- crates/core/src/host/module_host.rs | 28 ++-- .../src/host/wasm_common/module_host_actor.rs | 23 +-- crates/core/src/host/wasmtime/mod.rs | 19 +-- crates/core/src/module_host_context.rs | 11 ++ crates/core/src/startup.rs | 80 +++++++++- .../subscription/module_subscription_actor.rs | 7 +- crates/core/src/subscription/subscription.rs | 147 +++++++++--------- crates/core/src/util/mod.rs | 16 ++ crates/standalone/src/subcommands/start.rs | 2 +- 10 files changed, 236 insertions(+), 197 deletions(-) diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index abf21c3d81a..ce5e7f53384 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -2,7 +2,8 @@ use crate::energy::{EnergyMonitor, EnergyQuanta, NullEnergyMonitor}; use crate::hash::hash_bytes; use crate::host; use crate::messages::control_db::HostType; -use crate::module_host_context::ModuleHostContext; +use crate::module_host_context::{ModuleCreationContext, ModuleHostContext}; +use crate::util::spawn_rayon; use anyhow::Context; use parking_lot::Mutex; use serde::Serialize; @@ -12,77 +13,13 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use super::module_host::{Catalog, EntityDef, EventStatus, ModuleHost, NoSuchModule, UpdateDatabaseResult}; -use super::scheduler::SchedulerStarter; use super::ReducerArgs; pub struct HostController { modules: Mutex>, - threadpool: Arc, pub energy_monitor: Arc, } -pub struct HostThreadpool { - inner: rayon_core::ThreadPool, -} - -/// A Rayon [spawn_handler](https://docs.rs/rustc-rayon-core/latest/rayon_core/struct.ThreadPoolBuilder.html#method.spawn_handler) -/// which enters the given Tokio runtime at thread startup, -/// so that the Rayon workers can send along async channels. -/// -/// Other than entering the `rt`, this spawn handler behaves identitically to the default Rayon spawn handler, -/// as documented in -/// https://docs.rs/rustc-rayon-core/0.5.0/rayon_core/struct.ThreadPoolBuilder.html#method.spawn_handler -/// -/// Having Rayon threads block on async operations is a code smell. -/// We need to be careful that the Rayon threads never actually block, -/// i.e. that every async operation they invoke immediately completes. -/// I (pgoldman 2024-02-22) believe that our Rayon threads only ever send to unbounded channels, -/// and therefore never wait. -fn thread_spawn_handler(rt: tokio::runtime::Handle) -> impl FnMut(rayon::ThreadBuilder) -> Result<(), std::io::Error> { - move |thread| { - let rt = rt.clone(); - let mut builder = std::thread::Builder::new(); - if let Some(name) = thread.name() { - builder = builder.name(name.to_owned()); - } - if let Some(stack_size) = thread.stack_size() { - builder = builder.stack_size(stack_size); - } - builder.spawn(move || { - let _rt_guard = rt.enter(); - thread.run() - })?; - Ok(()) - } -} - -impl HostThreadpool { - fn new() -> Self { - let inner = rayon_core::ThreadPoolBuilder::new() - .thread_name(|_idx| "rayon-worker".to_string()) - .spawn_handler(thread_spawn_handler(tokio::runtime::Handle::current())) - // TODO(perf, pgoldman 2024-02-22): - // in the case where we have many modules running many reducers, - // we'll wind up with Rayon threads competing with each other and with Tokio threads - // for CPU time. - // - // We should investigate creating two separate CPU pools, - // possibly via https://docs.rs/nix/latest/nix/sched/fn.sched_setaffinity.html, - // and restricting Tokio threads to one CPU pool - // and Rayon threads to the other. - // Then we should give Tokio and Rayon each a number of worker threads - // equal to the size of their pool. - .num_threads(std::thread::available_parallelism().unwrap().get()) - .build() - .unwrap(); - Self { inner } - } - - pub fn spawn(&self, f: impl FnOnce() + Send + 'static) { - self.inner.spawn(f) - } -} - #[derive(PartialEq, Eq, Hash, Copy, Clone, Serialize, Debug)] pub enum DescribedEntityType { Table, @@ -163,7 +100,6 @@ impl HostController { pub fn new(energy_monitor: Arc) -> Self { Self { modules: Mutex::new(HashMap::new()), - threadpool: Arc::new(HostThreadpool::new()), energy_monitor, } } @@ -233,36 +169,38 @@ impl HostController { /// impact the logic of applications. The idea being that if SpacetimeDB is a distributed operating /// system, the applications will expect to be called when they are scheduled to be called regardless /// of whether the OS has been restarted. - pub async fn spawn_module_host(&self, module_host_context: ModuleHostContext) -> Result { - let key = module_host_context.dbic.database_instance_id; - - let (module_host, start_scheduler) = self.make_module_host(module_host_context)?; + pub async fn spawn_module_host(&self, mhc: ModuleHostContext) -> Result { + let key = mhc.dbic.database_instance_id; + + let mcc = ModuleCreationContext { + dbic: mhc.dbic, + scheduler: mhc.scheduler, + program_hash: hash_bytes(&mhc.program_bytes), + program_bytes: mhc.program_bytes, + energy_monitor: self.energy_monitor.clone(), + }; + let module_host = spawn_rayon(move || Self::make_module_host(mhc.host_type, mcc)).await?; let old_module = self.modules.lock().insert(key, module_host.clone()); if let Some(old_module) = old_module { old_module.exit().await } module_host.start(); - start_scheduler.start(&module_host)?; + mhc.scheduler_starter.start(&module_host)?; Ok(module_host) } - fn make_module_host(&self, mhc: ModuleHostContext) -> anyhow::Result<(ModuleHost, SchedulerStarter)> { - let module_hash = hash_bytes(&mhc.program_bytes); - let (threadpool, energy_monitor) = (self.threadpool.clone(), self.energy_monitor.clone()); - let module_host = match mhc.host_type { + fn make_module_host(host_type: HostType, mcc: ModuleCreationContext) -> anyhow::Result { + let module_host = match host_type { HostType::Wasm => { - // make_actor with block_in_place since it's going to take some time to compute. let start = Instant::now(); - let actor = tokio::task::block_in_place(|| { - host::wasmtime::make_actor(mhc.dbic, module_hash, &mhc.program_bytes, mhc.scheduler, energy_monitor) - })?; + let actor = host::wasmtime::make_actor(mcc)?; log::trace!("wasmtime::make_actor blocked for {:?}", start.elapsed()); - ModuleHost::new(threadpool, actor) + ModuleHost::new(actor) } }; - Ok((module_host, mhc.scheduler_starter)) + Ok(module_host) } /// Determine if the module host described by [`ModuleHostContext`] is diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index bacf61f3e0f..f8cb155cc82 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -6,9 +6,7 @@ use std::time::Duration; use base64::{engine::general_purpose::STANDARD as BASE_64_STD, Engine as _}; use futures::{Future, FutureExt}; use indexmap::IndexMap; -use tokio::sync::oneshot; -use super::host_controller::HostThreadpool; use super::{ArgsTuple, InvalidReducerArguments, ReducerArgs, ReducerCallResult, ReducerId, Timestamp}; use crate::client::{ClientActorId, ClientConnectionSender}; use crate::database_logger::LogLevel; @@ -315,7 +313,7 @@ impl fmt::Debug for ModuleHost { #[async_trait::async_trait] trait DynModuleHost: Send + Sync + 'static { - async fn get_instance(&self, db: Address) -> Result<(&HostThreadpool, Box), NoSuchModule>; + async fn get_instance(&self, db: Address) -> Result, NoSuchModule>; fn inject_logs(&self, log_level: LogLevel, message: &str); fn one_off_query( &self, @@ -330,7 +328,6 @@ trait DynModuleHost: Send + Sync + 'static { struct HostControllerActor { module: Arc, - threadpool: Arc, instance_pool: LendingPool, start: NotifyOnce, } @@ -338,7 +335,7 @@ struct HostControllerActor { impl HostControllerActor { fn spinup_new_instance(&self) { let (module, instance_pool) = (self.module.clone(), self.instance_pool.clone()); - self.threadpool.spawn(move || { + rayon::spawn(move || { let instance = module.create_instance(); match instance_pool.add(instance) { Ok(()) => {} @@ -361,7 +358,7 @@ async fn select_first>(fut_a: A, fut_b: B) -> #[async_trait::async_trait] impl DynModuleHost for HostControllerActor { - async fn get_instance(&self, db: Address) -> Result<(&HostThreadpool, Box), NoSuchModule> { + async fn get_instance(&self, db: Address) -> Result, NoSuchModule> { self.start.notified().await; // in the future we should do something like in the else branch here -- add more instances based on load. // we need to do write-skew retries first - right now there's only ever once instance per module. @@ -379,11 +376,10 @@ impl DynModuleHost for HostControllerActor { .await .map_err(|_| NoSuchModule)? }; - let inst = AutoReplacingModuleInstance { + Ok(Box::new(AutoReplacingModuleInstance { inst, module: self.module.clone(), - }; - Ok((&self.threadpool, Box::new(inst))) + })) } fn inject_logs(&self, log_level: LogLevel, message: &str) { @@ -459,13 +455,12 @@ pub enum InitDatabaseError { } impl ModuleHost { - pub fn new(threadpool: Arc, mut module: impl Module) -> Self { + pub fn new(mut module: impl Module) -> Self { let info = module.info(); let instance_pool = LendingPool::new(); instance_pool.add_multiple(module.initial_instances()).unwrap(); let inner = Arc::new(HostControllerActor { module: Arc::new(module), - threadpool, instance_pool, start: NotifyOnce::new(), }); @@ -491,13 +486,12 @@ impl ModuleHost { F: FnOnce(&mut dyn ModuleInstance) -> R + Send + 'static, R: Send + 'static, { - let (threadpool, mut inst) = self.inner.get_instance(self.info.address).await?; + let mut inst = self.inner.get_instance(self.info.address).await?; - let (tx, rx) = oneshot::channel(); - threadpool.spawn(move || { - let _ = tx.send(f(&mut *inst)); - }); - Ok(rx.await.expect("instance panicked")) + let result = tokio::task::spawn_blocking(move || f(&mut *inst)) + .await + .unwrap_or_else(|e| std::panic::resume_unwind(e.into_panic())); + Ok(result) } pub async fn disconnect_client(&self, client_id: ClientActorId) { diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 8a30c2e9911..ff31bc099c2 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -15,7 +15,6 @@ use crate::db::datastore::locking_tx_datastore::MutTxId; use crate::db::datastore::traits::IsolationLevel; use crate::energy::{EnergyMonitor, EnergyQuanta, ReducerBudget, ReducerFingerprint}; use crate::execution_context::ExecutionContext; -use crate::hash::Hash; use crate::host::instance_env::InstanceEnv; use crate::host::module_host::{ CallReducerParams, DatabaseUpdate, EventStatus, Module, ModuleEvent, ModuleFunctionCall, ModuleInfo, @@ -24,6 +23,7 @@ use crate::host::module_host::{ use crate::host::{ArgsTuple, EntityDef, ReducerCallResult, ReducerId, ReducerOutcome, Scheduler, Timestamp}; use crate::identity::Identity; use crate::messages::control_db::Database; +use crate::module_host_context::ModuleCreationContext; use crate::sql; use crate::subscription::module_subscription_actor::ModuleSubscriptions; use crate::util::const_unwrap; @@ -53,7 +53,7 @@ pub trait WasmInstance: Send + Sync + 'static { fn instance_env(&self) -> &InstanceEnv; - type Trap; + type Trap: Send; fn call_reducer(&mut self, op: ReducerOp<'_>, budget: ReducerBudget) -> ExecuteResult; @@ -123,13 +123,14 @@ pub enum DescribeError { } impl WasmModuleHostActor { - pub fn new( - database_instance_context: Arc, - module_hash: Hash, - module: T, - scheduler: Scheduler, - energy_monitor: Arc, - ) -> Result { + pub fn new(mcc: ModuleCreationContext, module: T) -> Result { + let ModuleCreationContext { + dbic: database_instance_context, + scheduler, + program_bytes: _, + program_hash: module_hash, + energy_monitor, + } = mcc; log::trace!( "Making new module host actor for database {}", database_instance_context.address @@ -533,7 +534,9 @@ impl WasmModuleInstance { ) .entered(); - let (tx, result) = tx_slot.set(tx, || self.instance.call_reducer(op, budget)); + // run the call_reducer call in rayon. it's important that we don't acquire a lock inside a rayon task, + // as that can lead to deadlock. + let (tx, result) = rayon::scope(|_| tx_slot.set(tx, || self.instance.call_reducer(op, budget))); let ExecuteResult { energy, diff --git a/crates/core/src/host/wasmtime/mod.rs b/crates/core/src/host/wasmtime/mod.rs index 4ec19680bcd..8c67730184d 100644 --- a/crates/core/src/host/wasmtime/mod.rs +++ b/crates/core/src/host/wasmtime/mod.rs @@ -1,14 +1,12 @@ use std::borrow::Cow; -use std::sync::Arc; use anyhow::Context; use once_cell::sync::Lazy; use wasmtime::{Engine, Linker, Module, StoreContext, StoreContextMut}; -use crate::database_instance_context::DatabaseInstanceContext; -use crate::energy::{EnergyMonitor, EnergyQuanta, ReducerBudget}; +use crate::energy::{EnergyQuanta, ReducerBudget}; use crate::error::NodesError; -use crate::hash::Hash; +use crate::module_host_context::ModuleCreationContext; use crate::stdb_path; mod wasm_instance_env; @@ -20,7 +18,6 @@ use self::wasm_instance_env::WasmInstanceEnv; use super::wasm_common::module_host_actor::InitializationError; use super::wasm_common::{abi, module_host_actor::WasmModuleHostActor, ModuleCreationError}; -use super::Scheduler; static ENGINE: Lazy = Lazy::new(|| { let mut config = wasmtime::Config::new(); @@ -55,14 +52,8 @@ static LINKER: Lazy> = Lazy::new(|| { linker }); -pub fn make_actor( - dbic: Arc, - module_hash: Hash, - program_bytes: &[u8], - scheduler: Scheduler, - energy_monitor: Arc, -) -> Result { - let module = Module::new(&ENGINE, program_bytes).map_err(ModuleCreationError::WasmCompileError)?; +pub fn make_actor(mcc: ModuleCreationContext) -> Result { + let module = Module::new(&ENGINE, &mcc.program_bytes).map_err(ModuleCreationError::WasmCompileError)?; let func_imports = module .imports() @@ -77,7 +68,7 @@ pub fn make_actor( let module = WasmtimeModule::new(module); - WasmModuleHostActor::new(dbic, module_hash, module, scheduler, energy_monitor).map_err(Into::into) + WasmModuleHostActor::new(mcc, module).map_err(Into::into) } #[derive(Debug, derive_more::From)] diff --git a/crates/core/src/module_host_context.rs b/crates/core/src/module_host_context.rs index 6efdbea246e..abd576775d0 100644 --- a/crates/core/src/module_host_context.rs +++ b/crates/core/src/module_host_context.rs @@ -1,4 +1,7 @@ +use spacetimedb_lib::Hash; + use crate::database_instance_context::DatabaseInstanceContext; +use crate::energy::EnergyMonitor; use crate::host::scheduler::{Scheduler, SchedulerStarter}; use crate::messages::control_db::HostType; use crate::util::AnyBytes; @@ -11,3 +14,11 @@ pub struct ModuleHostContext { pub host_type: HostType, pub program_bytes: AnyBytes, } + +pub struct ModuleCreationContext { + pub dbic: Arc, + pub scheduler: Scheduler, + pub program_bytes: AnyBytes, + pub program_hash: Hash, + pub energy_monitor: Arc, +} diff --git a/crates/core/src/startup.rs b/crates/core/src/startup.rs index b3d86d0b17e..928489423d5 100644 --- a/crates/core/src/startup.rs +++ b/crates/core/src/startup.rs @@ -9,7 +9,34 @@ use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::prelude::*; use tracing_subscriber::{reload, EnvFilter}; -pub fn configure_tracing() { +pub struct StartupOptions { + /// Whether or not to configure the global tracing subscriber. + pub tracing: bool, + /// Whether or not to configure the global rayon threadpool. + pub rayon: bool, +} + +impl Default for StartupOptions { + fn default() -> Self { + Self { + tracing: true, + rayon: true, + } + } +} + +impl StartupOptions { + pub fn configure(self) { + if self.tracing { + configure_tracing() + } + if self.rayon { + configure_rayon() + } + } +} + +fn configure_tracing() { // Use this to change log levels at runtime. // This means you can change the default log level to trace // if you are trying to debug an issue and need more logs on then turn it off @@ -108,3 +135,54 @@ fn reload_config(conf_file: &Path, reload_handle: &reload::Handle impl FnMut(rayon::ThreadBuilder) -> Result<(), std::io::Error> { + move |thread| { + let rt = rt.clone(); + let mut builder = std::thread::Builder::new(); + if let Some(name) = thread.name() { + builder = builder.name(name.to_owned()); + } + if let Some(stack_size) = thread.stack_size() { + builder = builder.stack_size(stack_size); + } + builder.spawn(move || { + let _rt_guard = rt.enter(); + thread.run() + })?; + Ok(()) + } +} diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 0ecb7bceae7..a9da7b594f3 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -40,7 +40,7 @@ impl ModuleSubscriptions { /// Add a subscriber to the module. NOTE: this function is blocking. #[tracing::instrument(skip_all)] pub fn add_subscriber(&self, sender: ClientConnectionSender, subscription: Subscribe) -> Result<(), DBError> { - let tx = &mut *scopeguard::guard(self.relational_db.begin_tx(), |tx| { + let tx = scopeguard::guard(self.relational_db.begin_tx(), |tx| { let ctx = ExecutionContext::subscribe(self.relational_db.address()); self.relational_db.release_tx(&ctx, tx); }); @@ -48,15 +48,16 @@ impl ModuleSubscriptions { let auth = AuthCtx::new(self.owner_identity, sender.id.identity); let mut queries = QuerySet::new(); for sql in subscription.query_strings { - let qset = compile_read_only_query(&self.relational_db, tx, &auth, &sql)?; + let qset = compile_read_only_query(&self.relational_db, &tx, &auth, &sql)?; queries.extend(qset); } - let database_update = tokio::task::block_in_place(|| queries.eval(&self.relational_db, tx, auth))?; + let database_update = queries.eval(&self.relational_db, &tx, auth)?; // It acquires the subscription lock after `eval`, allowing `add_subscription` to run concurrently. // This also makes it possible for `broadcast_event` to get scheduled before the subsequent part here // but that should not pose an issue. let mut subscriptions = self.subscriptions.write(); + drop(tx); self._remove_subscriber(sender.id, &mut subscriptions); let subscription = match subscriptions.iter_mut().find(|s| s.queries == queries) { Some(sub) => { diff --git a/crates/core/src/subscription/subscription.rs b/crates/core/src/subscription/subscription.rs index 71dfcbbb8b0..bb6e81010b1 100644 --- a/crates/core/src/subscription/subscription.rs +++ b/crates/core/src/subscription/subscription.rs @@ -25,15 +25,15 @@ use anyhow::Context; use derive_more::{Deref, DerefMut, From, IntoIterator}; -use std::collections::{btree_set, BTreeSet, HashMap, HashSet}; +use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator}; +use spacetimedb_primitives::TableId; +use std::collections::{btree_set, BTreeSet, HashMap, HashSet, LinkedList}; use std::ops::Deref; use std::sync::Arc; -use std::time::Instant; -use crate::db::db_metrics::{DB_METRICS, MAX_QUERY_CPU_TIME}; use crate::db::relational_db::Tx; use crate::error::{DBError, SubscriptionError}; -use crate::execution_context::{ExecutionContext, WorkloadType}; +use crate::execution_context::ExecutionContext; use crate::subscription::query::{run_query, to_mem_table_with_op_type, OP_TYPE_FIELD_NAME}; use crate::{ client::{ClientActorId, ClientConnectionSender}, @@ -41,7 +41,7 @@ use crate::{ host::module_host::{DatabaseTableUpdate, DatabaseUpdate, TableOp}, }; use spacetimedb_lib::identity::AuthCtx; -use spacetimedb_lib::{Address, PrimaryKey}; +use spacetimedb_lib::PrimaryKey; use spacetimedb_sats::db::auth::{StAccess, StTableType}; use spacetimedb_sats::relation::{DbTable, Header, Relation}; use spacetimedb_sats::{AlgebraicValue, ProductValue}; @@ -153,6 +153,15 @@ impl<'a> IntoIterator for &'a QuerySet { } } +impl<'a> IntoParallelIterator for &'a QuerySet { + type Item = &'a SupportedQuery; + type Iter = rayon::collections::btree_set::Iter<'a, SupportedQuery>; + + fn into_par_iter(self) -> Self::Iter { + self.0.par_iter() + } +} + impl Extend for QuerySet { fn extend>(&mut self, iter: T) { self.0.extend(iter) @@ -275,7 +284,6 @@ impl QuerySet { let eval = evaluator_for_primary_updates(db, auth); for SupportedQuery { kind, expr, .. } in self { use query::Supported::*; - let start = Instant::now(); match kind { Scan => { let source = expr @@ -321,8 +329,6 @@ impl QuerySet { } } } - #[cfg(feature = "metrics")] - record_query_duration_metrics(WorkloadType::Update, &db.address(), start); } for (table_id, (table_name, ops)) in table_ops.into_iter().filter(|(_, (_, ops))| !ops.is_empty()) { output.tables.push(DatabaseTableUpdate { @@ -343,74 +349,75 @@ impl QuerySet { /// This is a *major* difference with normal query execution, where is expected to return the full result set for each query. #[tracing::instrument(skip_all)] pub fn eval(&self, db: &RelationalDB, tx: &Tx, auth: AuthCtx) -> Result { - let mut database_update: DatabaseUpdate = DatabaseUpdate { tables: vec![] }; - let mut table_ops = HashMap::new(); + // evaluate each of the queries in this QuerySet in parallel + let span = tracing::Span::current(); + #[allow(clippy::needless_borrowed_reference)] // false positive + let eval_query = |&SupportedQuery { ref expr, .. }| { + let _entered = span.enter(); + let t = expr.source.get_db_table()?; + + let tables = run_query(&ExecutionContext::subscribe(db.address()), db, tx, expr, auth); + Some(tables.map(|tables| (t, tables))) + }; + let ops = self + .par_iter() + .filter_map(eval_query) + .try_fold(Vec::new, |mut v, item| { + item.map(|item| { + v.push(item); + v + }) + }) + .map(|r| r.map(|v| LinkedList::from([v]))) + .reduce( + || Ok(LinkedList::new()), + |l, r| { + // l? is run before r? so the leftmost error gets returned + let (mut l, mut r) = (l?, r?); + l.append(&mut r); + Ok(l) + }, + )? + .into_iter() + .flatten(); + // single threaded version for debugging; uncomment if you need it + // let ops = self.iter().filter_map(eval_query).collect::, _>>()?; + + let mut tables = Vec::with_capacity(self.len()); + // a map from TableId to the corresponding index in `tables` + let mut tables_map = HashMap::::new(); let mut seen = HashSet::new(); - for SupportedQuery { expr, .. } in self { - if let Some(t) = expr.source.get_db_table() { - let start = Instant::now(); - // Get the TableOps for this table - let (_, table_row_operations) = table_ops - .entry(t.table_id) - .or_insert_with(|| (t.head.table_name.clone(), vec![])); - for table in run_query(&ExecutionContext::subscribe(db.address()), db, tx, expr, auth)? { - for row in table.data { - let row_pk = RelationalDB::pk_for_row(&row); - - //Skip rows that are already resolved in a previous subscription... - if seen.contains(&(t.table_id, row_pk)) { - continue; - } - seen.insert((t.table_id, row_pk)); - - let row_pk = row_pk.to_bytes(); - table_row_operations.push(TableOp { - op_type: 1, // Insert - row_pk, - row, - }); - } - } - #[cfg(feature = "metrics")] - record_query_duration_metrics(WorkloadType::Subscribe, &db.address(), start); - } - } - for (table_id, (table_name, ops)) in table_ops.into_iter().filter(|(_, (_, ops))| !ops.is_empty()) { - database_update.tables.push(DatabaseTableUpdate { - table_id, - table_name, - ops, + // coalesce the results of each query (and filter out duplicates) serially + for (t, evaled_tables) in ops { + let idx = *tables_map.entry(t.table_id).or_insert_with(|| { + let i = tables.len(); + tables.push(DatabaseTableUpdate { + table_id: t.table_id, + table_name: t.head.table_name.clone(), + ops: Vec::new(), + }); + i }); + let table_row_ops = &mut tables[idx].ops; + let ops = evaled_tables + .into_iter() + .flat_map(|table| table.data) + .filter_map(|row| { + let row_pk = RelationalDB::pk_for_row(&row); + seen.insert((t.table_id, row_pk)).then(|| TableOp { + op_type: 1, + row_pk: row_pk.to_bytes(), + row, + }) + }); + table_row_ops.extend(ops); } - Ok(database_update) - } -} -#[cfg(feature = "metrics")] -fn record_query_duration_metrics(workload: WorkloadType, db: &Address, start: Instant) { - let query_duration = start.elapsed().as_secs_f64(); - - DB_METRICS - .rdb_query_cpu_time_sec - .with_label_values(&workload, db) - .observe(query_duration); - - let max_query_duration = *MAX_QUERY_CPU_TIME - .lock() - .unwrap() - .entry((*db, workload)) - .and_modify(|max| { - if query_duration > *max { - *max = query_duration; - } - }) - .or_insert_with(|| query_duration); + tables.retain(|upd| !upd.ops.is_empty()); - DB_METRICS - .rdb_query_cpu_time_sec_max - .with_label_values(&workload, db) - .set(max_query_duration); + Ok(DatabaseUpdate { tables }) + } } /// Helper to retain [`PrimaryKey`] before converting to [`TableOp`]. diff --git a/crates/core/src/util/mod.rs b/crates/core/src/util/mod.rs index 66193b05e52..f94faf610f1 100644 --- a/crates/core/src/util/mod.rs +++ b/crates/core/src/util/mod.rs @@ -1,5 +1,7 @@ use derive_more::From; +use futures::{Future, FutureExt}; use std::borrow::Cow; +use tokio::sync::oneshot; pub mod prometheus_handle; @@ -52,3 +54,17 @@ pub const fn const_unwrap(o: Option) -> T { None => panic!("called `const_unwrap()` on a `None` value"), } } + +#[tracing::instrument(skip_all)] +pub fn spawn_rayon(f: impl FnOnce() -> R + Send + 'static) -> impl Future { + let span = tracing::Span::current(); + let (tx, rx) = oneshot::channel(); + rayon::spawn(|| { + let _entered = span.entered(); + let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)); + if let Err(Err(_panic)) = tx.send(result) { + tracing::warn!("uncaught panic on threadpool") + } + }); + rx.map(|res| res.unwrap().unwrap_or_else(|err| std::panic::resume_unwind(err))) +} diff --git a/crates/standalone/src/subcommands/start.rs b/crates/standalone/src/subcommands/start.rs index fc7c8a35f1d..908ddf32c7f 100644 --- a/crates/standalone/src/subcommands/start.rs +++ b/crates/standalone/src/subcommands/start.rs @@ -222,7 +222,7 @@ pub async fn exec(args: &ArgMatches) -> anyhow::Result<()> { set_env_with_warning("SPACETIMEDB_TRACY", "1"); } - startup::configure_tracing(); + startup::StartupOptions::default().configure(); let ctx = StandaloneEnv::init(config).await?;