Skip to content

Commit

Permalink
move unique constraint checking until after optimistic insertion
Browse files Browse the repository at this point in the history
  • Loading branch information
Centril committed Dec 4, 2024
1 parent d9de1e3 commit f45b027
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 96 deletions.
53 changes: 30 additions & 23 deletions crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1196,35 +1196,27 @@ impl MutTxId {
pub(super) fn insert_row_internal(&mut self, table_id: TableId, row: &ProductValue) -> Result<RowRefInsertion<'_>> {
let commit_table = self.committed_state_write_lock.get_table(table_id);

// Check for constraint violations as early as possible,
// to ensure that `UniqueConstraintViolation` errors have precedence over other errors.
// `tx_table.insert` will later perform the same check on the tx table,
// so this method needs only to check the committed state.
if let Some(commit_table) = commit_table {
commit_table
.check_unique_constraints(row, |maybe_conflict| self.tx_state.is_deleted(table_id, maybe_conflict))
.map_err(IndexError::from)?;
}

// Get the insert table, so we can write the row into it.
let (tx_table, tx_blob_store, _, delete_table) = self
.tx_state
.get_table_and_blob_store_or_maybe_create_from(table_id, commit_table)
.ok_or(TableError::IdNotFoundState(table_id))?;

match tx_table.insert(tx_blob_store, row) {
Ok((hash, row_ref)) => {
// `row` not previously present in insert tables,
// but may still be a set-semantic conflict with a row
// in the committed state.

let ptr = row_ref.pointer();
Ok((tx_row_hash, tx_row_ref)) => {
let tx_row_ptr = tx_row_ref.pointer();
if let Some(commit_table) = commit_table {
// Safety:
// The `tx_row_ref` was not previously present in insert tables,
// but may still be a set-semantic conflict with a row
// in the committed state.

// SAFETY:
// - `commit_table` and `tx_table` use the same schema
// because `tx_table` is derived from `commit_table`.
// - `ptr` and `hash` are correct because we just got them from `tx_table.insert`.
if let Some(committed_ptr) = unsafe { Table::find_same_row(commit_table, tx_table, ptr, hash) } {
// - `tx_row_ptr` and `tx_row_hash` are correct because we just got them from `tx_table.insert`.
if let Some(commit_ptr) =
unsafe { Table::find_same_row(commit_table, tx_table, tx_row_ptr, tx_row_hash) }
{
// If `row` was already present in the committed state,
// either this is a set-semantic duplicate,
// or the row is marked as deleted, so we will undelete it
Expand All @@ -1250,26 +1242,41 @@ impl MutTxId {
// - Insert Row A
// This is impossible to recover if `Running 2` elides its insert.
tx_table
.delete(tx_blob_store, ptr, |_| ())
.delete(tx_blob_store, tx_row_ptr, |_| ())
.expect("Failed to delete a row we just inserted");

// It's possible that `row` appears in the committed state,
// but is marked as deleted.
// In this case, undelete it, so it remains in the committed state.
delete_table.remove(&committed_ptr);
delete_table.remove(&commit_ptr);

// No new row was inserted, but return `committed_ptr`.
let blob_store = &self.committed_state_write_lock.blob_store;
return Ok(RowRefInsertion::Existed(
// SAFETY: `find_same_row` told us that `ptr` refers to a valid row in `commit_table`.
unsafe { commit_table.get_row_ref_unchecked(blob_store, committed_ptr) },
unsafe { commit_table.get_row_ref_unchecked(blob_store, commit_ptr) },
));
}

// SAFETY: `ptr` came from `tx_table.insert` just now without any interleaving calls.
let tx_row_ref = unsafe { tx_table.get_row_ref_unchecked(tx_blob_store, tx_row_ptr) };

// The `tx_row_ref` did not violate a unique constraint *within* the `tx_table`,
// but it could do so wrt., `commit_table`,
// assuming the conflicting row hasn't been deleted since.
// Ensure that it doesn't, or roll back the insertion.
if let Err(e) = commit_table
.check_unique_constraints(tx_row_ref, |commit_ptr| delete_table.contains(&commit_ptr))
{
// There was a constraint violation, so undo the insertion.
tx_table.delete(tx_blob_store, tx_row_ptr, |_| {});
return Err(IndexError::from(e).into());
}
}

Ok(RowRefInsertion::Inserted(unsafe {
// SAFETY: `ptr` came from `tx_table.insert` just now without any interleaving calls.
tx_table.get_row_ref_unchecked(tx_blob_store, ptr)
tx_table.get_row_ref_unchecked(tx_blob_store, tx_row_ptr)
}))
}
// `row` previously present in insert tables; do nothing but return `ptr`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ impl<'a, R: RangeBounds<AlgebraicValue>> Iterator for ScanIterByColRange<'a, R>

fn next(&mut self) -> Option<Self::Item> {
for row_ref in &mut self.scan_iter {
let value = row_ref.project_not_empty(&self.cols).unwrap();
let value = row_ref.project(&self.cols).unwrap();
if self.range.contains(&value) {
return Some(row_ref);
}
Expand Down
25 changes: 8 additions & 17 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1601,7 +1601,7 @@ mod tests {
fn table_indexed(is_unique: bool) -> TableSchema {
table(
"MyTable",
ProductType::from([("my_col", AlgebraicType::I64)]),
ProductType::from([("my_col", AlgebraicType::I64), ("other_col", AlgebraicType::I64)]),
|builder| {
let builder = builder.with_index(
RawIndexAlgorithm::BTree { columns: 0.into() },
Expand Down Expand Up @@ -1907,8 +1907,8 @@ mod tests {
"Index not created"
);

stdb.insert(&mut tx, table_id, product![1i64])?;
stdb.insert(&mut tx, table_id, product![1i64])?;
stdb.insert(&mut tx, table_id, product![1i64, 1i64])?;
stdb.insert(&mut tx, table_id, product![1i64, 1i64])?;

assert_eq!(collect_from_sorted(&stdb, &tx, table_id, 0i64)?, vec![1]);
Ok(())
Expand Down Expand Up @@ -1978,21 +1978,12 @@ mod tests {
"Index not created"
);

stdb.insert(&mut tx, table_id, product![1i64])
stdb.insert(&mut tx, table_id, product![1i64, 0i64])
.expect("stdb.insert failed");
match stdb.insert(&mut tx, table_id, product![1i64]) {
Ok(_) => {
panic!("Allow to insert duplicate row")
}
Err(DBError::Index(err)) => match err {
IndexError::UniqueConstraintViolation { .. } => {}
err => {
panic!("Expected error `UniqueConstraintViolation`, got {err}")
}
},
err => {
panic!("Expected error `UniqueConstraintViolation`, got {err:?}")
}
match stdb.insert(&mut tx, table_id, product![1i64, 1i64]) {
Ok(_) => panic!("Allow to insert duplicate row"),
Err(DBError::Index(IndexError::UniqueConstraintViolation { .. })) => {}
Err(err) => panic!("Expected error `UniqueConstraintViolation`, got {err}"),
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion crates/sats/src/algebraic_value/de.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl ValueDeserializer {
Self { val }
}

/// Converts `&AlgebraicValue` to `&ValueDeserialize`.
/// Converts `&AlgebraicValue` to `&ValueDeserializer`.
pub fn from_ref(val: &AlgebraicValue) -> &Self {
// SAFETY: The conversion is OK due to `repr(transparent)`.
unsafe { &*(val as *const AlgebraicValue as *const ValueDeserializer) }
Expand Down
101 changes: 81 additions & 20 deletions crates/table/src/btree_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,57 @@ impl TypedIndex {
Self::String(this) => insert_at_type(this, cols, row_ref),

Self::AlgebraicValue(this) => {
let key = row_ref.project_not_empty(cols)?;
let key = row_ref.project(cols)?;
this.insert(key, row_ref.pointer());
Ok(())
}
}
}

/// Add the row referred to by `row_ref` to the index `self`,
/// which must be keyed at `cols`.
///
/// If `cols` is inconsistent with `self`,
/// or the `row_ref` has a row type other than that used for `self`,
/// this will behave oddly; it may return an error,
/// or may insert a nonsense value into the index.
/// Note, however, that it will not invoke undefined behavior.
///
/// Assumes that this is a unique constraint and returns `Ok(Some(existing_row))` if it's violated.
fn insert_unique(&mut self, cols: &ColList, row_ref: RowRef<'_>) -> Result<Option<RowPointer>, InvalidFieldError> {
fn insert_at_type<T: Ord + ReadColumn>(
this: &mut Index<T>,
cols: &ColList,
row_ref: RowRef<'_>,
) -> Result<Option<RowPointer>, InvalidFieldError> {
let col_pos = cols.as_singleton().unwrap();
let key = row_ref.read_col(col_pos).map_err(|_| col_pos)?;
Ok(this.insert_unique(key, row_ref.pointer()).copied())
}
let unique_violation = match self {
Self::Bool(this) => insert_at_type(this, cols, row_ref),
Self::U8(this) => insert_at_type(this, cols, row_ref),
Self::I8(this) => insert_at_type(this, cols, row_ref),
Self::U16(this) => insert_at_type(this, cols, row_ref),
Self::I16(this) => insert_at_type(this, cols, row_ref),
Self::U32(this) => insert_at_type(this, cols, row_ref),
Self::I32(this) => insert_at_type(this, cols, row_ref),
Self::U64(this) => insert_at_type(this, cols, row_ref),
Self::I64(this) => insert_at_type(this, cols, row_ref),
Self::U128(this) => insert_at_type(this, cols, row_ref),
Self::I128(this) => insert_at_type(this, cols, row_ref),
Self::U256(this) => insert_at_type(this, cols, row_ref),
Self::I256(this) => insert_at_type(this, cols, row_ref),
Self::String(this) => insert_at_type(this, cols, row_ref),

Self::AlgebraicValue(this) => {
let key = row_ref.project(cols)?;
Ok(this.insert_unique(key, row_ref.pointer()).copied())
}
}?;
Ok(unique_violation)
}

/// Remove the row referred to by `row_ref` from the index `self`,
/// which must be keyed at `cols`.
///
Expand Down Expand Up @@ -229,7 +273,7 @@ impl TypedIndex {
Self::String(this) => delete_at_type(this, cols, row_ref),

Self::AlgebraicValue(this) => {
let key = row_ref.project_not_empty(cols)?;
let key = row_ref.project(cols)?;
Ok(this.delete(&key, &row_ref.pointer()))
}
}
Expand Down Expand Up @@ -407,29 +451,34 @@ impl BTreeIndex {

/// Inserts `ptr` with the value `row` to this index.
/// This index will extract the necessary values from `row` based on `self.cols`.
///
/// Return false if `ptr` was already indexed prior to this call.
pub fn insert(&mut self, cols: &ColList, row_ref: RowRef<'_>) -> Result<(), InvalidFieldError> {
self.idx.insert(cols, row_ref)
}

/// Inserts `ptr` with the value `row` to this index.
/// This index will extract the necessary values from `row` based on `self.cols`.
///
/// Returns `Ok(Some(existing_row))` if this insertion would violate a unique constraint.
pub fn check_and_insert(
&mut self,
cols: &ColList,
row_ref: RowRef<'_>,
) -> Result<Option<RowPointer>, InvalidFieldError> {
if self.is_unique {
self.idx.insert_unique(cols, row_ref)
} else {
self.idx.insert(cols, row_ref)?;
Ok(None)
}
}

/// Deletes `ptr` with its indexed value `col_value` from this index.
///
/// Returns whether `ptr` was present.
pub fn delete(&mut self, cols: &ColList, row_ref: RowRef<'_>) -> Result<bool, InvalidFieldError> {
self.idx.delete(cols, row_ref)
}

/// Returns an iterator over the rows that would violate the unique constraint of this index,
/// if `row` were inserted,
/// or `None`, if this index doesn't have a unique constraint.
pub fn get_rows_that_violate_unique_constraint<'a>(
&'a self,
row: &'a AlgebraicValue,
) -> Option<BTreeIndexRangeIter<'a>> {
self.is_unique.then(|| self.seek(row))
}

/// Returns whether `value` is in this index.
pub fn contains_any(&self, value: &AlgebraicValue) -> bool {
self.seek(value).next().is_some()
Expand All @@ -445,12 +494,13 @@ impl BTreeIndex {
}

/// Extends [`BTreeIndex`] with `rows`.
/// Returns whether every element in `rows` was inserted.
pub fn build_from_rows<'table>(
&mut self,
cols: &ColList,
rows: impl IntoIterator<Item = RowRef<'table>>,
) -> Result<(), InvalidFieldError> {
// TODO(centril, correctness): consider `self.is_unique`.
// Should not be able to add an index which would cause unique constraint violations.
for row_ref in rows {
self.insert(cols, row_ref)?;
}
Expand Down Expand Up @@ -516,6 +566,16 @@ mod test {
!index.is_unique || index.contains_any(&get_fields(cols, row))
}

/// Returns an iterator over the rows that would violate the unique constraint of this index,
/// if `row` were inserted,
/// or `None`, if this index doesn't have a unique constraint.
fn get_rows_that_violate_unique_constraint<'a>(
index: &'a BTreeIndex,
row: &'a AlgebraicValue,
) -> Option<BTreeIndexRangeIter<'a>> {
index.is_unique.then(|| index.seek(row))
}

proptest! {
#[test]
fn remove_nonexistent_noop(((ty, cols, pv), is_unique) in (gen_row_and_cols(), any::<bool>())) {
Expand All @@ -538,7 +598,7 @@ mod test {
prop_assert_eq!(index.idx.len(), 0);
prop_assert_eq!(index.contains_any(&value), false);

index.insert(&cols, row_ref).unwrap();
prop_assert_eq!(index.check_and_insert(&cols, row_ref).unwrap(), None);
prop_assert_eq!(index.idx.len(), 1);
prop_assert_eq!(index.contains_any(&value), true);

Expand All @@ -559,20 +619,21 @@ mod test {
prop_assert_eq!(index.idx.len(), 0);
prop_assert_eq!(violates_unique_constraint(&index, &cols, &pv), false);
prop_assert_eq!(
index.get_rows_that_violate_unique_constraint(&value).unwrap().collect::<Vec<_>>(),
get_rows_that_violate_unique_constraint(&index, &value).unwrap().collect::<Vec<_>>(),
[]
);

// Insert.
index.insert(&cols, row_ref).unwrap();
prop_assert_eq!(index.check_and_insert(&cols, row_ref).unwrap(), None);

// Inserting again would be a problem.
prop_assert_eq!(index.idx.len(), 1);
prop_assert_eq!(violates_unique_constraint(&index, &cols, &pv), true);
prop_assert_eq!(
index.get_rows_that_violate_unique_constraint(&value).unwrap().collect::<Vec<_>>(),
get_rows_that_violate_unique_constraint(&index, &value).unwrap().collect::<Vec<_>>(),
[row_ref.pointer()]
);
prop_assert_eq!(index.check_and_insert(&cols, row_ref).unwrap(), Some(row_ref.pointer()));
}

#[test]
Expand All @@ -596,7 +657,7 @@ mod test {
let row = product![x];
let row_ref = table.insert(&mut blob_store, &row).unwrap().1;
val_to_ptr.insert(x, row_ref.pointer());
index.insert(&cols, row_ref).unwrap();
prop_assert_eq!(index.check_and_insert(&cols, row_ref).unwrap(), None);
}

fn test_seek(index: &BTreeIndex, val_to_ptr: &HashMap<u64, RowPointer>, range: impl RangeBounds<AlgebraicValue>, expect: impl IntoIterator<Item = u64>) -> TestCaseResult {
Expand Down
14 changes: 14 additions & 0 deletions crates/table/src/btree_index/multimap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,20 @@ impl<K: Ord, V: Ord> MultiMap<K, V> {
self.map.entry(key).or_default().push(val);
}

/// Inserts the relation `key -> val` to this multimap.
///
/// Returns back the value if the `key` was already present in the map.
pub fn insert_unique(&mut self, key: K, val: V) -> Option<&V> {
// TODO(perf, centril): don't use a multimap at all for unique indices.
let vals = self.map.entry(key).or_default();
if vals.is_empty() {
vals.push(val);
None
} else {
Some(&vals[0])
}
}

/// Deletes `key -> val` from this multimap.
///
/// Returns whether `key -> val` was present.
Expand Down
Loading

0 comments on commit f45b027

Please sign in to comment.