From 3d7a395a81c80ac8445a831edcececc55203b3c8 Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Fri, 31 Oct 2025 00:35:01 -0700 Subject: [PATCH 1/4] Clear view tables on disconnect --- crates/client-api/src/routes/database.rs | 6 +- crates/core/src/db/relational_db.rs | 16 +++- crates/core/src/host/host_controller.rs | 3 +- crates/core/src/host/module_host.rs | 25 +++++- .../src/locking_tx_datastore/datastore.rs | 6 +- .../src/locking_tx_datastore/mut_tx.rs | 89 ++++++++++++++++++- crates/datastore/src/system_tables.rs | 26 ++++++ 7 files changed, 157 insertions(+), 14 deletions(-) diff --git a/crates/client-api/src/routes/database.rs b/crates/client-api/src/routes/database.rs index 6fecde08bbd..6880163a7c7 100644 --- a/crates/client-api/src/routes/database.rs +++ b/crates/client-api/src/routes/database.rs @@ -101,7 +101,8 @@ pub async fn call( }; module - .call_identity_disconnected(caller_identity, connection_id) + // We don't clear views after reducer calls + .call_identity_disconnected(caller_identity, connection_id, false) .await .map_err(client_disconnected_error_to_response)?; @@ -274,7 +275,8 @@ async fn procedure( }; module - .call_identity_disconnected(caller_identity, connection_id) + // We don't clear views after procedure calls + .call_identity_disconnected(caller_identity, connection_id, false) .await .map_err(client_disconnected_error_to_response)?; diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index dab34f155c6..d586c932fa9 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -18,7 +18,7 @@ use spacetimedb_datastore::locking_tx_datastore::state_view::{ IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, IterTx, StateView, }; use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId}; -use spacetimedb_datastore::system_tables::{system_tables, StModuleRow}; +use spacetimedb_datastore::system_tables::{system_tables, StModuleRow, StViewRow, ST_VIEW_ID}; use spacetimedb_datastore::system_tables::{StFields, StVarFields, StVarName, StVarRow, ST_MODULE_ID, ST_VAR_ID}; use spacetimedb_datastore::traits::{ InsertFlags, IsolationLevel, Metadata, MutTx as _, MutTxDatastore, Program, RowTypeForTable, Tx as _, TxDatastore, @@ -1403,6 +1403,20 @@ impl RelationalDB { Ok(rows_deleted) } + /// Clear all rows from all view tables without dropping them. + pub fn clear_all_views(&self, tx: &mut MutTx) -> Result<(), DBError> { + for table_id in tx + .iter(ST_VIEW_ID)? + .map(StViewRow::try_from) + .collect::, _>>()? + .into_iter() + .filter_map(|row| row.table_id) + { + tx.clear_table(table_id)?; + } + Ok(()) + } + pub fn create_sequence(&self, tx: &mut MutTx, sequence_schema: SequenceSchema) -> Result { Ok(self.inner.create_sequence_mut_tx(tx, sequence_schema)?) } diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index f81e5d6b018..dfecd0dbd3a 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -847,9 +847,10 @@ impl Host { } = launched; // Disconnect dangling clients. + // No need to clear view tables here since we do it in `clear_all_clients`. for (identity, connection_id) in connected_clients { module_host - .call_identity_disconnected(identity, connection_id) + .call_identity_disconnected(identity, connection_id, false) .await .with_context(|| { format!( diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 09f1c4da4dc..b3600e26e60 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -38,7 +38,7 @@ use spacetimedb_data_structures::error_stream::ErrorStream; use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap}; use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload, WorkloadType}; use spacetimedb_datastore::locking_tx_datastore::MutTxId; -use spacetimedb_datastore::system_tables::{ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID}; +use spacetimedb_datastore::system_tables::{ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_CLIENT_ID}; use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData}; use spacetimedb_durability::DurableOffset; use spacetimedb_execution::pipelined::{PipelinedProject, ViewProject}; @@ -903,7 +903,7 @@ impl ModuleHost { // Call the `client_disconnected` reducer, if it exists. // This is a no-op if the module doesn't define such a reducer. this.subscriptions().remove_subscriber(client_id); - this.call_identity_disconnected_inner(client_id.identity, client_id.connection_id, inst) + this.call_identity_disconnected_inner(client_id.identity, client_id.connection_id, inst, true) }) .await { @@ -1024,6 +1024,7 @@ impl ModuleHost { caller_identity: Identity, caller_connection_id: ConnectionId, inst: &mut Instance, + clear_view_tables: bool, ) -> Result<(), ReducerCallError> { let reducer_lookup = self.info.module_def.lifecycle_reducer(Lifecycle::OnDisconnect); let reducer_name = reducer_lookup @@ -1050,6 +1051,13 @@ impl ModuleHost { let fallback = || { let database_identity = me.info.database_identity; stdb.with_auto_commit(workload(), |mut_tx| { + + if clear_view_tables { + if let Err(err) = mut_tx.delete_view_data_for_client(caller_identity, caller_connection_id) { + log::error!("`call_identity_disconnected`: failed to delete client view data: {err}"); + } + } + if !is_client_exist(mut_tx) { // The client is already gone. Nothing to do. log::debug!( @@ -1076,7 +1084,13 @@ impl ModuleHost { if let Some((reducer_id, reducer_def)) = reducer_lookup { let stdb = me.module.replica_ctx().relational_db.clone(); - let mut_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload()); + let mut mut_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload()); + + if clear_view_tables { + if let Err(err) = mut_tx.delete_view_data_for_client(caller_identity, caller_connection_id) { + log::error!("`call_identity_disconnected`: failed to delete client view data: {err}"); + } + } if !is_client_exist(&mut_tx) { // The client is already gone. Nothing to do. @@ -1151,10 +1165,11 @@ impl ModuleHost { &self, caller_identity: Identity, caller_connection_id: ConnectionId, + clear_view_tables: bool, ) -> Result<(), ReducerCallError> { let me = self.clone(); self.call("call_identity_disconnected", move |inst| { - me.call_identity_disconnected_inner(caller_identity, caller_connection_id, inst) + me.call_identity_disconnected_inner(caller_identity, caller_connection_id, inst, clear_view_tables) }) .await? } @@ -1166,8 +1181,10 @@ impl ModuleHost { let stdb = &me.module.replica_ctx().relational_db; let workload = Workload::Internal; stdb.with_auto_commit(workload, |mut_tx| { + stdb.clear_all_views(mut_tx)?; stdb.clear_table(mut_tx, ST_CONNECTION_CREDENTIALS_ID)?; stdb.clear_table(mut_tx, ST_CLIENT_ID)?; + stdb.clear_table(mut_tx, ST_VIEW_CLIENT_ID)?; Ok::<(), DBError>(()) }) }) diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index a38232f3f85..8f5a6cc1cb6 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -1252,9 +1252,9 @@ mod tests { use crate::system_tables::{ system_tables, StColumnRow, StConnectionCredentialsFields, StConstraintData, StConstraintFields, StConstraintRow, StIndexAlgorithm, StIndexFields, StIndexRow, StRowLevelSecurityFields, StScheduledFields, - StSequenceFields, StSequenceRow, StTableRow, StVarFields, StViewArgFields, StViewFields, ST_CLIENT_NAME, - ST_COLUMN_ID, ST_COLUMN_NAME, ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_NAME, ST_CONSTRAINT_ID, - ST_CONSTRAINT_NAME, ST_INDEX_ID, ST_INDEX_NAME, ST_MODULE_NAME, ST_RESERVED_SEQUENCE_RANGE, + StSequenceFields, StSequenceRow, StTableRow, StVarFields, StViewArgFields, StViewFields, ST_CLIENT_ID, + ST_CLIENT_NAME, ST_COLUMN_ID, ST_COLUMN_NAME, ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_NAME, + ST_CONSTRAINT_ID, ST_CONSTRAINT_NAME, ST_INDEX_ID, ST_INDEX_NAME, ST_MODULE_NAME, ST_RESERVED_SEQUENCE_RANGE, ST_ROW_LEVEL_SECURITY_ID, ST_ROW_LEVEL_SECURITY_NAME, ST_SCHEDULED_ID, ST_SCHEDULED_NAME, ST_SEQUENCE_ID, ST_SEQUENCE_NAME, ST_TABLE_NAME, ST_VAR_ID, ST_VAR_NAME, ST_VIEW_ARG_ID, ST_VIEW_ARG_NAME, ST_VIEW_CLIENT_ID, ST_VIEW_CLIENT_NAME, ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_NAME, ST_VIEW_ID, ST_VIEW_NAME, ST_VIEW_PARAM_ID, diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index 0ea3155b639..199a2b3e36a 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -9,9 +9,9 @@ use super::{ SharedMutexGuard, SharedWriteGuard, }; use crate::system_tables::{ - system_tables, ConnectionIdViaU128, StConnectionCredentialsFields, StConnectionCredentialsRow, StViewColumnFields, - StViewFields, StViewParamFields, StViewParamRow, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_COLUMN_ID, ST_VIEW_ID, - ST_VIEW_PARAM_ID, + system_tables, ConnectionIdViaU128, IdentityViaU256, StConnectionCredentialsFields, StConnectionCredentialsRow, + StViewClientFields, StViewClientRow, StViewColumnFields, StViewFields, StViewParamFields, StViewParamRow, + ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_CLIENT_ID, ST_VIEW_COLUMN_ID, ST_VIEW_ID, ST_VIEW_PARAM_ID, }; use crate::traits::{InsertFlags, RowTypeForTable, TxData, UpdateFlags}; use crate::{ @@ -1767,6 +1767,89 @@ impl<'a, I: Iterator>> Iterator for FilterDeleted<'a, I> { } impl MutTxId { + /// Delete view data and metadata for a client connection. + pub fn delete_view_data_for_client(&mut self, sender: Identity, connection_id: ConnectionId) -> Result<()> { + for row in self.delete_st_view_client_rows(sender, connection_id)? { + if !self.is_identity_subscribed_to_view_args(row.view_id, row.arg_id, sender)? { + self.delete_view_rows_for_identity(row.view_id, row.arg_id, sender)?; + } + } + Ok(()) + } + + /// Deletes and return the rows in `st_view_client` for a client connection. + fn delete_st_view_client_rows( + &mut self, + sender: Identity, + connection_id: ConnectionId, + ) -> Result> { + let sender = IdentityViaU256(sender); + let conn_id = ConnectionIdViaU128(connection_id); + let cols = col_list![StViewClientFields::Identity, StViewClientFields::ConnectionId]; + let value = AlgebraicValue::product([sender.into(), conn_id.into()]); + self.iter_by_col_eq(ST_VIEW_CLIENT_ID, cols, &value)? + .map(|row_ref| StViewClientRow::try_from(row_ref).map(|row| (row, row_ref.pointer()))) + .collect::>>()? + .into_iter() + .map(|(row, ptr)| self.delete(ST_VIEW_CLIENT_ID, ptr).map(|_| row)) + .collect() + } + + /// Is anyone is subscribed to the view arguments identified by `arg_id`? + fn is_identity_subscribed_to_view_args(&self, view_id: ViewId, arg_id: u64, sender: Identity) -> Result { + Ok(self + .iter_by_col_eq( + ST_VIEW_CLIENT_ID, + col_list![ + StViewClientFields::ViewId, + StViewClientFields::ArgId, + StViewClientFields::Identity + ], + &AlgebraicValue::product([view_id.into(), arg_id.into(), sender.into()]), + )? + .next() + .is_some()) + } + + /// Looks up a row in `st_view` by its primary key. + fn st_view_row(&self, view_id: ViewId) -> Result> { + self.iter_by_col_eq(ST_VIEW_ID, col_list![StViewFields::ViewId], &view_id.into())? + .next() + .map(StViewRow::try_from) + .transpose() + } + + /// Returns the [`TableId`] for this view's backing table by probing `st_view`. + /// Note, all views with at least one subscriber are materialized. + fn get_table_id_for_view(&self, view_id: ViewId) -> Result> { + Ok(self + .st_view_row(view_id)? + .and_then(|row| row.table_id.map(|id| (id, row.is_anonymous)))) + } + + /// Deletes the rows of a view subscribed to by `sender`. + fn delete_view_rows_for_identity(&mut self, view_id: ViewId, arg_id: u64, sender: Identity) -> Result<()> { + if let Some((table_id, is_anonymous)) = self.get_table_id_for_view(view_id)? { + let value = if is_anonymous { + let none_sender = AlgebraicValue::OptionNone(); + AlgebraicValue::product([none_sender, arg_id.into()]) + } else { + let sender = IdentityViaU256(sender); + let some_sender = AlgebraicValue::OptionSome(sender.into()); + AlgebraicValue::product([some_sender, arg_id.into()]) + }; + for row_pointer in self + .iter_by_col_eq(table_id, col_list![0, 1], &value)? + .map(|row_ref| row_ref.pointer()) + .collect::>() + .into_iter() + { + self.delete(table_id, row_pointer)?; + } + } + Ok(()) + } + pub fn insert_st_client( &mut self, identity: Identity, diff --git a/crates/datastore/src/system_tables.rs b/crates/datastore/src/system_tables.rs index 503ef4d9576..f5589fd7184 100644 --- a/crates/datastore/src/system_tables.rs +++ b/crates/datastore/src/system_tables.rs @@ -937,6 +937,20 @@ pub struct StViewClientRow { pub connection_id: ConnectionIdViaU128, } +impl TryFrom> for StViewClientRow { + type Error = DatastoreError; + + fn try_from(row: RowRef<'_>) -> Result { + read_via_bsatn(row) + } +} + +impl From for (Identity, ConnectionId) { + fn from(value: StViewClientRow) -> Self { + (value.identity.0, value.connection_id.0) + } +} + /// System table [ST_VIEW_ARG_NAME] /// /// | id | bytes | @@ -1238,6 +1252,12 @@ impl From for IdentityViaU256 { } } +impl From for AlgebraicValue { + fn from(val: IdentityViaU256) -> Self { + AlgebraicValue::U256(val.0.to_u256().into()) + } +} + /// System table [ST_MODULE_NAME] /// This table holds exactly one row, describing the latest version of the /// SpacetimeDB module associated with the database: @@ -1348,6 +1368,12 @@ impl TryFrom> for StClientRow { } } +impl From for (Identity, ConnectionId) { + fn from(value: StClientRow) -> Self { + (value.identity.0, value.connection_id.0) + } +} + /// System table [ST_VAR_NAME] /// /// | name | value | From 90eb038e7085dad936514c707bc642a919b30dc5 Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Wed, 5 Nov 2025 15:02:29 -0800 Subject: [PATCH 2/4] ArgId --- crates/datastore/src/locking_tx_datastore/mut_tx.rs | 6 +++--- crates/datastore/src/system_tables.rs | 2 +- crates/primitives/src/ids.rs | 6 ++++++ crates/primitives/src/lib.rs | 2 +- crates/sats/src/convert.rs | 3 ++- crates/sats/src/de/impls.rs | 1 + crates/sats/src/ser/impls.rs | 1 + crates/sats/src/typespace.rs | 1 + crates/table/src/read_column.rs | 1 + 9 files changed, 17 insertions(+), 6 deletions(-) diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index 199a2b3e36a..34902d3c7ae 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -39,7 +39,7 @@ use spacetimedb_lib::{ ConnectionId, Identity, }; use spacetimedb_primitives::{ - col_list, ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId, ViewId, + col_list, ArgId, ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId, ViewId, }; use spacetimedb_sats::{ bsatn::{self, to_writer, DecodeError, Deserializer}, @@ -1796,7 +1796,7 @@ impl MutTxId { } /// Is anyone is subscribed to the view arguments identified by `arg_id`? - fn is_identity_subscribed_to_view_args(&self, view_id: ViewId, arg_id: u64, sender: Identity) -> Result { + fn is_identity_subscribed_to_view_args(&self, view_id: ViewId, arg_id: ArgId, sender: Identity) -> Result { Ok(self .iter_by_col_eq( ST_VIEW_CLIENT_ID, @@ -1828,7 +1828,7 @@ impl MutTxId { } /// Deletes the rows of a view subscribed to by `sender`. - fn delete_view_rows_for_identity(&mut self, view_id: ViewId, arg_id: u64, sender: Identity) -> Result<()> { + fn delete_view_rows_for_identity(&mut self, view_id: ViewId, arg_id: ArgId, sender: Identity) -> Result<()> { if let Some((table_id, is_anonymous)) = self.get_table_id_for_view(view_id)? { let value = if is_anonymous { let none_sender = AlgebraicValue::OptionNone(); diff --git a/crates/datastore/src/system_tables.rs b/crates/datastore/src/system_tables.rs index f5589fd7184..4001cefef60 100644 --- a/crates/datastore/src/system_tables.rs +++ b/crates/datastore/src/system_tables.rs @@ -932,7 +932,7 @@ pub struct StViewParamRow { #[sats(crate = spacetimedb_lib)] pub struct StViewClientRow { pub view_id: ViewId, - pub arg_id: u64, + pub arg_id: ArgId, pub identity: IdentityViaU256, pub connection_id: ConnectionIdViaU128, } diff --git a/crates/primitives/src/ids.rs b/crates/primitives/src/ids.rs index 5aadfb8455b..06c6d7fc38c 100644 --- a/crates/primitives/src/ids.rs +++ b/crates/primitives/src/ids.rs @@ -84,6 +84,12 @@ system_id! { } auto_inc_system_id!(ViewId); +system_id! { + /// An identifier for a list of arguments passed to a view. + pub struct ArgId(pub u64); +} +auto_inc_system_id!(ArgId); + system_id! { /// An identifier for a sequence, unique within a database. pub struct SequenceId(pub u32); diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index 715fc77cec0..3245a911200 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -8,7 +8,7 @@ mod ids; pub use attr::{AttributeKind, ColumnAttribute, ConstraintKind, Constraints}; pub use col_list::{ColList, ColOrCols, ColSet}; pub use ids::{ - ColId, ConstraintId, FunctionId, IndexId, ProcedureId, ReducerId, ScheduleId, SequenceId, TableId, ViewId, + ArgId, ColId, ConstraintId, FunctionId, IndexId, ProcedureId, ReducerId, ScheduleId, SequenceId, TableId, ViewId, }; /// The minimum size of a chunk yielded by a wasm abi RowIter. diff --git a/crates/sats/src/convert.rs b/crates/sats/src/convert.rs index 1729402cdc3..ba38c6b2d74 100644 --- a/crates/sats/src/convert.rs +++ b/crates/sats/src/convert.rs @@ -1,7 +1,7 @@ use crate::sum_value::SumTag; use crate::{i256, u256}; use crate::{AlgebraicType, AlgebraicValue, ProductType, ProductValue}; -use spacetimedb_primitives::{ColId, ConstraintId, IndexId, ScheduleId, SequenceId, TableId, ViewId}; +use spacetimedb_primitives::{ArgId, ColId, ConstraintId, IndexId, ScheduleId, SequenceId, TableId, ViewId}; impl crate::Value for AlgebraicValue { type Type = AlgebraicType; @@ -63,6 +63,7 @@ macro_rules! system_id { } }; } +system_id!(ArgId); system_id!(TableId); system_id!(ViewId); system_id!(ColId); diff --git a/crates/sats/src/de/impls.rs b/crates/sats/src/de/impls.rs index cf1090253af..d0c9a92c23d 100644 --- a/crates/sats/src/de/impls.rs +++ b/crates/sats/src/de/impls.rs @@ -741,6 +741,7 @@ impl FieldNameVisitor<'_> for TupleNameVisitor<'_> { } } +impl_deserialize!([] spacetimedb_primitives::ArgId, de => u64::deserialize(de).map(Self)); impl_deserialize!([] spacetimedb_primitives::TableId, de => u32::deserialize(de).map(Self)); impl_deserialize!([] spacetimedb_primitives::ViewId, de => u32::deserialize(de).map(Self)); impl_deserialize!([] spacetimedb_primitives::SequenceId, de => u32::deserialize(de).map(Self)); diff --git a/crates/sats/src/ser/impls.rs b/crates/sats/src/ser/impls.rs index dc737c5b50c..9baac393dff 100644 --- a/crates/sats/src/ser/impls.rs +++ b/crates/sats/src/ser/impls.rs @@ -257,6 +257,7 @@ impl_serialize!([] ValueWithType<'_, ArrayValue>, (self, ser) => { } }); +impl_serialize!([] spacetimedb_primitives::ArgId, (self, ser) => ser.serialize_u64(self.0)); impl_serialize!([] spacetimedb_primitives::TableId, (self, ser) => ser.serialize_u32(self.0)); impl_serialize!([] spacetimedb_primitives::ViewId, (self, ser) => ser.serialize_u32(self.0)); impl_serialize!([] spacetimedb_primitives::SequenceId, (self, ser) => ser.serialize_u32(self.0)); diff --git a/crates/sats/src/typespace.rs b/crates/sats/src/typespace.rs index 36058c79195..f7d0978d660 100644 --- a/crates/sats/src/typespace.rs +++ b/crates/sats/src/typespace.rs @@ -410,6 +410,7 @@ impl_st!([T] Vec, ts => <[T]>::make_type(ts)); impl_st!([T, const N: usize] SmallVec<[T; N]>, ts => <[T]>::make_type(ts)); impl_st!([T] Option, ts => AlgebraicType::option(T::make_type(ts))); +impl_st!([] spacetimedb_primitives::ArgId, AlgebraicType::U64); impl_st!([] spacetimedb_primitives::ColId, AlgebraicType::U16); impl_st!([] spacetimedb_primitives::TableId, AlgebraicType::U32); impl_st!([] spacetimedb_primitives::ViewId, AlgebraicType::U32); diff --git a/crates/table/src/read_column.rs b/crates/table/src/read_column.rs index be58029f714..1cef02b2ff0 100644 --- a/crates/table/src/read_column.rs +++ b/crates/table/src/read_column.rs @@ -325,6 +325,7 @@ macro_rules! impl_read_column_via_from { } impl_read_column_via_from! { + u64 => spacetimedb_primitives::ArgId; u16 => spacetimedb_primitives::ColId; u32 => spacetimedb_primitives::ViewId; u32 => spacetimedb_primitives::TableId; From d58ac9fa1285667bc26a727741d25fc061cd333a Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Wed, 5 Nov 2025 17:00:48 -0800 Subject: [PATCH 3/4] Refactor st_view_client to track the subscriber count --- crates/core/src/db/relational_db.rs | 13 +- crates/core/src/host/module_host.rs | 31 ++--- .../locking_tx_datastore/committed_state.rs | 6 +- .../src/locking_tx_datastore/datastore.rs | 28 ++-- .../src/locking_tx_datastore/mut_tx.rs | 125 +++++++----------- crates/datastore/src/system_tables.rs | 81 +++++++----- 6 files changed, 132 insertions(+), 152 deletions(-) diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index d586c932fa9..2010b0cd93d 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -18,7 +18,7 @@ use spacetimedb_datastore::locking_tx_datastore::state_view::{ IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, IterTx, StateView, }; use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId}; -use spacetimedb_datastore::system_tables::{system_tables, StModuleRow, StViewRow, ST_VIEW_ID}; +use spacetimedb_datastore::system_tables::{system_tables, StModuleRow}; use spacetimedb_datastore::system_tables::{StFields, StVarFields, StVarName, StVarRow, ST_MODULE_ID, ST_VAR_ID}; use spacetimedb_datastore::traits::{ InsertFlags, IsolationLevel, Metadata, MutTx as _, MutTxDatastore, Program, RowTypeForTable, Tx as _, TxDatastore, @@ -1405,16 +1405,7 @@ impl RelationalDB { /// Clear all rows from all view tables without dropping them. pub fn clear_all_views(&self, tx: &mut MutTx) -> Result<(), DBError> { - for table_id in tx - .iter(ST_VIEW_ID)? - .map(StViewRow::try_from) - .collect::, _>>()? - .into_iter() - .filter_map(|row| row.table_id) - { - tx.clear_table(table_id)?; - } - Ok(()) + Ok(tx.clear_all_views()?) } pub fn create_sequence(&self, tx: &mut MutTx, sequence_schema: SequenceSchema) -> Result { diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index b3600e26e60..21304cde740 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -38,7 +38,7 @@ use spacetimedb_data_structures::error_stream::ErrorStream; use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap}; use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload, WorkloadType}; use spacetimedb_datastore::locking_tx_datastore::MutTxId; -use spacetimedb_datastore::system_tables::{ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_CLIENT_ID}; +use spacetimedb_datastore::system_tables::{ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_SUB_ID}; use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData}; use spacetimedb_durability::DurableOffset; use spacetimedb_execution::pipelined::{PipelinedProject, ViewProject}; @@ -1024,7 +1024,7 @@ impl ModuleHost { caller_identity: Identity, caller_connection_id: ConnectionId, inst: &mut Instance, - clear_view_tables: bool, + drop_view_subscribers: bool, ) -> Result<(), ReducerCallError> { let reducer_lookup = self.info.module_def.lifecycle_reducer(Lifecycle::OnDisconnect); let reducer_name = reducer_lookup @@ -1047,16 +1047,21 @@ impl ModuleHost { let me = self.clone(); let stdb = me.module.replica_ctx().relational_db.clone(); + // Decrement the number of subscribers for each view this caller is subscribed to + let dec_view_subscribers = |tx: &mut MutTxId| { + if drop_view_subscribers { + if let Err(err) = tx.dec_st_view_subscribers(caller_identity) { + log::error!("`call_identity_disconnected`: failed to delete client view data: {err}"); + } + } + }; + // A fallback transaction that deletes the client from `st_client`. let fallback = || { let database_identity = me.info.database_identity; stdb.with_auto_commit(workload(), |mut_tx| { - if clear_view_tables { - if let Err(err) = mut_tx.delete_view_data_for_client(caller_identity, caller_connection_id) { - log::error!("`call_identity_disconnected`: failed to delete client view data: {err}"); - } - } + dec_view_subscribers(mut_tx); if !is_client_exist(mut_tx) { // The client is already gone. Nothing to do. @@ -1086,11 +1091,7 @@ impl ModuleHost { let stdb = me.module.replica_ctx().relational_db.clone(); let mut mut_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload()); - if clear_view_tables { - if let Err(err) = mut_tx.delete_view_data_for_client(caller_identity, caller_connection_id) { - log::error!("`call_identity_disconnected`: failed to delete client view data: {err}"); - } - } + dec_view_subscribers(&mut mut_tx); if !is_client_exist(&mut_tx) { // The client is already gone. Nothing to do. @@ -1165,11 +1166,11 @@ impl ModuleHost { &self, caller_identity: Identity, caller_connection_id: ConnectionId, - clear_view_tables: bool, + drop_view_subscribers: bool, ) -> Result<(), ReducerCallError> { let me = self.clone(); self.call("call_identity_disconnected", move |inst| { - me.call_identity_disconnected_inner(caller_identity, caller_connection_id, inst, clear_view_tables) + me.call_identity_disconnected_inner(caller_identity, caller_connection_id, inst, drop_view_subscribers) }) .await? } @@ -1184,7 +1185,7 @@ impl ModuleHost { stdb.clear_all_views(mut_tx)?; stdb.clear_table(mut_tx, ST_CONNECTION_CREDENTIALS_ID)?; stdb.clear_table(mut_tx, ST_CLIENT_ID)?; - stdb.clear_table(mut_tx, ST_VIEW_CLIENT_ID)?; + stdb.clear_table(mut_tx, ST_VIEW_SUB_ID)?; Ok::<(), DBError>(()) }) }) diff --git a/crates/datastore/src/locking_tx_datastore/committed_state.rs b/crates/datastore/src/locking_tx_datastore/committed_state.rs index 24df23ae55f..501980284b9 100644 --- a/crates/datastore/src/locking_tx_datastore/committed_state.rs +++ b/crates/datastore/src/locking_tx_datastore/committed_state.rs @@ -24,8 +24,8 @@ use crate::{ use crate::{ locking_tx_datastore::mut_tx::ReadSet, system_tables::{ - ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_IDX, ST_VIEW_CLIENT_ID, ST_VIEW_CLIENT_IDX, - ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_IDX, ST_VIEW_ID, ST_VIEW_IDX, ST_VIEW_PARAM_ID, ST_VIEW_PARAM_IDX, + ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_IDX, ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_IDX, ST_VIEW_ID, + ST_VIEW_IDX, ST_VIEW_PARAM_ID, ST_VIEW_PARAM_IDX, ST_VIEW_SUB_ID, ST_VIEW_SUB_IDX, }, }; use anyhow::anyhow; @@ -304,7 +304,7 @@ impl CommittedState { self.create_table(ST_VIEW_ID, schemas[ST_VIEW_IDX].clone()); self.create_table(ST_VIEW_PARAM_ID, schemas[ST_VIEW_PARAM_IDX].clone()); self.create_table(ST_VIEW_COLUMN_ID, schemas[ST_VIEW_COLUMN_IDX].clone()); - self.create_table(ST_VIEW_CLIENT_ID, schemas[ST_VIEW_CLIENT_IDX].clone()); + self.create_table(ST_VIEW_SUB_ID, schemas[ST_VIEW_SUB_IDX].clone()); self.create_table(ST_VIEW_ARG_ID, schemas[ST_VIEW_ARG_IDX].clone()); // Insert the sequences into `st_sequences` diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index 8f5a6cc1cb6..3eaa4f65aa7 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -1256,9 +1256,9 @@ mod tests { ST_CLIENT_NAME, ST_COLUMN_ID, ST_COLUMN_NAME, ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_NAME, ST_CONSTRAINT_ID, ST_CONSTRAINT_NAME, ST_INDEX_ID, ST_INDEX_NAME, ST_MODULE_NAME, ST_RESERVED_SEQUENCE_RANGE, ST_ROW_LEVEL_SECURITY_ID, ST_ROW_LEVEL_SECURITY_NAME, ST_SCHEDULED_ID, ST_SCHEDULED_NAME, ST_SEQUENCE_ID, - ST_SEQUENCE_NAME, ST_TABLE_NAME, ST_VAR_ID, ST_VAR_NAME, ST_VIEW_ARG_ID, ST_VIEW_ARG_NAME, ST_VIEW_CLIENT_ID, - ST_VIEW_CLIENT_NAME, ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_NAME, ST_VIEW_ID, ST_VIEW_NAME, ST_VIEW_PARAM_ID, - ST_VIEW_PARAM_NAME, + ST_SEQUENCE_NAME, ST_TABLE_NAME, ST_VAR_ID, ST_VAR_NAME, ST_VIEW_ARG_ID, ST_VIEW_ARG_NAME, ST_VIEW_COLUMN_ID, + ST_VIEW_COLUMN_NAME, ST_VIEW_ID, ST_VIEW_NAME, ST_VIEW_PARAM_ID, ST_VIEW_PARAM_NAME, ST_VIEW_SUB_ID, + ST_VIEW_SUB_NAME, }; use crate::traits::{IsolationLevel, MutTx}; use crate::Result; @@ -1272,7 +1272,7 @@ mod tests { use spacetimedb_lib::error::ResultTest; use spacetimedb_lib::st_var::StVarValue; use spacetimedb_lib::{resolved_type_via_v9, ScheduleAt, TimeDuration}; - use spacetimedb_primitives::{col_list, ColId, ScheduleId, ViewId}; + use spacetimedb_primitives::{col_list, ArgId, ColId, ScheduleId, ViewId}; use spacetimedb_sats::algebraic_value::ser::value_serialize; use spacetimedb_sats::bsatn::ToBsatn; use spacetimedb_sats::layout::RowTypeLayout; @@ -1715,7 +1715,7 @@ mod tests { TableRow { id: ST_VIEW_ID.into(), name: ST_VIEW_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: Some(StViewFields::ViewId.into()) }, TableRow { id: ST_VIEW_PARAM_ID.into(), name: ST_VIEW_PARAM_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None }, TableRow { id: ST_VIEW_COLUMN_ID.into(), name: ST_VIEW_COLUMN_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None }, - TableRow { id: ST_VIEW_CLIENT_ID.into(), name: ST_VIEW_CLIENT_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None }, + TableRow { id: ST_VIEW_SUB_ID.into(), name: ST_VIEW_SUB_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None }, TableRow { id: ST_VIEW_ARG_ID.into(), name: ST_VIEW_ARG_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: Some(StViewArgFields::Id.into()) }, ])); @@ -1793,10 +1793,12 @@ mod tests { ColRow { table: ST_VIEW_COLUMN_ID.into(), pos: 2, name: "col_name", ty: AlgebraicType::String }, ColRow { table: ST_VIEW_COLUMN_ID.into(), pos: 3, name: "col_type", ty: AlgebraicType::bytes() }, - ColRow { table: ST_VIEW_CLIENT_ID.into(), pos: 0, name: "view_id", ty: ViewId::get_type() }, - ColRow { table: ST_VIEW_CLIENT_ID.into(), pos: 1, name: "arg_id", ty: AlgebraicType::U64 }, - ColRow { table: ST_VIEW_CLIENT_ID.into(), pos: 2, name: "identity", ty: AlgebraicType::U256 }, - ColRow { table: ST_VIEW_CLIENT_ID.into(), pos: 3, name: "connection_id", ty: AlgebraicType::U128 }, + ColRow { table: ST_VIEW_SUB_ID.into(), pos: 0, name: "view_id", ty: ViewId::get_type() }, + ColRow { table: ST_VIEW_SUB_ID.into(), pos: 1, name: "arg_id", ty: ArgId::get_type() }, + ColRow { table: ST_VIEW_SUB_ID.into(), pos: 2, name: "identity", ty: AlgebraicType::U256 }, + ColRow { table: ST_VIEW_SUB_ID.into(), pos: 3, name: "num_subscribers", ty: AlgebraicType::U64 }, + ColRow { table: ST_VIEW_SUB_ID.into(), pos: 4, name: "has_subscribers", ty: AlgebraicType::Bool }, + ColRow { table: ST_VIEW_SUB_ID.into(), pos: 5, name: "last_called", ty: AlgebraicType::I64 }, ColRow { table: ST_VIEW_ARG_ID.into(), pos: 0, name: "id", ty: AlgebraicType::U64 }, ColRow { table: ST_VIEW_ARG_ID.into(), pos: 1, name: "bytes", ty: AlgebraicType::bytes() }, @@ -1820,8 +1822,8 @@ mod tests { IndexRow { id: 15, table: ST_VIEW_ID.into(), col: col(1), name: "st_view_view_name_idx_btree", }, IndexRow { id: 16, table: ST_VIEW_PARAM_ID.into(), col: col_list![0, 1], name: "st_view_param_view_id_param_pos_idx_btree", }, IndexRow { id: 17, table: ST_VIEW_COLUMN_ID.into(), col: col_list![0, 1], name: "st_view_column_view_id_col_pos_idx_btree", }, - IndexRow { id: 18, table: ST_VIEW_CLIENT_ID.into(), col: col_list![0, 1], name: "st_view_client_view_id_arg_id_idx_btree", }, - IndexRow { id: 19, table: ST_VIEW_CLIENT_ID.into(), col: col_list![2, 3], name: "st_view_client_identity_connection_id_idx_btree", }, + IndexRow { id: 18, table: ST_VIEW_SUB_ID.into(), col: col(2), name: "st_view_sub_identity_idx_btree", }, + IndexRow { id: 19, table: ST_VIEW_SUB_ID.into(), col: col(4), name: "st_view_sub_has_subscribers_idx_btree", }, IndexRow { id: 20, table: ST_VIEW_ARG_ID.into(), col: col(0), name: "st_view_arg_id_idx_btree", }, IndexRow { id: 21, table: ST_VIEW_ARG_ID.into(), col: col(1), name: "st_view_arg_bytes_idx_btree", }, ])); @@ -2282,8 +2284,8 @@ mod tests { IndexRow { id: 15, table: ST_VIEW_ID.into(), col: col(1), name: "st_view_view_name_idx_btree", }, IndexRow { id: 16, table: ST_VIEW_PARAM_ID.into(), col: col_list![0, 1], name: "st_view_param_view_id_param_pos_idx_btree", }, IndexRow { id: 17, table: ST_VIEW_COLUMN_ID.into(), col: col_list![0, 1], name: "st_view_column_view_id_col_pos_idx_btree", }, - IndexRow { id: 18, table: ST_VIEW_CLIENT_ID.into(), col: col_list![0, 1], name: "st_view_client_view_id_arg_id_idx_btree", }, - IndexRow { id: 19, table: ST_VIEW_CLIENT_ID.into(), col: col_list![2, 3], name: "st_view_client_identity_connection_id_idx_btree", }, + IndexRow { id: 18, table: ST_VIEW_SUB_ID.into(), col: col(2), name: "st_view_sub_identity_idx_btree", }, + IndexRow { id: 19, table: ST_VIEW_SUB_ID.into(), col: col(4), name: "st_view_sub_has_subscribers_idx_btree", }, IndexRow { id: 20, table: ST_VIEW_ARG_ID.into(), col: col(0), name: "st_view_arg_id_idx_btree", }, IndexRow { id: 21, table: ST_VIEW_ARG_ID.into(), col: col(1), name: "st_view_arg_bytes_idx_btree", }, IndexRow { id: seq_start, table: FIRST_NON_SYSTEM_ID, col: col(0), name: "Foo_id_idx_btree", }, diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index 34902d3c7ae..38492bc1a6c 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -10,8 +10,8 @@ use super::{ }; use crate::system_tables::{ system_tables, ConnectionIdViaU128, IdentityViaU256, StConnectionCredentialsFields, StConnectionCredentialsRow, - StViewClientFields, StViewClientRow, StViewColumnFields, StViewFields, StViewParamFields, StViewParamRow, - ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_CLIENT_ID, ST_VIEW_COLUMN_ID, ST_VIEW_ID, ST_VIEW_PARAM_ID, + StViewColumnFields, StViewFields, StViewParamFields, StViewParamRow, StViewSubsFields, StViewSubsRow, + ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_COLUMN_ID, ST_VIEW_ID, ST_VIEW_PARAM_ID, ST_VIEW_SUB_ID, }; use crate::traits::{InsertFlags, RowTypeForTable, TxData, UpdateFlags}; use crate::{ @@ -39,7 +39,7 @@ use spacetimedb_lib::{ ConnectionId, Identity, }; use spacetimedb_primitives::{ - col_list, ArgId, ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId, ViewId, + col_list, ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId, ViewId, }; use spacetimedb_sats::{ bsatn::{self, to_writer, DecodeError, Deserializer}, @@ -1767,85 +1767,58 @@ impl<'a, I: Iterator>> Iterator for FilterDeleted<'a, I> { } impl MutTxId { - /// Delete view data and metadata for a client connection. - pub fn delete_view_data_for_client(&mut self, sender: Identity, connection_id: ConnectionId) -> Result<()> { - for row in self.delete_st_view_client_rows(sender, connection_id)? { - if !self.is_identity_subscribed_to_view_args(row.view_id, row.arg_id, sender)? { - self.delete_view_rows_for_identity(row.view_id, row.arg_id, sender)?; - } - } - Ok(()) - } - - /// Deletes and return the rows in `st_view_client` for a client connection. - fn delete_st_view_client_rows( - &mut self, - sender: Identity, - connection_id: ConnectionId, - ) -> Result> { + /// Decrements the number of subscribers in `st_view_sub` for a client identity. + pub fn dec_st_view_subscribers(&mut self, sender: Identity) -> Result<()> { let sender = IdentityViaU256(sender); - let conn_id = ConnectionIdViaU128(connection_id); - let cols = col_list![StViewClientFields::Identity, StViewClientFields::ConnectionId]; - let value = AlgebraicValue::product([sender.into(), conn_id.into()]); - self.iter_by_col_eq(ST_VIEW_CLIENT_ID, cols, &value)? - .map(|row_ref| StViewClientRow::try_from(row_ref).map(|row| (row, row_ref.pointer()))) - .collect::>>()? - .into_iter() - .map(|(row, ptr)| self.delete(ST_VIEW_CLIENT_ID, ptr).map(|_| row)) - .collect() - } + let cols = col_list![StViewSubsFields::Identity]; + let value = sender.into(); + + // Collect the rows for this identity. + // These are rows for which we will decrement the subscriber count. + let rows_to_delete = self + .iter_by_col_eq(ST_VIEW_SUB_ID, cols, &value)? + .map(|row_ref| StViewSubsRow::try_from(row_ref).map(|row| (row, row_ref.pointer()))) + .filter(|result| match result { + Ok((row, _)) => row.has_subscribers && row.num_subscribers > 0, + _ => true, + }) + .collect::>>()?; - /// Is anyone is subscribed to the view arguments identified by `arg_id`? - fn is_identity_subscribed_to_view_args(&self, view_id: ViewId, arg_id: ArgId, sender: Identity) -> Result { - Ok(self - .iter_by_col_eq( - ST_VIEW_CLIENT_ID, - col_list![ - StViewClientFields::ViewId, - StViewClientFields::ArgId, - StViewClientFields::Identity - ], - &AlgebraicValue::product([view_id.into(), arg_id.into(), sender.into()]), - )? - .next() - .is_some()) - } + // Copy the rows to delete and decrement their subscriber count. + // These are the rows that we will insert. + let rows_to_insert = rows_to_delete + .iter() + .map(|(row, _)| row.clone()) + .map(|row| StViewSubsRow { + num_subscribers: row.num_subscribers - 1, + has_subscribers: row.num_subscribers > 1, + ..row + }) + .collect::>(); - /// Looks up a row in `st_view` by its primary key. - fn st_view_row(&self, view_id: ViewId) -> Result> { - self.iter_by_col_eq(ST_VIEW_ID, col_list![StViewFields::ViewId], &view_id.into())? - .next() - .map(StViewRow::try_from) - .transpose() - } + // Delete the old rows + for (_, ptr) in rows_to_delete { + self.delete(ST_VIEW_SUB_ID, ptr)?; + } - /// Returns the [`TableId`] for this view's backing table by probing `st_view`. - /// Note, all views with at least one subscriber are materialized. - fn get_table_id_for_view(&self, view_id: ViewId) -> Result> { - Ok(self - .st_view_row(view_id)? - .and_then(|row| row.table_id.map(|id| (id, row.is_anonymous)))) + // Insert the new rows + for row in rows_to_insert { + self.insert_via_serialize_bsatn(ST_VIEW_SUB_ID, &row)?; + } + + Ok(()) } - /// Deletes the rows of a view subscribed to by `sender`. - fn delete_view_rows_for_identity(&mut self, view_id: ViewId, arg_id: ArgId, sender: Identity) -> Result<()> { - if let Some((table_id, is_anonymous)) = self.get_table_id_for_view(view_id)? { - let value = if is_anonymous { - let none_sender = AlgebraicValue::OptionNone(); - AlgebraicValue::product([none_sender, arg_id.into()]) - } else { - let sender = IdentityViaU256(sender); - let some_sender = AlgebraicValue::OptionSome(sender.into()); - AlgebraicValue::product([some_sender, arg_id.into()]) - }; - for row_pointer in self - .iter_by_col_eq(table_id, col_list![0, 1], &value)? - .map(|row_ref| row_ref.pointer()) - .collect::>() - .into_iter() - { - self.delete(table_id, row_pointer)?; - } + /// Clear all rows from all view tables without dropping them. + pub fn clear_all_views(&mut self) -> Result<()> { + for table_id in self + .iter(ST_VIEW_ID)? + .map(StViewRow::try_from) + .collect::>>()? + .into_iter() + .filter_map(|row| row.table_id) + { + self.clear_table(table_id)?; } Ok(()) } diff --git a/crates/datastore/src/system_tables.rs b/crates/datastore/src/system_tables.rs index 4001cefef60..c3a2c33d7b8 100644 --- a/crates/datastore/src/system_tables.rs +++ b/crates/datastore/src/system_tables.rs @@ -17,7 +17,7 @@ use spacetimedb_lib::db::raw_def::*; use spacetimedb_lib::de::{Deserialize, DeserializeOwned, Error}; use spacetimedb_lib::ser::Serialize; use spacetimedb_lib::st_var::StVarValue; -use spacetimedb_lib::{ConnectionId, Identity, ProductValue, SpacetimeType}; +use spacetimedb_lib::{ConnectionId, Identity, ProductValue, SpacetimeType, Timestamp}; use spacetimedb_primitives::*; use spacetimedb_sats::algebraic_value::ser::value_serialize; use spacetimedb_sats::hash::Hash; @@ -71,8 +71,8 @@ pub const ST_VIEW_ID: TableId = TableId(12); pub const ST_VIEW_PARAM_ID: TableId = TableId(13); /// The static ID of the table that tracks view columns pub const ST_VIEW_COLUMN_ID: TableId = TableId(14); -/// The static ID of the table that tracks the clients subscribed to each view -pub const ST_VIEW_CLIENT_ID: TableId = TableId(15); +/// The static ID of the table that tracks the number of clients subscribed to each view +pub const ST_VIEW_SUB_ID: TableId = TableId(15); /// The static ID of the table that tracks view arguments pub const ST_VIEW_ARG_ID: TableId = TableId(16); @@ -90,7 +90,7 @@ pub(crate) const ST_ROW_LEVEL_SECURITY_NAME: &str = "st_row_level_security"; pub(crate) const ST_VIEW_NAME: &str = "st_view"; pub(crate) const ST_VIEW_PARAM_NAME: &str = "st_view_param"; pub(crate) const ST_VIEW_COLUMN_NAME: &str = "st_view_column"; -pub(crate) const ST_VIEW_CLIENT_NAME: &str = "st_view_client"; +pub(crate) const ST_VIEW_SUB_NAME: &str = "st_view_sub"; pub(crate) const ST_VIEW_ARG_NAME: &str = "st_view_arg"; /// Reserved range of sequence values used for system tables. /// @@ -138,7 +138,7 @@ pub fn system_tables() -> [TableSchema; 16] { st_view_schema(), st_view_param_schema(), st_view_column_schema(), - st_view_client_schema(), + st_view_sub_schema(), st_view_arg_schema(), ] } @@ -182,7 +182,7 @@ pub(crate) const ST_CONNECTION_CREDENTIALS_IDX: usize = 10; pub(crate) const ST_VIEW_IDX: usize = 11; pub(crate) const ST_VIEW_PARAM_IDX: usize = 12; pub(crate) const ST_VIEW_COLUMN_IDX: usize = 13; -pub(crate) const ST_VIEW_CLIENT_IDX: usize = 14; +pub(crate) const ST_VIEW_SUB_IDX: usize = 14; pub(crate) const ST_VIEW_ARG_IDX: usize = 15; macro_rules! st_fields_enum { @@ -250,11 +250,13 @@ st_fields_enum!(enum StViewColumnFields { "col_type", ColType = 3, }); // WARNING: For a stable schema, don't change the field names and discriminants. -st_fields_enum!(enum StViewClientFields { +st_fields_enum!(enum StViewSubsFields { "view_id", ViewId = 0, "arg_id", ArgId = 1, "identity", Identity = 2, - "connection_id", ConnectionId = 3, + "num_subscribers", NumSubscribers = 3, + "has_subscribers", HasSubscribers = 4, + "last_called", LastCalled = 5, }); // WARNING: For a stable schema, don't change the field names and discriminants. st_fields_enum!(enum StViewArgFields { @@ -398,15 +400,12 @@ fn system_module_def() -> ModuleDef { .with_unique_constraint(st_view_param_unique_cols) .with_index_no_accessor_name(btree(st_view_param_unique_cols)); - let st_view_client_type = builder.add_type::(); + let st_view_sub_type = builder.add_type::(); builder - .build_table( - ST_VIEW_CLIENT_NAME, - *st_view_client_type.as_ref().expect("should be ref"), - ) + .build_table(ST_VIEW_SUB_NAME, *st_view_sub_type.as_ref().expect("should be ref")) .with_type(TableType::System) - .with_index_no_accessor_name(btree([StViewClientFields::ViewId, StViewClientFields::ArgId])) - .with_index_no_accessor_name(btree([StViewClientFields::Identity, StViewClientFields::ConnectionId])); + .with_index_no_accessor_name(btree(StViewSubsFields::Identity)) + .with_index_no_accessor_name(btree(StViewSubsFields::HasSubscribers)); let st_view_arg_type = builder.add_type::(); builder @@ -516,7 +515,7 @@ fn system_module_def() -> ModuleDef { validate_system_table::(&result, ST_VIEW_NAME); validate_system_table::(&result, ST_VIEW_PARAM_NAME); validate_system_table::(&result, ST_VIEW_COLUMN_NAME); - validate_system_table::(&result, ST_VIEW_CLIENT_NAME); + validate_system_table::(&result, ST_VIEW_SUB_NAME); validate_system_table::(&result, ST_VIEW_ARG_NAME); result @@ -585,8 +584,8 @@ lazy_static::lazy_static! { m.insert("st_view_view_name_idx_btree", IndexId(15)); m.insert("st_view_param_view_id_param_pos_idx_btree", IndexId(16)); m.insert("st_view_column_view_id_col_pos_idx_btree", IndexId(17)); - m.insert("st_view_client_view_id_arg_id_idx_btree", IndexId(18)); - m.insert("st_view_client_identity_connection_id_idx_btree", IndexId(19)); + m.insert("st_view_sub_identity_idx_btree", IndexId(18)); + m.insert("st_view_sub_has_subscribers_idx_btree", IndexId(19)); m.insert("st_view_arg_id_idx_btree", IndexId(20)); m.insert("st_view_arg_bytes_idx_btree", IndexId(21)); m @@ -717,8 +716,8 @@ pub fn st_view_column_schema() -> TableSchema { st_schema(ST_VIEW_COLUMN_NAME, ST_VIEW_COLUMN_ID) } -pub fn st_view_client_schema() -> TableSchema { - st_schema(ST_VIEW_CLIENT_NAME, ST_VIEW_CLIENT_ID) +pub fn st_view_sub_schema() -> TableSchema { + st_schema(ST_VIEW_SUB_NAME, ST_VIEW_SUB_ID) } pub fn st_view_arg_schema() -> TableSchema { @@ -747,7 +746,7 @@ pub(crate) fn system_table_schema(table_id: TableId) -> Option { ST_VIEW_ID => Some(st_view_schema()), ST_VIEW_PARAM_ID => Some(st_view_param_schema()), ST_VIEW_COLUMN_ID => Some(st_view_column_schema()), - ST_VIEW_CLIENT_ID => Some(st_view_client_schema()), + ST_VIEW_SUB_ID => Some(st_view_sub_schema()), ST_VIEW_ARG_ID => Some(st_view_arg_schema()), _ => None, } @@ -923,21 +922,23 @@ pub struct StViewParamRow { pub param_type: AlgebraicTypeViaBytes, } -/// System table [ST_VIEW_CLIENT_NAME] +/// System table [ST_VIEW_SUB_NAME] /// -/// | view_id | arg_id | identity | connection_id | -/// |---------|--------|----------|---------------| -/// | 1 | 2 | 0x... | 0x... | +/// | view_id | arg_id | identity | num_subscribers | has_subscribers | last_called | +/// |---------|--------|----------|-----------------|-----------------|-------------| +/// | 1 | 2 | 0x... | 3 | true | | #[derive(Debug, Clone, Eq, PartialEq, SpacetimeType)] #[sats(crate = spacetimedb_lib)] -pub struct StViewClientRow { +pub struct StViewSubsRow { pub view_id: ViewId, pub arg_id: ArgId, pub identity: IdentityViaU256, - pub connection_id: ConnectionIdViaU128, + pub num_subscribers: u64, + pub has_subscribers: bool, + pub last_called: TimestampViaI64, } -impl TryFrom> for StViewClientRow { +impl TryFrom> for StViewSubsRow { type Error = DatastoreError; fn try_from(row: RowRef<'_>) -> Result { @@ -945,12 +946,6 @@ impl TryFrom> for StViewClientRow { } } -impl From for (Identity, ConnectionId) { - fn from(value: StViewClientRow) -> Self { - (value.identity.0, value.connection_id.0) - } -} - /// System table [ST_VIEW_ARG_NAME] /// /// | id | bytes | @@ -1258,6 +1253,24 @@ impl From for AlgebraicValue { } } +/// A wrapper for [`Timestamp`] that acts like [`AlgebraicType::I64`] for serialization purposes. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct TimestampViaI64(pub Timestamp); +impl_serialize!([] TimestampViaI64, (self, ser) => self.0.to_micros_since_unix_epoch().serialize(ser)); +impl_deserialize!([] TimestampViaI64, de => ::deserialize(de).map(Timestamp::from_micros_since_unix_epoch).map(TimestampViaI64)); +impl_st!([] TimestampViaI64, AlgebraicType::I64); +impl From for TimestampViaI64 { + fn from(ts: Timestamp) -> Self { + Self(ts) + } +} + +impl From for AlgebraicValue { + fn from(val: TimestampViaI64) -> Self { + AlgebraicValue::I64(val.0.to_micros_since_unix_epoch()) + } +} + /// System table [ST_MODULE_NAME] /// This table holds exactly one row, describing the latest version of the /// SpacetimeDB module associated with the database: From 6c823d034b1fea722fc6094a64ec8732c3bd5d70 Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Wed, 5 Nov 2025 22:54:17 -0800 Subject: [PATCH 4/4] Another index --- .../src/locking_tx_datastore/datastore.rs | 10 +-- .../src/locking_tx_datastore/mut_tx.rs | 64 +++++++++++++++++-- crates/datastore/src/system_tables.rs | 24 ++++--- 3 files changed, 79 insertions(+), 19 deletions(-) diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index 3eaa4f65aa7..bc15c84be16 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -1824,8 +1824,9 @@ mod tests { IndexRow { id: 17, table: ST_VIEW_COLUMN_ID.into(), col: col_list![0, 1], name: "st_view_column_view_id_col_pos_idx_btree", }, IndexRow { id: 18, table: ST_VIEW_SUB_ID.into(), col: col(2), name: "st_view_sub_identity_idx_btree", }, IndexRow { id: 19, table: ST_VIEW_SUB_ID.into(), col: col(4), name: "st_view_sub_has_subscribers_idx_btree", }, - IndexRow { id: 20, table: ST_VIEW_ARG_ID.into(), col: col(0), name: "st_view_arg_id_idx_btree", }, - IndexRow { id: 21, table: ST_VIEW_ARG_ID.into(), col: col(1), name: "st_view_arg_bytes_idx_btree", }, + IndexRow { id: 20, table: ST_VIEW_SUB_ID.into(), col: col_list![0, 1, 2], name: "st_view_sub_view_id_arg_id_identity_idx_btree", }, + IndexRow { id: 21, table: ST_VIEW_ARG_ID.into(), col: col(0), name: "st_view_arg_id_idx_btree", }, + IndexRow { id: 22, table: ST_VIEW_ARG_ID.into(), col: col(1), name: "st_view_arg_bytes_idx_btree", }, ])); let start = ST_RESERVED_SEQUENCE_RANGE as i128 + 1; #[rustfmt::skip] @@ -2286,8 +2287,9 @@ mod tests { IndexRow { id: 17, table: ST_VIEW_COLUMN_ID.into(), col: col_list![0, 1], name: "st_view_column_view_id_col_pos_idx_btree", }, IndexRow { id: 18, table: ST_VIEW_SUB_ID.into(), col: col(2), name: "st_view_sub_identity_idx_btree", }, IndexRow { id: 19, table: ST_VIEW_SUB_ID.into(), col: col(4), name: "st_view_sub_has_subscribers_idx_btree", }, - IndexRow { id: 20, table: ST_VIEW_ARG_ID.into(), col: col(0), name: "st_view_arg_id_idx_btree", }, - IndexRow { id: 21, table: ST_VIEW_ARG_ID.into(), col: col(1), name: "st_view_arg_bytes_idx_btree", }, + IndexRow { id: 20, table: ST_VIEW_SUB_ID.into(), col: col_list![0, 1, 2], name: "st_view_sub_view_id_arg_id_identity_idx_btree", }, + IndexRow { id: 21, table: ST_VIEW_ARG_ID.into(), col: col(0), name: "st_view_arg_id_idx_btree", }, + IndexRow { id: 22, table: ST_VIEW_ARG_ID.into(), col: col(1), name: "st_view_arg_bytes_idx_btree", }, IndexRow { id: seq_start, table: FIRST_NON_SYSTEM_ID, col: col(0), name: "Foo_id_idx_btree", }, IndexRow { id: seq_start + 1, table: FIRST_NON_SYSTEM_ID, col: col(1), name: "Foo_name_idx_btree", }, IndexRow { id: seq_start + 2, table: FIRST_NON_SYSTEM_ID, col: col(2), name: "Foo_age_idx_btree", }, diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index 38492bc1a6c..f11198a61ef 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -10,7 +10,7 @@ use super::{ }; use crate::system_tables::{ system_tables, ConnectionIdViaU128, IdentityViaU256, StConnectionCredentialsFields, StConnectionCredentialsRow, - StViewColumnFields, StViewFields, StViewParamFields, StViewParamRow, StViewSubsFields, StViewSubsRow, + StViewColumnFields, StViewFields, StViewParamFields, StViewParamRow, StViewSubFields, StViewSubRow, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_COLUMN_ID, ST_VIEW_ID, ST_VIEW_PARAM_ID, ST_VIEW_SUB_ID, }; use crate::traits::{InsertFlags, RowTypeForTable, TxData, UpdateFlags}; @@ -33,13 +33,13 @@ use smallvec::SmallVec; use spacetimedb_data_structures::map::{IntMap, IntSet}; use spacetimedb_durability::TxOffset; use spacetimedb_execution::{dml::MutDatastore, Datastore, DeltaStore, Row}; -use spacetimedb_lib::{db::raw_def::v9::RawSql, metrics::ExecutionMetrics}; +use spacetimedb_lib::{db::raw_def::v9::RawSql, metrics::ExecutionMetrics, Timestamp}; use spacetimedb_lib::{ db::{auth::StAccess, raw_def::SEQUENCE_ALLOCATION_STEP}, ConnectionId, Identity, }; use spacetimedb_primitives::{ - col_list, ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId, ViewId, + col_list, ArgId, ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId, ViewId, }; use spacetimedb_sats::{ bsatn::{self, to_writer, DecodeError, Deserializer}, @@ -1767,17 +1767,69 @@ impl<'a, I: Iterator>> Iterator for FilterDeleted<'a, I> { } impl MutTxId { + /// Does this caller have an entry for `view_id` in `st_view_sub`? + pub fn is_view_materialized(&self, view_id: ViewId, arg_id: ArgId, sender: Identity) -> Result { + use StViewSubFields::*; + let sender = IdentityViaU256(sender); + let cols = col_list![ViewId, ArgId, Identity]; + let value = AlgebraicValue::product([view_id.into(), arg_id.into(), sender.into()]); + Ok(self.iter_by_col_eq(ST_VIEW_SUB_ID, cols, &value)?.next().is_some()) + } + + /// Does this caller have an entry for `view_id` in `st_view_sub`? + /// If so, update the `last_called` column. + /// Otherwise insert a row into `st_view_sub` with no subscribers. + pub fn st_view_sub_update_or_insert_last_called( + &mut self, + view_id: ViewId, + arg_id: ArgId, + sender: Identity, + ) -> Result<()> { + use StViewSubFields::*; + + let identity = IdentityViaU256(sender); + let cols = col_list![ViewId, ArgId, Identity]; + let value = AlgebraicValue::product([view_id.into(), arg_id.into(), identity.into()]); + let last_called = Timestamp::now().into(); + + // Update `last_called` of `st_view_sub` row + if let Some((row, ptr)) = self + .iter_by_col_eq(ST_VIEW_SUB_ID, cols, &value)? + .next() + .map(|row_ref| StViewSubRow::try_from(row_ref).map(|row| (row, row_ref.pointer()))) + .transpose()? + { + self.delete(ST_VIEW_SUB_ID, ptr)?; + self.insert_via_serialize_bsatn(ST_VIEW_SUB_ID, &StViewSubRow { last_called, ..row })?; + return Ok(()); + } + + // Insert `st_view_sub` row with 0 subscribers + self.insert_via_serialize_bsatn( + ST_VIEW_SUB_ID, + &StViewSubRow { + view_id, + arg_id, + identity, + num_subscribers: 0, + has_subscribers: false, + last_called, + }, + )?; + Ok(()) + } + /// Decrements the number of subscribers in `st_view_sub` for a client identity. pub fn dec_st_view_subscribers(&mut self, sender: Identity) -> Result<()> { let sender = IdentityViaU256(sender); - let cols = col_list![StViewSubsFields::Identity]; + let cols = col_list![StViewSubFields::Identity]; let value = sender.into(); // Collect the rows for this identity. // These are rows for which we will decrement the subscriber count. let rows_to_delete = self .iter_by_col_eq(ST_VIEW_SUB_ID, cols, &value)? - .map(|row_ref| StViewSubsRow::try_from(row_ref).map(|row| (row, row_ref.pointer()))) + .map(|row_ref| StViewSubRow::try_from(row_ref).map(|row| (row, row_ref.pointer()))) .filter(|result| match result { Ok((row, _)) => row.has_subscribers && row.num_subscribers > 0, _ => true, @@ -1789,7 +1841,7 @@ impl MutTxId { let rows_to_insert = rows_to_delete .iter() .map(|(row, _)| row.clone()) - .map(|row| StViewSubsRow { + .map(|row| StViewSubRow { num_subscribers: row.num_subscribers - 1, has_subscribers: row.num_subscribers > 1, ..row diff --git a/crates/datastore/src/system_tables.rs b/crates/datastore/src/system_tables.rs index c3a2c33d7b8..a8de22c0b99 100644 --- a/crates/datastore/src/system_tables.rs +++ b/crates/datastore/src/system_tables.rs @@ -250,7 +250,7 @@ st_fields_enum!(enum StViewColumnFields { "col_type", ColType = 3, }); // WARNING: For a stable schema, don't change the field names and discriminants. -st_fields_enum!(enum StViewSubsFields { +st_fields_enum!(enum StViewSubFields { "view_id", ViewId = 0, "arg_id", ArgId = 1, "identity", Identity = 2, @@ -400,12 +400,17 @@ fn system_module_def() -> ModuleDef { .with_unique_constraint(st_view_param_unique_cols) .with_index_no_accessor_name(btree(st_view_param_unique_cols)); - let st_view_sub_type = builder.add_type::(); + let st_view_sub_type = builder.add_type::(); builder .build_table(ST_VIEW_SUB_NAME, *st_view_sub_type.as_ref().expect("should be ref")) .with_type(TableType::System) - .with_index_no_accessor_name(btree(StViewSubsFields::Identity)) - .with_index_no_accessor_name(btree(StViewSubsFields::HasSubscribers)); + .with_index_no_accessor_name(btree(StViewSubFields::Identity)) + .with_index_no_accessor_name(btree(StViewSubFields::HasSubscribers)) + .with_index_no_accessor_name(btree([ + StViewSubFields::ViewId, + StViewSubFields::ArgId, + StViewSubFields::Identity, + ])); let st_view_arg_type = builder.add_type::(); builder @@ -515,7 +520,7 @@ fn system_module_def() -> ModuleDef { validate_system_table::(&result, ST_VIEW_NAME); validate_system_table::(&result, ST_VIEW_PARAM_NAME); validate_system_table::(&result, ST_VIEW_COLUMN_NAME); - validate_system_table::(&result, ST_VIEW_SUB_NAME); + validate_system_table::(&result, ST_VIEW_SUB_NAME); validate_system_table::(&result, ST_VIEW_ARG_NAME); result @@ -586,8 +591,9 @@ lazy_static::lazy_static! { m.insert("st_view_column_view_id_col_pos_idx_btree", IndexId(17)); m.insert("st_view_sub_identity_idx_btree", IndexId(18)); m.insert("st_view_sub_has_subscribers_idx_btree", IndexId(19)); - m.insert("st_view_arg_id_idx_btree", IndexId(20)); - m.insert("st_view_arg_bytes_idx_btree", IndexId(21)); + m.insert("st_view_sub_view_id_arg_id_identity_idx_btree", IndexId(20)); + m.insert("st_view_arg_id_idx_btree", IndexId(21)); + m.insert("st_view_arg_bytes_idx_btree", IndexId(22)); m }; } @@ -929,7 +935,7 @@ pub struct StViewParamRow { /// | 1 | 2 | 0x... | 3 | true | | #[derive(Debug, Clone, Eq, PartialEq, SpacetimeType)] #[sats(crate = spacetimedb_lib)] -pub struct StViewSubsRow { +pub struct StViewSubRow { pub view_id: ViewId, pub arg_id: ArgId, pub identity: IdentityViaU256, @@ -938,7 +944,7 @@ pub struct StViewSubsRow { pub last_called: TimestampViaI64, } -impl TryFrom> for StViewSubsRow { +impl TryFrom> for StViewSubRow { type Error = DatastoreError; fn try_from(row: RowRef<'_>) -> Result {