Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions crates/client-api/src/routes/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ pub async fn call<S: ControlStateDelegate + NodeDelegate>(
};

module
.call_identity_disconnected(caller_identity, connection_id)
// We don't clear views after reducer calls
.call_identity_disconnected(caller_identity, connection_id, false)
.await
.map_err(client_disconnected_error_to_response)?;

Expand Down Expand Up @@ -274,7 +275,8 @@ async fn procedure<S: ControlStateDelegate + NodeDelegate>(
};

module
.call_identity_disconnected(caller_identity, connection_id)
// We don't clear views after procedure calls
.call_identity_disconnected(caller_identity, connection_id, false)
.await
.map_err(client_disconnected_error_to_response)?;

Expand Down
5 changes: 5 additions & 0 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1403,6 +1403,11 @@ impl RelationalDB {
Ok(rows_deleted)
}

/// Clear all rows from all view tables without dropping them.
pub fn clear_all_views(&self, tx: &mut MutTx) -> Result<(), DBError> {
Ok(tx.clear_all_views()?)
}

pub fn create_sequence(&self, tx: &mut MutTx, sequence_schema: SequenceSchema) -> Result<SequenceId, DBError> {
Ok(self.inner.create_sequence_mut_tx(tx, sequence_schema)?)
}
Expand Down
3 changes: 2 additions & 1 deletion crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -847,9 +847,10 @@ impl Host {
} = launched;

