Skip to content

Commit

Permalink
Refactor for full split
Browse files Browse the repository at this point in the history
  • Loading branch information
mamcx committed Dec 11, 2024
1 parent d6fa034 commit 4684925
Show file tree
Hide file tree
Showing 10 changed files with 347 additions and 187 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use super::{
datastore::Result,
sequence::{Sequence, SequencesState},
state_view::{Iter, IterByColRange, ScanIterByColRange, StateView},
state_view::{IterTxByColRange, StateView},
tx_state::{DeleteTable, IndexIdMap, RemovedIndexIdSet, TxState},
IterByColEq,
};
use crate::db::datastore::locking_tx_datastore::state_view::{IterTx, ScanIterTxByColRange};
use crate::{
db::{
datastore::{
Expand Down Expand Up @@ -69,6 +71,12 @@ impl MemoryUsage for CommittedState {
}

impl StateView for CommittedState {
type Iter<'a> = IterTx<'a>;
type IterByColRange<'a, R: RangeBounds<AlgebraicValue>> = IterTxByColRange<'a, R>;
type IterByColEq<'a, 'r> = IterByColEq<'a, 'r>
where
Self: 'a;

fn get_schema(&self, table_id: TableId) -> Option<&Arc<TableSchema>> {
self.tables.get(&table_id).map(|table| table.get_schema())
}
Expand All @@ -77,9 +85,9 @@ impl StateView for CommittedState {
self.get_table(table_id).map(|table| table.row_count)
}

fn iter(&self, table_id: TableId) -> Result<Iter<'_>> {
fn iter(&self, table_id: TableId) -> Result<Self::Iter<'_>> {
if self.table_name(table_id).is_some() {
return Ok(Iter::tx(table_id, self));
return Ok(IterTx::new(table_id, self));
}
Err(TableError::IdNotFound(SystemTable::st_table, table_id.0).into())
}
Expand All @@ -91,16 +99,25 @@ impl StateView for CommittedState {
table_id: TableId,
cols: ColList,
range: R,
) -> Result<IterByColRange<'_, R>> {
) -> Result<Self::IterByColRange<'_, R>> {
// TODO: Why does this unconditionally return a `Scan` iter,
// instead of trying to return a `CommittedIndex` iter?
// Answer: Because CommittedIndexIter::tx_state: Option<&'a TxState> need to be Some to read after reopen
Ok(IterByColRange::Scan(ScanIterByColRange::new(
Ok(IterTxByColRange::Scan(ScanIterTxByColRange::new(
self.iter(table_id)?,
cols,
range,
)))
}

fn iter_by_col_eq<'a, 'r>(
&'a self,
table_id: TableId,
cols: impl Into<ColList>,
value: &'r AlgebraicValue,
) -> Result<Self::IterByColEq<'a, 'r>> {
self.iter_by_col_range(table_id, cols.into(), value)
}
}

/// Swallow `Err(TableError::Duplicate(_))`, which signals a set-semantic collision,
Expand Down Expand Up @@ -629,32 +646,6 @@ impl CommittedState {
}
}

pub enum CommittedIndexIter<'a> {
Tx(CommittedIndexIterTx<'a>),
MutTx(CommittedIndexIterMutTx<'a>),
}

impl<'a> CommittedIndexIter<'a> {
pub(super) fn tx(committed_rows: IndexScanIter<'a>) -> Self {
Self::Tx(CommittedIndexIterTx::new(committed_rows))
}

pub(super) fn mut_tx(table_id: TableId, tx_state: &'a TxState, committed_rows: IndexScanIter<'a>) -> Self {
Self::MutTx(CommittedIndexIterMutTx::new(table_id, tx_state, committed_rows))
}
}

impl<'a> Iterator for CommittedIndexIter<'a> {
type Item = RowRef<'a>;

fn next(&mut self) -> Option<Self::Item> {
match self {
Self::Tx(iter) => iter.next(),
Self::MutTx(iter) => iter.next(),
}
}
}

pub struct CommittedIndexIterTx<'a> {
committed_rows: IndexScanIter<'a>,
num_committed_rows_fetched: u64,
Expand Down
28 changes: 20 additions & 8 deletions crates/core/src/db/datastore/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ use super::{
committed_state::CommittedState,
mut_tx::MutTxId,
sequence::SequencesState,
state_view::{Iter, IterByColRange, StateView},
state_view::{IterTxByColRange, StateView},
tx::TxId,
tx_state::TxState,
};
use crate::db::datastore::locking_tx_datastore::state_view::{IterMutTx, IterMutTxByColRange, IterTx};
use crate::execution_context::Workload;
use crate::{
db::{
Expand Down Expand Up @@ -323,15 +324,26 @@ impl Tx for Locking {

impl TxDatastore for Locking {
type Iter<'a>
= Iter<'a>
= IterTx<'a>
where
Self: 'a;
type IterByColEq<'a, 'r>
= IterByColRange<'a, &'r AlgebraicValue>
type IterMutTx<'a>= IterMutTx<'a>
where
Self: 'a;
type IterByColRange<'a, R: RangeBounds<AlgebraicValue>>
= IterByColRange<'a, R>
= IterTxByColRange<'a, R>
where
Self: 'a;
type IterMutByColRange<'a, R: RangeBounds<AlgebraicValue>>
= IterMutTxByColRange<'a, R>
where
Self: 'a;
type IterByColEq<'a, 'r>
= IterTxByColRange<'a, &'r AlgebraicValue>
where
Self: 'a;
type IterMutByColEq<'a, 'r>
= IterMutTxByColRange<'a, &'r AlgebraicValue>
where
Self: 'a;

Expand Down Expand Up @@ -492,7 +504,7 @@ impl MutTxDatastore for Locking {
tx.constraint_id_from_name(constraint_name)
}

fn iter_mut_tx<'a>(&'a self, tx: &'a Self::MutTx, table_id: TableId) -> Result<Self::Iter<'a>> {
fn iter_mut_tx<'a>(&'a self, tx: &'a Self::MutTx, table_id: TableId) -> Result<Self::IterMutTx<'a>> {
tx.iter(table_id)
}

Expand All @@ -502,7 +514,7 @@ impl MutTxDatastore for Locking {
table_id: TableId,
cols: impl Into<ColList>,
range: R,
) -> Result<Self::IterByColRange<'a, R>> {
) -> Result<Self::IterMutByColRange<'a, R>> {
tx.iter_by_col_range(table_id, cols.into(), range)
}

Expand All @@ -512,7 +524,7 @@ impl MutTxDatastore for Locking {
table_id: TableId,
cols: impl Into<ColList>,
value: &'r AlgebraicValue,
) -> Result<Self::IterByColEq<'a, 'r>> {
) -> Result<Self::IterMutByColEq<'a, 'r>> {
tx.iter_by_col_eq(table_id, cols.into(), value)
}

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/db/datastore/locking_tx_datastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod mut_tx;
pub use mut_tx::MutTxId;
mod sequence;
pub mod state_view;
pub use state_view::{Iter, IterByColEq, IterByColRange};
pub use state_view::{IterByColEq, IterTxByColRange};
pub(crate) mod tx;
mod tx_state;

Expand Down
37 changes: 29 additions & 8 deletions crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use super::{
committed_state::{CommittedIndexIter, CommittedState},
committed_state::CommittedState,
datastore::{record_metrics, Result},
sequence::{Sequence, SequencesState},
state_view::{IndexSeekIterMutTxId, Iter, IterByColRange, ScanIterByColRange, StateView},
state_view::{IndexSeekIterMutTxId, ScanIterMutTxByColRange, StateView},
tx::TxId,
tx_state::{DeleteTable, IndexIdMap, TxState},
SharedMutexGuard, SharedWriteGuard,
};
use crate::db::datastore::locking_tx_datastore::committed_state::CommittedIndexIterMutTx;
use crate::db::datastore::locking_tx_datastore::state_view::{IterMutTx, IterMutTxByColEq, IterMutTxByColRange};
use crate::db::datastore::system_tables::{StRowLevelSecurityFields, StRowLevelSecurityRow, ST_ROW_LEVEL_SECURITY_ID};
use crate::db::datastore::{
system_tables::{
Expand Down Expand Up @@ -1419,6 +1421,12 @@ impl MutTxId {
}

impl StateView for MutTxId {
type Iter<'a> = IterMutTx<'a>;
type IterByColRange<'a, R: RangeBounds<AlgebraicValue>> = IterMutTxByColRange<'a, R>;
type IterByColEq<'a, 'r> = IterMutTxByColEq<'a, 'r>
where
Self: 'a;

fn get_schema(&self, table_id: TableId) -> Option<&Arc<TableSchema>> {
// TODO(bikeshedding, docs): should this also check if the schema is in the system tables,
// but the table hasn't been constructed yet?
Expand All @@ -1442,9 +1450,13 @@ impl StateView for MutTxId {
}
}

fn iter(&self, table_id: TableId) -> Result<Iter<'_>> {
fn iter(&self, table_id: TableId) -> Result<Self::Iter<'_>> {
if self.table_name(table_id).is_some() {
return Ok(Iter::mut_tx(table_id, &self.tx_state, &self.committed_state_write_lock));
return Ok(IterMutTx::new(
table_id,
&self.tx_state,
&self.committed_state_write_lock,
));
}
Err(TableError::IdNotFound(SystemTable::st_table, table_id.0).into())
}
Expand All @@ -1454,7 +1466,7 @@ impl StateView for MutTxId {
table_id: TableId,
cols: ColList,
range: R,
) -> Result<IterByColRange<'_, R>> {
) -> Result<Self::IterByColRange<'_, R>> {
// We have to index_seek in both the committed state and the current tx state.
// First, we will check modifications in the current tx. It may be that the table
// has not been modified yet in the current tx, in which case we will only search
Expand All @@ -1468,7 +1480,7 @@ impl StateView for MutTxId {
// rolling it back will leave the index in place.
if let Some(inserted_rows) = self.tx_state.index_seek(table_id, &cols, &range) {
// The current transaction has modified this table, and the table is indexed.
Ok(IterByColRange::Index(IndexSeekIterMutTxId {
Ok(IterMutTxByColRange::Index(IndexSeekIterMutTxId {
table_id,
tx_state: &self.tx_state,
inserted_rows,
Expand All @@ -1479,7 +1491,7 @@ impl StateView for MutTxId {
// Either the current transaction has not modified this table, or the table is not
// indexed.
match self.committed_state_write_lock.index_seek(table_id, &cols, &range) {
Some(committed_rows) => Ok(IterByColRange::CommittedIndex(CommittedIndexIter::mut_tx(
Some(committed_rows) => Ok(IterMutTxByColRange::CommittedIndex(CommittedIndexIterMutTx::new(
table_id,
&self.tx_state,
committed_rows,
Expand Down Expand Up @@ -1513,7 +1525,7 @@ impl StateView for MutTxId {
}
}

Ok(IterByColRange::Scan(ScanIterByColRange::new(
Ok(IterMutTxByColRange::Scan(ScanIterMutTxByColRange::new(
self.iter(table_id)?,
cols,
range,
Expand All @@ -1522,4 +1534,13 @@ impl StateView for MutTxId {
}
}
}

fn iter_by_col_eq<'a, 'r>(
&'a self,
table_id: TableId,
cols: impl Into<ColList>,
value: &'r AlgebraicValue,
) -> Result<Self::IterByColEq<'a, 'r>> {
self.iter_by_col_range(table_id, cols.into(), value)
}
}
Loading

0 comments on commit 4684925

Please sign in to comment.