diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs index d67c84520d..e44cfd13af 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs @@ -1196,16 +1196,6 @@ impl MutTxId { pub(super) fn insert_row_internal(&mut self, table_id: TableId, row: &ProductValue) -> Result> { let commit_table = self.committed_state_write_lock.get_table(table_id); - // Check for constraint violations as early as possible, - // to ensure that `UniqueConstraintViolation` errors have precedence over other errors. - // `tx_table.insert` will later perform the same check on the tx table, - // so this method needs only to check the committed state. - if let Some(commit_table) = commit_table { - commit_table - .check_unique_constraints(row, |maybe_conflict| self.tx_state.is_deleted(table_id, maybe_conflict)) - .map_err(IndexError::from)?; - } - // Get the insert table, so we can write the row into it. let (tx_table, tx_blob_store, _, delete_table) = self .tx_state @@ -1213,18 +1203,22 @@ impl MutTxId { .ok_or(TableError::IdNotFoundState(table_id))?; match tx_table.insert(tx_blob_store, row) { - Ok((hash, row_ref)) => { - // `row` not previously present in insert tables, - // but may still be a set-semantic conflict with a row - // in the committed state. - - let ptr = row_ref.pointer(); + Ok((tx_row_hash, tx_row_ref)) => { + let tx_row_ptr = tx_row_ref.pointer(); if let Some(commit_table) = commit_table { - // Safety: + // The `tx_row_ref` was not previously present in insert tables, + // but may still be a set-semantic conflict + // or may violate a unique constraint with a row in the committed state. + // We'll check the set-semantic aspect in (1) and the constraint in (2). + + // (1) Rule out a set-semantic conflict with the committed state. + // SAFETY: // - `commit_table` and `tx_table` use the same schema // because `tx_table` is derived from `commit_table`. - // - `ptr` and `hash` are correct because we just got them from `tx_table.insert`. - if let Some(committed_ptr) = unsafe { Table::find_same_row(commit_table, tx_table, ptr, hash) } { + // - `tx_row_ptr` and `tx_row_hash` are correct because we just got them from `tx_table.insert`. + if let Some(commit_ptr) = + unsafe { Table::find_same_row(commit_table, tx_table, tx_row_ptr, tx_row_hash) } + { // If `row` was already present in the committed state, // either this is a set-semantic duplicate, // or the row is marked as deleted, so we will undelete it @@ -1250,26 +1244,42 @@ impl MutTxId { // - Insert Row A // This is impossible to recover if `Running 2` elides its insert. tx_table - .delete(tx_blob_store, ptr, |_| ()) + .delete(tx_blob_store, tx_row_ptr, |_| ()) .expect("Failed to delete a row we just inserted"); // It's possible that `row` appears in the committed state, // but is marked as deleted. // In this case, undelete it, so it remains in the committed state. - delete_table.remove(&committed_ptr); + delete_table.remove(&commit_ptr); // No new row was inserted, but return `committed_ptr`. let blob_store = &self.committed_state_write_lock.blob_store; return Ok(RowRefInsertion::Existed( // SAFETY: `find_same_row` told us that `ptr` refers to a valid row in `commit_table`. - unsafe { commit_table.get_row_ref_unchecked(blob_store, committed_ptr) }, + unsafe { commit_table.get_row_ref_unchecked(blob_store, commit_ptr) }, )); } + + // Pacify the borrow checker. + // SAFETY: `ptr` came from `tx_table.insert` just now without any interleaving calls. + let tx_row_ref = unsafe { tx_table.get_row_ref_unchecked(tx_blob_store, tx_row_ptr) }; + + // (2) The `tx_row_ref` did not violate a unique constraint *within* the `tx_table`, + // but it could do so wrt., `commit_table`, + // assuming the conflicting row hasn't been deleted since. + // Ensure that it doesn't, or roll back the insertion. + if let Err(e) = commit_table + .check_unique_constraints(tx_row_ref, |commit_ptr| delete_table.contains(&commit_ptr)) + { + // There was a constraint violation, so undo the insertion. + tx_table.delete(tx_blob_store, tx_row_ptr, |_| {}); + return Err(IndexError::from(e).into()); + } } Ok(RowRefInsertion::Inserted(unsafe { // SAFETY: `ptr` came from `tx_table.insert` just now without any interleaving calls. - tx_table.get_row_ref_unchecked(tx_blob_store, ptr) + tx_table.get_row_ref_unchecked(tx_blob_store, tx_row_ptr) })) } // `row` previously present in insert tables; do nothing but return `ptr`. diff --git a/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs b/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs index efd37ee184..14166e137c 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs @@ -376,7 +376,7 @@ impl<'a, R: RangeBounds> Iterator for ScanIterByColRange<'a, R> fn next(&mut self) -> Option { for row_ref in &mut self.scan_iter { - let value = row_ref.project_not_empty(&self.cols).unwrap(); + let value = row_ref.project(&self.cols).unwrap(); if self.range.contains(&value) { return Some(row_ref); } diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index b429bc0eb3..9fa1ef92ba 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -1602,7 +1602,7 @@ mod tests { fn table_indexed(is_unique: bool) -> TableSchema { table( "MyTable", - ProductType::from([("my_col", AlgebraicType::I64)]), + ProductType::from([("my_col", AlgebraicType::I64), ("other_col", AlgebraicType::I64)]), |builder| { let builder = builder.with_index( RawIndexAlgorithm::BTree { columns: 0.into() }, @@ -1908,8 +1908,8 @@ mod tests { "Index not created" ); - stdb.insert(&mut tx, table_id, product![1i64])?; - stdb.insert(&mut tx, table_id, product![1i64])?; + stdb.insert(&mut tx, table_id, product![1i64, 1i64])?; + stdb.insert(&mut tx, table_id, product![1i64, 1i64])?; assert_eq!(collect_from_sorted(&stdb, &tx, table_id, 0i64)?, vec![1]); Ok(()) @@ -1979,21 +1979,12 @@ mod tests { "Index not created" ); - stdb.insert(&mut tx, table_id, product![1i64]) + stdb.insert(&mut tx, table_id, product![1i64, 0i64]) .expect("stdb.insert failed"); - match stdb.insert(&mut tx, table_id, product![1i64]) { - Ok(_) => { - panic!("Allow to insert duplicate row") - } - Err(DBError::Index(err)) => match err { - IndexError::UniqueConstraintViolation { .. } => {} - err => { - panic!("Expected error `UniqueConstraintViolation`, got {err}") - } - }, - err => { - panic!("Expected error `UniqueConstraintViolation`, got {err:?}") - } + match stdb.insert(&mut tx, table_id, product![1i64, 1i64]) { + Ok(_) => panic!("Allow to insert duplicate row"), + Err(DBError::Index(IndexError::UniqueConstraintViolation { .. })) => {} + Err(err) => panic!("Expected error `UniqueConstraintViolation`, got {err}"), } Ok(()) diff --git a/crates/sats/src/algebraic_value/de.rs b/crates/sats/src/algebraic_value/de.rs index af4d064bbb..8915167ac3 100644 --- a/crates/sats/src/algebraic_value/de.rs +++ b/crates/sats/src/algebraic_value/de.rs @@ -18,7 +18,7 @@ impl ValueDeserializer { Self { val } } - /// Converts `&AlgebraicValue` to `&ValueDeserialize`. + /// Converts `&AlgebraicValue` to `&ValueDeserializer`. pub fn from_ref(val: &AlgebraicValue) -> &Self { // SAFETY: The conversion is OK due to `repr(transparent)`. unsafe { &*(val as *const AlgebraicValue as *const ValueDeserializer) } diff --git a/crates/table/src/btree_index.rs b/crates/table/src/btree_index.rs index f4460b597a..2bc00574c8 100644 --- a/crates/table/src/btree_index.rs +++ b/crates/table/src/btree_index.rs @@ -186,13 +186,57 @@ impl TypedIndex { Self::String(this) => insert_at_type(this, cols, row_ref), Self::AlgebraicValue(this) => { - let key = row_ref.project_not_empty(cols)?; + let key = row_ref.project(cols)?; this.insert(key, row_ref.pointer()); Ok(()) } } } + /// Add the row referred to by `row_ref` to the index `self`, + /// which must be keyed at `cols`. + /// + /// If `cols` is inconsistent with `self`, + /// or the `row_ref` has a row type other than that used for `self`, + /// this will behave oddly; it may return an error, + /// or may insert a nonsense value into the index. + /// Note, however, that it will not invoke undefined behavior. + /// + /// Assumes that this is a unique constraint and returns `Ok(Some(existing_row))` if it's violated. + fn insert_unique(&mut self, cols: &ColList, row_ref: RowRef<'_>) -> Result, InvalidFieldError> { + fn insert_at_type( + this: &mut Index, + cols: &ColList, + row_ref: RowRef<'_>, + ) -> Result, InvalidFieldError> { + let col_pos = cols.as_singleton().unwrap(); + let key = row_ref.read_col(col_pos).map_err(|_| col_pos)?; + Ok(this.insert_unique(key, row_ref.pointer()).copied()) + } + let unique_violation = match self { + Self::Bool(this) => insert_at_type(this, cols, row_ref), + Self::U8(this) => insert_at_type(this, cols, row_ref), + Self::I8(this) => insert_at_type(this, cols, row_ref), + Self::U16(this) => insert_at_type(this, cols, row_ref), + Self::I16(this) => insert_at_type(this, cols, row_ref), + Self::U32(this) => insert_at_type(this, cols, row_ref), + Self::I32(this) => insert_at_type(this, cols, row_ref), + Self::U64(this) => insert_at_type(this, cols, row_ref), + Self::I64(this) => insert_at_type(this, cols, row_ref), + Self::U128(this) => insert_at_type(this, cols, row_ref), + Self::I128(this) => insert_at_type(this, cols, row_ref), + Self::U256(this) => insert_at_type(this, cols, row_ref), + Self::I256(this) => insert_at_type(this, cols, row_ref), + Self::String(this) => insert_at_type(this, cols, row_ref), + + Self::AlgebraicValue(this) => { + let key = row_ref.project(cols)?; + Ok(this.insert_unique(key, row_ref.pointer()).copied()) + } + }?; + Ok(unique_violation) + } + /// Remove the row referred to by `row_ref` from the index `self`, /// which must be keyed at `cols`. /// @@ -229,7 +273,7 @@ impl TypedIndex { Self::String(this) => delete_at_type(this, cols, row_ref), Self::AlgebraicValue(this) => { - let key = row_ref.project_not_empty(cols)?; + let key = row_ref.project(cols)?; Ok(this.delete(&key, &row_ref.pointer())) } } @@ -407,12 +451,27 @@ impl BTreeIndex { /// Inserts `ptr` with the value `row` to this index. /// This index will extract the necessary values from `row` based on `self.cols`. - /// - /// Return false if `ptr` was already indexed prior to this call. pub fn insert(&mut self, cols: &ColList, row_ref: RowRef<'_>) -> Result<(), InvalidFieldError> { self.idx.insert(cols, row_ref) } + /// Inserts `ptr` with the value `row` to this index. + /// This index will extract the necessary values from `row` based on `self.cols`. + /// + /// Returns `Ok(Some(existing_row))` if this insertion would violate a unique constraint. + pub fn check_and_insert( + &mut self, + cols: &ColList, + row_ref: RowRef<'_>, + ) -> Result, InvalidFieldError> { + if self.is_unique { + self.idx.insert_unique(cols, row_ref) + } else { + self.idx.insert(cols, row_ref)?; + Ok(None) + } + } + /// Deletes `ptr` with its indexed value `col_value` from this index. /// /// Returns whether `ptr` was present. @@ -420,16 +479,6 @@ impl BTreeIndex { self.idx.delete(cols, row_ref) } - /// Returns an iterator over the rows that would violate the unique constraint of this index, - /// if `row` were inserted, - /// or `None`, if this index doesn't have a unique constraint. - pub fn get_rows_that_violate_unique_constraint<'a>( - &'a self, - row: &'a AlgebraicValue, - ) -> Option> { - self.is_unique.then(|| self.seek(row)) - } - /// Returns whether `value` is in this index. pub fn contains_any(&self, value: &AlgebraicValue) -> bool { self.seek(value).next().is_some() @@ -445,12 +494,13 @@ impl BTreeIndex { } /// Extends [`BTreeIndex`] with `rows`. - /// Returns whether every element in `rows` was inserted. pub fn build_from_rows<'table>( &mut self, cols: &ColList, rows: impl IntoIterator>, ) -> Result<(), InvalidFieldError> { + // TODO(centril, correctness): consider `self.is_unique`. + // Should not be able to add an index which would cause unique constraint violations. for row_ref in rows { self.insert(cols, row_ref)?; } @@ -516,6 +566,16 @@ mod test { !index.is_unique || index.contains_any(&get_fields(cols, row)) } + /// Returns an iterator over the rows that would violate the unique constraint of this index, + /// if `row` were inserted, + /// or `None`, if this index doesn't have a unique constraint. + fn get_rows_that_violate_unique_constraint<'a>( + index: &'a BTreeIndex, + row: &'a AlgebraicValue, + ) -> Option> { + index.is_unique.then(|| index.seek(row)) + } + proptest! { #[test] fn remove_nonexistent_noop(((ty, cols, pv), is_unique) in (gen_row_and_cols(), any::())) { @@ -538,7 +598,7 @@ mod test { prop_assert_eq!(index.idx.len(), 0); prop_assert_eq!(index.contains_any(&value), false); - index.insert(&cols, row_ref).unwrap(); + prop_assert_eq!(index.check_and_insert(&cols, row_ref).unwrap(), None); prop_assert_eq!(index.idx.len(), 1); prop_assert_eq!(index.contains_any(&value), true); @@ -559,20 +619,21 @@ mod test { prop_assert_eq!(index.idx.len(), 0); prop_assert_eq!(violates_unique_constraint(&index, &cols, &pv), false); prop_assert_eq!( - index.get_rows_that_violate_unique_constraint(&value).unwrap().collect::>(), + get_rows_that_violate_unique_constraint(&index, &value).unwrap().collect::>(), [] ); // Insert. - index.insert(&cols, row_ref).unwrap(); + prop_assert_eq!(index.check_and_insert(&cols, row_ref).unwrap(), None); // Inserting again would be a problem. prop_assert_eq!(index.idx.len(), 1); prop_assert_eq!(violates_unique_constraint(&index, &cols, &pv), true); prop_assert_eq!( - index.get_rows_that_violate_unique_constraint(&value).unwrap().collect::>(), + get_rows_that_violate_unique_constraint(&index, &value).unwrap().collect::>(), [row_ref.pointer()] ); + prop_assert_eq!(index.check_and_insert(&cols, row_ref).unwrap(), Some(row_ref.pointer())); } #[test] @@ -596,7 +657,7 @@ mod test { let row = product![x]; let row_ref = table.insert(&mut blob_store, &row).unwrap().1; val_to_ptr.insert(x, row_ref.pointer()); - index.insert(&cols, row_ref).unwrap(); + prop_assert_eq!(index.check_and_insert(&cols, row_ref).unwrap(), None); } fn test_seek(index: &BTreeIndex, val_to_ptr: &HashMap, range: impl RangeBounds, expect: impl IntoIterator) -> TestCaseResult { diff --git a/crates/table/src/btree_index/multimap.rs b/crates/table/src/btree_index/multimap.rs index 881a287455..699311e52c 100644 --- a/crates/table/src/btree_index/multimap.rs +++ b/crates/table/src/btree_index/multimap.rs @@ -37,6 +37,20 @@ impl MultiMap { self.map.entry(key).or_default().push(val); } + /// Inserts the relation `key -> val` to this multimap. + /// + /// Returns back the value if the `key` was already present in the map. + pub fn insert_unique(&mut self, key: K, val: V) -> Option<&V> { + // TODO(perf, centril): don't use a multimap at all for unique indices. + let vals = self.map.entry(key).or_default(); + if vals.is_empty() { + vals.push(val); + None + } else { + Some(&vals[0]) + } + } + /// Deletes `key -> val` from this multimap. /// /// Returns whether `key -> val` was present. diff --git a/crates/table/src/table.rs b/crates/table/src/table.rs index 514cdc8b5e..5aece1dc8b 100644 --- a/crates/table/src/table.rs +++ b/crates/table/src/table.rs @@ -205,15 +205,13 @@ impl Table { /// `MutTxId::insert` will ignore rows which are listed in the delete table. pub fn check_unique_constraints( &self, - row: &ProductValue, + row: RowRef<'_>, mut is_deleted: impl FnMut(RowPointer) -> bool, ) -> Result<(), UniqueConstraintViolation> { for (cols, index) in self.indexes.iter().filter(|(_, index)| index.is_unique) { let value = row.project(cols).unwrap(); - if let Some(mut conflicts) = index.get_rows_that_violate_unique_constraint(&value) { - if conflicts.any(|ptr| !is_deleted(ptr)) { - return Err(self.build_error_unique(index, cols, value)); - } + if index.seek(&value).next().is_some_and(|ptr| !is_deleted(ptr)) { + return Err(self.build_error_unique(index, cols, value)); } } Ok(()) @@ -236,27 +234,54 @@ impl Table { blob_store: &'a mut dyn BlobStore, row: &ProductValue, ) -> Result<(RowHash, RowRef<'a>), InsertError> { - // Check unique constraints. - // This error should take precedence over any other potential failures. - self.check_unique_constraints( - row, - // No need to worry about the committed vs tx state dichotomy here; - // just treat all rows in the table as live. - |_| false, - )?; - // Insert the row into the page manager. let (hash, ptr) = self.insert_internal(blob_store, row)?; + // Insert row into indices and check unique constraints. // SAFETY: We just inserted `ptr`, so it must be present. + unsafe { + self.insert_into_indices(blob_store, ptr)?; + } + + // SAFETY: We just inserted `ptr`, + // and `insert_into_indices` didn't remove it, + // so it must be present. let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, ptr) }; + Ok((hash, row_ref)) + } - // Insert row into indices. + /// Insert row identified by `ptr` into indices. + /// This also checks unique constraints. + /// Deletes the row if there were any violations. + /// + /// SAFETY: `self.is_row_present(row)` must hold. + /// Post-condition: If this method returns `Ok(_)`, the row still exists. + unsafe fn insert_into_indices<'a>( + &'a mut self, + blob_store: &'a mut dyn BlobStore, + ptr: RowPointer, + ) -> Result<(), InsertError> { + let mut index_error = None; for (cols, index) in self.indexes.iter_mut() { - index.insert(cols, row_ref).unwrap(); + // SAFETY: We just inserted `ptr`, so it must be present. + let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, ptr) }; + if index.check_and_insert(cols, row_ref).unwrap().is_some() { + let value = row_ref.project(cols).unwrap(); + let error = InsertError::IndexError(UniqueConstraintViolation::build(&self.schema, index, cols, value)); + index_error = Some(error); + break; + } } - - Ok((hash, row_ref)) + if let Some(err) = index_error { + // Found unique constraint violation. + // Undo the insertion. + // SAFETY: We just inserted `ptr`, so it must be present. + unsafe { + self.delete_unchecked(blob_store, ptr); + } + return Err(err); + } + Ok(()) } /// Insert a `row` into this table. @@ -467,8 +492,7 @@ impl Table { // Do this before the actual deletion, as `index.delete` needs a `RowRef` // so it can extract the appropriate value. for (cols, index) in self.indexes.iter_mut() { - let deleted = index.delete(cols, row_ref).unwrap(); - debug_assert!(deleted); + index.delete(cols, row_ref).unwrap(); } // SAFETY: We've checked above that `self.is_row_present(ptr)`. @@ -745,7 +769,7 @@ impl<'a> RowRef<'a> { /// /// If `cols` contains zero or more than one column, the values of the projected columns are wrapped in a [`ProductValue`]. /// If `cols` is a single column, the value of that column is returned without wrapping in a `ProductValue`. - pub fn project_not_empty(self, cols: &ColList) -> Result { + pub fn project(self, cols: &ColList) -> Result { if let Some(head) = cols.as_singleton() { return self.read_col(head).map_err(|_| head.into()); } @@ -1044,18 +1068,10 @@ pub struct UniqueConstraintViolation { pub value: AlgebraicValue, } -// Private API: -impl Table { +impl UniqueConstraintViolation { /// Returns a unique constraint violation error for the given `index` /// and the `value` that would have been duplicated. - fn build_error_unique( - &self, - index: &BTreeIndex, - cols: &ColList, - value: AlgebraicValue, - ) -> UniqueConstraintViolation { - let schema = self.get_schema(); - + fn build(schema: &TableSchema, index: &BTreeIndex, cols: &ColList, value: AlgebraicValue) -> Self { // Fetch the table name. let table_name = schema.table_name.clone(); @@ -1074,13 +1090,28 @@ impl Table { .index_name .clone(); - UniqueConstraintViolation { + Self { constraint_name, table_name, cols, value, } } +} + +// Private API: +impl Table { + /// Returns a unique constraint violation error for the given `index` + /// and the `value` that would have been duplicated. + fn build_error_unique( + &self, + index: &BTreeIndex, + cols: &ColList, + value: AlgebraicValue, + ) -> UniqueConstraintViolation { + let schema = self.get_schema(); + UniqueConstraintViolation::build(schema, index, cols, value) + } /// Returns a new empty table with the given `schema`, `row_layout`, and `static_layout`s /// and with a specified capacity for the `indexes` of the table. @@ -1287,8 +1318,9 @@ pub(crate) mod test { Err(e) => panic!("Expected UniqueConstraintViolation but found {:?}", e), } - // Second insert did not clear the hash as we had a constraint violation. - assert_eq!(hash_post_ins, *table.inner.pages[pi].unmodified_hash().unwrap()); + // Second insert did clear the hash while we had a constraint violation, + // as constraint checking is done after insertion and then rolled back. + assert_eq!(table.inner.pages[pi].unmodified_hash(), None); } fn insert_retrieve_body(ty: impl Into, val: impl Into) -> TestCaseResult { diff --git a/smoketests/tests/filtering.py b/smoketests/tests/filtering.py index 0951007fd4..6917b11467 100644 --- a/smoketests/tests/filtering.py +++ b/smoketests/tests/filtering.py @@ -10,6 +10,7 @@ class Filtering(Smoketest): id: i32, name: String, + #[unique] nick: String, } @@ -21,11 +22,13 @@ class Filtering(Smoketest): #[spacetimedb::reducer] pub fn insert_person_twice(ctx: &ReducerContext, id: i32, name: String, nick: String) { - ctx.db.person().insert(Person { id, name: name.clone(), nick: nick.clone()} ); - match ctx.db.person().try_insert(Person { id, name: name.clone(), nick: nick.clone()}) { + // We'd like to avoid an error due to a set-semantic error. + let name2 = format!("{name}2"); + ctx.db.person().insert(Person { id, name, nick: nick.clone()} ); + match ctx.db.person().try_insert(Person { id, name: name2, nick: nick.clone()}) { Ok(_) => {}, Err(_) => { - log::info!("UNIQUE CONSTRAINT VIOLATION ERROR: id {}: {}", id, name) + log::info!("UNIQUE CONSTRAINT VIOLATION ERROR: id = {}, nick = {}", id, nick) } } } @@ -245,6 +248,8 @@ def test_filtering(self): self.call("find_identified_person", 23) self.assertIn('IDENTIFIED FOUND: Alice', self.logs(2)) - # Insert row with unique columns twice should fail + # Inserting into a table with unique constraints fails + # when the second row has the same value in the constrained columns as the first row. + # In this case, the table has `#[unique] id` and `#[unique] nick` but not `#[unique] name`. self.call("insert_person_twice", 23, "Alice", "al") - self.assertIn('UNIQUE CONSTRAINT VIOLATION ERROR: id 23: Alice', self.logs(2)) + self.assertIn('UNIQUE CONSTRAINT VIOLATION ERROR: id = 23, nick = al', self.logs(2))