From 6770f6239aeb143bd58c79ef997b94ceda67432b Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Tue, 21 Jan 2025 05:08:51 +0100 Subject: [PATCH] add tests for MutTxId::update and fix it --- Cargo.toml | 2 +- .../locking_tx_datastore/committed_state.rs | 39 +- .../locking_tx_datastore/datastore.rs | 407 +++++++++++++++--- .../datastore/locking_tx_datastore/mut_tx.rs | 196 +++++---- crates/core/src/error.rs | 24 +- crates/table/src/table.rs | 31 +- 6 files changed, 538 insertions(+), 161 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7c1109b9537..fc010eb1cf0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -190,7 +190,7 @@ paste = "1.0" petgraph = { version = "0.6.5", default-features = false } pin-project-lite = "0.2.9" postgres-types = "0.2.5" -pretty_assertions = "1.4" +pretty_assertions = { version = "1.4", features = ["unstable"] } proc-macro2 = "1.0" prometheus = "0.13.0" proptest = "1.4" diff --git a/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs b/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs index 2dc6eee58b4..2e06b296bdf 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs @@ -5,7 +5,10 @@ use super::{ tx_state::{DeleteTable, IndexIdMap, RemovedIndexIdSet, TxState}, IterByColEqTx, }; -use crate::db::datastore::locking_tx_datastore::state_view::{IterTx, ScanIterByColRangeTx}; +use crate::{ + db::datastore::locking_tx_datastore::state_view::{IterTx, ScanIterByColRangeTx}, + error::IndexError, +}; use crate::{ db::{ datastore::{ @@ -132,7 +135,8 @@ fn ignore_duplicate_insert_error(res: std::result::Result) -> match res { Ok(_) => Ok(()), Err(InsertError::Duplicate(_)) => Ok(()), - Err(err) => Err(err.into()), + Err(InsertError::Bflatn(e)) => Err(e.into()), + Err(InsertError::IndexError(e)) => Err(IndexError::from(e).into()), } } @@ -321,7 +325,7 @@ impl CommittedState { let skip_index_update = true; table .delete_equal_row(blob_store, rel, skip_index_update) - .map_err(TableError::Insert)? + .map_err(TableError::Bflatn)? .ok_or_else(|| anyhow!("Delete for non-existent row when replaying transaction"))?; Ok(()) } @@ -333,8 +337,11 @@ impl CommittedState { row: &ProductValue, ) -> Result<()> { let (table, blob_store) = self.get_table_and_blob_store_or_create(table_id, schema); - table.insert_for_replay(blob_store, row).map_err(TableError::Insert)?; - Ok(()) + table.insert_for_replay(blob_store, row).map(drop).map_err(|e| match e { + InsertError::Bflatn(e) => TableError::Bflatn(e).into(), + InsertError::Duplicate(e) => TableError::Duplicate(e).into(), + InsertError::IndexError(e) => IndexError::UniqueConstraintViolation(e).into(), + }) } pub(super) fn build_sequence_state(&mut self, sequence_state: &mut SequencesState) -> Result<()> { @@ -511,6 +518,9 @@ impl CommittedState { pub(super) fn merge(&mut self, tx_state: TxState, ctx: &ExecutionContext) -> TxData { let mut tx_data = TxData::default(); + // First, merge index id fast-lookup map changes and delete indices. + self.merge_index_map(tx_state.index_id_map, tx_state.index_id_map_removals.as_deref()); + // First, apply deletes. This will free up space in the committed tables. self.merge_apply_deletes(&mut tx_data, tx_state.delete_tables); @@ -518,9 +528,6 @@ impl CommittedState { // before allocating new pages. self.merge_apply_inserts(&mut tx_data, tx_state.insert_tables, tx_state.blob_store); - // Merge index id fast-lookup map changes. - self.merge_index_map(tx_state.index_id_map, tx_state.index_id_map_removals.as_deref()); - // If the TX will be logged, record its projected tx offset, // then increment the counter. if self.tx_consumes_offset(&tx_data, ctx) { @@ -613,9 +620,21 @@ impl CommittedState { } fn merge_index_map(&mut self, index_id_map: IndexIdMap, index_id_map_removals: Option<&RemovedIndexIdSet>) { - for index_id in index_id_map_removals.into_iter().flatten() { - self.index_id_map.remove(index_id); + // Remove indices that tx-state removed. + // It's not necessarily the case that the index already existed in the committed state. + for (index_id, table_id) in index_id_map_removals + .into_iter() + .flatten() + .filter_map(|index_id| self.index_id_map.remove(index_id).map(|x| (*index_id, x))) + { + assert!(self + .tables + .get_mut(&table_id) + .expect("table to delete index from should exist") + .delete_index(&self.blob_store, index_id)); } + + // Add the ones tx-state added. self.index_id_map.extend(index_id_map); } diff --git a/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs b/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs index d915f542ac5..23ed78bb7b1 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs @@ -997,8 +997,9 @@ mod tests { use crate::db::datastore::Result; use crate::error::{DBError, IndexError}; use bsatn::to_vec; + use core::{fmt, mem}; use itertools::Itertools; - use pretty_assertions::assert_eq; + use pretty_assertions::{assert_eq, assert_matches}; use spacetimedb_lib::db::auth::{StAccess, StTableType}; use spacetimedb_lib::error::ResultTest; use spacetimedb_lib::resolved_type_via_v9; @@ -1260,41 +1261,47 @@ mod tests { ] } - fn basic_table_schema() -> TableSchema { - basic_table_schema_with_indices( - vec![ - IndexSchema { - index_id: IndexId::SENTINEL, - table_id: TableId::SENTINEL, - index_name: "Foo_id_idx_btree".into(), - index_algorithm: IndexAlgorithm::BTree(BTreeAlgorithm { columns: col_list![0] }), - }, - IndexSchema { - index_id: IndexId::SENTINEL, - table_id: TableId::SENTINEL, - index_name: "Foo_name_idx_btree".into(), - index_algorithm: IndexAlgorithm::BTree(BTreeAlgorithm { columns: col_list![1] }), - }, - ], - vec![ - ConstraintSchema { - table_id: TableId::SENTINEL, - constraint_id: ConstraintId::SENTINEL, - constraint_name: "Foo_id_key".into(), - data: ConstraintData::Unique(UniqueConstraintData { - columns: col_list![0].into(), - }), - }, - ConstraintSchema { - table_id: TableId::SENTINEL, - constraint_id: ConstraintId::SENTINEL, - constraint_name: "Foo_name_key".into(), - data: ConstraintData::Unique(UniqueConstraintData { - columns: col_list![1].into(), - }), - }, - ], - ) + fn basic_indices() -> Vec { + vec![ + IndexSchema { + index_id: IndexId::SENTINEL, + table_id: TableId::SENTINEL, + index_name: "Foo_id_idx_btree".into(), + index_algorithm: IndexAlgorithm::BTree(BTreeAlgorithm { columns: col_list![0] }), + }, + IndexSchema { + index_id: IndexId::SENTINEL, + table_id: TableId::SENTINEL, + index_name: "Foo_name_idx_btree".into(), + index_algorithm: IndexAlgorithm::BTree(BTreeAlgorithm { columns: col_list![1] }), + }, + ] + } + + fn extract_index_id(datastore: &Locking, tx: &MutTxId, index: &IndexSchema) -> ResultTest { + let index_id = datastore.index_id_from_name_mut_tx(tx, &index.index_name)?; + Ok(index_id.expect("the index should exist")) + } + + fn basic_constraints() -> Vec { + vec![ + ConstraintSchema { + table_id: TableId::SENTINEL, + constraint_id: ConstraintId::SENTINEL, + constraint_name: "Foo_id_key".into(), + data: ConstraintData::Unique(UniqueConstraintData { + columns: col_list![0].into(), + }), + }, + ConstraintSchema { + table_id: TableId::SENTINEL, + constraint_id: ConstraintId::SENTINEL, + constraint_name: "Foo_name_key".into(), + data: ConstraintData::Unique(UniqueConstraintData { + columns: col_list![1].into(), + }), + }, + ] } fn basic_table_schema_with_indices(indices: Vec, constraints: Vec) -> TableSchema { @@ -1349,14 +1356,30 @@ mod tests { ) } - fn setup_table() -> ResultTest<(Locking, MutTxId, TableId)> { + // TODO(centril): find-replace all occurrences of body. + fn begin_mut_tx(datastore: &Locking) -> MutTxId { + datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests) + } + + fn setup_table_with_indices( + indices: Vec, + constraints: Vec, + ) -> ResultTest<(Locking, MutTxId, TableId)> { let datastore = get_datastore()?; - let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); - let schema = basic_table_schema(); + let mut tx = begin_mut_tx(&datastore); + let schema = basic_table_schema_with_indices(indices, constraints); let table_id = datastore.create_table_mut_tx(&mut tx, schema)?; Ok((datastore, tx, table_id)) } + fn setup_table() -> ResultTest<(Locking, MutTxId, TableId)> { + setup_table_with_indices(basic_indices(), basic_constraints()) + } + + fn random_row() -> ProductValue { + u32_str_u32(42, "foo", 24) + } + fn all_rows(datastore: &Locking, tx: &MutTxId, table_id: TableId) -> Vec { datastore .iter_mut_tx(tx, table_id) @@ -1384,6 +1407,19 @@ mod tests { Ok((gen_cols, row_ref)) } + fn update<'a>( + datastore: &'a Locking, + tx: &'a mut MutTxId, + table_id: TableId, + index_id: IndexId, + row: &ProductValue, + ) -> Result<(AlgebraicValue, RowRef<'a>)> { + let row = to_vec(&row).unwrap(); + let (gen_cols, row_ref) = datastore.update_mut_tx(tx, table_id, index_id, &row)?; + let gen_cols = row_ref.project(&gen_cols)?; + Ok((gen_cols, row_ref)) + } + #[test] fn test_bootstrapping_sets_up_tables() -> ResultTest<()> { let datastore = get_datastore()?; @@ -2075,24 +2111,289 @@ mod tests { Ok(()) } + fn expect_index_err(res: Result) -> IndexError { + res.expect_err("`res` should be an error") + .into_index() + .expect("the error should be an `IndexError`") + } + + fn commit(datastore: &Locking, tx: MutTxId) -> ResultTest { + Ok(datastore.commit_mut_tx(tx)?.expect("commit should produce `TxData`")) + } + + fn test_under_tx_and_commit( + datastore: &Locking, + mut tx: MutTxId, + mut test: impl FnMut(&mut MutTxId) -> ResultTest<()>, + ) -> ResultTest<()> { + // Test the tx state. + test(&mut tx)?; + + // Test the commit state. + commit(datastore, tx)?; + test(&mut begin_mut_tx(datastore)) + } + + /// Checks that update validates the row against the row type. + #[test] + fn test_update_wrong_row_type() -> ResultTest<()> { + let (datastore, tx, table_id) = setup_table_with_indices([].into(), [].into())?; + test_under_tx_and_commit(&datastore, tx, |tx| { + // We provide an index that doesn't exist on purpose. + let index_id = 0.into(); + // Remove the last field of the row, invalidating it. + let mut row = Vec::from(random_row().elements); + let _ = row.pop().expect("there should be an element to remove"); + let row = row.into(); + // Now attempt the update. + let err = update(&datastore, tx, table_id, index_id, &row) + .expect_err("the update should fail") + .into_table() + .expect("the error should be a `TableError`") + .into_bflatn() + .expect("the error should be a bflatn error"); + + assert_matches!(err, spacetimedb_table::bflatn_to::Error::Decode(..)); + Ok(()) + }) + } + + /// Checks that update checks if the index exists. #[test] fn test_update_no_such_index() -> ResultTest<()> { - let datastore = get_datastore()?; - let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); - let schema = basic_table_schema_with_indices([].into(), [].into()); - let table_id = datastore.create_table_mut_tx(&mut tx, schema)?; + let (datastore, tx, table_id) = setup_table_with_indices([].into(), [].into())?; + test_under_tx_and_commit(&datastore, tx, |tx| { + let index_id = 0.into(); + let err = expect_index_err(update(&datastore, tx, table_id, index_id, &random_row())); + assert_eq!(err, IndexError::NotFound(index_id)); + Ok(()) + }) + } - // There are no indices attached to `table_id`. - let index_id = 0.into(); - let row = to_vec(&u32_str_u32(42, "foo", 24)).unwrap(); - let err = datastore - .update_mut_tx(&mut tx, table_id, index_id, &row) - .expect_err("update using a non-existent index should error") - .into_index() - .expect("the error should be an index error"); - assert_eq!(err, IndexError::NotFound(index_id)); + /// Checks that update checks if the index exists and that this considers tx-state index deletion. + #[test] + fn test_update_no_such_index_because_deleted() -> ResultTest<()> { + // Setup and immediately commit. + let (datastore, tx, table_id) = setup_table()?; + commit(&datastore, tx)?; - Ok(()) + // Remove index in tx state. + let mut tx = begin_mut_tx(&datastore); + let index_id = extract_index_id(&datastore, &tx, &basic_indices()[0])?; + tx.drop_index(index_id)?; + + test_under_tx_and_commit(&datastore, tx, |tx: &mut _| { + let err = expect_index_err(update(&datastore, tx, table_id, index_id, &random_row())); + assert_eq!(err, IndexError::NotFound(index_id)); + Ok(()) + }) + } + + /// Checks that update ensures the index is unique. + #[test] + fn test_update_index_not_unique() -> ResultTest<()> { + let indices = basic_indices(); + let (datastore, mut tx, table_id) = setup_table_with_indices(indices.clone(), [].into())?; + let row = &random_row(); + insert(&datastore, &mut tx, table_id, row)?; + + test_under_tx_and_commit(&datastore, tx, |tx| { + for index in &indices { + let index_id = extract_index_id(&datastore, tx, index)?; + let err = expect_index_err(update(&datastore, tx, table_id, index_id, row)); + assert_eq!(err, IndexError::NotUnique(index_id)); + } + Ok(()) + }) + } + + /// Checks that update ensures that the row-to-update exists. + #[test] + fn test_update_no_such_row() -> ResultTest<()> { + let (datastore, tx, table_id) = setup_table()?; + + test_under_tx_and_commit(&datastore, tx, |tx| { + let row = &random_row(); + for (index_pos, index) in basic_indices().into_iter().enumerate() { + let index_id = extract_index_id(&datastore, tx, &index)?; + let err = expect_index_err(update(&datastore, tx, table_id, index_id, row)); + let needle = row.get_field(index_pos, None).unwrap().clone(); + assert_eq!(err, IndexError::KeyNotFound(index_id, needle)); + } + Ok(()) + }) + } + + /// Checks that update ensures that the row-to-update exists and considers delete tables. + #[test] + fn test_update_no_such_row_because_deleted() -> ResultTest<()> { + let (datastore, mut tx, table_id) = setup_table()?; + + // Insert the row, commit, and delete it. + let row = &random_row(); + insert(&datastore, &mut tx, table_id, row)?; + commit(&datastore, tx)?; + let mut tx = begin_mut_tx(&datastore); + assert_eq!(1, datastore.delete_by_rel_mut_tx(&mut tx, table_id, [row.clone()])); + + test_under_tx_and_commit(&datastore, tx, |tx| { + for (index_pos, index) in basic_indices().into_iter().enumerate() { + let index_id = extract_index_id(&datastore, tx, &index)?; + let err = expect_index_err(update(&datastore, tx, table_id, index_id, row)); + let needle = row.get_field(index_pos, None).unwrap().clone(); + assert_eq!(err, IndexError::KeyNotFound(index_id, needle)); + } + Ok(()) + }) + } + + /// Checks that update ensures that the row-to-update exists and considers delete tables. + #[test] + fn test_update_no_such_row_because_deleted_new_index_in_tx() -> ResultTest<()> { + let (datastore, mut tx, table_id) = setup_table_with_indices([].into(), [].into())?; + + // Insert the row and commit. + let row = &random_row(); + insert(&datastore, &mut tx, table_id, row)?; + commit(&datastore, tx)?; + + // Now add the indices and then delete the row. + let mut tx = begin_mut_tx(&datastore); + let mut indices = basic_indices(); + for index in &mut indices { + index.table_id = table_id; + index.index_id = datastore.create_index_mut_tx(&mut tx, index.clone(), true)?; + } + assert_eq!(1, datastore.delete_by_rel_mut_tx(&mut tx, table_id, [row.clone()])); + + test_under_tx_and_commit(&datastore, tx, |tx| { + for (index_pos, index) in indices.iter().enumerate() { + let err = expect_index_err(update(&datastore, tx, table_id, index.index_id, row)); + let needle = row.get_field(index_pos, None).unwrap().clone(); + assert_eq!(err, IndexError::KeyNotFound(index.index_id, needle)); + } + Ok(()) + }) + } + + /// Checks that update ensures that the row-to-update exists and that sequences were used. + #[test] + fn test_update_no_such_row_seq_triggered() -> ResultTest<()> { + let (datastore, tx, table_id) = setup_table()?; + test_under_tx_and_commit(&datastore, tx, |tx| { + let mut row = random_row(); + let field_before = mem::replace(&mut row.elements[0], 0u32.into()); + + // Use the index on the first u32 field as it's unique auto_inc. + let index_id = extract_index_id(&datastore, tx, &basic_indices()[0])?; + + // Attempt the update. + let err = expect_index_err(update(&datastore, tx, table_id, index_id, &row)); + assert_matches!(err, IndexError::KeyNotFound(_, key) if key != field_before); + Ok(()) + }) + } + + /// Checks that update checks other unique constraints against the committed state. + #[test] + fn test_update_violates_commit_unique_constraints() -> ResultTest<()> { + let (datastore, mut tx, table_id) = setup_table_with_indices([].into(), [].into())?; + + // Insert two rows. + let mut row = random_row(); + insert(&datastore, &mut tx, table_id, &row)?; + row.elements[0] = 24u32.into(); + let original_string = mem::replace(&mut row.elements[1], "bar".into()); + insert(&datastore, &mut tx, table_id, &row)?; + row.elements[1] = original_string; + + // Add the index on the string field. + let mut indices = basic_indices(); + for index in &mut indices { + index.table_id = table_id; + } + datastore.create_index_mut_tx(&mut tx, indices.swap_remove(1), true)?; + // Commit. + commit(&datastore, tx)?; + + // *After committing*, add the u32 field index. + // We'll use that index to seek whilst changing the second field to the first row we added. + let mut tx = begin_mut_tx(&datastore); + let index_id = datastore.create_index_mut_tx(&mut tx, indices.swap_remove(0), true)?; + + test_under_tx_and_commit(&datastore, tx, |tx| { + // Attempt the update. There should be a unique constraint violation on the string field. + let err = expect_index_err(update(&datastore, tx, table_id, index_id, &row)); + assert_matches!(err, IndexError::UniqueConstraintViolation(_)); + Ok(()) + }) + } + + /// Checks that update checks other unique constraints against the committed state. + #[test] + fn test_update_violates_tx_unique_constraints() -> ResultTest<()> { + let (datastore, mut tx, table_id) = setup_table()?; + + // Insert two rows. + let mut row = random_row(); + insert(&datastore, &mut tx, table_id, &row)?; + row.elements[0] = 24u32.into(); + let original_string = mem::replace(&mut row.elements[1], "bar".into()); + insert(&datastore, &mut tx, table_id, &row)?; + row.elements[1] = original_string; + + // Seek the index on the first u32 field. + let index_id = extract_index_id(&datastore, &tx, &basic_indices()[0])?; + + test_under_tx_and_commit(&datastore, tx, |tx| { + // Attempt the update. There should be a unique constraint violation on the string field. + let err = expect_index_err(update(&datastore, tx, table_id, index_id, &row)); + assert_matches!(err, IndexError::UniqueConstraintViolation(_)); + Ok(()) + }) + } + + /// Checks that update is idempotent. + #[test] + fn test_update_idempotent() -> ResultTest<()> { + let (datastore, mut tx, table_id) = setup_table()?; + + // Insert a row. + let row = &random_row(); + insert(&datastore, &mut tx, table_id, row)?; + // Seek the index on the first u32 field. + let index_id = extract_index_id(&datastore, &tx, &basic_indices()[0])?; + + test_under_tx_and_commit(&datastore, tx, |tx| { + let (_, new_row) = update(&datastore, tx, table_id, index_id, row).expect("update should have succeeded"); + assert_eq!(row, &new_row.to_product_value()); + Ok(()) + }) + } + + /// Checks that update successfully uses sequences. + #[test] + fn test_update_uses_sequences() -> ResultTest<()> { + let (datastore, mut tx, table_id) = setup_table()?; + + // Insert a row. + let mut row = random_row(); + row.elements[0] = 0u32.into(); + insert(&datastore, &mut tx, table_id, &row)?; + + // Seek the index on the string field. + let index_id = extract_index_id(&datastore, &tx, &basic_indices()[1])?; + + test_under_tx_and_commit(&datastore, tx, |tx| { + let mut row = row.clone(); + let (seq_val, new_row) = + update(&datastore, tx, table_id, index_id, &row).expect("update should have succeeded"); + let new_row = new_row.to_product_value(); + assert_eq!(&seq_val, &new_row.elements[0]); + row.elements[0] = seq_val; + assert_eq!(row, new_row); + Ok(()) + }) } #[test] diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs index 9f1494e5bc3..626b5909021 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs @@ -43,7 +43,7 @@ use spacetimedb_schema::{ use spacetimedb_table::{ blob_store::{BlobStore, HashMapBlobStore}, indexes::{RowPointer, SquashedOffset}, - table::{IndexScanIter, InsertError, RowRef, Table, TableAndIndex}, + table::{DuplicateError, IndexScanIter, InsertError, RowRef, Table, TableAndIndex}, }; use std::{ sync::Arc, @@ -456,13 +456,11 @@ impl MutTxId { // Remove the index in the transaction's insert table. // By altering the insert table, this gets moved over to the committed state on merge. let (table, blob_store, idx_map, ..) = self.get_or_create_insert_table_mut(table_id)?; - if table.delete_index(blob_store, index_id) { - // This likely will do a clone-write as over time? - // The schema might have found other referents. - table.with_mut_schema(|s| s.indexes.retain(|x| x.index_id != index_id)); - } - // Remove the `index_id -> (table_id, col_list)` association. + assert!(table.delete_index(blob_store, index_id)); + // Remove the `index_id -> (table_id, col_list)` association from tx state. idx_map.remove(&index_id); + // Queue the deletion of the index in the committed state. + // Note that the index could have been added in this tx. self.tx_state .index_id_map_removals .get_or_insert_with(Default::default) @@ -1216,9 +1214,7 @@ impl MutTxId { .ok_or(TableError::IdNotFoundState(table_id))?; // 1. Insert the physical row. - let (tx_row_ref, blob_bytes) = tx_table - .insert_physically_bsatn(tx_blob_store, row) - .map_err(InsertError::Bflatn)?; + let (tx_row_ref, blob_bytes) = tx_table.insert_physically_bsatn(tx_blob_store, row)?; // 2. Optionally: Detect, generate, write sequence values. // 3. Confirm that the insertion respects constraints and update statistics. // 4. Post condition (PC.INS.1): @@ -1363,7 +1359,7 @@ impl MutTxId { Ok((gen_cols, rri)) } // `row` previously present in insert tables; do nothing but return `ptr`. - Err(InsertError::Duplicate(ptr)) => { + Err(InsertError::Duplicate(DuplicateError(ptr))) => { let rri = RowRefInsertion::Existed( // SAFETY: `tx_table` told us that `ptr` refers to a valid row in it. unsafe { tx_table.get_row_ref_unchecked(tx_blob_store, ptr) }, @@ -1371,12 +1367,9 @@ impl MutTxId { Ok((gen_cols, rri)) } - // Index error: unbox and return `TableError::IndexError` - // rather than `TableError::Insert(InsertError::IndexError)`. + // Unwrap these error into `TableError::{IndexError, Bflatn}`: Err(InsertError::IndexError(e)) => Err(IndexError::from(e).into()), - - // Misc. insertion error; fail. - Err(e @ InsertError::Bflatn(_)) => Err(TableError::Insert(e).into()), + Err(InsertError::Bflatn(e)) => Err(TableError::Bflatn(e).into()), } } @@ -1411,9 +1404,7 @@ impl MutTxId { self.committed_state_write_lock.get_table(table_id), ) .ok_or(TableError::IdNotFoundState(table_id))?; - let (tx_row_ref, blob_bytes) = tx_table - .insert_physically_bsatn(tx_blob_store, row) - .map_err(InsertError::Bflatn)?; + let (tx_row_ref, blob_bytes) = tx_table.insert_physically_bsatn(tx_blob_store, row)?; // 2. Detect, generate, write sequence values in the new row. //---------------------------------------------------------------------- @@ -1442,45 +1433,42 @@ impl MutTxId { // 3. Find the old row and remove it. //---------------------------------------------------------------------- - /// Projects the new row to the index and find the old row. - #[inline(always)] - fn project_and_seek<'b>( - index_id: IndexId, - tx_row_ref: RowRef<'_>, - index: TableAndIndex<'b>, - commit_table: Option<&Table>, - del_table: &DeleteTable, - ) -> Result> { - // Confirm that we have a unique index. + #[inline] + fn ensure_unique(index_id: IndexId, index: TableAndIndex<'_>) -> Result<()> { if !index.index().is_unique() { return Err(IndexError::NotUnique(index_id).into()); } - + Ok(()) + } + /// Ensure that the new row does not violate the commit table's unique constraints. + #[inline] + fn check_commit_unique_constraints( + commit_table: &Table, + del_table: &DeleteTable, + ignore_index_id: IndexId, + new_row: RowRef<'_>, + old_ptr: RowPointer, + ) -> Result<()> { + commit_table + .check_unique_constraints( + new_row, + // Don't check this index since we'll do a 1-1 old/new replacement. + |ixs| ixs.filter(|(&id, _)| id != ignore_index_id), + |commit_ptr| commit_ptr == old_ptr || del_table.contains(&commit_ptr), + ) + .map_err(IndexError::from) + .map_err(Into::into) + } + /// Projects the new row to the index to find the old row. + #[inline] + fn find_old_row(new_row: RowRef<'_>, index: TableAndIndex<'_>) -> (Option, AlgebraicValue) { + let index = index.index(); // Project the row to the index's columns/type. - let needle = tx_row_ref - .project(&index.index().indexed_columns) + let needle = new_row + .project(&index.indexed_columns) .expect("projecting a table row to one of the table's indices should never fail"); - // Find the old row. - let old_row_ref = index - .seek(&needle) - .next() - .ok_or_else(|| IndexError::KeyNotFound(index_id, needle))?; - let old_row_ptr = old_row_ref.pointer(); - - // Ensure that the new row does not violate the commit table's unique constraints. - if let Some(commit_table) = commit_table { - commit_table - .check_unique_constraints( - old_row_ref, - // Don't check this index since we'll do a 1-1 old/new replacement. - |ixs| ixs.filter(|(&id, _)| id != index_id), - |commit_ptr| commit_ptr == old_row_ptr || del_table.contains(&commit_ptr), - ) - .map_err(IndexError::from)?; - } - - Ok(old_row_ref) + (index.seek(&needle).next(), needle) } // The index we've been directed to use must exist @@ -1489,18 +1477,29 @@ impl MutTxId { // As it's unlikely that an index was added in this transaction, // we begin by checking the committed state. let err = 'failed_rev_ins: { - let tx_row_ptr = if let Some(commit_index) = self - .committed_state_write_lock - .get_index_by_id_with_table(table_id, index_id) - .filter(|_| !tx_removed_index) + let tx_row_ptr = if tx_removed_index { + break 'failed_rev_ins IndexError::NotFound(index_id).into(); + } else if let Some((commit_index, old_ptr)) = + // Find the committed state index, project the row to it, and find the old row. + // The old row must not have been deleted. + // + // If the old row wasn't found, it may still exist in the tx state, + // which inherits the index structure of the committed state, + // so we'd like to avoid an early error in that case. + self + .committed_state_write_lock + .get_index_by_id_with_table(table_id, index_id) + .and_then(|index| find_old_row(tx_row_ref, index).0.map(|ptr| (index, ptr))) + .filter(|(_, ptr)| !del_table.contains(ptr)) { - // Project the new row to the index and find the old row, - // doing various necessary checks along the way. - let commit_table = Some(commit_index.table()); - let old_row_ptr = match project_and_seek(index_id, tx_row_ref, commit_index, commit_table, del_table) { - Err(e) => break 'failed_rev_ins e, - Ok(x) => x.pointer(), - }; + // 1. Ensure the index is unique. + // 2. Ensure the new row doesn't violate any other committed state unique indices. + if let Err(e) = ensure_unique(index_id, commit_index).and_then(|_| { + check_commit_unique_constraints(commit_index.table(), del_table, index_id, tx_row_ref, old_ptr) + }) { + break 'failed_rev_ins e; + } + // Check constraints and confirm the insertion of the new row. // // SAFETY: `self.is_row_present(row)` holds as we still haven't deleted the row, @@ -1508,24 +1507,61 @@ impl MutTxId { // On error, `tx_row_ptr` has already been removed, so don't do it again. let (_, tx_row_ptr) = unsafe { tx_table.confirm_insertion(tx_blob_store, tx_row_ptr, blob_bytes) }?; // Delete the old row. - del_table.insert(old_row_ptr); + del_table.insert(old_ptr); tx_row_ptr - } else if let Some(tx_index) = tx_table.get_index_by_id_with_table(tx_blob_store, index_id) { - // Project the new row to the index and find the old row, - // doing various necessary checks along the way. - let commit_table = self.committed_state_write_lock.get_table(table_id); - let old_row_ptr = match project_and_seek(index_id, tx_row_ref, tx_index, commit_table, del_table) { + } else if let Some(tx_index) = + // Either the row was not found in the committed state index, + // or the index was added in our tx state. + // In the latter case, committed state rows will be present in the index, + // so we must handle those specially. + tx_table.get_index_by_id_with_table(tx_blob_store, index_id) + { + // 0. Find the old row. + // 1. Ensure the index is unique. + // 2. Ensure the new row doesn't violate any other committed state unique indices. + let (old_ptr, needle) = find_old_row(tx_row_ref, tx_index); + let res = old_ptr + // If we have an old committed state row, ensure it hasn't been deleted in our tx. + .filter(|ptr| ptr.squashed_offset() == SquashedOffset::TX_STATE || !del_table.contains(ptr)) + .ok_or_else(|| IndexError::KeyNotFound(index_id, needle).into()) + .and_then(|old_ptr| { + ensure_unique(index_id, tx_index)?; + if let Some(commit_table) = self.committed_state_write_lock.get_table(table_id) { + check_commit_unique_constraints(commit_table, del_table, index_id, tx_row_ref, old_ptr)?; + } + Ok(old_ptr) + }); + let old_ptr = match res { Err(e) => break 'failed_rev_ins e, - Ok(x) => x.pointer(), + Ok(x) => x, }; - // Check constraints and confirm the update of the new row. - // This ensures that the old row is removed from the indices - // before attempting to insert the new row into the indices. - // - // SAFETY: `self.is_row_present(tx_row_ptr)` and `self.is_row_present(old_row_ptr)` both hold - // as we've deleted neither. - // In particular, the `write_gen_val_to_col` call does not remove the row. - unsafe { tx_table.confirm_update(tx_blob_store, tx_row_ptr, old_row_ptr, blob_bytes) }? + + match old_ptr.squashed_offset() { + SquashedOffset::COMMITTED_STATE => { + // Check constraints and confirm the insertion of the new row. + // + // SAFETY: `self.is_row_present(row)` holds as we still haven't deleted the row, + // in particular, the `write_gen_val_to_col` call does not remove the row. + // On error, `tx_row_ptr` has already been removed, so don't do it again. + let (_, tx_row_ptr) = + unsafe { tx_table.confirm_insertion(tx_blob_store, tx_row_ptr, blob_bytes) }?; + // Delete the old row. + del_table.insert(old_ptr); + tx_row_ptr + } + SquashedOffset::TX_STATE => { + // Check constraints and confirm the update of the new row. + // This ensures that the old row is removed from the indices + // before attempting to insert the new row into the indices. + // + // SAFETY: `self.is_row_present(tx_row_ptr)` and `self.is_row_present(old_ptr)` both hold + // as we've deleted neither. + // In particular, the `write_gen_val_to_col` call does not remove the row. + unsafe { tx_table.confirm_update(tx_blob_store, tx_row_ptr, old_ptr, blob_bytes) } + .map_err(IndexError::UniqueConstraintViolation)? + } + _ => unreachable!("Invalid SquashedOffset for RowPointer: {:?}", old_ptr), + } } else { break 'failed_rev_ins IndexError::NotFound(index_id).into(); }; @@ -1596,9 +1632,7 @@ impl MutTxId { // We only want to physically insert the row here to get a row pointer. // We'd like to avoid any set semantic and unique constraint checks. - let (row_ref, _) = tx_table - .insert_physically_pv(tx_blob_store, rel) - .map_err(InsertError::Bflatn)?; + let (row_ref, _) = tx_table.insert_physically_pv(tx_blob_store, rel)?; let ptr = row_ref.pointer(); // First, check if a matching row exists in the `tx_table`. diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs index a11c9e3d4d3..61cdc1cba98 100644 --- a/crates/core/src/error.rs +++ b/crates/core/src/error.rs @@ -9,8 +9,8 @@ use spacetimedb_expr::errors::TypingError; use spacetimedb_sats::AlgebraicType; use spacetimedb_schema::error::ValidationErrors; use spacetimedb_snapshot::SnapshotError; -use spacetimedb_table::read_column; -use spacetimedb_table::table::{self, InsertError, ReadViaBsatnError, UniqueConstraintViolation}; +use spacetimedb_table::table::{self, ReadViaBsatnError, UniqueConstraintViolation}; +use spacetimedb_table::{bflatn_to, read_column}; use thiserror::Error; use crate::client::ClientActorId; @@ -29,7 +29,7 @@ use spacetimedb_sats::{AlgebraicValue, ProductValue}; use spacetimedb_vm::errors::{ErrorKind, ErrorLang, ErrorType, ErrorVm}; use spacetimedb_vm::expr::Crud; -#[derive(Error, Debug)] +#[derive(Error, Debug, EnumAsInner)] pub enum TableError { #[error("Table with name `{0}` start with 'st_' and that is reserved for internal system tables.")] System(Box), @@ -69,7 +69,9 @@ pub enum TableError { found: String, }, #[error(transparent)] - Insert(#[from] table::InsertError), + Bflatn(#[from] bflatn_to::Error), + #[error(transparent)] + Duplicate(#[from] table::DuplicateError), #[error(transparent)] ReadColTypeError(#[from] read_column::TypeError), } @@ -230,12 +232,22 @@ pub enum DBError { TypeError(#[from] TypingError), } -impl From for DBError { - fn from(err: InsertError) -> Self { +impl From for DBError { + fn from(err: bflatn_to::Error) -> Self { Self::Table(err.into()) } } +impl From for DBError { + fn from(err: table::InsertError) -> Self { + match err { + table::InsertError::Duplicate(e) => TableError::from(e).into(), + table::InsertError::Bflatn(e) => TableError::from(e).into(), + table::InsertError::IndexError(e) => IndexError::from(e).into(), + } + } +} + impl From for DBError { fn from(e: SnapshotError) -> Self { DBError::Snapshot(Box::new(e)) diff --git a/crates/table/src/table.rs b/crates/table/src/table.rs index 26d58413e60..e3e52bed484 100644 --- a/crates/table/src/table.rs +++ b/crates/table/src/table.rs @@ -31,6 +31,7 @@ use core::{ hint::unreachable_unchecked, }; use derive_more::{Add, AddAssign, From, Sub, SubAssign}; +use enum_as_inner::EnumAsInner; use smallvec::SmallVec; use spacetimedb_lib::{bsatn::DecodeError, de::DeserializeOwned}; use spacetimedb_primitives::{ColId, ColList, IndexId, SequenceId}; @@ -188,12 +189,17 @@ impl MemoryUsage for TableInner { } } -/// Various error that can happen on table insertion. +/// There was already a row with the same value. #[derive(Error, Debug, PartialEq, Eq)] +#[error("Duplicate insertion of row {0:?} violates set semantics")] +pub struct DuplicateError(pub RowPointer); + +/// Various error that can happen on table insertion. +#[derive(Error, Debug, PartialEq, Eq, EnumAsInner)] pub enum InsertError { /// There was already a row with the same value. - #[error("Duplicate insertion of row {0:?} violates set semantics")] - Duplicate(RowPointer), + #[error(transparent)] + Duplicate(#[from] DuplicateError), /// Couldn't write the row to the page manager. #[error(transparent)] @@ -513,7 +519,7 @@ impl Table { new_ptr: RowPointer, old_ptr: RowPointer, blob_bytes_added: BlobNumBytes, - ) -> Result { + ) -> Result { // (1) Remove old row from indices. // SAFETY: Caller promised that `self.is_row_present(old_ptr)` holds. unsafe { self.delete_from_indices(blob_store, old_ptr) }; @@ -562,7 +568,7 @@ impl Table { &'a mut self, blob_store: &'a mut dyn BlobStore, ptr: RowPointer, - ) -> Result<(), InsertError> { + ) -> Result<(), UniqueConstraintViolation> { let mut index_error = None; for (&index_id, index) in self.indexes.iter_mut() { // SAFETY: We just inserted `ptr`, so it must be present. @@ -570,8 +576,7 @@ impl Table { if index.check_and_insert(row_ref).unwrap().is_some() { let cols = &index.indexed_columns; let value = row_ref.project(cols).unwrap(); - let error = - InsertError::IndexError(UniqueConstraintViolation::build(&self.schema, index, index_id, value)); + let error = UniqueConstraintViolation::build(&self.schema, index, index_id, value); index_error = Some(error); break; } @@ -637,7 +642,7 @@ impl Table { &'a mut self, blob_store: &'a mut dyn BlobStore, ptr: RowPointer, - ) -> Result, InsertError> { + ) -> Result, DuplicateError> { if self.pointer_map.is_none() { // No pointer map? Set semantic constraint is checked by a unique index instead. return Ok(None); @@ -658,7 +663,7 @@ impl Table { .pages .delete_row(&self.inner.visitor_prog, self.row_size(), ptr, blob_store) }; - return Err(InsertError::Duplicate(existing_row)); + return Err(DuplicateError(existing_row)); } // If the optimistic insertion was correct, @@ -929,7 +934,7 @@ impl Table { blob_store: &mut dyn BlobStore, row: &ProductValue, skip_index_update: bool, - ) -> Result, InsertError> { + ) -> Result, Error> { // Insert `row` temporarily so `temp_ptr` and `hash` can be used to find the row. // This must avoid consulting and inserting to the pointer map, // as the row is already present, set-semantically. @@ -1032,6 +1037,12 @@ impl Table { self.rebuild_pointer_map(blob_store); } + // Remove index from schema. + // + // This likely will do a clone-write as over time? + // The schema might have found other referents. + self.with_mut_schema(|s| s.indexes.retain(|x| x.index_id != index_id)); + ret }