// Disconnect dangling clients.
// No need to clear view tables here since we do it in `clear_all_clients`.
for (identity, connection_id) in connected_clients {
module_host
.call_identity_disconnected(identity, connection_id)
.call_identity_disconnected(identity, connection_id, false)
.await
.with_context(|| {
format!(
Expand Down
26 changes: 22 additions & 4 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use spacetimedb_data_structures::error_stream::ErrorStream;
use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload, WorkloadType};
use spacetimedb_datastore::locking_tx_datastore::MutTxId;
use spacetimedb_datastore::system_tables::{ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID};
use spacetimedb_datastore::system_tables::{ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_SUB_ID};
use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData};
use spacetimedb_durability::DurableOffset;
use spacetimedb_execution::pipelined::{PipelinedProject, ViewProject};
Expand Down Expand Up @@ -903,7 +903,7 @@ impl ModuleHost {
// Call the `client_disconnected` reducer, if it exists.
// This is a no-op if the module doesn't define such a reducer.
this.subscriptions().remove_subscriber(client_id);
this.call_identity_disconnected_inner(client_id.identity, client_id.connection_id, inst)
this.call_identity_disconnected_inner(client_id.identity, client_id.connection_id, inst, true)
})
.await
{
Expand Down Expand Up @@ -1024,6 +1024,7 @@ impl ModuleHost {
caller_identity: Identity,
caller_connection_id: ConnectionId,
inst: &mut Instance,
drop_view_subscribers: bool,
) -> Result<(), ReducerCallError> {
let reducer_lookup = self.info.module_def.lifecycle_reducer(Lifecycle::OnDisconnect);
let reducer_name = reducer_lookup
Expand All @@ -1046,10 +1047,22 @@ impl ModuleHost {
let me = self.clone();
let stdb = me.module.replica_ctx().relational_db.clone();

// Decrement the number of subscribers for each view this caller is subscribed to
let dec_view_subscribers = |tx: &mut MutTxId| {
if drop_view_subscribers {
if let Err(err) = tx.dec_st_view_subscribers(caller_identity) {
log::error!("`call_identity_disconnected`: failed to delete client view data: {err}");
}
}
};

// A fallback transaction that deletes the client from `st_client`.
let fallback = || {
let database_identity = me.info.database_identity;
stdb.with_auto_commit(workload(), |mut_tx| {

dec_view_subscribers(mut_tx);

if !is_client_exist(mut_tx) {
// The client is already gone. Nothing to do.
log::debug!(
Expand All @@ -1076,7 +1089,9 @@ impl ModuleHost {

if let Some((reducer_id, reducer_def)) = reducer_lookup {
let stdb = me.module.replica_ctx().relational_db.clone();
let mut_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload());
let mut mut_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload());

dec_view_subscribers(&mut mut_tx);

if !is_client_exist(&mut_tx) {
// The client is already gone. Nothing to do.
Expand Down Expand Up @@ -1151,10 +1166,11 @@ impl ModuleHost {
&self,
caller_identity: Identity,
caller_connection_id: ConnectionId,
drop_view_subscribers: bool,
) -> Result<(), ReducerCallError> {
let me = self.clone();
self.call("call_identity_disconnected", move |inst| {
me.call_identity_disconnected_inner(caller_identity, caller_connection_id, inst)
me.call_identity_disconnected_inner(caller_identity, caller_connection_id, inst, drop_view_subscribers)
})
.await?
}
Expand All @@ -1166,8 +1182,10 @@ impl ModuleHost {
let stdb = &me.module.replica_ctx().relational_db;
let workload = Workload::Internal;
stdb.with_auto_commit(workload, |mut_tx| {
stdb.clear_all_views(mut_tx)?;
stdb.clear_table(mut_tx, ST_CONNECTION_CREDENTIALS_ID)?;
stdb.clear_table(mut_tx, ST_CLIENT_ID)?;
stdb.clear_table(mut_tx, ST_VIEW_SUB_ID)?;
Ok::<(), DBError>(())
})
})
Expand Down
6 changes: 3 additions & 3 deletions crates/datastore/src/locking_tx_datastore/committed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use crate::{
use crate::{
locking_tx_datastore::mut_tx::ReadSet,
system_tables::{
ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_IDX, ST_VIEW_CLIENT_ID, ST_VIEW_CLIENT_IDX,
ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_IDX, ST_VIEW_ID, ST_VIEW_IDX, ST_VIEW_PARAM_ID, ST_VIEW_PARAM_IDX,
ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_IDX, ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_IDX, ST_VIEW_ID,
ST_VIEW_IDX, ST_VIEW_PARAM_ID, ST_VIEW_PARAM_IDX, ST_VIEW_SUB_ID, ST_VIEW_SUB_IDX,
},
};
use anyhow::anyhow;
Expand Down Expand Up @@ -304,7 +304,7 @@ impl CommittedState {
self.create_table(ST_VIEW_ID, schemas[ST_VIEW_IDX].clone());
self.create_table(ST_VIEW_PARAM_ID, schemas[ST_VIEW_PARAM_IDX].clone());
self.create_table(ST_VIEW_COLUMN_ID, schemas[ST_VIEW_COLUMN_IDX].clone());
self.create_table(ST_VIEW_CLIENT_ID, schemas[ST_VIEW_CLIENT_IDX].clone());
self.create_table(ST_VIEW_SUB_ID, schemas[ST_VIEW_SUB_IDX].clone());
self.create_table(ST_VIEW_ARG_ID, schemas[ST_VIEW_ARG_IDX].clone());

// Insert the sequences into `st_sequences`
Expand Down
44 changes: 24 additions & 20 deletions crates/datastore/src/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1252,13 +1252,13 @@ mod tests {
use crate::system_tables::{
system_tables, StColumnRow, StConnectionCredentialsFields, StConstraintData, StConstraintFields,
StConstraintRow, StIndexAlgorithm, StIndexFields, StIndexRow, StRowLevelSecurityFields, StScheduledFields,
StSequenceFields, StSequenceRow, StTableRow, StVarFields, StViewArgFields, StViewFields, ST_CLIENT_NAME,
ST_COLUMN_ID, ST_COLUMN_NAME, ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_NAME, ST_CONSTRAINT_ID,
ST_CONSTRAINT_NAME, ST_INDEX_ID, ST_INDEX_NAME, ST_MODULE_NAME, ST_RESERVED_SEQUENCE_RANGE,
StSequenceFields, StSequenceRow, StTableRow, StVarFields, StViewArgFields, StViewFields, ST_CLIENT_ID,
ST_CLIENT_NAME, ST_COLUMN_ID, ST_COLUMN_NAME, ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_NAME,
ST_CONSTRAINT_ID, ST_CONSTRAINT_NAME, ST_INDEX_ID, ST_INDEX_NAME, ST_MODULE_NAME, ST_RESERVED_SEQUENCE_RANGE,
ST_ROW_LEVEL_SECURITY_ID, ST_ROW_LEVEL_SECURITY_NAME, ST_SCHEDULED_ID, ST_SCHEDULED_NAME, ST_SEQUENCE_ID,
ST_SEQUENCE_NAME, ST_TABLE_NAME, ST_VAR_ID, ST_VAR_NAME, ST_VIEW_ARG_ID, ST_VIEW_ARG_NAME, ST_VIEW_CLIENT_ID,
ST_VIEW_CLIENT_NAME, ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_NAME, ST_VIEW_ID, ST_VIEW_NAME, ST_VIEW_PARAM_ID,
ST_VIEW_PARAM_NAME,
ST_SEQUENCE_NAME, ST_TABLE_NAME, ST_VAR_ID, ST_VAR_NAME, ST_VIEW_ARG_ID, ST_VIEW_ARG_NAME, ST_VIEW_COLUMN_ID,
ST_VIEW_COLUMN_NAME, ST_VIEW_ID, ST_VIEW_NAME, ST_VIEW_PARAM_ID, ST_VIEW_PARAM_NAME, ST_VIEW_SUB_ID,
ST_VIEW_SUB_NAME,
};
use crate::traits::{IsolationLevel, MutTx};
use crate::Result;
Expand All @@ -1272,7 +1272,7 @@ mod tests {
use spacetimedb_lib::error::ResultTest;
use spacetimedb_lib::st_var::StVarValue;
use spacetimedb_lib::{resolved_type_via_v9, ScheduleAt, TimeDuration};
use spacetimedb_primitives::{col_list, ColId, ScheduleId, ViewId};
use spacetimedb_primitives::{col_list, ArgId, ColId, ScheduleId, ViewId};
use spacetimedb_sats::algebraic_value::ser::value_serialize;
use spacetimedb_sats::bsatn::ToBsatn;
use spacetimedb_sats::layout::RowTypeLayout;
Expand Down Expand Up @@ -1715,7 +1715,7 @@ mod tests {
TableRow { id: ST_VIEW_ID.into(), name: ST_VIEW_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: Some(StViewFields::ViewId.into()) },
TableRow { id: ST_VIEW_PARAM_ID.into(), name: ST_VIEW_PARAM_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None },
TableRow { id: ST_VIEW_COLUMN_ID.into(), name: ST_VIEW_COLUMN_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None },
TableRow { id: ST_VIEW_CLIENT_ID.into(), name: ST_VIEW_CLIENT_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None },
TableRow { id: ST_VIEW_SUB_ID.into(), name: ST_VIEW_SUB_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None },
TableRow { id: ST_VIEW_ARG_ID.into(), name: ST_VIEW_ARG_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: Some(StViewArgFields::Id.into()) },

]));
Expand Down Expand Up @@ -1793,10 +1793,12 @@ mod tests {
ColRow { table: ST_VIEW_COLUMN_ID.into(), pos: 2, name: "col_name", ty: AlgebraicType::String },
ColRow { table: ST_VIEW_COLUMN_ID.into(), pos: 3, name: "col_type", ty: AlgebraicType::bytes() },

ColRow { table: ST_VIEW_CLIENT_ID.into(), pos: 0, name: "view_id", ty: ViewId::get_type() },
ColRow { table: ST_VIEW_CLIENT_ID.into(), pos: 1, name: "arg_id", ty: AlgebraicType::U64 },
ColRow { table: ST_VIEW_CLIENT_ID.into(), pos: 2, name: "identity", ty: AlgebraicType::U256 },
ColRow { table: ST_VIEW_CLIENT_ID.into(), pos: 3, name: "connection_id", ty: AlgebraicType::U128 },
ColRow { table: ST_VIEW_SUB_ID.into(), pos: 0, name: "view_id", ty: ViewId::get_type() },
ColRow { table: ST_VIEW_SUB_ID.into(), pos: 1, name: "arg_id", ty: ArgId::get_type() },
ColRow { table: ST_VIEW_SUB_ID.into(), pos: 2, name: "identity", ty: AlgebraicType::U256 },
ColRow { table: ST_VIEW_SUB_ID.into(), pos: 3, name: "num_subscribers", ty: AlgebraicType::U64 },
ColRow { table: ST_VIEW_SUB_ID.into(), pos: 4, name: "has_subscribers", ty: AlgebraicType::Bool },
ColRow { table: ST_VIEW_SUB_ID.into(), pos: 5, name: "last_called", ty: AlgebraicType::I64 },

ColRow { table: ST_VIEW_ARG_ID.into(), pos: 0, name: "id", ty: AlgebraicType::U64 },
ColRow { table: ST_VIEW_ARG_ID.into(), pos: 1, name: "bytes", ty: AlgebraicType::bytes() },
Expand All @@ -1820,10 +1822,11 @@ mod tests {
IndexRow { id: 15, table: ST_VIEW_ID.into(), col: col(1), name: "st_view_view_name_idx_btree", },
IndexRow { id: 16, table: ST_VIEW_PARAM_ID.into(), col: col_list![0, 1], name: "st_view_param_view_id_param_pos_idx_btree", },
IndexRow { id: 17, table: ST_VIEW_COLUMN_ID.into(), col: col_list![0, 1], name: "st_view_column_view_id_col_pos_idx_btree", },
IndexRow { id: 18, table: ST_VIEW_CLIENT_ID.into(), col: col_list![0, 1], name: "st_view_client_view_id_arg_id_idx_btree", },
IndexRow { id: 19, table: ST_VIEW_CLIENT_ID.into(), col: col_list![2, 3], name: "st_view_client_identity_connection_id_idx_btree", },
IndexRow { id: 20, table: ST_VIEW_ARG_ID.into(), col: col(0), name: "st_view_arg_id_idx_btree", },
IndexRow { id: 21, table: ST_VIEW_ARG_ID.into(), col: col(1), name: "st_view_arg_bytes_idx_btree", },
IndexRow { id: 18, table: ST_VIEW_SUB_ID.into(), col: col(2), name: "st_view_sub_identity_idx_btree", },
IndexRow { id: 19, table: ST_VIEW_SUB_ID.into(), col: col(4), name: "st_view_sub_has_subscribers_idx_btree", },
IndexRow { id: 20, table: ST_VIEW_SUB_ID.into(), col: col_list![0, 1, 2], name: "st_view_sub_view_id_arg_id_identity_idx_btree", },
IndexRow { id: 21, table: ST_VIEW_ARG_ID.into(), col: col(0), name: "st_view_arg_id_idx_btree", },
IndexRow { id: 22, table: ST_VIEW_ARG_ID.into(), col: col(1), name: "st_view_arg_bytes_idx_btree", },
]));
let start = ST_RESERVED_SEQUENCE_RANGE as i128 + 1;
#[rustfmt::skip]
Expand Down Expand Up @@ -2282,10 +2285,11 @@ mod tests {
IndexRow { id: 15, table: ST_VIEW_ID.into(), col: col(1), name: "st_view_view_name_idx_btree", },
IndexRow { id: 16, table: ST_VIEW_PARAM_ID.into(), col: col_list![0, 1], name: "st_view_param_view_id_param_pos_idx_btree", },
IndexRow { id: 17, table: ST_VIEW_COLUMN_ID.into(), col: col_list![0, 1], name: "st_view_column_view_id_col_pos_idx_btree", },
IndexRow { id: 18, table: ST_VIEW_CLIENT_ID.into(), col: col_list![0, 1], name: "st_view_client_view_id_arg_id_idx_btree", },
IndexRow { id: 19, table: ST_VIEW_CLIENT_ID.into(), col: col_list![2, 3], name: "st_view_client_identity_connection_id_idx_btree", },
IndexRow { id: 20, table: ST_VIEW_ARG_ID.into(), col: col(0), name: "st_view_arg_id_idx_btree", },
IndexRow { id: 21, table: ST_VIEW_ARG_ID.into(), col: col(1), name: "st_view_arg_bytes_idx_btree", },
IndexRow { id: 18, table: ST_VIEW_SUB_ID.into(), col: col(2), name: "st_view_sub_identity_idx_btree", },
IndexRow { id: 19, table: ST_VIEW_SUB_ID.into(), col: col(4), name: "st_view_sub_has_subscribers_idx_btree", },
IndexRow { id: 20, table: ST_VIEW_SUB_ID.into(), col: col_list![0, 1, 2], name: "st_view_sub_view_id_arg_id_identity_idx_btree", },
IndexRow { id: 21, table: ST_VIEW_ARG_ID.into(), col: col(0), name: "st_view_arg_id_idx_btree", },
IndexRow { id: 22, table: ST_VIEW_ARG_ID.into(), col: col(1), name: "st_view_arg_bytes_idx_btree", },
IndexRow { id: seq_start, table: FIRST_NON_SYSTEM_ID, col: col(0), name: "Foo_id_idx_btree", },
IndexRow { id: seq_start + 1, table: FIRST_NON_SYSTEM_ID, col: col(1), name: "Foo_name_idx_btree", },
IndexRow { id: seq_start + 2, table: FIRST_NON_SYSTEM_ID, col: col(2), name: "Foo_age_idx_btree", },
Expand Down
Loading
Loading