From dfa00c6b5bd6bfc710cd45242c8ae6ca2e21d34f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mario=20Alejandro=20Montoya=20Corte=CC=81s?= Date: Mon, 9 Dec 2024 13:04:26 -0500 Subject: [PATCH] Split Iter & IterByColRange types into Tx and MutTxId versions --- .../locking_tx_datastore/committed_state.rs | 86 +++++++++--- .../datastore/locking_tx_datastore/mut_tx.rs | 11 +- .../locking_tx_datastore/state_view.rs | 132 ++++++++++++------ .../db/datastore/locking_tx_datastore/tx.rs | 7 +- 4 files changed, 158 insertions(+), 78 deletions(-) diff --git a/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs b/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs index 71f133ffb8e..0930dd2ed39 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs @@ -79,7 +79,7 @@ impl StateView for CommittedState { fn iter(&self, table_id: TableId) -> Result> { if self.table_name(table_id).is_some() { - return Ok(Iter::new(table_id, None, self)); + return Ok(Iter::tx(table_id, self)); } Err(TableError::IdNotFound(SystemTable::st_table, table_id.0).into()) } @@ -94,6 +94,7 @@ impl StateView for CommittedState { ) -> Result> { // TODO: Why does this unconditionally return a `Scan` iter, // instead of trying to return a `CommittedIndex` iter? + // Answer: Because CommittedIndexIter::tx_state: Option<&'a TxState> need to be Some to read after reopen Ok(IterByColRange::Scan(ScanIterByColRange::new( self.iter(table_id)?, cols, @@ -628,42 +629,87 @@ impl CommittedState { } } -pub struct CommittedIndexIter<'a> { +pub enum CommittedIndexIter<'a> { + Tx(CommittedIndexIterTx<'a>), + MutTx(CommittedIndexIterMutTx<'a>), +} + +impl<'a> CommittedIndexIter<'a> { + pub(super) fn tx(committed_rows: IndexScanIter<'a>) -> Self { + Self::Tx(CommittedIndexIterTx::new(committed_rows)) + } + + pub(super) fn mut_tx(table_id: TableId, tx_state: &'a TxState, committed_rows: IndexScanIter<'a>) -> Self { + Self::MutTx(CommittedIndexIterMutTx::new(table_id, tx_state, committed_rows)) + } +} + +impl<'a> Iterator for CommittedIndexIter<'a> { + type Item = RowRef<'a>; + + fn next(&mut self) -> Option { + match self { + Self::Tx(iter) => iter.next(), + Self::MutTx(iter) => iter.next(), + } + } +} + +pub struct CommittedIndexIterTx<'a> { + committed_rows: IndexScanIter<'a>, + num_committed_rows_fetched: u64, +} + +impl<'a> CommittedIndexIterTx<'a> { + pub(super) fn new(committed_rows: IndexScanIter<'a>) -> Self { + Self { + committed_rows, + num_committed_rows_fetched: 0, + } + } +} + +impl<'a> Iterator for CommittedIndexIterTx<'a> { + type Item = RowRef<'a>; + + fn next(&mut self) -> Option { + if let Some(row_ref) = self.committed_rows.next() { + // TODO(metrics): This doesn't actually fetch a row. + // Move this counter to `RowRef::read_row`. + self.num_committed_rows_fetched += 1; + return Some(row_ref); + } + + None + } +} + +pub struct CommittedIndexIterMutTx<'a> { table_id: TableId, - tx_state: Option<&'a TxState>, - #[allow(dead_code)] - committed_state: &'a CommittedState, + tx_state: &'a TxState, committed_rows: IndexScanIter<'a>, num_committed_rows_fetched: u64, } -impl<'a> CommittedIndexIter<'a> { - pub(super) fn new( - table_id: TableId, - tx_state: Option<&'a TxState>, - committed_state: &'a CommittedState, - committed_rows: IndexScanIter<'a>, - ) -> Self { +impl<'a> CommittedIndexIterMutTx<'a> { + pub(super) fn new(table_id: TableId, tx_state: &'a TxState, committed_rows: IndexScanIter<'a>) -> Self { Self { table_id, tx_state, - committed_state, committed_rows, num_committed_rows_fetched: 0, } } } -impl<'a> Iterator for CommittedIndexIter<'a> { +impl<'a> Iterator for CommittedIndexIterMutTx<'a> { type Item = RowRef<'a>; fn next(&mut self) -> Option { - if let Some(row_ref) = self.committed_rows.find(|row_ref| { - !self - .tx_state - .map(|tx_state| tx_state.is_deleted(self.table_id, row_ref.pointer())) - .unwrap_or(false) - }) { + if let Some(row_ref) = self + .committed_rows + .find(|row_ref| !self.tx_state.is_deleted(self.table_id, row_ref.pointer())) + { // TODO(metrics): This doesn't actually fetch a row. // Move this counter to `RowRef::read_row`. self.num_committed_rows_fetched += 1; diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs index 54c5c2080a7..45d095e57cd 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs @@ -1424,11 +1424,7 @@ impl StateView for MutTxId { fn iter(&self, table_id: TableId) -> Result> { if self.table_name(table_id).is_some() { - return Ok(Iter::new( - table_id, - Some(&self.tx_state), - &self.committed_state_write_lock, - )); + return Ok(Iter::mut_tx(table_id, &self.tx_state, &self.committed_state_write_lock)); } Err(TableError::IdNotFound(SystemTable::st_table, table_id.0).into()) } @@ -1463,10 +1459,9 @@ impl StateView for MutTxId { // Either the current transaction has not modified this table, or the table is not // indexed. match self.committed_state_write_lock.index_seek(table_id, &cols, &range) { - Some(committed_rows) => Ok(IterByColRange::CommittedIndex(CommittedIndexIter::new( + Some(committed_rows) => Ok(IterByColRange::CommittedIndex(CommittedIndexIter::mut_tx( table_id, - Some(&self.tx_state), - &self.committed_state_write_lock, + &self.tx_state, committed_rows, ))), None => { diff --git a/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs b/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs index 14166e137ca..c73d1db7abb 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs @@ -149,35 +149,43 @@ pub trait StateView { } } -pub struct Iter<'a> { - table_id: TableId, - tx_state_del: Option<&'a DeleteTable>, +pub struct IterMutTx<'a> { tx_state_ins: Option<(&'a Table, &'a HashMapBlobStore)>, - committed_state: &'a CommittedState, stage: ScanStage<'a>, } -impl<'a> Iter<'a> { - pub(super) fn new(table_id: TableId, tx_state: Option<&'a TxState>, committed_state: &'a CommittedState) -> Self { - let tx_state_ins = tx_state.and_then(|tx| { - let ins = tx.insert_tables.get(&table_id)?; - let bs = &tx.blob_store; - Some((ins, bs)) - }); - let tx_state_del = tx_state.and_then(|tx| tx.delete_tables.get(&table_id)); - Self { - table_id, - tx_state_ins, - tx_state_del, - committed_state, - stage: ScanStage::Start, - } +impl<'a> IterMutTx<'a> { + pub(super) fn new(table_id: TableId, tx_state: &'a TxState, committed_state: &'a CommittedState) -> Self { + let tx_state_ins = tx_state + .insert_tables + .get(&table_id) + .map(|table| (table, &tx_state.blob_store)); + + let tx_state_del = tx_state.delete_tables.get(&table_id); + + let stage = if let Some(table) = committed_state.tables.get(&table_id) { + // The committed state has changes for this table. + let iter = table.scan_rows(&committed_state.blob_store); + if let Some(del_tables) = tx_state_del.and_then(|del| if del.is_empty() { None } else { Some(del) }) { + // There are deletes in the tx state + // so we must exclude those (1b). + ScanStage::CommittedWithTxDeletes { iter, del_tables } + } else { + // There are no deletes in the tx state + // so we don't need to care about those (1a). + ScanStage::CommittedNoTxDeletes { iter } + } + } else { + ScanStage::Continue + }; + + Self { tx_state_ins, stage } } } enum ScanStage<'a> { - /// We haven't decided yet where to yield from. - Start, + /// Continue to the next stage. + Continue, /// Yielding rows from the current tx. CurrentTx { iter: TableScanIter<'a> }, /// Yielding rows from the committed state @@ -186,18 +194,20 @@ enum ScanStage<'a> { /// Yielding rows from the committed state /// but there are deleted rows in the tx state, /// so we must check against those. - CommittedWithTxDeletes { iter: TableScanIter<'a> }, + CommittedWithTxDeletes { + iter: TableScanIter<'a>, + del_tables: &'a DeleteTable, + }, } -impl<'a> Iterator for Iter<'a> { +impl<'a> Iterator for IterMutTx<'a> { type Item = RowRef<'a>; + #[inline] fn next(&mut self) -> Option { - let table_id = self.table_id; - // The finite state machine goes: // - // Start + // Continue // | // |--> CurrentTx -------------------------------\ // | ^ | @@ -209,22 +219,7 @@ impl<'a> Iterator for Iter<'a> { loop { match &mut self.stage { - ScanStage::Start => { - if let Some(table) = self.committed_state.tables.get(&table_id) { - // The committed state has changes for this table. - let iter = table.scan_rows(&self.committed_state.blob_store); - self.stage = if self.tx_state_del.is_some() { - // There are no deletes in the tx state - // so we don't need to care about those (1a). - ScanStage::CommittedWithTxDeletes { iter } - } else { - // There are deletes in the tx state - // so we must exclude those (1b). - ScanStage::CommittedNoTxDeletes { iter } - }; - continue; - } - } + ScanStage::Continue => {} ScanStage::CommittedNoTxDeletes { iter } => { // (1a) Go through the committed state for this table // but do not consider deleted rows. @@ -232,7 +227,7 @@ impl<'a> Iterator for Iter<'a> { return next; } } - ScanStage::CommittedWithTxDeletes { iter } => { + ScanStage::CommittedWithTxDeletes { iter, del_tables } => { // (1b) Check the committed row's state in the current tx. // If it's been deleted, skip it. // If it's still present, yield it. @@ -259,7 +254,6 @@ impl<'a> Iterator for Iter<'a> { // // As a result, in MVCC, this branch will need to check if the `row_ref` // also exists in the `tx_state.insert_tables` and ensure it is yielded only once. - let del_tables = unsafe { self.tx_state_del.unwrap_unchecked() }; if let next @ Some(_) = iter.find(|row_ref| !del_tables.contains(&row_ref.pointer())) { return next; } @@ -279,6 +273,56 @@ impl<'a> Iterator for Iter<'a> { } } +pub struct IterTx<'a> { + iter: TableScanIter<'a>, +} + +impl<'a> IterTx<'a> { + pub(super) fn new(table_id: TableId, committed_state: &'a CommittedState) -> Self { + // The table_id was validated to exist in the committed state. + let table = committed_state + .tables + .get(&table_id) + .expect("table_id must exist in committed state"); + let iter = table.scan_rows(&committed_state.blob_store); + Self { iter } + } +} + +impl<'a> Iterator for IterTx<'a> { + type Item = RowRef<'a>; + + #[inline] + fn next(&mut self) -> Option { + self.iter.next() + } +} + +pub enum Iter<'a> { + Tx(IterTx<'a>), + MutTx(IterMutTx<'a>), +} + +impl<'a> Iter<'a> { + pub(super) fn tx(table_id: TableId, committed_state: &'a CommittedState) -> Self { + Iter::Tx(IterTx::new(table_id, committed_state)) + } + pub(super) fn mut_tx(table_id: TableId, tx_state: &'a TxState, committed_state: &'a CommittedState) -> Self { + Iter::MutTx(IterMutTx::new(table_id, tx_state, committed_state)) + } +} + +impl<'a> Iterator for Iter<'a> { + type Item = RowRef<'a>; + + fn next(&mut self) -> Option { + match self { + Iter::Tx(iter) => iter.next(), + Iter::MutTx(iter) => iter.next(), + } + } +} + pub struct IndexSeekIterMutTxId<'a> { pub(super) table_id: TableId, pub(super) tx_state: &'a TxState, diff --git a/crates/core/src/db/datastore/locking_tx_datastore/tx.rs b/crates/core/src/db/datastore/locking_tx_datastore/tx.rs index b9be047a55c..b785d7c36a6 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/tx.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/tx.rs @@ -46,12 +46,7 @@ impl StateView for TxId { range: R, ) -> Result> { match self.committed_state_shared_lock.index_seek(table_id, &cols, &range) { - Some(committed_rows) => Ok(IterByColRange::CommittedIndex(CommittedIndexIter::new( - table_id, - None, - &self.committed_state_shared_lock, - committed_rows, - ))), + Some(committed_rows) => Ok(IterByColRange::CommittedIndex(CommittedIndexIter::tx(committed_rows))), None => self .committed_state_shared_lock .iter_by_col_range(table_id, cols, range),