Skip to content

Commit

Permalink
add tests for MutTxId::update and fix it
Browse files Browse the repository at this point in the history
  • Loading branch information
Centril committed Jan 21, 2025
1 parent 9e692b8 commit 6770f62
Show file tree
Hide file tree
Showing 6 changed files with 538 additions and 161 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ paste = "1.0"
petgraph = { version = "0.6.5", default-features = false }
pin-project-lite = "0.2.9"
postgres-types = "0.2.5"
pretty_assertions = "1.4"
pretty_assertions = { version = "1.4", features = ["unstable"] }
proc-macro2 = "1.0"
prometheus = "0.13.0"
proptest = "1.4"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use super::{
tx_state::{DeleteTable, IndexIdMap, RemovedIndexIdSet, TxState},
IterByColEqTx,
};
use crate::db::datastore::locking_tx_datastore::state_view::{IterTx, ScanIterByColRangeTx};
use crate::{
db::datastore::locking_tx_datastore::state_view::{IterTx, ScanIterByColRangeTx},
error::IndexError,
};
use crate::{
db::{
datastore::{
Expand Down Expand Up @@ -132,7 +135,8 @@ fn ignore_duplicate_insert_error<T>(res: std::result::Result<T, InsertError>) ->
match res {
Ok(_) => Ok(()),
Err(InsertError::Duplicate(_)) => Ok(()),
Err(err) => Err(err.into()),
Err(InsertError::Bflatn(e)) => Err(e.into()),
Err(InsertError::IndexError(e)) => Err(IndexError::from(e).into()),
}
}

Expand Down Expand Up @@ -321,7 +325,7 @@ impl CommittedState {
let skip_index_update = true;
table
.delete_equal_row(blob_store, rel, skip_index_update)
.map_err(TableError::Insert)?
.map_err(TableError::Bflatn)?
.ok_or_else(|| anyhow!("Delete for non-existent row when replaying transaction"))?;
Ok(())
}
Expand All @@ -333,8 +337,11 @@ impl CommittedState {
row: &ProductValue,
) -> Result<()> {
let (table, blob_store) = self.get_table_and_blob_store_or_create(table_id, schema);
table.insert_for_replay(blob_store, row).map_err(TableError::Insert)?;
Ok(())
table.insert_for_replay(blob_store, row).map(drop).map_err(|e| match e {
InsertError::Bflatn(e) => TableError::Bflatn(e).into(),
InsertError::Duplicate(e) => TableError::Duplicate(e).into(),
InsertError::IndexError(e) => IndexError::UniqueConstraintViolation(e).into(),
})
}

pub(super) fn build_sequence_state(&mut self, sequence_state: &mut SequencesState) -> Result<()> {
Expand Down Expand Up @@ -511,16 +518,16 @@ impl CommittedState {
pub(super) fn merge(&mut self, tx_state: TxState, ctx: &ExecutionContext) -> TxData {
let mut tx_data = TxData::default();

// First, merge index id fast-lookup map changes and delete indices.
self.merge_index_map(tx_state.index_id_map, tx_state.index_id_map_removals.as_deref());

// First, apply deletes. This will free up space in the committed tables.
self.merge_apply_deletes(&mut tx_data, tx_state.delete_tables);

// Then, apply inserts. This will re-fill the holes freed by deletions
// before allocating new pages.
self.merge_apply_inserts(&mut tx_data, tx_state.insert_tables, tx_state.blob_store);

// Merge index id fast-lookup map changes.
self.merge_index_map(tx_state.index_id_map, tx_state.index_id_map_removals.as_deref());

// If the TX will be logged, record its projected tx offset,
// then increment the counter.
if self.tx_consumes_offset(&tx_data, ctx) {
Expand Down Expand Up @@ -613,9 +620,21 @@ impl CommittedState {
}

fn merge_index_map(&mut self, index_id_map: IndexIdMap, index_id_map_removals: Option<&RemovedIndexIdSet>) {
for index_id in index_id_map_removals.into_iter().flatten() {
self.index_id_map.remove(index_id);
// Remove indices that tx-state removed.
// It's not necessarily the case that the index already existed in the committed state.
for (index_id, table_id) in index_id_map_removals
.into_iter()
.flatten()
.filter_map(|index_id| self.index_id_map.remove(index_id).map(|x| (*index_id, x)))
{
assert!(self
.tables
.get_mut(&table_id)
.expect("table to delete index from should exist")
.delete_index(&self.blob_store, index_id));
}

// Add the ones tx-state added.
self.index_id_map.extend(index_id_map);
}

Expand Down
Loading

0 comments on commit 6770f62

Please sign in to comment.