Skip to content

Commit 0a4e9f5

Browse files
Clear view tables on disconnect
1 parent 7c4c3dd commit 0a4e9f5

File tree

7 files changed

+157
-14
lines changed

7 files changed

+157
-14
lines changed

crates/client-api/src/routes/database.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ pub async fn call<S: ControlStateDelegate + NodeDelegate>(
101101
};
102102

103103
module
104-
.call_identity_disconnected(caller_identity, connection_id)
104+
// We don't clear views after reducer calls
105+
.call_identity_disconnected(caller_identity, connection_id, false)
105106
.await
106107
.map_err(client_disconnected_error_to_response)?;
107108

@@ -274,7 +275,8 @@ async fn procedure<S: ControlStateDelegate + NodeDelegate>(
274275
};
275276

276277
module
277-
.call_identity_disconnected(caller_identity, connection_id)
278+
// We don't clear views after procedure calls
279+
.call_identity_disconnected(caller_identity, connection_id, false)
278280
.await
279281
.map_err(client_disconnected_error_to_response)?;
280282

crates/core/src/db/relational_db.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use spacetimedb_datastore::locking_tx_datastore::state_view::{
1818
IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, IterTx, StateView,
1919
};
2020
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId};
21-
use spacetimedb_datastore::system_tables::{system_tables, StModuleRow};
21+
use spacetimedb_datastore::system_tables::{system_tables, StModuleRow, StViewRow, ST_VIEW_ID};
2222
use spacetimedb_datastore::system_tables::{StFields, StVarFields, StVarName, StVarRow, ST_MODULE_ID, ST_VAR_ID};
2323
use spacetimedb_datastore::traits::{
2424
InsertFlags, IsolationLevel, Metadata, MutTx as _, MutTxDatastore, Program, RowTypeForTable, Tx as _, TxDatastore,
@@ -1403,6 +1403,20 @@ impl RelationalDB {
14031403
Ok(rows_deleted)
14041404
}
14051405

1406+
/// Clear all rows from all view tables without dropping them.
1407+
pub fn clear_all_views(&self, tx: &mut MutTx) -> Result<(), DBError> {
1408+
for table_id in tx
1409+
.iter(ST_VIEW_ID)?
1410+
.map(StViewRow::try_from)
1411+
.collect::<Result<Vec<_>, _>>()?
1412+
.into_iter()
1413+
.filter_map(|row| row.table_id)
1414+
{
1415+
tx.clear_table(table_id)?;
1416+
}
1417+
Ok(())
1418+
}
1419+
14061420
pub fn create_sequence(&self, tx: &mut MutTx, sequence_schema: SequenceSchema) -> Result<SequenceId, DBError> {
14071421
Ok(self.inner.create_sequence_mut_tx(tx, sequence_schema)?)
14081422
}

crates/core/src/host/host_controller.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -847,9 +847,10 @@ impl Host {
847847
} = launched;
848848

849849
// Disconnect dangling clients.
850+
// No need to clear view tables here since we do it in `clear_all_clients`.
850851
for (identity, connection_id) in connected_clients {
851852
module_host
852-
.call_identity_disconnected(identity, connection_id)
853+
.call_identity_disconnected(identity, connection_id, false)
853854
.await
854855
.with_context(|| {
855856
format!(

crates/core/src/host/module_host.rs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use spacetimedb_data_structures::error_stream::ErrorStream;
3838
use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
3939
use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload, WorkloadType};
4040
use spacetimedb_datastore::locking_tx_datastore::MutTxId;
41-
use spacetimedb_datastore::system_tables::{ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID};
41+
use spacetimedb_datastore::system_tables::{ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_CLIENT_ID};
4242
use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData};
4343
use spacetimedb_durability::DurableOffset;
4444
use spacetimedb_execution::pipelined::{PipelinedProject, ViewProject};
@@ -903,7 +903,7 @@ impl ModuleHost {
903903
// Call the `client_disconnected` reducer, if it exists.
904904
// This is a no-op if the module doesn't define such a reducer.
905905
this.subscriptions().remove_subscriber(client_id);
906-
this.call_identity_disconnected_inner(client_id.identity, client_id.connection_id, inst)
906+
this.call_identity_disconnected_inner(client_id.identity, client_id.connection_id, inst, true)
907907
})
908908
.await
909909
{
@@ -1024,6 +1024,7 @@ impl ModuleHost {
10241024
caller_identity: Identity,
10251025
caller_connection_id: ConnectionId,
10261026
inst: &mut Instance,
1027+
clear_view_tables: bool,
10271028
) -> Result<(), ReducerCallError> {
10281029
let reducer_lookup = self.info.module_def.lifecycle_reducer(Lifecycle::OnDisconnect);
10291030
let reducer_name = reducer_lookup
@@ -1050,6 +1051,13 @@ impl ModuleHost {
10501051
let fallback = || {
10511052
let database_identity = me.info.database_identity;
10521053
stdb.with_auto_commit(workload(), |mut_tx| {
1054+
1055+
if clear_view_tables {
1056+
if let Err(err) = mut_tx.delete_view_data_for_client(caller_identity, caller_connection_id) {
1057+
log::error!("`call_identity_disconnected`: failed to delete client view data: {err}");
1058+
}
1059+
}
1060+
10531061
if !is_client_exist(mut_tx) {
10541062
// The client is already gone. Nothing to do.
10551063
log::debug!(
@@ -1076,7 +1084,13 @@ impl ModuleHost {
10761084

10771085
if let Some((reducer_id, reducer_def)) = reducer_lookup {
10781086
let stdb = me.module.replica_ctx().relational_db.clone();
1079-
let mut_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload());
1087+
let mut mut_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload());
1088+
1089+
if clear_view_tables {
1090+
if let Err(err) = mut_tx.delete_view_data_for_client(caller_identity, caller_connection_id) {
1091+
log::error!("`call_identity_disconnected`: failed to delete client view data: {err}");
1092+
}
1093+
}
10801094

10811095
if !is_client_exist(&mut_tx) {
10821096
// The client is already gone. Nothing to do.
@@ -1151,10 +1165,11 @@ impl ModuleHost {
11511165
&self,
11521166
caller_identity: Identity,
11531167
caller_connection_id: ConnectionId,
1168+
clear_view_tables: bool,
11541169
) -> Result<(), ReducerCallError> {
11551170
let me = self.clone();
11561171
self.call("call_identity_disconnected", move |inst| {
1157-
me.call_identity_disconnected_inner(caller_identity, caller_connection_id, inst)
1172+
me.call_identity_disconnected_inner(caller_identity, caller_connection_id, inst, clear_view_tables)
11581173
})
11591174
.await?
11601175
}
@@ -1166,8 +1181,10 @@ impl ModuleHost {
11661181
let stdb = &me.module.replica_ctx().relational_db;
11671182
let workload = Workload::Internal;
11681183
stdb.with_auto_commit(workload, |mut_tx| {
1184+
stdb.clear_all_views(mut_tx)?;
11691185
stdb.clear_table(mut_tx, ST_CONNECTION_CREDENTIALS_ID)?;
11701186
stdb.clear_table(mut_tx, ST_CLIENT_ID)?;
1187+
stdb.clear_table(mut_tx, ST_VIEW_CLIENT_ID)?;
11711188
Ok::<(), DBError>(())
11721189
})
11731190
})

crates/datastore/src/locking_tx_datastore/datastore.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1252,9 +1252,9 @@ mod tests {
12521252
use crate::system_tables::{
12531253
system_tables, StColumnRow, StConnectionCredentialsFields, StConstraintData, StConstraintFields,
12541254
StConstraintRow, StIndexAlgorithm, StIndexFields, StIndexRow, StRowLevelSecurityFields, StScheduledFields,
1255-
StSequenceFields, StSequenceRow, StTableRow, StVarFields, StViewArgFields, StViewFields, ST_CLIENT_NAME,
1256-
ST_COLUMN_ID, ST_COLUMN_NAME, ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_NAME, ST_CONSTRAINT_ID,
1257-
ST_CONSTRAINT_NAME, ST_INDEX_ID, ST_INDEX_NAME, ST_MODULE_NAME, ST_RESERVED_SEQUENCE_RANGE,
1255+
StSequenceFields, StSequenceRow, StTableRow, StVarFields, StViewArgFields, StViewFields, ST_CLIENT_ID,
1256+
ST_CLIENT_NAME, ST_COLUMN_ID, ST_COLUMN_NAME, ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_NAME,
1257+
ST_CONSTRAINT_ID, ST_CONSTRAINT_NAME, ST_INDEX_ID, ST_INDEX_NAME, ST_MODULE_NAME, ST_RESERVED_SEQUENCE_RANGE,
12581258
ST_ROW_LEVEL_SECURITY_ID, ST_ROW_LEVEL_SECURITY_NAME, ST_SCHEDULED_ID, ST_SCHEDULED_NAME, ST_SEQUENCE_ID,
12591259
ST_SEQUENCE_NAME, ST_TABLE_NAME, ST_VAR_ID, ST_VAR_NAME, ST_VIEW_ARG_ID, ST_VIEW_ARG_NAME, ST_VIEW_CLIENT_ID,
12601260
ST_VIEW_CLIENT_NAME, ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_NAME, ST_VIEW_ID, ST_VIEW_NAME, ST_VIEW_PARAM_ID,

crates/datastore/src/locking_tx_datastore/mut_tx.rs

Lines changed: 86 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ use super::{
99
SharedMutexGuard, SharedWriteGuard,
1010
};
1111
use crate::system_tables::{
12-
system_tables, ConnectionIdViaU128, StConnectionCredentialsFields, StConnectionCredentialsRow, StViewColumnFields,
13-
StViewFields, StViewParamFields, StViewParamRow, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_COLUMN_ID, ST_VIEW_ID,
14-
ST_VIEW_PARAM_ID,
12+
system_tables, ConnectionIdViaU128, IdentityViaU256, StConnectionCredentialsFields, StConnectionCredentialsRow,
13+
StViewClientFields, StViewClientRow, StViewColumnFields, StViewFields, StViewParamFields, StViewParamRow,
14+
ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_CLIENT_ID, ST_VIEW_COLUMN_ID, ST_VIEW_ID, ST_VIEW_PARAM_ID,
1515
};
1616
use crate::traits::{InsertFlags, RowTypeForTable, TxData, UpdateFlags};
1717
use crate::{
@@ -1767,6 +1767,89 @@ impl<'a, I: Iterator<Item = RowRef<'a>>> Iterator for FilterDeleted<'a, I> {
17671767
}
17681768

17691769
impl MutTxId {
1770+
/// Delete view data and metadata for a client connection
1771+
pub fn delete_view_data_for_client(&mut self, sender: Identity, connection_id: ConnectionId) -> Result<()> {
1772+
for row in self.delete_st_view_client_rows(sender, connection_id)? {
1773+
if !self.is_identity_subscribed_to_view_args(row.view_id, row.arg_id, sender)? {
1774+
self.delete_view_rows_for_identity(row.view_id, row.arg_id, sender)?;
1775+
}
1776+
}
1777+
Ok(())
1778+
}
1779+
1780+
/// Delete and return the rows in `st_view_client` for a client connection
1781+
fn delete_st_view_client_rows(
1782+
&mut self,
1783+
sender: Identity,
1784+
connection_id: ConnectionId,
1785+
) -> Result<Vec<StViewClientRow>> {
1786+
let sender = IdentityViaU256(sender);
1787+
let conn_id = ConnectionIdViaU128(connection_id);
1788+
let cols = col_list![StViewClientFields::Identity, StViewClientFields::ConnectionId];
1789+
let value = AlgebraicValue::product([sender.into(), conn_id.into()]);
1790+
self.iter_by_col_eq(ST_VIEW_CLIENT_ID, cols, &value)?
1791+
.map(|row_ref| StViewClientRow::try_from(row_ref).map(|row| (row, row_ref.pointer())))
1792+
.collect::<Result<Vec<_>>>()?
1793+
.into_iter()
1794+
.map(|(row, ptr)| self.delete(ST_VIEW_CLIENT_ID, ptr).map(|_| row))
1795+
.collect()
1796+
}
1797+
1798+
/// Is anyone is subscribed to the view arguments identified by `arg_id`
1799+
fn is_identity_subscribed_to_view_args(&self, view_id: ViewId, arg_id: u64, sender: Identity) -> Result<bool> {
1800+
Ok(self
1801+
.iter_by_col_eq(
1802+
ST_VIEW_CLIENT_ID,
1803+
col_list![
1804+
StViewClientFields::ViewId,
1805+
StViewClientFields::ArgId,
1806+
StViewClientFields::Identity
1807+
],
1808+
&AlgebraicValue::product([view_id.into(), arg_id.into(), sender.into()]),
1809+
)?
1810+
.next()
1811+
.is_some())
1812+
}
1813+
1814+
/// Lookup a row in `st_view` by its primary key
1815+
fn st_view_row(&self, view_id: ViewId) -> Result<Option<StViewRow>> {
1816+
self.iter_by_col_eq(ST_VIEW_ID, col_list![StViewFields::ViewId], &view_id.into())?
1817+
.next()
1818+
.map(StViewRow::try_from)
1819+
.transpose()
1820+
}
1821+
1822+
/// Get the [`TableId`] for this view's backing table by probing `st_view`.
1823+
/// Note, all views with at least one subscriber are materialized.
1824+
fn get_table_id_for_view(&self, view_id: ViewId) -> Result<Option<(TableId, bool)>> {
1825+
Ok(self
1826+
.st_view_row(view_id)?
1827+
.and_then(|row| row.table_id.map(|id| (id, row.is_anonymous))))
1828+
}
1829+
1830+
/// Delete the rows of a view subscribed to by `sender`
1831+
fn delete_view_rows_for_identity(&mut self, view_id: ViewId, arg_id: u64, sender: Identity) -> Result<()> {
1832+
if let Some((table_id, is_anonymous)) = self.get_table_id_for_view(view_id)? {
1833+
let value = if is_anonymous {
1834+
let none_sender = AlgebraicValue::OptionNone();
1835+
AlgebraicValue::product([none_sender, arg_id.into()])
1836+
} else {
1837+
let sender = IdentityViaU256(sender);
1838+
let some_sender = AlgebraicValue::OptionSome(sender.into());
1839+
AlgebraicValue::product([some_sender, arg_id.into()])
1840+
};
1841+
for row_pointer in self
1842+
.iter_by_col_eq(table_id, col_list![0, 1], &value)?
1843+
.map(|row_ref| row_ref.pointer())
1844+
.collect::<Vec<_>>()
1845+
.into_iter()
1846+
{
1847+
self.delete(table_id, row_pointer)?;
1848+
}
1849+
}
1850+
Ok(())
1851+
}
1852+
17701853
pub fn insert_st_client(
17711854
&mut self,
17721855
identity: Identity,

crates/datastore/src/system_tables.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -937,6 +937,20 @@ pub struct StViewClientRow {
937937
pub connection_id: ConnectionIdViaU128,
938938
}
939939

940+
impl TryFrom<RowRef<'_>> for StViewClientRow {
941+
type Error = DatastoreError;
942+
943+
fn try_from(row: RowRef<'_>) -> Result<Self, Self::Error> {
944+
read_via_bsatn(row)
945+
}
946+
}
947+
948+
impl From<StViewClientRow> for (Identity, ConnectionId) {
949+
fn from(value: StViewClientRow) -> Self {
950+
(value.identity.0, value.connection_id.0)
951+
}
952+
}
953+
940954
/// System table [ST_VIEW_ARG_NAME]
941955
///
942956
/// | id | bytes |
@@ -1238,6 +1252,12 @@ impl From<Identity> for IdentityViaU256 {
12381252
}
12391253
}
12401254

1255+
impl From<IdentityViaU256> for AlgebraicValue {
1256+
fn from(val: IdentityViaU256) -> Self {
1257+
AlgebraicValue::U256(val.0.to_u256().into())
1258+
}
1259+
}
1260+
12411261
/// System table [ST_MODULE_NAME]
12421262
/// This table holds exactly one row, describing the latest version of the
12431263
/// SpacetimeDB module associated with the database:
@@ -1348,6 +1368,12 @@ impl TryFrom<RowRef<'_>> for StClientRow {
13481368
}
13491369
}
13501370

1371+
impl From<StClientRow> for (Identity, ConnectionId) {
1372+
fn from(value: StClientRow) -> Self {
1373+
(value.identity.0, value.connection_id.0)
1374+
}
1375+
}
1376+
13511377
/// System table [ST_VAR_NAME]
13521378
///
13531379
/// | name | value |

0 commit comments

Comments
 (0)