Skip to content

Commit

Permalink
[release/candidate/v0.9.1]: Manually apply #1252: Atomically downgrad…
Browse files Browse the repository at this point in the history
…e lock when committing tx to prevent deadlock
  • Loading branch information
Zeke Foppa committed May 18, 2024
1 parent 9de3eef commit eb09f6e
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 120 deletions.
27 changes: 27 additions & 0 deletions crates/core/src/db/datastore/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,33 @@ impl MutTx for Locking {
}
}

impl Locking {
pub fn rollback_mut_tx_downgrade(&self, ctx: &ExecutionContext, tx: MutTxId) -> TxId {
let lock_wait_time = tx.lock_wait_time;
let timer = tx.timer;
// TODO(cloutiertyler): We should probably track the tx.rollback() time separately.
let tx = tx.rollback_downgrade();

// Record metrics for the transaction at the very end right before we drop
// the MutTx and release the lock.
record_metrics(ctx, timer, lock_wait_time, false);

tx
}

pub fn commit_mut_tx_downgrade(&self, ctx: &ExecutionContext, tx: MutTxId) -> Result<Option<(TxData, TxId)>> {
let lock_wait_time = tx.lock_wait_time;
let timer = tx.timer;
// TODO(cloutiertyler): We should probably track the tx.commit() time separately.
let res = tx.commit_downgrade(ctx);

// Record metrics for the transaction at the very end right before we drop
// the MutTx and release the lock.
record_metrics(ctx, timer, lock_wait_time, true);
Ok(Some(res))
}
}

