Skip to content

Commit

Permalink
Split Iter & IterByColRange types into Tx and MutTxId versions
Browse files Browse the repository at this point in the history
  • Loading branch information
mamcx committed Dec 9, 2024
1 parent 96a3871 commit dfa00c6
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl StateView for CommittedState {

fn iter(&self, table_id: TableId) -> Result<Iter<'_>> {
if self.table_name(table_id).is_some() {
return Ok(Iter::new(table_id, None, self));
return Ok(Iter::tx(table_id, self));
}
Err(TableError::IdNotFound(SystemTable::st_table, table_id.0).into())
}
Expand All @@ -94,6 +94,7 @@ impl StateView for CommittedState {
) -> Result<IterByColRange<'_, R>> {
// TODO: Why does this unconditionally return a `Scan` iter,
// instead of trying to return a `CommittedIndex` iter?
// Answer: Because CommittedIndexIter::tx_state: Option<&'a TxState> need to be Some to read after reopen
Ok(IterByColRange::Scan(ScanIterByColRange::new(
self.iter(table_id)?,
cols,
Expand Down Expand Up @@ -628,42 +629,87 @@ impl CommittedState {
}
}

pub struct CommittedIndexIter<'a> {
pub enum CommittedIndexIter<'a> {
Tx(CommittedIndexIterTx<'a>),
MutTx(CommittedIndexIterMutTx<'a>),
}

impl<'a> CommittedIndexIter<'a> {
pub(super) fn tx(committed_rows: IndexScanIter<'a>) -> Self {
Self::Tx(CommittedIndexIterTx::new(committed_rows))
}

pub(super) fn mut_tx(table_id: TableId, tx_state: &'a TxState, committed_rows: IndexScanIter<'a>) -> Self {
Self::MutTx(CommittedIndexIterMutTx::new(table_id, tx_state, committed_rows))
}
}

impl<'a> Iterator for CommittedIndexIter<'a> {
type Item = RowRef<'a>;

fn next(&mut self) -> Option<Self::Item> {
match self {
Self::Tx(iter) => iter.next(),
Self::MutTx(iter) => iter.next(),
}
}
}

pub struct CommittedIndexIterTx<'a> {
committed_rows: IndexScanIter<'a>,
num_committed_rows_fetched: u64,
}

impl<'a> CommittedIndexIterTx<'a> {
pub(super) fn new(committed_rows: IndexScanIter<'a>) -> Self {
Self {
committed_rows,
num_committed_rows_fetched: 0,
}
}
}

impl<'a> Iterator for CommittedIndexIterTx<'a> {
type Item = RowRef<'a>;

fn next(&mut self) -> Option<Self::Item> {
if let Some(row_ref) = self.committed_rows.next() {
// TODO(metrics): This doesn't actually fetch a row.
// Move this counter to `RowRef::read_row`.
self.num_committed_rows_fetched += 1;
return Some(row_ref);
}

None
}
}

pub struct CommittedIndexIterMutTx<'a> {
table_id: TableId,
tx_state: Option<&'a TxState>,
#[allow(dead_code)]
committed_state: &'a CommittedState,
tx_state: &'a TxState,
committed_rows: IndexScanIter<'a>,
num_committed_rows_fetched: u64,
}

impl<'a> CommittedIndexIter<'a> {
pub(super) fn new(
table_id: TableId,
tx_state: Option<&'a TxState>,
committed_state: &'a CommittedState,
committed_rows: IndexScanIter<'a>,
) -> Self {
impl<'a> CommittedIndexIterMutTx<'a> {
pub(super) fn new(table_id: TableId, tx_state: &'a TxState, committed_rows: IndexScanIter<'a>) -> Self {
Self {
table_id,
tx_state,
committed_state,
committed_rows,
num_committed_rows_fetched: 0,
}
}
}

impl<'a> Iterator for CommittedIndexIter<'a> {
impl<'a> Iterator for CommittedIndexIterMutTx<'a> {
type Item = RowRef<'a>;

fn next(&mut self) -> Option<Self::Item> {
if let Some(row_ref) = self.committed_rows.find(|row_ref| {
!self
.tx_state
.map(|tx_state| tx_state.is_deleted(self.table_id, row_ref.pointer()))
.unwrap_or(false)
}) {
if let Some(row_ref) = self
.committed_rows
.find(|row_ref| !self.tx_state.is_deleted(self.table_id, row_ref.pointer()))
{
// TODO(metrics): This doesn't actually fetch a row.
// Move this counter to `RowRef::read_row`.
self.num_committed_rows_fetched += 1;
Expand Down
11 changes: 3 additions & 8 deletions crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1424,11 +1424,7 @@ impl StateView for MutTxId {

fn iter(&self, table_id: TableId) -> Result<Iter<'_>> {
if self.table_name(table_id).is_some() {
return Ok(Iter::new(
table_id,
Some(&self.tx_state),
&self.committed_state_write_lock,
));
return Ok(Iter::mut_tx(table_id, &self.tx_state, &self.committed_state_write_lock));
}
Err(TableError::IdNotFound(SystemTable::st_table, table_id.0).into())
}
Expand Down Expand Up @@ -1463,10 +1459,9 @@ impl StateView for MutTxId {
// Either the current transaction has not modified this table, or the table is not
// indexed.
match self.committed_state_write_lock.index_seek(table_id, &cols, &range) {
Some(committed_rows) => Ok(IterByColRange::CommittedIndex(CommittedIndexIter::new(
Some(committed_rows) => Ok(IterByColRange::CommittedIndex(CommittedIndexIter::mut_tx(
table_id,
Some(&self.tx_state),
&self.committed_state_write_lock,
&self.tx_state,
committed_rows,
))),
None => {
Expand Down
132 changes: 88 additions & 44 deletions crates/core/src/db/datastore/locking_tx_datastore/state_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,35 +149,43 @@ pub trait StateView {
}
}

pub struct Iter<'a> {
table_id: TableId,
tx_state_del: Option<&'a DeleteTable>,
pub struct IterMutTx<'a> {
tx_state_ins: Option<(&'a Table, &'a HashMapBlobStore)>,
committed_state: &'a CommittedState,
stage: ScanStage<'a>,
}

impl<'a> Iter<'a> {
pub(super) fn new(table_id: TableId, tx_state: Option<&'a TxState>, committed_state: &'a CommittedState) -> Self {
let tx_state_ins = tx_state.and_then(|tx| {
let ins = tx.insert_tables.get(&table_id)?;
let bs = &tx.blob_store;
Some((ins, bs))
});
let tx_state_del = tx_state.and_then(|tx| tx.delete_tables.get(&table_id));
Self {
table_id,
tx_state_ins,
tx_state_del,
committed_state,
stage: ScanStage::Start,
}
impl<'a> IterMutTx<'a> {
pub(super) fn new(table_id: TableId, tx_state: &'a TxState, committed_state: &'a CommittedState) -> Self {
let tx_state_ins = tx_state
.insert_tables
.get(&table_id)
.map(|table| (table, &tx_state.blob_store));

let tx_state_del = tx_state.delete_tables.get(&table_id);

let stage = if let Some(table) = committed_state.tables.get(&table_id) {
// The committed state has changes for this table.
let iter = table.scan_rows(&committed_state.blob_store);
if let Some(del_tables) = tx_state_del.and_then(|del| if del.is_empty() { None } else { Some(del) }) {
// There are deletes in the tx state
// so we must exclude those (1b).
ScanStage::CommittedWithTxDeletes { iter, del_tables }
} else {
// There are no deletes in the tx state
// so we don't need to care about those (1a).
ScanStage::CommittedNoTxDeletes { iter }
}
} else {
ScanStage::Continue
};

Self { tx_state_ins, stage }
}
}

enum ScanStage<'a> {
/// We haven't decided yet where to yield from.
Start,
/// Continue to the next stage.
Continue,
/// Yielding rows from the current tx.
CurrentTx { iter: TableScanIter<'a> },
/// Yielding rows from the committed state
Expand All @@ -186,18 +194,20 @@ enum ScanStage<'a> {
/// Yielding rows from the committed state
/// but there are deleted rows in the tx state,
/// so we must check against those.
CommittedWithTxDeletes { iter: TableScanIter<'a> },
CommittedWithTxDeletes {
iter: TableScanIter<'a>,
del_tables: &'a DeleteTable,
},
}

impl<'a> Iterator for Iter<'a> {
impl<'a> Iterator for IterMutTx<'a> {
type Item = RowRef<'a>;

#[inline]
fn next(&mut self) -> Option<Self::Item> {
let table_id = self.table_id;

// The finite state machine goes:
//
// Start
// Continue
// |
// |--> CurrentTx -------------------------------\
// | ^ |
Expand All @@ -209,30 +219,15 @@ impl<'a> Iterator for Iter<'a> {

loop {
match &mut self.stage {
ScanStage::Start => {
if let Some(table) = self.committed_state.tables.get(&table_id) {
// The committed state has changes for this table.
let iter = table.scan_rows(&self.committed_state.blob_store);
self.stage = if self.tx_state_del.is_some() {
// There are no deletes in the tx state
// so we don't need to care about those (1a).
ScanStage::CommittedWithTxDeletes { iter }
} else {
// There are deletes in the tx state
// so we must exclude those (1b).
ScanStage::CommittedNoTxDeletes { iter }
};
continue;
}
}
ScanStage::Continue => {}
ScanStage::CommittedNoTxDeletes { iter } => {
// (1a) Go through the committed state for this table
// but do not consider deleted rows.
if let next @ Some(_) = iter.next() {
return next;
}
}
ScanStage::CommittedWithTxDeletes { iter } => {
ScanStage::CommittedWithTxDeletes { iter, del_tables } => {
// (1b) Check the committed row's state in the current tx.
// If it's been deleted, skip it.
// If it's still present, yield it.
Expand All @@ -259,7 +254,6 @@ impl<'a> Iterator for Iter<'a> {
//
// As a result, in MVCC, this branch will need to check if the `row_ref`
// also exists in the `tx_state.insert_tables` and ensure it is yielded only once.
let del_tables = unsafe { self.tx_state_del.unwrap_unchecked() };
if let next @ Some(_) = iter.find(|row_ref| !del_tables.contains(&row_ref.pointer())) {
return next;
}
Expand All @@ -279,6 +273,56 @@ impl<'a> Iterator for Iter<'a> {
}
}

pub struct IterTx<'a> {
iter: TableScanIter<'a>,
}

impl<'a> IterTx<'a> {
pub(super) fn new(table_id: TableId, committed_state: &'a CommittedState) -> Self {
// The table_id was validated to exist in the committed state.
let table = committed_state
.tables
.get(&table_id)
.expect("table_id must exist in committed state");
let iter = table.scan_rows(&committed_state.blob_store);
Self { iter }
}
}

impl<'a> Iterator for IterTx<'a> {
type Item = RowRef<'a>;

#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.iter.next()
}
}

pub enum Iter<'a> {
Tx(IterTx<'a>),
MutTx(IterMutTx<'a>),
}

impl<'a> Iter<'a> {
pub(super) fn tx(table_id: TableId, committed_state: &'a CommittedState) -> Self {
Iter::Tx(IterTx::new(table_id, committed_state))
}
pub(super) fn mut_tx(table_id: TableId, tx_state: &'a TxState, committed_state: &'a CommittedState) -> Self {
Iter::MutTx(IterMutTx::new(table_id, tx_state, committed_state))
}
}

impl<'a> Iterator for Iter<'a> {
type Item = RowRef<'a>;

fn next(&mut self) -> Option<Self::Item> {
match self {
Iter::Tx(iter) => iter.next(),
Iter::MutTx(iter) => iter.next(),
}
}
}

pub struct IndexSeekIterMutTxId<'a> {
pub(super) table_id: TableId,
pub(super) tx_state: &'a TxState,
Expand Down
7 changes: 1 addition & 6 deletions crates/core/src/db/datastore/locking_tx_datastore/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,7 @@ impl StateView for TxId {
range: R,
) -> Result<IterByColRange<'_, R>> {
match self.committed_state_shared_lock.index_seek(table_id, &cols, &range) {
Some(committed_rows) => Ok(IterByColRange::CommittedIndex(CommittedIndexIter::new(
table_id,
None,
&self.committed_state_shared_lock,
committed_rows,
))),
Some(committed_rows) => Ok(IterByColRange::CommittedIndex(CommittedIndexIter::tx(committed_rows))),
None => self
.committed_state_shared_lock
.iter_by_col_range(table_id, cols, range),
Expand Down

0 comments on commit dfa00c6

Please sign in to comment.