Skip to content

Commit

Permalink
misc: refactored client state DB interface, converted some things ove…
Browse files Browse the repository at this point in the history
…r to use client state manager where feasible
  • Loading branch information
delbonis committed Jan 31, 2025
1 parent 7120ece commit bca88d8
Show file tree
Hide file tree
Showing 20 changed files with 587 additions and 411 deletions.
3 changes: 3 additions & 0 deletions bin/strata-client/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ use thiserror::Error;

#[derive(Debug, Error)]
pub enum InitError {
#[error("missing init client state")]
MissingInitClientState,

#[error("io: {0}")]
Io(#[from] io::Error),

Expand Down
20 changes: 12 additions & 8 deletions bin/strata-client/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use strata_primitives::{
use strata_rocksdb::CommonDb;
use strata_state::csm_status::CsmStatus;
use strata_status::StatusChannel;
use strata_storage::L2BlockManager;
use strata_storage::{L2BlockManager, NodeStorage};
use tokio::runtime::Handle;
use tracing::*;

Expand Down Expand Up @@ -115,13 +115,12 @@ pub fn create_bitcoin_rpc_client(config: &Config) -> anyhow::Result<Arc<BitcoinC
}

// initializes the status bundle that we can pass around cheaply for status/metrics
pub fn init_status_channel<D>(database: &D) -> anyhow::Result<StatusChannel>
where
D: Database + Send + Sync + 'static,
{
pub fn init_status_channel(storage: &NodeStorage) -> anyhow::Result<StatusChannel> {
// init client state
let cs_db = database.client_state_db().as_ref();
let (cur_state_idx, cur_state) = state_tracker::reconstruct_cur_state(cs_db)?;
let csman = storage.client_state();
let (cur_state_idx, cur_state) = csman
.get_most_recent_state_blocking()
.ok_or(InitError::MissingInitClientState)?;

// init the CsmStatus
let mut status = CsmStatus::default();
Expand All @@ -132,7 +131,12 @@ where
..Default::default()
};

Ok(StatusChannel::new(cur_state, l1_status, None))
// TODO avoid clone, change status channel to use arc
Ok(StatusChannel::new(
cur_state.as_ref().clone(),
l1_status,
None,
))
}

pub fn init_engine_controller(
Expand Down
10 changes: 5 additions & 5 deletions bin/strata-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ fn main_inner(args: Args) -> anyhow::Result<()> {

// Initialize core databases
let database = init_core_dbs(rbdb.clone(), ops_config);
let storage = Arc::new(create_node_storage(database.clone(), pool.clone()));
let storage = Arc::new(create_node_storage(database.clone(), pool.clone())?);

// Set up bridge messaging stuff.
// TODO move all of this into relayer task init
Expand All @@ -110,9 +110,9 @@ fn main_inner(args: Args) -> anyhow::Result<()> {
let bitcoin_client = create_bitcoin_rpc_client(&config)?;

// Check if we have to do genesis.
if genesis::check_needs_client_init(database.as_ref())? {
if genesis::check_needs_client_init(storage.as_ref())? {
info!("need to init client state!");
genesis::init_client_state(&params, database.as_ref())?;
genesis::init_client_state(&params, storage.client_state())?;
}

info!("init finished, starting main tasks");
Expand Down Expand Up @@ -309,7 +309,7 @@ fn start_core_tasks(
bitcoin_client: Arc<BitcoinClient>,
) -> anyhow::Result<CoreContext> {
// init status tasks
let status_channel = init_status_channel(database.as_ref())?;
let status_channel = init_status_channel(storage.as_ref())?;

let engine = init_engine_controller(
config,
Expand All @@ -331,7 +331,7 @@ fn start_core_tasks(
let sync_manager: Arc<_> = sync_manager::start_sync_tasks(
executor,
database.clone(),
storage.clone(),
&storage,
engine.clone(),
pool.clone(),
params.clone(),
Expand Down
49 changes: 16 additions & 33 deletions bin/strata-client/src/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ use zkaleido::ProofReceipt;

use crate::extractor::{extract_deposit_requests, extract_withdrawal_infos};

#[deprecated]
fn fetch_l2blk<D: Database + Sync + Send + 'static>(
l2_db: &Arc<<D as Database>::L2DB>,
blkid: L2BlockId,
Expand Down Expand Up @@ -648,23 +649,12 @@ impl<D: Database + Send + Sync + 'static> StrataApiServer for StrataRpcImpl<D> {

// FIXME: possibly create a separate rpc type corresponding to ClientUpdateOutput
async fn get_client_update_output(&self, idx: u64) -> RpcResult<Option<ClientUpdateOutput>> {
let db = self.database.clone();

let res = wait_blocking("fetch_client_update_output", move || {
let client_state_db = db.client_state_db();

let writes = client_state_db.get_client_state_writes(idx)?;
let actions = client_state_db.get_client_update_actions(idx)?;

match (writes, actions) {
(Some(w), Some(a)) => Ok(Some(ClientUpdateOutput::new(w, a))),
// normally this is just that they're both missing
_ => Ok(None),
}
})
.await?;

Ok(res)
Ok(self
.storage
.client_state()
.get_update_async(idx)
.map_err(Error::Db)
.await?)
}
}

Expand Down Expand Up @@ -901,12 +891,12 @@ impl StrataSequencerApiServer for SequencerServerImpl {

pub struct StrataDebugRpcImpl<D> {
storage: Arc<NodeStorage>,
database: Arc<D>,
_database: Arc<D>,
}

impl<D: Database + Sync + Send + 'static> StrataDebugRpcImpl<D> {
pub fn new(storage: Arc<NodeStorage>, database: Arc<D>) -> Self {
Self { storage, database }
pub fn new(storage: Arc<NodeStorage>, _database: Arc<D>) -> Self {
Self { storage, _database }
}
}

Expand Down Expand Up @@ -941,19 +931,12 @@ impl<D: Database + Sync + Send + 'static> StrataDebugApiServer for StrataDebugRp
}

async fn get_clientstate_at_idx(&self, idx: u64) -> RpcResult<Option<ClientState>> {
let database = self.database.clone();
let cs = wait_blocking("clientstate_at_idx", move || {
let client_state_db = database.client_state_db();
match reconstruct_state(client_state_db.as_ref(), idx) {
Ok(client_state) => Ok(Some(client_state)),
Err(e) => {
error!(%idx, %e, "failed to reconstruct client state");
Err(Error::Other(e.to_string()))
}
}
})
.await?;
Ok(cs)
Ok(self
.storage
.client_state()
.get_state_async(idx)
.map_err(Error::Db)
.await?)
}

async fn set_bail_context(&self, _ctx: String) -> RpcResult<()> {
Expand Down
Loading

0 comments on commit bca88d8

Please sign in to comment.