impl Programmable for Locking {
fn program_hash(&self, tx: &TxId) -> Result<Option<spacetimedb_sats::hash::Hash>> {
tx.iter(&ExecutionContext::internal(self.database_address), &ST_MODULE_ID)?
Expand Down
25 changes: 25 additions & 0 deletions crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use super::{
datastore::Result,
sequence::{Sequence, SequencesState},
state_view::{IndexSeekIterMutTxId, Iter, IterByColRange, ScanIterByColRange, StateView},
tx::TxId,
tx_state::TxState,
SharedMutexGuard, SharedWriteGuard,
};
Expand Down Expand Up @@ -689,9 +690,33 @@ impl MutTxId {
committed_state_write_lock.merge(tx_state, ctx)
}

pub fn commit_downgrade(self, ctx: &ExecutionContext) -> (TxData, TxId) {
let Self {
mut committed_state_write_lock,
tx_state,
..
} = self;
let tx_data = committed_state_write_lock.merge(tx_state, ctx);
let tx = TxId {
committed_state_shared_lock: SharedWriteGuard::downgrade(committed_state_write_lock),
lock_wait_time: Duration::ZERO,
timer: Instant::now(),
};
(tx_data, tx)
}

pub fn rollback(self) {
// TODO: Check that no sequences exceed their allocation after the rollback.
}

pub fn rollback_downgrade(self) -> TxId {
// TODO: Check that no sequences exceed their allocation after the rollback.
TxId {
committed_state_shared_lock: SharedWriteGuard::downgrade(self.committed_state_write_lock),
lock_wait_time: Duration::ZERO,
timer: Instant::now(),
}
}
}

/// Either a row just inserted to a table or a row that already existed in some table.
Expand Down
139 changes: 82 additions & 57 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,12 @@ impl RelationalDB {
self.inner.rollback_mut_tx(ctx, tx)
}

#[tracing::instrument(skip_all)]
pub fn rollback_mut_tx_downgrade(&self, ctx: &ExecutionContext, tx: MutTx) -> Tx {
log::trace!("ROLLBACK MUT TX");
self.inner.rollback_mut_tx_downgrade(ctx, tx)
}

#[tracing::instrument(skip_all)]
pub fn release_tx(&self, ctx: &ExecutionContext, tx: Tx) {
log::trace!("ROLLBACK TX");
Expand All @@ -306,75 +312,94 @@ impl RelationalDB {

#[tracing::instrument(skip_all)]
pub fn commit_tx(&self, ctx: &ExecutionContext, tx: MutTx) -> Result<Option<TxData>, DBError> {
use commitlog::payload::{
txdata::{Mutations, Ops},
Txdata,
};

log::trace!("COMMIT MUT TX");

let Some(tx_data) = self.inner.commit_mut_tx(ctx, tx)? else {
return Ok(None);
};

if let Some(durability) = &self.durability {
let inserts: Box<_> = tx_data
.inserts()
.map(|(table_id, rowdata)| Ops {
table_id: *table_id,
rowdata: rowdata.clone(),
})
.collect();
let deletes: Box<_> = tx_data
.deletes()
.map(|(table_id, rowdata)| Ops {
table_id: *table_id,
rowdata: rowdata.clone(),
})
.collect();

// Avoid appending transactions to the commitlog which don't modify
// any tables.
//
// An exception ar connect / disconnect calls, which we always want
// paired in the log, so as to be able to disconnect clients
// automatically after a server crash. See:
// [`crate::host::ModuleHost::call_identity_connected_disconnected`]
//
// Note that this may change in the future: some analytics and/or
// timetravel queries may benefit from seeing all inputs, even if
// the database state did not change.
let is_noop = || inserts.is_empty() && deletes.is_empty();
let is_connect_disconnect = |ctx: &ReducerContext| {
matches!(
ctx.name.strip_prefix("__identity_"),
Some("connected__" | "disconnected__")
)
};
let inputs = ctx
.reducer_context()
.and_then(|rcx| (!is_noop() || is_connect_disconnect(rcx)).then(|| rcx.into()));

let txdata = Txdata {
inputs,
outputs: None,
mutations: Some(Mutations {
inserts,
deletes,
truncates: [].into(),
}),
};

if !txdata.is_empty() {
log::trace!("append {txdata:?}");
// TODO: Should measure queuing time + actual write
durability.append_tx(txdata);
}
Self::do_durability(&**durability, ctx, &tx_data)
}

Ok(Some(tx_data))
}

#[tracing::instrument(skip_all)]
pub fn commit_tx_downgrade(&self, ctx: &ExecutionContext, tx: MutTx) -> Result<Option<(TxData, Tx)>, DBError> {
log::trace!("COMMIT MUT TX");

let Some((tx_data, tx)) = self.inner.commit_mut_tx_downgrade(ctx, tx)? else {
return Ok(None);
};

if let Some(durability) = &self.durability {
Self::do_durability(&**durability, ctx, &tx_data)
}

Ok(Some((tx_data, tx)))
}

fn do_durability(durability: &dyn Durability<TxData = Txdata>, ctx: &ExecutionContext, tx_data: &TxData) {
use commitlog::payload::{
txdata::{Mutations, Ops},
Txdata,
};

let inserts: Box<_> = tx_data
.inserts()
.map(|(table_id, rowdata)| Ops {
table_id: *table_id,
rowdata: rowdata.clone(),
})
.collect();
let deletes: Box<_> = tx_data
.deletes()
.map(|(table_id, rowdata)| Ops {
table_id: *table_id,
rowdata: rowdata.clone(),
})
.collect();

// Avoid appending transactions to the commitlog which don't modify
// any tables.
//
// An exception ar connect / disconnect calls, which we always want
// paired in the log, so as to be able to disconnect clients
// automatically after a server crash. See:
// [`crate::host::ModuleHost::call_identity_connected_disconnected`]
//
// Note that this may change in the future: some analytics and/or
// timetravel queries may benefit from seeing all inputs, even if
// the database state did not change.
let is_noop = || inserts.is_empty() && deletes.is_empty();
let is_connect_disconnect = |ctx: &ReducerContext| {
matches!(
ctx.name.strip_prefix("__identity_"),
Some("connected__" | "disconnected__")
)
};
let inputs = ctx
.reducer_context()
.and_then(|rcx| (!is_noop() || is_connect_disconnect(rcx)).then(|| rcx.into()));

let txdata = Txdata {
inputs,
outputs: None,
mutations: Some(Mutations {
inserts,
deletes,
truncates: [].into(),
}),
};

if !txdata.is_empty() {
log::trace!("append {txdata:?}");
// TODO: Should measure queuing time + actual write
durability.append_tx(txdata);
}
}

/// Run a fallible function in a transaction.
///
/// If the supplied function returns `Ok`, the transaction is automatically
Expand Down
32 changes: 11 additions & 21 deletions crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::identity::Identity;
use crate::messages::control_db::Database;
use crate::module_host_context::ModuleCreationContext;
use crate::sql;
use crate::subscription::module_subscription_actor::ModuleSubscriptions;
use crate::subscription::module_subscription_actor::{ModuleSubscriptions, WriteSkew};
use crate::util::const_unwrap;
use crate::worker_metrics::WORKER_METRICS;
use spacetimedb_sats::db::def::TableDef;
Expand Down Expand Up @@ -573,14 +573,8 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
}
reducer_span.exit();

// Take a lock on our subscriptions now. Otherwise, we could have a race condition where we commit
// the tx, someone adds a subscription and receives this tx as an initial update, and then receives the
// update again when we broadcast_event.
let subscriptions = self.info.subscriptions.subscriptions.read();
let status = match call_result {
Err(err) => {
stdb.rollback_mut_tx(&ctx, tx);

T::log_traceback("reducer", reducer_name, &err);

WORKER_METRICS
Expand All @@ -598,23 +592,13 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
}
}
Ok(Err(errmsg)) => {
stdb.rollback_mut_tx(&ctx, tx);

log::info!("reducer returned error: {errmsg}");

EventStatus::Failed(errmsg.into())
}
Ok(Ok(())) => {
if let Some(tx_data) = stdb.commit_tx(&ctx, tx).unwrap() {
EventStatus::Committed(DatabaseUpdate::from_writes(&tx_data))
} else {
todo!("Write skew, you need to implement retries my man, T-dawg.");
}
}
Ok(Ok(())) => EventStatus::Committed(DatabaseUpdate::default()),
};

let outcome = ReducerOutcome::from(&status);

let event = ModuleEvent {
timestamp,
caller_identity,
Expand All @@ -629,12 +613,18 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
request_id,
timer,
};
self.info
let event = match self
.info
.subscriptions
.blocking_broadcast_event(client.as_deref(), &subscriptions, Arc::new(event));
.commit_and_broadcast_event(client.as_deref(), event, &ctx, tx)
.unwrap()
{
Ok(ev) => ev,
Err(WriteSkew) => todo!("Write skew, you need to implement retries my man, T-dawg."),
};

ReducerCallResult {
outcome,
outcome: ReducerOutcome::from(&event.status),
energy_used: energy.used,
execution_duration: timings.total_duration,
}
Expand Down
Loading

0 comments on commit eb09f6e

Please sign in to comment.