@@ -24,7 +24,7 @@ use crate::{
2424use anyhow:: anyhow;
2525use core:: { convert:: Infallible , ops:: RangeBounds } ;
2626use itertools:: Itertools ;
27- use spacetimedb_data_structures:: map:: { HashSet , IntMap } ;
27+ use spacetimedb_data_structures:: map:: { HashSet , IntMap , IntSet } ;
2828use spacetimedb_lib:: {
2929 db:: auth:: { StAccess , StTableType } ,
3030 Identity ,
@@ -66,6 +66,11 @@ pub struct CommittedState {
6666 /// Pages are shared between all modules running on a particular host,
6767 /// not allocated per-module.
6868 pub ( super ) page_pool : PagePool ,
69+ /// Whether the table was dropped during replay.
70+ /// TODO(centril): Only used during bootstrap and is otherwise unused.
71+ /// We should split `CommittedState` into two types
72+ /// where one, e.g., `ReplayCommittedState`, has this field.
73+ table_dropped : IntSet < TableId > ,
6974}
7075
7176impl MemoryUsage for CommittedState {
@@ -76,9 +81,14 @@ impl MemoryUsage for CommittedState {
7681 blob_store,
7782 index_id_map,
7883 page_pool : _,
84+ table_dropped,
7985 } = self ;
8086 // NOTE(centril): We do not want to include the heap usage of `page_pool` as it's a shared resource.
81- next_tx_offset. heap_usage ( ) + tables. heap_usage ( ) + blob_store. heap_usage ( ) + index_id_map. heap_usage ( )
87+ next_tx_offset. heap_usage ( )
88+ + tables. heap_usage ( )
89+ + blob_store. heap_usage ( )
90+ + index_id_map. heap_usage ( )
91+ + table_dropped. heap_usage ( )
8292 }
8393}
8494
@@ -157,6 +167,7 @@ impl CommittedState {
157167 tables : <_ >:: default ( ) ,
158168 blob_store : <_ >:: default ( ) ,
159169 index_id_map : <_ >:: default ( ) ,
170+ table_dropped : <_ >:: default ( ) ,
160171 page_pool,
161172 }
162173 }
@@ -334,17 +345,30 @@ impl CommittedState {
334345 Ok ( ( ) )
335346 }
336347
337- pub ( super ) fn replay_delete_by_rel ( & mut self , table_id : TableId , rel : & ProductValue ) -> Result < ( ) > {
338- let table = self
339- . tables
340- . get_mut ( & table_id)
341- . ok_or ( TableError :: IdNotFoundState ( table_id) ) ?;
348+ pub ( super ) fn replay_delete_by_rel ( & mut self , table_id : TableId , row : & ProductValue ) -> Result < ( ) > {
349+ // Get the table for mutation.
350+ // If it was dropped, avoid an error and just ignore the row instead.
351+ let table = match self . tables . get_mut ( & table_id) {
352+ Some ( t) => t,
353+ None if self . table_dropped . contains ( & table_id) => return Ok ( ( ) ) ,
354+ None => return Err ( TableError :: IdNotFoundState ( table_id) . into ( ) ) ,
355+ } ;
356+
357+ // Delete the row.
342358 let blob_store = & mut self . blob_store ;
343359 table
344- . delete_equal_row ( & self . page_pool , blob_store, rel )
360+ . delete_equal_row ( & self . page_pool , blob_store, row )
345361 . map_err ( TableError :: Bflatn ) ?
346362 . ok_or_else ( || anyhow ! ( "Delete for non-existent row when replaying transaction" ) ) ?;
347363
364+ if table_id == ST_TABLE_ID {
365+ // A row was removed from `st_table`, so a table was dropped.
366+ // Remove that table from the in-memory structures.
367+ self . tables
368+ . remove ( & Self :: read_table_id ( row) )
369+ . expect ( "table to remove should exist" ) ;
370+ }
371+
348372 Ok ( ( ) )
349373 }
350374
@@ -379,8 +403,7 @@ impl CommittedState {
379403 ///
380404 /// The `row_ptr` is a pointer to `row`.
381405 fn st_column_changed ( & mut self , row : & ProductValue , row_ptr : RowPointer ) -> Result < ( ) > {
382- let target_table_id = TableId :: deserialize ( ValueDeserializer :: from_ref ( & row. elements [ 0 ] ) )
383- . expect ( "first field in `st_column` should decode to a `TableId`" ) ;
406+ let target_table_id = Self :: read_table_id ( row) ;
384407 let target_col_id = ColId :: deserialize ( ValueDeserializer :: from_ref ( & row. elements [ 1 ] ) )
385408 . expect ( "second field in `st_column` should decode to a `ColId`" ) ;
386409
@@ -411,6 +434,12 @@ impl CommittedState {
411434 Ok ( ( ) )
412435 }
413436
437+ /// Assuming that a `TableId` is stored as the first field in `row`, read it.
438+ fn read_table_id ( row : & ProductValue ) -> TableId {
439+ TableId :: deserialize ( ValueDeserializer :: from_ref ( & row. elements [ 0 ] ) )
440+ . expect ( "first field in `st_column` should decode to a `TableId`" )
441+ }
442+
414443 pub ( super ) fn build_sequence_state ( & mut self , sequence_state : & mut SequencesState ) -> Result < ( ) > {
415444 let st_sequences = self . tables . get ( & ST_SEQUENCE_ID ) . unwrap ( ) ;
416445 for row_ref in st_sequences. scan_rows ( & self . blob_store ) {
0 commit comments