Skip to content

Commit 45626e6

Browse files
StViewClient -> StViewSub
1 parent 90eb038 commit 45626e6

File tree

5 files changed

+109
-135
lines changed

5 files changed

+109
-135
lines changed

crates/core/src/host/module_host.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use spacetimedb_data_structures::error_stream::ErrorStream;
3838
use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
3939
use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload, WorkloadType};
4040
use spacetimedb_datastore::locking_tx_datastore::MutTxId;
41-
use spacetimedb_datastore::system_tables::{ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_CLIENT_ID};
41+
use spacetimedb_datastore::system_tables::{ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_SUB_ID};
4242
use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData};
4343
use spacetimedb_durability::DurableOffset;
4444
use spacetimedb_execution::pipelined::{PipelinedProject, ViewProject};
@@ -1024,7 +1024,7 @@ impl ModuleHost {
10241024
caller_identity: Identity,
10251025
caller_connection_id: ConnectionId,
10261026
inst: &mut Instance,
1027-
clear_view_tables: bool,
1027+
drop_subscriptions: bool,
10281028
) -> Result<(), ReducerCallError> {
10291029
let reducer_lookup = self.info.module_def.lifecycle_reducer(Lifecycle::OnDisconnect);
10301030
let reducer_name = reducer_lookup
@@ -1052,8 +1052,8 @@ impl ModuleHost {
10521052
let database_identity = me.info.database_identity;
10531053
stdb.with_auto_commit(workload(), |mut_tx| {
10541054

1055-
if clear_view_tables {
1056-
if let Err(err) = mut_tx.delete_view_data_for_client(caller_identity, caller_connection_id) {
1055+
if drop_subscriptions {
1056+
if let Err(err) = mut_tx.dec_st_view_subscribers(caller_identity) {
10571057
log::error!("`call_identity_disconnected`: failed to delete client view data: {err}");
10581058
}
10591059
}
@@ -1086,8 +1086,8 @@ impl ModuleHost {
10861086
let stdb = me.module.replica_ctx().relational_db.clone();
10871087
let mut mut_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload());
10881088

1089-
if clear_view_tables {
1090-
if let Err(err) = mut_tx.delete_view_data_for_client(caller_identity, caller_connection_id) {
1089+
if drop_subscriptions {
1090+
if let Err(err) = mut_tx.dec_st_view_subscribers(caller_identity) {
10911091
log::error!("`call_identity_disconnected`: failed to delete client view data: {err}");
10921092
}
10931093
}
@@ -1184,7 +1184,7 @@ impl ModuleHost {
11841184
stdb.clear_all_views(mut_tx)?;
11851185
stdb.clear_table(mut_tx, ST_CONNECTION_CREDENTIALS_ID)?;
11861186
stdb.clear_table(mut_tx, ST_CLIENT_ID)?;
1187-
stdb.clear_table(mut_tx, ST_VIEW_CLIENT_ID)?;
1187+
stdb.clear_table(mut_tx, ST_VIEW_SUB_ID)?;
11881188
Ok::<(), DBError>(())
11891189
})
11901190
})

crates/datastore/src/locking_tx_datastore/committed_state.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ use crate::{
2424
use crate::{
2525
locking_tx_datastore::mut_tx::ReadSet,
2626
system_tables::{
27-
ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_IDX, ST_VIEW_CLIENT_ID, ST_VIEW_CLIENT_IDX,
28-
ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_IDX, ST_VIEW_ID, ST_VIEW_IDX, ST_VIEW_PARAM_ID, ST_VIEW_PARAM_IDX,
27+
ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_IDX, ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_IDX, ST_VIEW_ID,
28+
ST_VIEW_IDX, ST_VIEW_PARAM_ID, ST_VIEW_PARAM_IDX, ST_VIEW_SUB_ID, ST_VIEW_SUB_IDX,
2929
},
3030
};
3131
use anyhow::anyhow;
@@ -304,7 +304,7 @@ impl CommittedState {
304304
self.create_table(ST_VIEW_ID, schemas[ST_VIEW_IDX].clone());
305305
self.create_table(ST_VIEW_PARAM_ID, schemas[ST_VIEW_PARAM_IDX].clone());
306306
self.create_table(ST_VIEW_COLUMN_ID, schemas[ST_VIEW_COLUMN_IDX].clone());
307-
self.create_table(ST_VIEW_CLIENT_ID, schemas[ST_VIEW_CLIENT_IDX].clone());
307+
self.create_table(ST_VIEW_SUB_ID, schemas[ST_VIEW_SUB_IDX].clone());
308308
self.create_table(ST_VIEW_ARG_ID, schemas[ST_VIEW_ARG_IDX].clone());
309309

310310
// Insert the sequences into `st_sequences`

crates/datastore/src/locking_tx_datastore/datastore.rs

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1256,9 +1256,9 @@ mod tests {
12561256
ST_CLIENT_NAME, ST_COLUMN_ID, ST_COLUMN_NAME, ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_NAME,
12571257
ST_CONSTRAINT_ID, ST_CONSTRAINT_NAME, ST_INDEX_ID, ST_INDEX_NAME, ST_MODULE_NAME, ST_RESERVED_SEQUENCE_RANGE,
12581258
ST_ROW_LEVEL_SECURITY_ID, ST_ROW_LEVEL_SECURITY_NAME, ST_SCHEDULED_ID, ST_SCHEDULED_NAME, ST_SEQUENCE_ID,
1259-
ST_SEQUENCE_NAME, ST_TABLE_NAME, ST_VAR_ID, ST_VAR_NAME, ST_VIEW_ARG_ID, ST_VIEW_ARG_NAME, ST_VIEW_CLIENT_ID,
1260-
ST_VIEW_CLIENT_NAME, ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_NAME, ST_VIEW_ID, ST_VIEW_NAME, ST_VIEW_PARAM_ID,
1261-
ST_VIEW_PARAM_NAME,
1259+
ST_SEQUENCE_NAME, ST_TABLE_NAME, ST_VAR_ID, ST_VAR_NAME, ST_VIEW_ARG_ID, ST_VIEW_ARG_NAME, ST_VIEW_COLUMN_ID,
1260+
ST_VIEW_COLUMN_NAME, ST_VIEW_ID, ST_VIEW_NAME, ST_VIEW_PARAM_ID, ST_VIEW_PARAM_NAME, ST_VIEW_SUB_ID,
1261+
ST_VIEW_SUB_NAME,
12621262
};
12631263
use crate::traits::{IsolationLevel, MutTx};
12641264
use crate::Result;
@@ -1272,7 +1272,7 @@ mod tests {
12721272
use spacetimedb_lib::error::ResultTest;
12731273
use spacetimedb_lib::st_var::StVarValue;
12741274
use spacetimedb_lib::{resolved_type_via_v9, ScheduleAt, TimeDuration};
1275-
use spacetimedb_primitives::{col_list, ColId, ScheduleId, ViewId};
1275+
use spacetimedb_primitives::{col_list, ArgId, ColId, ScheduleId, ViewId};
12761276
use spacetimedb_sats::algebraic_value::ser::value_serialize;
12771277
use spacetimedb_sats::bsatn::ToBsatn;
12781278
use spacetimedb_sats::layout::RowTypeLayout;
@@ -1715,7 +1715,7 @@ mod tests {
17151715
TableRow { id: ST_VIEW_ID.into(), name: ST_VIEW_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: Some(StViewFields::ViewId.into()) },
17161716
TableRow { id: ST_VIEW_PARAM_ID.into(), name: ST_VIEW_PARAM_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None },
17171717
TableRow { id: ST_VIEW_COLUMN_ID.into(), name: ST_VIEW_COLUMN_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None },
1718-
TableRow { id: ST_VIEW_CLIENT_ID.into(), name: ST_VIEW_CLIENT_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None },
1718+
TableRow { id: ST_VIEW_SUB_ID.into(), name: ST_VIEW_SUB_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None },
17191719
TableRow { id: ST_VIEW_ARG_ID.into(), name: ST_VIEW_ARG_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: Some(StViewArgFields::Id.into()) },
17201720

17211721
]));
@@ -1793,10 +1793,12 @@ mod tests {
17931793
ColRow { table: ST_VIEW_COLUMN_ID.into(), pos: 2, name: "col_name", ty: AlgebraicType::String },
17941794
ColRow { table: ST_VIEW_COLUMN_ID.into(), pos: 3, name: "col_type", ty: AlgebraicType::bytes() },
17951795

1796-
ColRow { table: ST_VIEW_CLIENT_ID.into(), pos: 0, name: "view_id", ty: ViewId::get_type() },
1797-
ColRow { table: ST_VIEW_CLIENT_ID.into(), pos: 1, name: "arg_id", ty: AlgebraicType::U64 },
1798-
ColRow { table: ST_VIEW_CLIENT_ID.into(), pos: 2, name: "identity", ty: AlgebraicType::U256 },
1799-
ColRow { table: ST_VIEW_CLIENT_ID.into(), pos: 3, name: "connection_id", ty: AlgebraicType::U128 },
1796+
ColRow { table: ST_VIEW_SUB_ID.into(), pos: 0, name: "view_id", ty: ViewId::get_type() },
1797+
ColRow { table: ST_VIEW_SUB_ID.into(), pos: 1, name: "arg_id", ty: ArgId::get_type() },
1798+
ColRow { table: ST_VIEW_SUB_ID.into(), pos: 2, name: "identity", ty: AlgebraicType::U256 },
1799+
ColRow { table: ST_VIEW_SUB_ID.into(), pos: 3, name: "num_subscribers", ty: AlgebraicType::U64 },
1800+
ColRow { table: ST_VIEW_SUB_ID.into(), pos: 4, name: "has_subscribers", ty: AlgebraicType::Bool },
1801+
ColRow { table: ST_VIEW_SUB_ID.into(), pos: 5, name: "last_called", ty: AlgebraicType::I64 },
18001802

18011803
ColRow { table: ST_VIEW_ARG_ID.into(), pos: 0, name: "id", ty: AlgebraicType::U64 },
18021804
ColRow { table: ST_VIEW_ARG_ID.into(), pos: 1, name: "bytes", ty: AlgebraicType::bytes() },
@@ -1820,8 +1822,8 @@ mod tests {
18201822
IndexRow { id: 15, table: ST_VIEW_ID.into(), col: col(1), name: "st_view_view_name_idx_btree", },
18211823
IndexRow { id: 16, table: ST_VIEW_PARAM_ID.into(), col: col_list![0, 1], name: "st_view_param_view_id_param_pos_idx_btree", },
18221824
IndexRow { id: 17, table: ST_VIEW_COLUMN_ID.into(), col: col_list![0, 1], name: "st_view_column_view_id_col_pos_idx_btree", },
1823-
IndexRow { id: 18, table: ST_VIEW_CLIENT_ID.into(), col: col_list![0, 1], name: "st_view_client_view_id_arg_id_idx_btree", },
1824-
IndexRow { id: 19, table: ST_VIEW_CLIENT_ID.into(), col: col_list![2, 3], name: "st_view_client_identity_connection_id_idx_btree", },
1825+
IndexRow { id: 18, table: ST_VIEW_SUB_ID.into(), col: col(2), name: "st_view_sub_identity_idx_btree", },
1826+
IndexRow { id: 19, table: ST_VIEW_SUB_ID.into(), col: col(4), name: "st_view_sub_has_subscribers_idx_btree", },
18251827
IndexRow { id: 20, table: ST_VIEW_ARG_ID.into(), col: col(0), name: "st_view_arg_id_idx_btree", },
18261828
IndexRow { id: 21, table: ST_VIEW_ARG_ID.into(), col: col(1), name: "st_view_arg_bytes_idx_btree", },
18271829
]));
@@ -2282,8 +2284,8 @@ mod tests {
22822284
IndexRow { id: 15, table: ST_VIEW_ID.into(), col: col(1), name: "st_view_view_name_idx_btree", },
22832285
IndexRow { id: 16, table: ST_VIEW_PARAM_ID.into(), col: col_list![0, 1], name: "st_view_param_view_id_param_pos_idx_btree", },
22842286
IndexRow { id: 17, table: ST_VIEW_COLUMN_ID.into(), col: col_list![0, 1], name: "st_view_column_view_id_col_pos_idx_btree", },
2285-
IndexRow { id: 18, table: ST_VIEW_CLIENT_ID.into(), col: col_list![0, 1], name: "st_view_client_view_id_arg_id_idx_btree", },
2286-
IndexRow { id: 19, table: ST_VIEW_CLIENT_ID.into(), col: col_list![2, 3], name: "st_view_client_identity_connection_id_idx_btree", },
2287+
IndexRow { id: 18, table: ST_VIEW_SUB_ID.into(), col: col(2), name: "st_view_sub_identity_idx_btree", },
2288+
IndexRow { id: 19, table: ST_VIEW_SUB_ID.into(), col: col(4), name: "st_view_sub_has_subscribers_idx_btree", },
22872289
IndexRow { id: 20, table: ST_VIEW_ARG_ID.into(), col: col(0), name: "st_view_arg_id_idx_btree", },
22882290
IndexRow { id: 21, table: ST_VIEW_ARG_ID.into(), col: col(1), name: "st_view_arg_bytes_idx_btree", },
22892291
IndexRow { id: seq_start, table: FIRST_NON_SYSTEM_ID, col: col(0), name: "Foo_id_idx_btree", },

crates/datastore/src/locking_tx_datastore/mut_tx.rs

Lines changed: 37 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ use super::{
1010
};
1111
use crate::system_tables::{
1212
system_tables, ConnectionIdViaU128, IdentityViaU256, StConnectionCredentialsFields, StConnectionCredentialsRow,
13-
StViewClientFields, StViewClientRow, StViewColumnFields, StViewFields, StViewParamFields, StViewParamRow,
14-
ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_CLIENT_ID, ST_VIEW_COLUMN_ID, ST_VIEW_ID, ST_VIEW_PARAM_ID,
13+
StViewColumnFields, StViewFields, StViewParamFields, StViewParamRow, StViewSubsFields, StViewSubsRow,
14+
ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_COLUMN_ID, ST_VIEW_ID, ST_VIEW_PARAM_ID, ST_VIEW_SUB_ID,
1515
};
1616
use crate::traits::{InsertFlags, RowTypeForTable, TxData, UpdateFlags};
1717
use crate::{
@@ -39,7 +39,7 @@ use spacetimedb_lib::{
3939
ConnectionId, Identity,
4040
};
4141
use spacetimedb_primitives::{
42-
col_list, ArgId, ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId, ViewId,
42+
col_list, ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId, ViewId,
4343
};
4444
use spacetimedb_sats::{
4545
bsatn::{self, to_writer, DecodeError, Deserializer},
@@ -1767,86 +1767,45 @@ impl<'a, I: Iterator<Item = RowRef<'a>>> Iterator for FilterDeleted<'a, I> {
17671767
}
17681768

17691769
impl MutTxId {
1770-
/// Delete view data and metadata for a client connection.
1771-
pub fn delete_view_data_for_client(&mut self, sender: Identity, connection_id: ConnectionId) -> Result<()> {
1772-
for row in self.delete_st_view_client_rows(sender, connection_id)? {
1773-
if !self.is_identity_subscribed_to_view_args(row.view_id, row.arg_id, sender)? {
1774-
self.delete_view_rows_for_identity(row.view_id, row.arg_id, sender)?;
1775-
}
1776-
}
1777-
Ok(())
1778-
}
1779-
1780-
/// Deletes and return the rows in `st_view_client` for a client connection.
1781-
fn delete_st_view_client_rows(
1782-
&mut self,
1783-
sender: Identity,
1784-
connection_id: ConnectionId,
1785-
) -> Result<Vec<StViewClientRow>> {
1770+
/// Decrements the number of subscribers in `st_view_sub` for a client identity.
1771+
pub fn dec_st_view_subscribers(&mut self, sender: Identity) -> Result<()> {
17861772
let sender = IdentityViaU256(sender);
1787-
let conn_id = ConnectionIdViaU128(connection_id);
1788-
let cols = col_list![StViewClientFields::Identity, StViewClientFields::ConnectionId];
1789-
let value = AlgebraicValue::product([sender.into(), conn_id.into()]);
1790-
self.iter_by_col_eq(ST_VIEW_CLIENT_ID, cols, &value)?
1791-
.map(|row_ref| StViewClientRow::try_from(row_ref).map(|row| (row, row_ref.pointer())))
1792-
.collect::<Result<Vec<_>>>()?
1793-
.into_iter()
1794-
.map(|(row, ptr)| self.delete(ST_VIEW_CLIENT_ID, ptr).map(|_| row))
1795-
.collect()
1796-
}
1797-
1798-
/// Is anyone is subscribed to the view arguments identified by `arg_id`?
1799-
fn is_identity_subscribed_to_view_args(&self, view_id: ViewId, arg_id: ArgId, sender: Identity) -> Result<bool> {
1800-
Ok(self
1801-
.iter_by_col_eq(
1802-
ST_VIEW_CLIENT_ID,
1803-
col_list![
1804-
StViewClientFields::ViewId,
1805-
StViewClientFields::ArgId,
1806-
StViewClientFields::Identity
1807-
],
1808-
&AlgebraicValue::product([view_id.into(), arg_id.into(), sender.into()]),
1809-
)?
1810-
.next()
1811-
.is_some())
1812-
}
1773+
let cols = col_list![StViewSubsFields::Identity];
1774+
let value = sender.into();
1775+
1776+
// Collect the rows for this identity.
1777+
// These are rows for which we will decrement the subscriber count.
1778+
let rows_to_delete = self
1779+
.iter_by_col_eq(ST_VIEW_SUB_ID, cols, &value)?
1780+
.map(|row_ref| StViewSubsRow::try_from(row_ref).map(|row| (row, row_ref.pointer())))
1781+
.filter(|result| match result {
1782+
Ok((row, _)) => row.has_subscribers && row.num_subscribers > 0,
1783+
_ => true,
1784+
})
1785+
.collect::<Result<Vec<_>>>()?;
18131786

1814-
/// Looks up a row in `st_view` by its primary key.
1815-
fn st_view_row(&self, view_id: ViewId) -> Result<Option<StViewRow>> {
1816-
self.iter_by_col_eq(ST_VIEW_ID, col_list![StViewFields::ViewId], &view_id.into())?
1817-
.next()
1818-
.map(StViewRow::try_from)
1819-
.transpose()
1820-
}
1787+
// Copy the rows to delete and decrement their subscriber count.
1788+
// These are the rows that we will insert.
1789+
let rows_to_insert = rows_to_delete
1790+
.iter()
1791+
.map(|(row, _)| row.clone())
1792+
.map(|row| StViewSubsRow {
1793+
num_subscribers: row.num_subscribers - 1,
1794+
has_subscribers: row.num_subscribers > 1,
1795+
..row
1796+
})
1797+
.collect::<Vec<_>>();
18211798

1822-
/// Returns the [`TableId`] for this view's backing table by probing `st_view`.
1823-
/// Note, all views with at least one subscriber are materialized.
1824-
fn get_table_id_for_view(&self, view_id: ViewId) -> Result<Option<(TableId, bool)>> {
1825-
Ok(self
1826-
.st_view_row(view_id)?
1827-
.and_then(|row| row.table_id.map(|id| (id, row.is_anonymous))))
1828-
}
1799+
// Delete the old rows
1800+
for (_, ptr) in rows_to_delete {
1801+
self.delete(ST_VIEW_SUB_ID, ptr)?;
1802+
}
18291803

1830-
/// Deletes the rows of a view subscribed to by `sender`.
1831-
fn delete_view_rows_for_identity(&mut self, view_id: ViewId, arg_id: ArgId, sender: Identity) -> Result<()> {
1832-
if let Some((table_id, is_anonymous)) = self.get_table_id_for_view(view_id)? {
1833-
let value = if is_anonymous {
1834-
let none_sender = AlgebraicValue::OptionNone();
1835-
AlgebraicValue::product([none_sender, arg_id.into()])
1836-
} else {
1837-
let sender = IdentityViaU256(sender);
1838-
let some_sender = AlgebraicValue::OptionSome(sender.into());
1839-
AlgebraicValue::product([some_sender, arg_id.into()])
1840-
};
1841-
for row_pointer in self
1842-
.iter_by_col_eq(table_id, col_list![0, 1], &value)?
1843-
.map(|row_ref| row_ref.pointer())
1844-
.collect::<Vec<_>>()
1845-
.into_iter()
1846-
{
1847-
self.delete(table_id, row_pointer)?;
1848-
}
1804+
// Insert the new rows
1805+
for row in rows_to_insert {
1806+
self.insert_via_serialize_bsatn(ST_VIEW_SUB_ID, &row)?;
18491807
}
1808+
18501809
Ok(())
18511810
}
18521811

0 commit comments

Comments
 (0)