diff --git a/bin/strata-client/src/errors.rs b/bin/strata-client/src/errors.rs index ad1285e16e..41292bb887 100644 --- a/bin/strata-client/src/errors.rs +++ b/bin/strata-client/src/errors.rs @@ -7,6 +7,9 @@ use thiserror::Error; #[derive(Debug, Error)] pub enum InitError { + #[error("missing init client state")] + MissingInitClientState, + #[error("io: {0}")] Io(#[from] io::Error), diff --git a/bin/strata-client/src/helpers.rs b/bin/strata-client/src/helpers.rs index 6b6e272d18..0d30e7a61c 100644 --- a/bin/strata-client/src/helpers.rs +++ b/bin/strata-client/src/helpers.rs @@ -15,7 +15,7 @@ use strata_primitives::{ use strata_rocksdb::CommonDb; use strata_state::csm_status::CsmStatus; use strata_status::StatusChannel; -use strata_storage::L2BlockManager; +use strata_storage::{L2BlockManager, NodeStorage}; use tokio::runtime::Handle; use tracing::*; @@ -115,13 +115,12 @@ pub fn create_bitcoin_rpc_client(config: &Config) -> anyhow::Result(database: &D) -> anyhow::Result -where - D: Database + Send + Sync + 'static, -{ +pub fn init_status_channel(storage: &NodeStorage) -> anyhow::Result { // init client state - let cs_db = database.client_state_db().as_ref(); - let (cur_state_idx, cur_state) = state_tracker::reconstruct_cur_state(cs_db)?; + let csman = storage.client_state(); + let (cur_state_idx, cur_state) = csman + .get_most_recent_state_blocking() + .ok_or(InitError::MissingInitClientState)?; // init the CsmStatus let mut status = CsmStatus::default(); @@ -132,7 +131,12 @@ where ..Default::default() }; - Ok(StatusChannel::new(cur_state, l1_status, None)) + // TODO avoid clone, change status channel to use arc + Ok(StatusChannel::new( + cur_state.as_ref().clone(), + l1_status, + None, + )) } pub fn init_engine_controller( diff --git a/bin/strata-client/src/main.rs b/bin/strata-client/src/main.rs index 585f8a9f28..16c18e3164 100644 --- a/bin/strata-client/src/main.rs +++ b/bin/strata-client/src/main.rs @@ -98,7 +98,7 @@ fn main_inner(args: Args) -> anyhow::Result<()> { // Initialize core databases let database = init_core_dbs(rbdb.clone(), ops_config); - let storage = Arc::new(create_node_storage(database.clone(), pool.clone())); + let storage = Arc::new(create_node_storage(database.clone(), pool.clone())?); // Set up bridge messaging stuff. // TODO move all of this into relayer task init @@ -110,9 +110,9 @@ fn main_inner(args: Args) -> anyhow::Result<()> { let bitcoin_client = create_bitcoin_rpc_client(&config)?; // Check if we have to do genesis. - if genesis::check_needs_client_init(database.as_ref())? { + if genesis::check_needs_client_init(storage.as_ref())? { info!("need to init client state!"); - genesis::init_client_state(¶ms, database.as_ref())?; + genesis::init_client_state(¶ms, storage.client_state())?; } info!("init finished, starting main tasks"); @@ -309,7 +309,7 @@ fn start_core_tasks( bitcoin_client: Arc, ) -> anyhow::Result { // init status tasks - let status_channel = init_status_channel(database.as_ref())?; + let status_channel = init_status_channel(storage.as_ref())?; let engine = init_engine_controller( config, @@ -331,7 +331,7 @@ fn start_core_tasks( let sync_manager: Arc<_> = sync_manager::start_sync_tasks( executor, database.clone(), - storage.clone(), + &storage, engine.clone(), pool.clone(), params.clone(), diff --git a/bin/strata-client/src/rpc_server.rs b/bin/strata-client/src/rpc_server.rs index 858d094888..281de0dba6 100644 --- a/bin/strata-client/src/rpc_server.rs +++ b/bin/strata-client/src/rpc_server.rs @@ -66,6 +66,7 @@ use zkaleido::ProofReceipt; use crate::extractor::{extract_deposit_requests, extract_withdrawal_infos}; +#[deprecated] fn fetch_l2blk( l2_db: &Arc<::L2DB>, blkid: L2BlockId, @@ -648,23 +649,12 @@ impl StrataApiServer for StrataRpcImpl { // FIXME: possibly create a separate rpc type corresponding to ClientUpdateOutput async fn get_client_update_output(&self, idx: u64) -> RpcResult> { - let db = self.database.clone(); - - let res = wait_blocking("fetch_client_update_output", move || { - let client_state_db = db.client_state_db(); - - let writes = client_state_db.get_client_state_writes(idx)?; - let actions = client_state_db.get_client_update_actions(idx)?; - - match (writes, actions) { - (Some(w), Some(a)) => Ok(Some(ClientUpdateOutput::new(w, a))), - // normally this is just that they're both missing - _ => Ok(None), - } - }) - .await?; - - Ok(res) + Ok(self + .storage + .client_state() + .get_update_async(idx) + .map_err(Error::Db) + .await?) } } @@ -901,12 +891,12 @@ impl StrataSequencerApiServer for SequencerServerImpl { pub struct StrataDebugRpcImpl { storage: Arc, - database: Arc, + _database: Arc, } impl StrataDebugRpcImpl { - pub fn new(storage: Arc, database: Arc) -> Self { - Self { storage, database } + pub fn new(storage: Arc, _database: Arc) -> Self { + Self { storage, _database } } } @@ -941,19 +931,12 @@ impl StrataDebugApiServer for StrataDebugRp } async fn get_clientstate_at_idx(&self, idx: u64) -> RpcResult> { - let database = self.database.clone(); - let cs = wait_blocking("clientstate_at_idx", move || { - let client_state_db = database.client_state_db(); - match reconstruct_state(client_state_db.as_ref(), idx) { - Ok(client_state) => Ok(Some(client_state)), - Err(e) => { - error!(%idx, %e, "failed to reconstruct client state"); - Err(Error::Other(e.to_string())) - } - } - }) - .await?; - Ok(cs) + Ok(self + .storage + .client_state() + .get_state_async(idx) + .map_err(Error::Db) + .await?) } async fn set_bail_context(&self, _ctx: String) -> RpcResult<()> { diff --git a/crates/consensus-logic/src/csm/client_transition.rs b/crates/consensus-logic/src/csm/client_transition.rs index f142003010..fe44a9b197 100644 --- a/crates/consensus-logic/src/csm/client_transition.rs +++ b/crates/consensus-logic/src/csm/client_transition.rs @@ -67,14 +67,11 @@ impl EventContext for StorageEventContext<'_> { /// Processes the event given the current consensus state, producing some /// output. This can return database errors. pub fn process_event( - state: &ClientState, + state: &mut ClientStateMut, ev: &SyncEvent, context: &impl EventContext, params: &Params, -) -> Result { - let mut writes = Vec::new(); - let mut actions = Vec::new(); - +) -> Result<(), Error> { match ev { SyncEvent::L1Block(height, l1blkid) => { // If the block is before the horizon we don't care about it. @@ -83,33 +80,33 @@ pub fn process_event( eprintln!("early L1 block at h={height}, you may have set up the test env wrong"); warn!(%height, "ignoring unexpected L1Block event before horizon"); - return Ok(ClientUpdateOutput::new(writes, actions)); + return Ok(()); } // FIXME this doesn't do any SPV checks to make sure we only go to // a longer chain, it just does it unconditionally let block_mf = context.get_l1_block_manifest(*height)?; - let l1v = state.l1_view(); - let l1_vs = state.l1_view().tip_verification_state(); + let l1v = state.state().l1_view(); + let l1_vs = l1v.tip_verification_state(); + let cur_seen_tip_height = l1v.tip_height(); + let next_exp_height = l1v.next_expected_block(); // Do the consensus checks if let Some(l1_vs) = l1_vs { let l1_vs_height = l1_vs.last_verified_block_num as u64; let mut updated_l1vs = l1_vs.clone(); - for height in (l1_vs_height + 1..l1v.tip_height()) { + for height in (l1_vs_height + 1..cur_seen_tip_height) { let block_mf = context.get_l1_block_manifest(height)?; let header: Header = bitcoin::consensus::deserialize(block_mf.header()).unwrap(); updated_l1vs = updated_l1vs.check_and_update_continuity_new(&header, &get_btc_params()); } - writes.push(ClientStateWrite::UpdateVerificationState(updated_l1vs)) + state.update_verification_state(updated_l1vs); } // Only accept the block if it's the next block in the chain we expect to accept. - let cur_seen_tip_height = l1v.tip_height(); - let next_exp_height = l1v.next_expected_block(); if next_exp_height > params.rollup().horizon_l1_height { // TODO check that the new block we're trying to add has the same parent as the tip // block @@ -117,7 +114,7 @@ pub fn process_event( } if *height == next_exp_height { - writes.push(ClientStateWrite::AcceptL1Block(*l1blkid)); + state.accept_l1_block(*l1blkid); } else { #[cfg(test)] eprintln!("not sure what to do here h={height} exp={next_exp_height}"); @@ -128,10 +125,10 @@ pub fn process_event( let safe_depth = params.rollup().l1_reorg_safe_depth as u64; let maturable_height = next_exp_height.saturating_sub(safe_depth); - if maturable_height > params.rollup().horizon_l1_height && state.is_chain_active() { - let (wrs, acts) = handle_mature_l1_height(maturable_height, state, context); - writes.extend(wrs); - actions.extend(acts); + if maturable_height > params.rollup().horizon_l1_height + && state.state().is_chain_active() + { + handle_mature_l1_height(state, maturable_height, context); } } @@ -152,21 +149,21 @@ pub fn process_event( let threshold = params.rollup.l1_reorg_safe_depth; let genesis_threshold = genesis_ht + threshold as u64; - debug!(%genesis_threshold, %genesis_ht, active=%state.is_chain_active(), "Inside activate chain"); + let active = state.state().is_chain_active(); + debug!(%genesis_threshold, %genesis_ht, %active, "Inside activate chain"); // If necessary, activate the chain! - if !state.is_chain_active() && *height >= genesis_threshold { + if !active && *height >= genesis_threshold { debug!("emitting chain activation"); let genesis_block = make_genesis_block(params); - writes.push(ClientStateWrite::ActivateChain); - writes.push(ClientStateWrite::UpdateVerificationState( - l1_verification_state.clone(), + state.activate_chain(); + state.update_verification_state(l1_verification_state.clone()); + state.set_sync_state(SyncState::from_genesis_blkid( + genesis_block.header().get_blockid(), )); - writes.push(ClientStateWrite::ReplaceSync(Box::new( - SyncState::from_genesis_blkid(genesis_block.header().get_blockid()), - ))); - actions.push(SyncAction::L2Genesis( + + state.push_action(SyncAction::L2Genesis( l1_verification_state.last_verified_block_hash, )); } @@ -176,47 +173,48 @@ pub fn process_event( // TODO not sure why this was here //let l1_db = database.l1_db(); - let buried = state.l1_view().buried_l1_height(); + let buried = state.state().l1_view().buried_l1_height(); if *to_height < buried { error!(%to_height, %buried, "got L1 revert below buried height"); return Err(Error::ReorgTooDeep(*to_height, buried)); } - writes.push(ClientStateWrite::RollbackL1BlocksTo(*to_height)); + state.rollback_l1_blocks(*to_height); } SyncEvent::L1DABatch(height, checkpoints) => { debug!(%height, "received L1DABatch"); - if let Some(ss) = state.sync() { + if let Some(ss) = state.state().sync() { // TODO load it up and figure out what's there, see if we have to // load the state updates from L1 or something // TODO not sure why this was here //let l2_db = database.l2_db(); let proof_verified_checkpoints = - filter_verified_checkpoints(state, checkpoints, params.rollup()); + filter_verified_checkpoints(state.state(), checkpoints, params.rollup()); // When DABatch appears, it is only confirmed at the moment. These will be finalized // only when the corresponding L1 block is buried enough if !proof_verified_checkpoints.is_empty() { - writes.push(ClientStateWrite::CheckpointsReceived( - proof_verified_checkpoints - .iter() - .map(|batch_checkpoint_with_commitment| { - let batch_checkpoint = - &batch_checkpoint_with_commitment.batch_checkpoint; - L1Checkpoint::new( - batch_checkpoint.batch_info().clone(), - batch_checkpoint.bootstrap_state().clone(), - !batch_checkpoint.proof().is_empty(), - *height, - ) - }) - .collect(), - )); - - actions.push(SyncAction::WriteCheckpoints( + // Copy out all the basic checkpoint data into dedicated + // structures for it. + let ckpts = proof_verified_checkpoints + .iter() + .map(|batch_checkpoint_with_commitment| { + let batch_checkpoint = + &batch_checkpoint_with_commitment.batch_checkpoint; + L1Checkpoint::new( + batch_checkpoint.batch_info().clone(), + batch_checkpoint.bootstrap_state().clone(), + !batch_checkpoint.proof().is_empty(), + *height, + ) + }) + .collect::>(); + state.accept_checkpoints(&ckpts); + + state.push_action(SyncAction::WriteCheckpoints( *height, proof_verified_checkpoints, )); @@ -240,7 +238,7 @@ pub fn process_event( // height of last matured L1 block in chain state let chs_last_buried = chainstate.l1_view().safe_height().saturating_sub(1); // buried height in client state - let cls_last_buried = state.l1_view().buried_l1_height(); + let cls_last_buried = state.state().l1_view().buried_l1_height(); if chs_last_buried > cls_last_buried { // can bury till last matured block in chainstate @@ -249,25 +247,21 @@ pub fn process_event( let client_state_bury_height = min( chs_last_buried, // keep at least 1 item - state.l1_view().tip_height().saturating_sub(1), + state.state().l1_view().tip_height().saturating_sub(1), ); - writes.push(ClientStateWrite::UpdateBuried(client_state_bury_height)); + + state.update_buried(client_state_bury_height); } // TODO better checks here - writes.push(ClientStateWrite::AcceptL2Block( - *blkid, - block.block().header().blockidx(), - )); - actions.push(SyncAction::UpdateTip(*blkid)); - - let (wrs, acts) = handle_checkpoint_finalization(state, blkid, params, context); - writes.extend(wrs); - actions.extend(acts); + state.accept_l2_block(*blkid, block.block().header().blockidx()); + state.push_action(SyncAction::UpdateTip(*blkid)); + + handle_checkpoint_finalization(state, blkid, params, context)?; } } - Ok(ClientUpdateOutput::new(writes, actions)) + Ok(()) } /// Handles the maturation of L1 height by finalizing checkpoints and emitting @@ -282,8 +276,8 @@ pub fn process_event( /// /// # Arguments /// -/// * `maturable_height` - The height at which L1 blocks are considered mature. /// * `state` - A reference to the current client state. +/// * `maturable_height` - The height at which L1 blocks are considered mature. /// * `database` - A reference to the database interface. /// /// # Returns @@ -292,49 +286,60 @@ pub fn process_event( /// * A vector of [`ClientStateWrite`] representing the state changes to be written. /// * A vector of [`SyncAction`] representing the actions to be synchronized. fn handle_mature_l1_height( + state: &mut ClientStateMut, maturable_height: u64, - state: &ClientState, context: &impl EventContext, -) -> (Vec, Vec) { - let mut writes = Vec::new(); - let mut actions = Vec::new(); - - // If there are checkpoints at or before the maturable height, mark them as finalized - if state +) -> Result<(), Error> { + // If there are no checkpoints then return early. + if !state + .state() .l1_view() .has_verified_checkpoint_before(maturable_height) { - if let Some(checkpt) = state - .l1_view() - .get_last_verified_checkpoint_before(maturable_height) - { - // FinalizeBlock Should only be applied when l2_block is actually - // available in l2_db - // If l2 blocks is not in db then finalization will happen when - // l2Block is fetched from the network and the corresponding - //checkpoint is already finalized. - let l2_blkid = checkpt.batch_info.l2_blockid; - - match context.get_l2_block_data(&l2_blkid) { - Ok(_) => { - debug!(%maturable_height, "Writing CheckpointFinalized"); - writes.push(ClientStateWrite::CheckpointFinalized(maturable_height)); - // Emit sync action for finalizing a l2 block - info!(%maturable_height, %l2_blkid, "L1 block found in db, push FinalizeBlock SyncAction"); - actions.push(SyncAction::FinalizeBlock(l2_blkid)); - } - Err(e) => { - error!(%maturable_height, %l2_blkid, %e, "error while fetching block data"); - } + return Ok(()); + } + + // If there *are* checkpoints at or before the maturable height, mark them + // as finalized + if let Some(checkpt) = state + .state() + .l1_view() + .get_last_verified_checkpoint_before(maturable_height) + { + // FinalizeBlock Should only be applied when l2_block is actually + // available in l2_db + // If l2 blocks is not in db then finalization will happen when + // l2Block is fetched from the network and the corresponding + //checkpoint is already finalized. + let blkid = checkpt.batch_info.l2_blockid; + + match context.get_l2_block_data(&blkid) { + Ok(_) => { + // Emit sync action for finalizing a l2 block + info!(%maturable_height, %blkid, "l2 block found in db, push FinalizeBlock SyncAction"); + + state.finalize_checkpoint(maturable_height); + state.push_action(SyncAction::FinalizeBlock(blkid)); + } + + Err(Error::MissingL2Block(_)) => { + warn!( + %maturable_height, %blkid, "l2 block not in db yet, skipping finalize" + ); + } + + Err(e) => { + error!(err = %e, "error while fetching block data from l2_db"); } - } else { - warn!( - %maturable_height, - "expected to find blockid corresponding to buried l1 height in confirmed_blocks but could not find" - ); } + } else { + warn!( + %maturable_height, + "expected to find blockid corresponding to buried l1 height in confirmed_blocks but could not find" + ); } - (writes, actions) + + Ok(()) } /// Handles the finalization of a checkpoint by processing the corresponding L2 @@ -359,14 +364,12 @@ fn handle_mature_l1_height( /// * A vector of [`ClientStateWrite`] representing the state changes to be written. /// * A vector of [`SyncAction`] representing the actions to be synchronized. fn handle_checkpoint_finalization( - state: &ClientState, + state: &mut ClientStateMut, blkid: &L2BlockId, params: &Params, context: &impl EventContext, -) -> (Vec, Vec) { - let mut writes = Vec::new(); - let mut actions = Vec::new(); - let verified_checkpoints: &[L1Checkpoint] = state.l1_view().verified_checkpoints(); +) -> Result<(), Error> { + let verified_checkpoints: &[L1Checkpoint] = state.state().l1_view().verified_checkpoints(); match find_l1_height_for_l2_blockid(verified_checkpoints, blkid) { Some(l1_height) => { let safe_depth = params.rollup().l1_reorg_safe_depth as u64; @@ -374,22 +377,22 @@ fn handle_checkpoint_finalization( // Maturable height is the height at which l1 blocks are sufficiently buried // and have negligible chance of reorg. let maturable_height = state + .state() .l1_view() .next_expected_block() .saturating_sub(safe_depth); // The l1 height should be handled only if it is less than maturable height if l1_height < maturable_height { - let (wrs, acts) = handle_mature_l1_height(l1_height, state, context); - writes.extend(wrs); - actions.extend(acts); + handle_mature_l1_height(state, l1_height, context)?; } } None => { debug!(%blkid, "L2 block not found in verified checkpoints, possibly not a last block in the checkpoint."); } } - (writes, actions) + + Ok(()) } /// Searches for a given [`L2BlockId`] within a slice of [`L1Checkpoint`] structs diff --git a/crates/consensus-logic/src/csm/state_tracker.rs b/crates/consensus-logic/src/csm/state_tracker.rs index ab19ebd58b..7f75be0055 100644 --- a/crates/consensus-logic/src/csm/state_tracker.rs +++ b/crates/consensus-logic/src/csm/state_tracker.rs @@ -8,10 +8,10 @@ use strata_common::bail_manager::{check_bail_trigger, BAIL_SYNC_EVENT}; use strata_db::traits::*; use strata_primitives::params::Params; use strata_state::{ - client_state::ClientState, + client_state::{ClientState, ClientStateMut}, operation::{self, ClientUpdateOutput}, }; -use strata_storage::NodeStorage; +use strata_storage::{ClientStateManager, NodeStorage}; use tracing::*; use super::client_transition; @@ -23,7 +23,6 @@ pub struct StateTracker { storage: Arc, cur_state_idx: u64, - cur_state: Arc, } @@ -54,6 +53,7 @@ impl StateTracker { /// Given the next event index, computes the state application if the /// requisite data is available. Returns the output and the new state. + // TODO maybe remove output return value pub fn advance_consensus_state( &mut self, ev_idx: u64, @@ -66,41 +66,41 @@ impl StateTracker { // Load the event from the database. let db = self.database.as_ref(); let sync_event_db = db.sync_event_db(); - let client_state_db = db.client_state_db(); let ev = sync_event_db .get_sync_event(ev_idx)? .ok_or(Error::MissingSyncEvent(ev_idx))?; - debug!(?ev, "Processing event"); + debug!(?ev, "processing sync event"); #[cfg(feature = "debug-utils")] check_bail_trigger(BAIL_SYNC_EVENT); // Compute the state transition. let context = client_transition::StorageEventContext::new(&self.storage); - let outp = client_transition::process_event(&self.cur_state, &ev, &context, &self.params)?; + let mut state_mut = ClientStateMut::new(self.cur_state.as_ref().clone()); + client_transition::process_event(&mut state_mut, &ev, &context, &self.params)?; // Clone the state and apply the operations to it. - let mut new_state = self.cur_state.as_ref().clone(); - operation::apply_writes_to_state(&mut new_state, outp.writes().iter().cloned()); + let outp = state_mut.into_update(); + + // Store the outputs. + let state = self + .storage + .client_state() + .put_update_blocking(ev_idx, outp.clone())?; // Update bookkeeping. - self.cur_state = Arc::new(new_state); + self.cur_state = state; self.cur_state_idx = ev_idx; debug!(%ev_idx, "computed new consensus state"); - // Store the outputs. - // TODO ideally avoid clone - client_state_db.write_client_update_output(ev_idx, outp.clone())?; - Ok((outp, self.cur_state.clone())) } - /// Writes the current state to the database as a new checkpoint. + /// Does nothing. + // TODO remove this function pub fn store_checkpoint(&self) -> anyhow::Result<()> { - let client_state_db = self.database.client_state_db(); - let state = self.cur_state.as_ref().clone(); // TODO avoid clone - client_state_db.write_client_state_checkpoint(self.cur_state_idx, state)?; + warn!("tried to store client state checkpoint, we don't have this anymore"); Ok(()) } } @@ -118,71 +118,28 @@ impl StateTracker { /// - `cs_db`: An implementation of the [`ClientStateDatabase`] trait, used for retrieving /// checkpoint and state data. /// - `idx`: The index from which to replay state writes, starting from the last checkpoint. -pub fn reconstruct_cur_state( - cs_db: &impl ClientStateDatabase, -) -> anyhow::Result<(u64, ClientState)> { - let last_ckpt_idx = cs_db.get_last_checkpoint_idx()?; +pub fn reconstruct_cur_state(csman: &ClientStateManager) -> anyhow::Result<(u64, ClientState)> { + let last_state_idx = csman.get_last_state_idx_blocking()?; - // genesis state. - if last_ckpt_idx == 0 { + // We used to do something here, but now we just print a log. + if last_state_idx == 0 { debug!("starting from init state"); - let state = cs_db - .get_state_checkpoint(0)? - .ok_or(Error::MissingCheckpoint(0))?; - return Ok((0, state)); } - // If we're not in genesis, then we probably have to replay some writes. - let last_write_idx = cs_db.get_last_write_idx()?; - - let state = reconstruct_state(cs_db, last_write_idx)?; - - Ok((last_write_idx, state)) + let state = csman + .get_state_blocking(last_state_idx)? + .ok_or(Error::MissingConsensusWrites(last_state_idx))?; + Ok((last_state_idx, state)) } -/// Reconstructs the -/// [`ClientStateWrite`](strata_state::operation::ClientStateWrite) -/// -/// Under the hood fetches the nearest checkpoint before the reuested idx -/// and then replays all the [`ClientStateWrite`](strata_state::operation::ClientStateWrite)s -/// from that checkpoint up to the requested index `idx` -/// such that we have accurate [`ClientState`]. -/// -/// # Parameters -/// -/// - `cs_db`: anything that implements the [`ClientStateDatabase`] trait. -/// - `idx`: index to look ahead from. -pub fn reconstruct_state( - cs_db: &impl ClientStateDatabase, - idx: u64, -) -> anyhow::Result { - match cs_db.get_state_checkpoint(idx)? { - Some(cl) => { - // if the checkpoint was created at the idx itself, return the checkpoint - debug!(%idx, "no writes to replay"); - Ok(cl) - } +/// Fetches the client state at some idx from the database. +// TODO remove this +pub fn reconstruct_state(csman: &ClientStateManager, idx: u64) -> anyhow::Result { + match csman.get_state_blocking(idx)? { + Some(cl) => Ok(cl), None => { - // get the previously written checkpoint - let prev_ckpt_idx = cs_db.get_prev_checkpoint_at(idx)?; - - // get the previous checkpoint Client State - let mut state = cs_db - .get_state_checkpoint(prev_ckpt_idx)? - .ok_or(Error::MissingCheckpoint(idx))?; - - // write the client state - let write_replay_start = prev_ckpt_idx + 1; - debug!(%prev_ckpt_idx, %idx, "reconstructing state from checkpoint"); - - for i in write_replay_start..=idx { - let writes = cs_db - .get_client_state_writes(i)? - .ok_or(Error::MissingConsensusWrites(i))?; - operation::apply_writes_to_state(&mut state, writes.into_iter()); - } - - Ok(state) + error!("we don't support state reconstruction anymore"); + return Err(Error::MissingConsensusWrites(idx).into()); } } } @@ -236,6 +193,7 @@ mod tests { let _ = client_state_db.write_client_state_checkpoint(idx, state); } } + // for the 13th, 14th, 15th state, we require fetching the 12th index ClientState and // applying the writes. for i in 13..17 { diff --git a/crates/consensus-logic/src/csm/worker.rs b/crates/consensus-logic/src/csm/worker.rs index 3b22315c40..bef0eea0b6 100644 --- a/crates/consensus-logic/src/csm/worker.rs +++ b/crates/consensus-logic/src/csm/worker.rs @@ -12,7 +12,7 @@ use strata_eectl::engine::ExecEngineCtl; use strata_primitives::prelude::*; use strata_state::{client_state::ClientState, csm_status::CsmStatus, operation::SyncAction}; use strata_status::StatusChannel; -use strata_storage::{CheckpointDbManager, NodeStorage}; +use strata_storage::{CheckpointDbManager, ClientStateManager, L2BlockManager, NodeStorage}; use strata_tasks::ShutdownGuard; use tokio::{ sync::{broadcast, mpsc}, @@ -43,7 +43,7 @@ pub struct WorkerState { // TODO should we move this out? database: Arc, - /// L2 block manager. + /// Node storage handle. storage: Arc, /// Checkpoint manager. @@ -66,8 +66,8 @@ impl WorkerState { cupdate_tx: broadcast::Sender>, checkpoint_manager: Arc, ) -> anyhow::Result { - let client_state_db = database.client_state_db().as_ref(); - let (cur_state_idx, cur_state) = state_tracker::reconstruct_cur_state(client_state_db)?; + let (cur_state_idx, cur_state) = + state_tracker::reconstruct_cur_state(storage.client_state())?; let state_tracker = state_tracker::StateTracker::new( params.clone(), database.clone(), @@ -241,17 +241,10 @@ fn handle_sync_event( // Make sure that the new state index is set as expected. assert_eq!(state.state_tracker.cur_state_idx(), ev_idx); - // Write the client state checkpoint periodically based on the event idx.. - if ev_idx % state.params.run.client_checkpoint_interval as u64 == 0 { - let client_state_db = state.database.client_state_db(); - client_state_db.write_client_state_checkpoint(ev_idx, new_state.as_ref().clone())?; - } - - // FIXME clean this up + // FIXME clean this up and make them take Arcs let mut status = CsmStatus::default(); status.set_last_sync_ev_idx(ev_idx); status.update_from_client_state(new_state.as_ref()); - status_channel.update_client_state(new_state.as_ref().clone()); trace!(?new_state, "sending client update notif"); diff --git a/crates/consensus-logic/src/genesis.rs b/crates/consensus-logic/src/genesis.rs index 46c8545292..be3d42eb7f 100644 --- a/crates/consensus-logic/src/genesis.rs +++ b/crates/consensus-logic/src/genesis.rs @@ -15,16 +15,17 @@ use strata_state::{ genesis::GenesisStateData, header::L2BlockHeader, l1::{L1HeaderRecord, L1ViewState}, + operation::ClientUpdateOutput, prelude::*, }; -use strata_storage::{L1BlockManager, L2BlockManager, NodeStorage}; +use strata_storage::{ClientStateManager, L1BlockManager, L2BlockManager, NodeStorage}; use tracing::*; use crate::errors::Error; /// Inserts into the database an initial basic client state that we can begin /// waiting for genesis with. -pub fn init_client_state(params: &Params, database: &impl Database) -> anyhow::Result<()> { +pub fn init_client_state(params: &Params, csman: &ClientStateManager) -> anyhow::Result<()> { debug!("initializing client state in database!"); let init_state = ClientState::from_genesis_params( @@ -33,8 +34,7 @@ pub fn init_client_state(params: &Params, database: &impl Database) -> anyhow::R ); // Write the state into the database. - let cs_db = database.client_state_db(); - cs_db.write_client_state_checkpoint(0, init_state)?; + csman.put_update_blocking(0, ClientUpdateOutput::new_state(init_state))?; Ok(()) } @@ -159,12 +159,10 @@ pub fn make_genesis_chainstate( } /// Check if the database needs to have client init done to it. -pub fn check_needs_client_init(database: &impl Database) -> anyhow::Result { - let cs_db = database.client_state_db(); - +pub fn check_needs_client_init(storage: &NodeStorage) -> anyhow::Result { // Check if we've written any genesis state checkpoint. These we perform a // bit more carefully and check errors more granularly. - match cs_db.get_last_checkpoint_idx() { + match storage.client_state().get_last_state_idx_blocking() { Ok(_) => {} Err(DbError::NotBootstrapped) => return Ok(true), @@ -175,6 +173,7 @@ pub fn check_needs_client_init(database: &impl Database) -> anyhow::Result Ok(false) } +/// Checks if we have a genesis block written to the L2 block database. pub fn check_needs_genesis(l2man: &L2BlockManager) -> anyhow::Result { // Check if there's any genesis block written. match l2man.get_blocks_at_height_blocking(0) { diff --git a/crates/consensus-logic/src/sync_manager.rs b/crates/consensus-logic/src/sync_manager.rs index 6a76d17fcb..eb302423d6 100644 --- a/crates/consensus-logic/src/sync_manager.rs +++ b/crates/consensus-logic/src/sync_manager.rs @@ -79,7 +79,7 @@ pub fn start_sync_tasks< >( executor: &TaskExecutor, database: Arc, - storage: Arc, + storage: &Arc, engine: Arc, pool: threadpool::ThreadPool, params: Arc, diff --git a/crates/db/src/traits.rs b/crates/db/src/traits.rs index 77d030be30..1159fa801a 100644 --- a/crates/db/src/traits.rs +++ b/crates/db/src/traits.rs @@ -12,7 +12,8 @@ use strata_primitives::{ }; use strata_state::{ block::L2BlockBundle, bridge_duties::BridgeDutyStatus, chain_state::Chainstate, - client_state::ClientState, l1::L1Tx, operation::*, state_op::WriteBatch, sync_event::SyncEvent, + client_state::ClientState, l1::L1Tx, operation::*, prelude::*, state_op::WriteBatch, + sync_event::SyncEvent, }; use zkaleido::ProofReceipt; @@ -123,33 +124,14 @@ pub trait ClientStateDatabase { /// [``SyncEventDatabase``]. Will error if `idx - 1` does not exist (unless /// `idx` is 0) or if trying to overwrite a state, as this is almost /// certainly a bug. - fn write_client_update_output(&self, idx: u64, output: ClientUpdateOutput) -> DbResult<()>; + fn put_client_update(&self, idx: u64, output: ClientUpdateOutput) -> DbResult<()>; - /// Writes a new consensus checkpoint that we can cheaply resume from. Will - /// error if trying to overwrite a state. - fn write_client_state_checkpoint(&self, idx: u64, state: ClientState) -> DbResult<()>; + /// Gets the output client state writes for some input index. + fn get_client_update(&self, idx: u64) -> DbResult>; /// Gets the idx of the last written state. Or returns error if a bootstrap /// state has not been written yet. - fn get_last_write_idx(&self) -> DbResult; - - /// Gets the output client state writes for some input index. - fn get_client_state_writes(&self, idx: u64) -> DbResult>>; - - /// Gets the actions output from a client state transition. - fn get_client_update_actions(&self, idx: u64) -> DbResult>>; - - /// Gets the last consensus checkpoint idx. - fn get_last_checkpoint_idx(&self) -> DbResult; - - /// Gets the idx of the last checkpoint up to the given input idx. This is - /// the idx we should resume at when playing out consensus writes since the - /// saved checkpoint, which may be the same as the given idx (if we didn't - /// receive any sync events since the last checkpoint. - fn get_prev_checkpoint_at(&self, idx: u64) -> DbResult; - - /// Gets a state checkpoint at a previously written index, if it exists. - fn get_state_checkpoint(&self, idx: u64) -> DbResult>; + fn get_last_state_idx(&self) -> DbResult; } /// L2 data store for CL blocks. Does not store anything about what we think diff --git a/crates/evmexec/src/fork_choice_state.rs b/crates/evmexec/src/fork_choice_state.rs index f1c98e3492..da796360f7 100644 --- a/crates/evmexec/src/fork_choice_state.rs +++ b/crates/evmexec/src/fork_choice_state.rs @@ -50,7 +50,7 @@ fn get_block_hash(l2_block: L2BlockBundle) -> Result { } fn get_last_checkpoint_state(db: &D) -> Result> { - let last_checkpoint_idx = db.client_state_db().get_last_checkpoint_idx(); + let last_checkpoint_idx = db.client_state_db().get_last_state_idx(); if let Err(DbError::NotBootstrapped) = last_checkpoint_idx { // before genesis block ready; use hardcoded genesis state @@ -58,7 +58,8 @@ fn get_last_checkpoint_state(db: &D) -> Result> } last_checkpoint_idx - .and_then(|ckpt_idx| db.client_state_db().get_state_checkpoint(ckpt_idx)) + .and_then(|ckpt_idx| db.client_state_db().get_client_update(ckpt_idx)) + .map(|res| res.map(|update| update.into_state())) .context("Failed to get last checkpoint state") } diff --git a/crates/rocksdb-store/src/client_state/db.rs b/crates/rocksdb-store/src/client_state/db.rs index 5781237325..21b9c7803d 100644 --- a/crates/rocksdb-store/src/client_state/db.rs +++ b/crates/rocksdb-store/src/client_state/db.rs @@ -4,7 +4,7 @@ use rockbound::{OptimisticTransactionDB, Schema, SchemaDBOperationsExt}; use strata_db::{errors::*, traits::*, DbResult}; use strata_state::operation::*; -use super::schemas::{ClientStateSchema, ClientUpdateOutputSchema}; +use super::schemas::ClientUpdateOutputSchema; use crate::DbOpsConfig; pub struct ClientStateDb { @@ -38,7 +38,7 @@ impl ClientStateDb { } impl ClientStateDatabase for ClientStateDb { - fn write_client_update_output(&self, idx: u64, output: ClientUpdateOutput) -> DbResult<()> { + fn put_client_update(&self, idx: u64, output: ClientUpdateOutput) -> DbResult<()> { let expected_idx = match self.get_last_idx::()? { Some(last_idx) => last_idx + 1, None => 1, @@ -50,75 +50,16 @@ impl ClientStateDatabase for ClientStateDb { Ok(()) } - fn write_client_state_checkpoint( - &self, - idx: u64, - state: strata_state::client_state::ClientState, - ) -> DbResult<()> { - // FIXME this should probably be a transaction - if self.db.get::(&idx)?.is_some() { - return Err(DbError::OverwriteConsensusCheckpoint(idx)); - } - self.db.put::(&idx, &state)?; - Ok(()) + fn get_client_update(&self, idx: u64) -> DbResult> { + Ok(self.db.get::(&idx)?) } - fn get_last_write_idx(&self) -> DbResult { + fn get_last_state_idx(&self) -> DbResult { match self.get_last_idx::()? { Some(idx) => Ok(idx), None => Err(DbError::NotBootstrapped), } } - - fn get_client_state_writes(&self, idx: u64) -> DbResult>> { - let output = self.db.get::(&idx)?; - match output { - Some(out) => Ok(Some(out.writes().to_owned())), - None => Ok(None), - } - } - - fn get_client_update_actions(&self, idx: u64) -> DbResult>> { - let output = self.db.get::(&idx)?; - match output { - Some(out) => Ok(Some(out.actions().to_owned())), - None => Ok(None), - } - } - - fn get_last_checkpoint_idx(&self) -> DbResult { - match self.get_last_idx::()? { - Some(idx) => Ok(idx), - None => Err(DbError::NotBootstrapped), - } - } - - fn get_prev_checkpoint_at(&self, idx: u64) -> DbResult { - let mut iterator = self.db.iter::()?; - iterator.seek_to_last(); - let rev_iterator = iterator.rev(); - - for res in rev_iterator { - match res { - Ok(item) => { - let (tip, _) = item.into_tuple(); - if tip <= idx { - return Ok(tip); - } - } - Err(e) => return Err(DbError::Other(e.to_string())), - } - } - - Err(DbError::NotBootstrapped) - } - - fn get_state_checkpoint( - &self, - idx: u64, - ) -> DbResult> { - Ok(self.db.get::(&idx)?) - } } #[cfg(test)] diff --git a/crates/rocksdb-store/src/client_state/schemas.rs b/crates/rocksdb-store/src/client_state/schemas.rs index fb060bc226..2331518f24 100644 --- a/crates/rocksdb-store/src/client_state/schemas.rs +++ b/crates/rocksdb-store/src/client_state/schemas.rs @@ -1,4 +1,4 @@ -use strata_state::{client_state::ClientState, operation::ClientUpdateOutput}; +use strata_state::operation::ClientUpdateOutput; use crate::{define_table_with_seek_key_codec, define_table_without_codec, impl_borsh_value_codec}; @@ -7,9 +7,3 @@ define_table_with_seek_key_codec!( /// Table to store client state updates. (ClientUpdateOutputSchema) u64 => ClientUpdateOutput ); - -// Consensus State Schema and corresponding codecs implementation -define_table_with_seek_key_codec!( - /// Table to store client states. - (ClientStateSchema) u64 => ClientState -); diff --git a/crates/rocksdb-store/src/lib.rs b/crates/rocksdb-store/src/lib.rs index c03b0da684..85919a3d98 100644 --- a/crates/rocksdb-store/src/lib.rs +++ b/crates/rocksdb-store/src/lib.rs @@ -20,43 +20,6 @@ use strata_db::database::CommonDatabase; #[cfg(feature = "test_utils")] pub mod test_utils; -pub const ROCKSDB_NAME: &str = "strata-client"; - -pub const STORE_COLUMN_FAMILIES: &[ColumnFamilyName] = &[ - SequenceSchema::COLUMN_FAMILY_NAME, - ClientUpdateOutputSchema::COLUMN_FAMILY_NAME, - ClientStateSchema::COLUMN_FAMILY_NAME, - L1BlockSchema::COLUMN_FAMILY_NAME, - MmrSchema::COLUMN_FAMILY_NAME, - SyncEventSchema::COLUMN_FAMILY_NAME, - TxnSchema::COLUMN_FAMILY_NAME, - L2BlockSchema::COLUMN_FAMILY_NAME, - L2BlockStatusSchema::COLUMN_FAMILY_NAME, - L2BlockHeightSchema::COLUMN_FAMILY_NAME, - WriteBatchSchema::COLUMN_FAMILY_NAME, - // Payload/intent schemas - PayloadSchema::COLUMN_FAMILY_NAME, - IntentSchema::COLUMN_FAMILY_NAME, - IntentIdxSchema::COLUMN_FAMILY_NAME, - // Bcast schemas - BcastL1TxIdSchema::COLUMN_FAMILY_NAME, - BcastL1TxSchema::COLUMN_FAMILY_NAME, - // Bridge relay schemas - BridgeMsgIdSchema::COLUMN_FAMILY_NAME, - ScopeMsgIdSchema::COLUMN_FAMILY_NAME, - // Bridge signature schemas - BridgeTxStateTxidSchema::COLUMN_FAMILY_NAME, - BridgeTxStateSchema::COLUMN_FAMILY_NAME, - // Bridge duty schemas - BridgeDutyTxidSchema::COLUMN_FAMILY_NAME, - BridgeDutyStatusSchema::COLUMN_FAMILY_NAME, - // Bridge duty checkpoint - BridgeDutyCheckpointSchema::COLUMN_FAMILY_NAME, - // Checkpoint schemas - BatchCheckpointSchema::COLUMN_FAMILY_NAME, - // TODO add col families for other store types -]; - use std::{fs, path::Path, sync::Arc}; use bridge::schemas::{ @@ -93,12 +56,56 @@ use writer::schemas::{IntentIdxSchema, IntentSchema, PayloadSchema}; use crate::{ chain_state::schemas::WriteBatchSchema, - client_state::schemas::{ClientStateSchema, ClientUpdateOutputSchema}, + client_state::schemas::ClientUpdateOutputSchema, l1::schemas::{L1BlockSchema, MmrSchema, TxnSchema}, sequence::SequenceSchema, sync_event::schemas::SyncEventSchema, }; +pub const ROCKSDB_NAME: &str = "strata-client"; + +#[rustfmt::skip] +pub const STORE_COLUMN_FAMILIES: &[ColumnFamilyName] = &[ + // Core + SequenceSchema::COLUMN_FAMILY_NAME, + ClientUpdateOutputSchema::COLUMN_FAMILY_NAME, + L1BlockSchema::COLUMN_FAMILY_NAME, + MmrSchema::COLUMN_FAMILY_NAME, + SyncEventSchema::COLUMN_FAMILY_NAME, + TxnSchema::COLUMN_FAMILY_NAME, + L2BlockSchema::COLUMN_FAMILY_NAME, + L2BlockStatusSchema::COLUMN_FAMILY_NAME, + L2BlockHeightSchema::COLUMN_FAMILY_NAME, + WriteBatchSchema::COLUMN_FAMILY_NAME, + + // Payload/intent schemas + PayloadSchema::COLUMN_FAMILY_NAME, + IntentSchema::COLUMN_FAMILY_NAME, + IntentIdxSchema::COLUMN_FAMILY_NAME, + + // Bcast schemas + BcastL1TxIdSchema::COLUMN_FAMILY_NAME, + BcastL1TxSchema::COLUMN_FAMILY_NAME, + + // Bridge relay schemas + BridgeMsgIdSchema::COLUMN_FAMILY_NAME, + ScopeMsgIdSchema::COLUMN_FAMILY_NAME, + + // Bridge signature schemas + BridgeTxStateTxidSchema::COLUMN_FAMILY_NAME, + BridgeTxStateSchema::COLUMN_FAMILY_NAME, + + // Bridge duty schemas + BridgeDutyTxidSchema::COLUMN_FAMILY_NAME, + BridgeDutyStatusSchema::COLUMN_FAMILY_NAME, + + // Bridge duty checkpoint + BridgeDutyCheckpointSchema::COLUMN_FAMILY_NAME, + + // Checkpoint schemas + BatchCheckpointSchema::COLUMN_FAMILY_NAME, +]; + /// database operations configuration #[derive(Clone, Copy, Debug)] pub struct DbOpsConfig { diff --git a/crates/state/src/client_state.rs b/crates/state/src/client_state.rs index b6d4a30a1a..1aa81bb4cf 100644 --- a/crates/state/src/client_state.rs +++ b/crates/state/src/client_state.rs @@ -7,11 +7,13 @@ use arbitrary::Arbitrary; use borsh::{BorshDeserialize, BorshSerialize}; use serde::{Deserialize, Serialize}; use strata_primitives::buf::Buf32; +use tracing::*; use crate::{ batch::{BatchInfo, BootstrapState}, id::L2BlockId, l1::{HeaderVerificationState, L1BlockId}, + operation::{ClientUpdateOutput, SyncAction}, }; /// High level client's state of the network. This is local to the client, not @@ -327,3 +329,171 @@ impl L1Checkpoint { } } } + +/// Wrapper around [`ClientState`] used for modifying it and producing sync +/// actions. +pub struct ClientStateMut { + state: ClientState, + actions: Vec, +} + +impl ClientStateMut { + pub fn new(state: ClientState) -> Self { + Self { + state, + actions: Vec::new(), + } + } + + pub fn state(&self) -> &ClientState { + &self.state + } + + pub fn into_update(self) -> ClientUpdateOutput { + ClientUpdateOutput::new(self.state, self.actions) + } + + pub fn push_action(&mut self, a: SyncAction) { + self.actions.push(a); + } + + pub fn push_actions(&mut self, a: impl Iterator) { + self.actions.extend(a); + } + + // Semantical mutation fns. + // TODO remove logs from this, break down into simpler logical units + + // TODO remove sync state + pub fn set_sync_state(&mut self, ss: SyncState) { + self.state.set_sync_state(ss); + } + + pub fn activate_chain(&mut self) { + self.state.chain_active = true; + } + + pub fn update_verification_state(&mut self, l1_vs: HeaderVerificationState) { + debug!(?l1_vs, "received HeaderVerificationState"); + + if self.state.genesis_verification_hash().is_none() { + info!(?l1_vs, "Setting genesis L1 verification state"); + self.state.genesis_l1_verification_state_hash = Some(l1_vs.compute_hash().unwrap()); + } + + self.state.l1_view_mut().header_verification_state = Some(l1_vs); + } + + pub fn rollback_l1_blocks(&mut self, height: u64) { + let l1v = self.state.l1_view_mut(); + let buried_height = l1v.buried_l1_height(); + + if height < buried_height { + error!(%height, %buried_height, "unable to roll back past buried height"); + panic!("operation: emitted invalid write"); + } + + let new_unacc_len = (height - buried_height) as usize; + let l1_vs = l1v.tip_verification_state(); + if let Some(l1_vs) = l1_vs { + // TODO: handle other things + let mut rollbacked_l1_vs = l1_vs.clone(); + rollbacked_l1_vs.last_verified_block_num = height as u32; + rollbacked_l1_vs.last_verified_block_hash = l1v.local_unaccepted_blocks[new_unacc_len]; + } + l1v.local_unaccepted_blocks.truncate(new_unacc_len); + + // Keep pending checkpoints whose l1 height is less than or equal to rollback height + l1v.verified_checkpoints + .retain(|ckpt| ckpt.height <= height); + } + + // TODO convert to L1BlockCommitment? + pub fn accept_l1_block(&mut self, l1blkid: L1BlockId) { + debug!(?l1blkid, "received AcceptL1Block"); + // TODO make this also do something + let l1v = self.state.l1_view_mut(); + l1v.local_unaccepted_blocks.push(l1blkid); + l1v.next_expected_block += 1; + } + + // TODO convert to L2BlockCommitment? + pub fn accept_l2_block(&mut self, blkid: L2BlockId, height: u64) { + // TODO do any other bookkeeping + debug!(%height, %blkid, "received AcceptL2Block"); + let ss = self.state.expect_sync_mut(); + ss.tip_blkid = blkid; + ss.tip_height = height; + } + + pub fn update_buried(&mut self, new_idx: u64) { + let l1v = self.state.l1_view_mut(); + + // Check that it's increasing. + let old_idx = l1v.buried_l1_height(); + + if new_idx < old_idx { + panic!("operation: emitted non-greater buried height"); + } + + // Check that it's not higher than what we know about. + if new_idx > l1v.tip_height() { + panic!("operation: new buried height above known L1 tip"); + } + + // If everything checks out we can just remove them. + let diff = (new_idx - old_idx) as usize; + let _blocks = l1v + .local_unaccepted_blocks + .drain(..diff) + .collect::>(); + + // TODO merge these blocks into the L1 MMR in the client state if + // we haven't already + } + + pub fn accept_checkpoints(&mut self, ckpts: &[L1Checkpoint]) { + // Extend the pending checkpoints + self.state + .l1_view_mut() + .verified_checkpoints + .extend(ckpts.iter().cloned()); + } + + pub fn finalize_checkpoint(&mut self, l1height: u64) { + let l1v = self.state.l1_view_mut(); + + let finalized_checkpts: Vec<_> = l1v + .verified_checkpoints + .iter() + .take_while(|ckpt| ckpt.height <= l1height) + .collect(); + + let new_finalized = finalized_checkpts.last().cloned().cloned(); + let total_finalized = finalized_checkpts.len(); + debug!(?new_finalized, ?total_finalized, "Finalized checkpoints"); + + // Remove the finalized from pending and then mark the last one as last_finalized + // checkpoint + l1v.verified_checkpoints.drain(..total_finalized); + + if let Some(ckpt) = new_finalized { + // Check if heights match accordingly + if !l1v + .last_finalized_checkpoint + .as_ref() + .is_none_or(|prev_ckpt| ckpt.batch_info.idx() == prev_ckpt.batch_info.idx() + 1) + { + panic!("operation: mismatched indices of pending checkpoint"); + } + + let fin_blockid = *ckpt.batch_info.l2_blockid(); + l1v.last_finalized_checkpoint = Some(ckpt); + + // Update finalized blockid in StateSync + self.state.expect_sync_mut().finalized_blkid = fin_blockid; + } + } + + // TODO add operation stuff +} diff --git a/crates/state/src/operation.rs b/crates/state/src/operation.rs index 3394487029..dab8c84657 100644 --- a/crates/state/src/operation.rs +++ b/crates/state/src/operation.rs @@ -19,25 +19,33 @@ use crate::{ Clone, Debug, Eq, PartialEq, Arbitrary, BorshDeserialize, BorshSerialize, Deserialize, Serialize, )] pub struct ClientUpdateOutput { - writes: Vec, + state: ClientState, actions: Vec, } impl ClientUpdateOutput { - pub fn new(writes: Vec, actions: Vec) -> Self { - Self { writes, actions } + pub fn new(state: ClientState, actions: Vec) -> Self { + Self { state, actions } } - pub fn writes(&self) -> &[ClientStateWrite] { - &self.writes + pub fn new_state(state: ClientState) -> Self { + Self::new(state, Vec::new()) + } + + pub fn state(&self) -> &ClientState { + &self.state } pub fn actions(&self) -> &[SyncAction] { &self.actions } - pub fn into_parts(self) -> (Vec, Vec) { - (self.writes, self.actions) + pub fn into_state(self) -> ClientState { + self.state + } + + pub fn into_parts(self) -> (ClientState, Vec) { + (self.state, self.actions) } } diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 675d370fb5..1de1551caa 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -10,14 +10,16 @@ pub use managers::{ client_state::ClientStateManager, l1::L1BlockManager, l2::L2BlockManager, }; pub use ops::l1tx_broadcast::BroadcastDbOps; -use strata_db::traits::Database; +use strata_db::{traits::Database, DbResult}; /// A consolidation of database managers. +// TODO move this to its own module #[derive(Clone)] pub struct NodeStorage { l1_block_manager: Arc, l2_block_manager: Arc, chainstate_manager: Arc, + client_state_manager: Arc, // TODO maybe move this into a different one? checkpoint_manager: Arc, @@ -36,24 +38,36 @@ impl NodeStorage { &self.chainstate_manager } + pub fn client_state(&self) -> &Arc { + &self.client_state_manager + } + pub fn checkpoint(&self) -> &Arc { &self.checkpoint_manager } } -pub fn create_node_storage(db: Arc, pool: threadpool::ThreadPool) -> NodeStorage +/// Given a raw database, creates storage managers and returns a [`NodeStorage`] +/// instance around the underlying raw database. +pub fn create_node_storage(db: Arc, pool: threadpool::ThreadPool) -> DbResult where D: Database + Sync + Send + 'static, { let l1_block_manager = Arc::new(L1BlockManager::new(pool.clone(), db.clone())); let l2_block_manager = Arc::new(L2BlockManager::new(pool.clone(), db.clone())); + let client_state_manager = Arc::new(ClientStateManager::new(pool.clone(), db.clone())?); let chainstate_manager = Arc::new(ChainstateManager::new(pool.clone(), db.clone())); + + // (see above) let checkpoint_manager = Arc::new(CheckpointDbManager::new(pool.clone(), db.clone())); - NodeStorage { + Ok(NodeStorage { l1_block_manager, l2_block_manager, + client_state_manager, chainstate_manager, + + // (see above) checkpoint_manager, - } + }) } diff --git a/crates/storage/src/managers/client_state.rs b/crates/storage/src/managers/client_state.rs index 2e1cee2410..74275115c2 100644 --- a/crates/storage/src/managers/client_state.rs +++ b/crates/storage/src/managers/client_state.rs @@ -3,21 +3,146 @@ use std::sync::Arc; -use strata_db::traits::Database; -use strata_state::operation::ClientUpdateOutput; +use strata_db::{traits::Database, DbError, DbResult}; +use strata_state::{client_state::ClientState, operation::ClientUpdateOutput}; use threadpool::ThreadPool; +use tokio::sync::Mutex; use crate::{cache, ops}; pub struct ClientStateManager { ops: ops::client_state::ClientStateOps, - state_cache: cache::CacheTable>, + + // TODO actually use caches + update_cache: cache::CacheTable>, + state_cache: cache::CacheTable>, + + cur_state: Mutex, } impl ClientStateManager { - pub fn new(pool: ThreadPool, db: Arc) -> Self { + pub fn new( + pool: ThreadPool, + db: Arc, + ) -> DbResult { let ops = ops::client_state::Context::new(db.client_state_db().clone()).into_ops(pool); + let update_cache = cache::CacheTable::new(64.try_into().unwrap()); let state_cache = cache::CacheTable::new(64.try_into().unwrap()); - Self { ops, state_cache } + + // Figure out the current state so we can access it. + let mut cur_state = CurStateTracker::new_empty(); + let last_idx = ops.get_last_state_idx_blocking()?; + let last_state = ops + .get_client_update_blocking(last_idx)? + .ok_or(DbError::UnknownIdx(last_idx))? + .into_state(); + cur_state.set(last_idx, Arc::new(last_state)); + + Ok(Self { + ops, + update_cache, + state_cache, + cur_state: Mutex::new(cur_state), + }) + } + + pub fn get_last_state_idx_blocking(&self) -> DbResult { + self.ops.get_last_state_idx_blocking() + } + + // TODO convert to managing these with Arcs + pub async fn get_state_async(&self, idx: u64) -> DbResult> { + self.ops + .get_client_update_async(idx) + .await + .map(|res| res.map(|update| update.into_state())) + } + + pub fn get_state_blocking(&self, idx: u64) -> DbResult> { + self.ops + .get_client_update_blocking(idx) + .map(|res| res.map(|update| update.into_state())) + } + + pub async fn get_update_async(&self, idx: u64) -> DbResult> { + self.ops.get_client_update_async(idx).await + } + + pub fn put_update_blocking( + &self, + idx: u64, + update: ClientUpdateOutput, + ) -> DbResult> { + // FIXME this is a lot of cloning, good thing the type isn't gigantic, + // still feels bad though + let state = Arc::new(update.state().clone()); + self.ops.put_client_update_blocking(idx, update.clone())?; + self.maybe_update_cur_state_blocking(idx, &state); + self.update_cache.insert(idx, Some(update)); + self.state_cache.insert(idx, state.clone()); + Ok(state) + } + + // TODO rollback and whatnot + + // Internal functions. + + fn maybe_update_cur_state_blocking(&self, idx: u64, state: &Arc) -> bool { + let mut cur = self.cur_state.blocking_lock(); + cur.maybe_update(idx, state) + } + + // Convenience functions. + + /// Gets the higest known state and its idx. + pub async fn get_most_recent_state(&self) -> Option<(u64, Arc)> { + let cur = self.cur_state.lock().await; + cur.get_clone().map(|state| (cur.get_idx(), state)) + } + + /// Gets the highest known state and its idx. + pub fn get_most_recent_state_blocking(&self) -> Option<(u64, Arc)> { + let cur = self.cur_state.blocking_lock(); + cur.get_clone().map(|state| (cur.get_idx(), state)) + } +} + +/// Internally tracks the current state so we can fetch it as needed. +struct CurStateTracker { + last_idx: Option, + state: Option>, +} + +impl CurStateTracker { + pub fn new_empty() -> Self { + Self { + last_idx: None, + state: None, + } + } + + pub fn get_idx(&self) -> u64 { + self.last_idx.unwrap_or_default() + } + + pub fn get_clone(&self) -> Option> { + self.state.clone() + } + + pub fn set(&mut self, idx: u64, state: Arc) { + self.last_idx = Some(idx); + self.state = Some(state); + } + + pub fn is_idx_better(&self, idx: u64) -> bool { + self.last_idx.is_none_or(|v| idx >= v) + } + + pub fn maybe_update(&mut self, idx: u64, state: &Arc) -> bool { + let should = self.is_idx_better(idx); + if should { + self.set(idx, state.clone()); + } + should } } diff --git a/crates/storage/src/ops/client_state.rs b/crates/storage/src/ops/client_state.rs index e0287fc7c2..be4b137970 100644 --- a/crates/storage/src/ops/client_state.rs +++ b/crates/storage/src/ops/client_state.rs @@ -3,22 +3,14 @@ use std::sync::Arc; use strata_db::traits::*; -use strata_state::{ - client_state::ClientState, - operation::{ClientStateWrite, ClientUpdateOutput, SyncAction}, -}; +use strata_state::operation::ClientUpdateOutput; use crate::exec::*; inst_ops_simple! { ( => ClientStateOps) { - write_client_update_output(idx: u64, output: ClientUpdateOutput) => (); - write_client_state_checkpoint(idx: u64, state: ClientState) => (); - get_last_write_idx() => u64; - get_client_state_writes(idx: u64) => Option>; - get_client_update_actions(idx: u64) => Option>; - get_last_checkpoint_idx() => u64; - get_prev_checkpoint_at(idx: u64) => u64; - get_state_checkpoint(idx: u64) => Option; + put_client_update(idx: u64, output: ClientUpdateOutput) => (); + get_client_update(idx: u64) => Option; + get_last_state_idx() => u64; } } diff --git a/crates/storage/src/ops/mod.rs b/crates/storage/src/ops/mod.rs index 1ce8c4d02d..4faac250a3 100644 --- a/crates/storage/src/ops/mod.rs +++ b/crates/storage/src/ops/mod.rs @@ -5,7 +5,6 @@ pub mod bridge_relay; pub mod chainstate; pub mod checkpoint; pub mod client_state; -pub mod envelope; pub mod l1; pub mod l1tx_broadcast; pub mod l2;