diff --git a/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs b/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs index ea8f2ebe484..f6cc4a626a2 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs @@ -360,11 +360,36 @@ impl CommittedState { table.get_row_ref(&self.blob_store, row_ptr).unwrap() } + /// True if the transaction `(tx_data, ctx)` will be written to the commitlog, + /// and therefore consumes a value from `self.next_tx_offset`. + /// + /// A TX is written to the logs if any of the following holds: + /// - The TX inserted at least one row. + /// - The TX deleted at least one row. + /// - The TX was the result of the reducers `__identity_connected__` or `__identity_disconnected__`. + fn tx_consumes_offset(&self, tx_data: &TxData, ctx: &ExecutionContext) -> bool { + // Avoid appending transactions to the commitlog which don't modify + // any tables. + // + // An exception are connect / disconnect calls, which we always want + // paired in the log, so as to be able to disconnect clients + // automatically after a server crash. See: + // [`crate::host::ModuleHost::call_identity_connected_disconnected`] + // + // Note that this may change in the future: some analytics and/or + // timetravel queries may benefit from seeing all inputs, even if + // the database state did not change. + tx_data.inserts().any(|(_, inserted_rows)| !inserted_rows.is_empty()) + || tx_data.deletes().any(|(_, deleted_rows)| !deleted_rows.is_empty()) + || matches!( + ctx.reducer_context().map(|rcx| rcx.name.strip_prefix("__identity_")), + Some(Some("connected__" | "disconnected__")) + ) + } + pub fn merge(&mut self, tx_state: TxState, ctx: &ExecutionContext) -> TxData { let mut tx_data = TxData::default(); - self.next_tx_offset += 1; - // First, apply deletes. This will free up space in the committed tables. self.merge_apply_deletes(&mut tx_data, tx_state.delete_tables, ctx); @@ -373,6 +398,13 @@ impl CommittedState { self.merge_apply_inserts(&mut tx_data, tx_state.insert_tables, tx_state.blob_store, ctx); + // If the TX will be logged, record its projected tx offset, + // then increment the counter. + if self.tx_consumes_offset(&tx_data, ctx) { + tx_data.set_tx_offset(self.next_tx_offset); + self.next_tx_offset += 1; + } + tx_data } diff --git a/crates/core/src/db/datastore/traits.rs b/crates/core/src/db/datastore/traits.rs index acfcc22a1c2..1f68e0fcf03 100644 --- a/crates/core/src/db/datastore/traits.rs +++ b/crates/core/src/db/datastore/traits.rs @@ -171,10 +171,29 @@ pub struct TxData { /// Map of all `TableId`s in both `inserts` and `deletes` to their /// corresponding table name. tables: IntMap, + /// Tx offset of the transaction which performed these operations. + /// + /// `None` implies that `inserts` and `deletes` are both empty, + /// but `Some` does not necessarily imply that either is non-empty. + tx_offset: Option, // TODO: Store an `Arc` or equivalent instead. } impl TxData { + /// Set `tx_offset` as the expected on-disk transaction offset of this transaction. + pub fn set_tx_offset(&mut self, tx_offset: u64) { + self.tx_offset = Some(tx_offset); + } + + /// Read the expected on-disk transaction offset of this transaction. + /// + /// `None` implies that this [`TxData`] contains zero inserted or deleted rows, + /// but the inverse is not necessarily true; + /// a [`TxData`] may have a `tx_offset` but no row operations. + pub fn tx_offset(&self) -> Option { + self.tx_offset + } + /// Set `rows` as the inserted rows for `(table_id, table_name)`. pub fn set_inserts_for_table(&mut self, table_id: TableId, table_name: &str, rows: Arc<[ProductValue]>) { self.inserts.insert(table_id, rows); diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 91e8e676823..8d4690505ec 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -12,7 +12,7 @@ use super::db_metrics::DB_METRICS; use super::relational_operators::Relation; use crate::config::DatabaseConfig; use crate::error::{DBError, DatabaseError, TableError}; -use crate::execution_context::{ExecutionContext, ReducerContext}; +use crate::execution_context::ExecutionContext; use crate::util::slow::SlowQueryConfig; use fs2::FileExt; use parking_lot::RwLock; @@ -365,63 +365,57 @@ impl RelationalDB { Ok(Some((tx_data, tx))) } + /// If `(tx_data, ctx)` should be appended to the commitlog, do so. + /// + /// Note that by this stage, + /// [`crate::db::datastore::locking_tx_datastore::committed_state::tx_consumes_offset`] + /// has already decided based on the reducer and operations whether the transaction should be appended; + /// this method is responsible only for reading its decision out of the `tx_data` + /// and calling `durability.append_tx`. fn do_durability(durability: &dyn Durability, ctx: &ExecutionContext, tx_data: &TxData) { use commitlog::payload::{ txdata::{Mutations, Ops}, Txdata, }; - let inserts: Box<_> = tx_data - .inserts() - .map(|(table_id, rowdata)| Ops { - table_id: *table_id, - rowdata: rowdata.clone(), - }) - .collect(); - let deletes: Box<_> = tx_data - .deletes() - .map(|(table_id, rowdata)| Ops { - table_id: *table_id, - rowdata: rowdata.clone(), - }) - .collect(); - - // Avoid appending transactions to the commitlog which don't modify - // any tables. - // - // An exception ar connect / disconnect calls, which we always want - // paired in the log, so as to be able to disconnect clients - // automatically after a server crash. See: - // [`crate::host::ModuleHost::call_identity_connected_disconnected`] - // - // Note that this may change in the future: some analytics and/or - // timetravel queries may benefit from seeing all inputs, even if - // the database state did not change. - let is_noop = || inserts.is_empty() && deletes.is_empty(); - let is_connect_disconnect = |ctx: &ReducerContext| { - matches!( - ctx.name.strip_prefix("__identity_"), - Some("connected__" | "disconnected__") - ) - }; - let inputs = ctx - .reducer_context() - .and_then(|rcx| (!is_noop() || is_connect_disconnect(rcx)).then(|| rcx.into())); - - let txdata = Txdata { - inputs, - outputs: None, - mutations: Some(Mutations { - inserts, - deletes, - truncates: [].into(), - }), - }; + if tx_data.tx_offset().is_some() { + let inserts: Box<_> = tx_data + .inserts() + .map(|(table_id, rowdata)| Ops { + table_id: *table_id, + rowdata: rowdata.clone(), + }) + .collect(); + let deletes: Box<_> = tx_data + .deletes() + .map(|(table_id, rowdata)| Ops { + table_id: *table_id, + rowdata: rowdata.clone(), + }) + .collect(); + + let inputs = ctx.reducer_context().map(|rcx| rcx.into()); + + let txdata = Txdata { + inputs, + outputs: None, + mutations: Some(Mutations { + inserts, + deletes, + truncates: [].into(), + }), + }; - if !txdata.is_empty() { log::trace!("append {txdata:?}"); // TODO: Should measure queuing time + actual write durability.append_tx(txdata); + } else { + debug_assert!(tx_data.inserts().all(|(_, inserted_rows)| inserted_rows.is_empty())); + debug_assert!(tx_data.deletes().all(|(_, deleted_rows)| deleted_rows.is_empty())); + debug_assert!(!matches!( + ctx.reducer_context().map(|rcx| rcx.name.strip_prefix("__identity_")), + Some(Some("connected__" | "disconnected__")) + )); } }