diff --git a/.github/workflows/functional.yml b/.github/workflows/functional.yml index 71d427be2..44b87a1f6 100644 --- a/.github/workflows/functional.yml +++ b/.github/workflows/functional.yml @@ -118,20 +118,54 @@ jobs: - name: Run functional tests (1) id: funcTestsRun1 continue-on-error: true + env: + NO_COLOR: "1" run: | NEWPATH="$(realpath target/debug/)" export PATH="${NEWPATH}:${PATH}" which strata-client cd functional-tests && poetry run python entry.py - # Run again just to be sure as some tests are flaky + # Run again just to be sure as some tests are flaky, with more logging - name: Run functional tests (2) + id: funcTestsRun2 if: steps.funcTestsRun1.outcome == 'failure' + continue-on-error: true + env: + RUST_LOG: "trace,hyper=warn,soketto=warn,jsonrpsee-server=warn,mio=warn" + NO_COLOR: "1" + RUST_BACKTRACE: "1" + LOG_LEVEL: "debug" run: | NEWPATH="$(realpath target/debug/)" export PATH="${NEWPATH}:${PATH}" which strata-client cd functional-tests && poetry run python entry.py + - name: Zip log files on failure + if: steps.funcTestsRun2.outcome == 'failure' + # Create a zip archive (logs.zip) that includes only service.log files, + # preserving the folder structure starting from functional-tests/_dd + # NOTE: Entire _dd ends up hundreds of MBs, so keep only logs (still huge...). + run: | + # service logs + find functional-tests/_dd -type f -name "service.log" | zip -@ logs.zip + # functional test logs + zip -r logs.zip functional-tests/_dd -i '*/logs/*.log' + - name: Upload logs as build artifact on failure + if: steps.funcTestsRun2.outcome == 'failure' + uses: actions/upload-artifact@v4 + with: + name: fntest_dd + path: logs.zip + retention-days: 30 + if-no-files-found: error + compression-level: 0 # already compressed + - name: Fail job if functional tests fail + if: steps.funcTestsRun2.outcome == 'failure' + run: | + echo "Functional tests failed" + exit 1 + functional-tests-success: name: Check that all checks pass diff --git a/bin/strata-client/src/el_sync.rs b/bin/strata-client/src/el_sync.rs new file mode 100644 index 000000000..6398bfdb1 --- /dev/null +++ b/bin/strata-client/src/el_sync.rs @@ -0,0 +1,118 @@ +use strata_db::DbError; +use strata_eectl::{engine::ExecEngineCtl, errors::EngineError, messages::ExecPayloadData}; +use strata_state::id::L2BlockId; +use strata_storage::NodeStorage; +use thiserror::Error; +use tracing::{debug, info}; + +#[derive(Debug, Error)] +pub enum Error { + #[error("missing chainstate for slot {0}")] + MissingChainstate(u64), + #[error("missing l2block {0}")] + MissingL2Block(L2BlockId), + #[error("db: {0}")] + Db(#[from] DbError), + #[error("engine: {0}")] + Engine(#[from] EngineError), +} + +/// Sync missing blocks in EL using payloads stored in L2 block database. +/// +/// TODO: retry on network errors +pub fn sync_chainstate_to_el( + storage: &NodeStorage, + engine: &impl ExecEngineCtl, +) -> Result<(), Error> { + let chainstate_manager = storage.chainstate(); + let l2_block_manager = storage.l2(); + let earliest_idx = chainstate_manager.get_earliest_write_idx_blocking()?; + let latest_idx = chainstate_manager.get_last_write_idx_blocking()?; + + info!(%earliest_idx, %latest_idx, "search for last known idx"); + + // last idx of chainstate whose corresponding block is present in el + let sync_from_idx = find_last_match((earliest_idx, latest_idx), |idx| { + let Some(chain_state) = chainstate_manager.get_toplevel_chainstate_blocking(idx)? else { + return Err(Error::MissingChainstate(idx)); + }; + + let block_id = chain_state.chain_tip_blkid(); + + Ok(engine.check_block_exists(*block_id)?) + })? + .map(|idx| idx + 1) // sync from next index + .unwrap_or(0); // sync from genesis + + info!(%sync_from_idx, "last known index in EL"); + + for idx in sync_from_idx..=latest_idx { + debug!(?idx, "Syncing chainstate"); + let Some(chain_state) = chainstate_manager.get_toplevel_chainstate_blocking(idx)? else { + return Err(Error::MissingChainstate(idx)); + }; + + let block_id = chain_state.chain_tip_blkid(); + + let Some(l2block) = l2_block_manager.get_block_data_blocking(block_id)? else { + return Err(Error::MissingL2Block(*block_id)); + }; + + let payload = ExecPayloadData::from_l2_block_bundle(&l2block); + + engine.submit_payload(payload)?; + engine.update_head_block(*block_id)?; + } + + Ok(()) +} + +fn find_last_match( + range: (u64, u64), + predicate: impl Fn(u64) -> Result, +) -> Result, Error> { + let (mut left, mut right) = range; + + // Check the leftmost value first + if !predicate(left)? { + return Ok(None); // If the leftmost value is false, no values can be true + } + + let mut best_match = None; + + // Proceed with binary search + while left <= right { + let mid = left + (right - left) / 2; + + if predicate(mid)? { + best_match = Some(mid); // Update best match + left = mid + 1; // Continue searching in the right half + } else { + right = mid - 1; // Search in the left half + } + } + + Ok(best_match) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_find_last_match() { + // find match + assert!(matches!( + find_last_match((0, 5), |idx| Ok(idx < 3)), + Ok(Some(2)) + )); + // found no match + assert!(matches!(find_last_match((0, 5), |_| Ok(false)), Ok(None))); + // got error + let error_message = "intentional error for test"; + assert!(matches!( + find_last_match((0, 5), |_| Err(EngineError::Other(error_message.into()))?), + Err(err) if err.to_string().contains(error_message) + )); + } +} diff --git a/bin/strata-client/src/main.rs b/bin/strata-client/src/main.rs index 1086aacf2..1ca3a9c34 100644 --- a/bin/strata-client/src/main.rs +++ b/bin/strata-client/src/main.rs @@ -1,6 +1,7 @@ use std::{sync::Arc, time::Duration}; use bitcoin::{hashes::Hash, BlockHash}; +use el_sync::sync_chainstate_to_el; use jsonrpsee::Methods; use parking_lot::lock_api::RwLock; use rpc_client::sync_client; @@ -45,6 +46,7 @@ use tracing::*; use crate::{args::Args, helpers::*}; mod args; +mod el_sync; mod errors; mod extractor; mod helpers; @@ -284,8 +286,8 @@ fn do_startup_checks( } Ok(false) => { // Current chain tip tip block is not known by the EL. - // TODO: Try to sync EL using existing block payloads from DB. - anyhow::bail!("missing expected evm block, block_id = {}", chain_tip); + warn!(%chain_tip, "missing expected EVM block"); + sync_chainstate_to_el(storage, engine)?; } Err(error) => { // Likely network issue diff --git a/bin/strata-sequencer-client/src/duty_executor.rs b/bin/strata-sequencer-client/src/duty_executor.rs index dfadd5180..042d2ff81 100644 --- a/bin/strata-sequencer-client/src/duty_executor.rs +++ b/bin/strata-sequencer-client/src/duty_executor.rs @@ -1,16 +1,15 @@ use std::{collections::HashSet, sync::Arc}; -use strata_primitives::buf::Buf32; use strata_rpc_api::StrataSequencerApiClient; use strata_rpc_types::HexBytes64; use strata_sequencer::{ block_template::{BlockCompletionData, BlockGenerationConfig}, - duty::types::{BatchCheckpointDuty, BlockSigningDuty, Duty, IdentityData}, + duty::types::{BatchCheckpointDuty, BlockSigningDuty, Duty, DutyId, IdentityData}, utils::now_millis, }; use thiserror::Error; use tokio::{runtime::Handle, select, sync::mpsc}; -use tracing::{debug, error}; +use tracing::{debug, error, info, warn}; use crate::helpers::{sign_checkpoint, sign_header}; @@ -36,7 +35,7 @@ where // Keep track of seen duties to avoid processing the same duty multiple times. // Does not need to be persisted, as new duties are generated based on current chain state. let mut seen_duties = HashSet::new(); - let (failed_duties_tx, mut failed_duties_rx) = mpsc::channel::(8); + let (failed_duties_tx, mut failed_duties_rx) = mpsc::channel::(8); loop { select! { @@ -44,10 +43,10 @@ where if let Some(duty) = duty { let duty_id = duty.id(); if seen_duties.contains(&duty_id) { - debug!("skipping already seen duty: {:?}", duty); + debug!(%duty_id, "skipping already seen duty"); continue; } - seen_duties.insert(duty.id()); + seen_duties.insert(duty_id); handle.spawn(handle_duty(rpc.clone(), duty, idata.clone(), failed_duties_tx.clone())); } else { // tx is closed, we are done @@ -55,9 +54,10 @@ where } } failed_duty = failed_duties_rx.recv() => { - if let Some(failed_duty_id) = failed_duty { + if let Some(duty_id) = failed_duty { // remove from seen duties, so we can retry if the duty is seen again - seen_duties.remove(&failed_duty_id); + warn!(%duty_id, "removing failed duty"); + seen_duties.remove(&duty_id); } } } @@ -68,18 +68,19 @@ async fn handle_duty( rpc: Arc, duty: Duty, idata: IdentityData, - failed_duties_tx: mpsc::Sender, + failed_duties_tx: mpsc::Sender, ) where R: StrataSequencerApiClient + Send + Sync, { - debug!("handle_duty: {:?}", duty); + let duty_id = duty.id(); + debug!(%duty_id, ?duty, "handle_duty"); let duty_result = match duty.clone() { - Duty::SignBlock(duty) => handle_sign_block_duty(rpc, duty, idata).await, - Duty::CommitBatch(duty) => handle_commit_batch_duty(rpc, duty, idata).await, + Duty::SignBlock(duty) => handle_sign_block_duty(rpc, duty, duty_id, idata).await, + Duty::CommitBatch(duty) => handle_commit_batch_duty(rpc, duty, duty_id, idata).await, }; - if let Err(e) = duty_result { - error!(?duty, "duty failed: {}", e); + if let Err(error) = duty_result { + error!(%duty_id, %error, "duty failed"); let _ = failed_duties_tx.send(duty.id()).await; } } @@ -87,14 +88,17 @@ async fn handle_duty( async fn handle_sign_block_duty( rpc: Arc, duty: BlockSigningDuty, + duty_id: DutyId, idata: IdentityData, ) -> Result<(), DutyExecError> where R: StrataSequencerApiClient + Send + Sync, { - if now_millis() < duty.target_ts() { + let now = now_millis(); + if now < duty.target_ts() { // wait until target time // TODO: ensure duration is within some bounds + warn!(%duty_id, %now, target = duty.target_ts(), "got duty too early; sleeping till target time"); tokio::time::sleep(tokio::time::Duration::from_millis( duty.target_ts() - now_millis(), )) @@ -107,6 +111,10 @@ where .await .map_err(DutyExecError::GenerateTemplate)?; + let id = template.template_id(); + + info!(%duty_id, block_id = %id, "got block template"); + let signature = sign_header(template.header(), &idata.key); let completion = BlockCompletionData::from_signature(signature); @@ -114,12 +122,15 @@ where .await .map_err(DutyExecError::CompleteTemplate)?; + info!(%duty_id, block_id = %id, "block signing complete"); + Ok(()) } async fn handle_commit_batch_duty( rpc: Arc, duty: BatchCheckpointDuty, + duty_id: DutyId, idata: IdentityData, ) -> Result<(), DutyExecError> where @@ -127,6 +138,8 @@ where { let sig = sign_checkpoint(duty.inner(), &idata.key); + debug!(%duty_id, %sig, "checkpoint signature"); + rpc.complete_checkpoint_signature(duty.inner().batch_info().epoch(), HexBytes64(sig.0)) .await .map_err(DutyExecError::CompleteCheckpoint)?; diff --git a/bin/strata-sequencer-client/src/duty_fetcher.rs b/bin/strata-sequencer-client/src/duty_fetcher.rs index baa49e818..15f265f9b 100644 --- a/bin/strata-sequencer-client/src/duty_fetcher.rs +++ b/bin/strata-sequencer-client/src/duty_fetcher.rs @@ -3,7 +3,7 @@ use std::{sync::Arc, time::Duration}; use strata_rpc_api::StrataSequencerApiClient; use strata_sequencer::duty::types::Duty; use tokio::sync::mpsc; -use tracing::{debug, error, warn}; +use tracing::{error, info, warn}; pub(crate) async fn duty_fetcher_worker( rpc: Arc, @@ -25,7 +25,7 @@ where } }; - debug!("got {} duties", duties.len()); + info!(count = %duties.len(), "got new duties"); for duty in duties { if duty_tx.send(duty).await.is_err() { diff --git a/crates/common/src/bail_manager.rs b/crates/common/src/bail_manager.rs index 65ee10ac3..0edde7d34 100644 --- a/crates/common/src/bail_manager.rs +++ b/crates/common/src/bail_manager.rs @@ -5,6 +5,7 @@ use tokio::sync::watch; pub static BAIL_DUTY_SIGN_BLOCK: &str = "duty_sign_block"; pub static BAIL_ADVANCE_CONSENSUS_STATE: &str = "advance_consensus_state"; pub static BAIL_SYNC_EVENT: &str = "sync_event"; +pub static BAIL_SYNC_EVENT_NEW_TIP: &str = "sync_event_new_tip"; struct BailWatch { sender: watch::Sender>, diff --git a/crates/consensus-logic/src/csm/worker.rs b/crates/consensus-logic/src/csm/worker.rs index 3ffa89ba6..4b3003138 100644 --- a/crates/consensus-logic/src/csm/worker.rs +++ b/crates/consensus-logic/src/csm/worker.rs @@ -5,7 +5,7 @@ use std::{sync::Arc, thread}; #[cfg(feature = "debug-utils")] -use strata_common::bail_manager::{check_bail_trigger, BAIL_SYNC_EVENT}; +use strata_common::bail_manager::{check_bail_trigger, BAIL_SYNC_EVENT, BAIL_SYNC_EVENT_NEW_TIP}; use strata_db::{ traits::*, types::{CheckpointConfStatus, CheckpointEntry, CheckpointProvingStatus}, @@ -143,7 +143,12 @@ impl WorkerState { debug!(?ev, "processing sync event"); #[cfg(feature = "debug-utils")] - check_bail_trigger(BAIL_SYNC_EVENT); + { + check_bail_trigger(BAIL_SYNC_EVENT); + if matches!(ev, strata_state::sync_event::SyncEvent::NewTipBlock(_)) { + check_bail_trigger(BAIL_SYNC_EVENT_NEW_TIP); + } + } // Compute the state transition. let context = client_transition::StorageEventContext::new(&self.storage); diff --git a/crates/consensus-logic/src/fork_choice_manager.rs b/crates/consensus-logic/src/fork_choice_manager.rs index eddbef5ad..de90a54c6 100644 --- a/crates/consensus-logic/src/fork_choice_manager.rs +++ b/crates/consensus-logic/src/fork_choice_manager.rs @@ -110,9 +110,11 @@ pub fn init_forkchoice_manager( ) -> anyhow::Result { // Load data about the last finalized block so we can use that to initialize // the finalized tracker. - let sync_state = init_csm_state.sync().expect("csm state should be init"); - let chain_tip_height = sync_state.chain_tip_height(); + // TODO: get finalized block id without depending on client state + // or ensure client state and chain state are in-sync during startup + let sync_state = init_csm_state.sync().expect("csm state should be init"); + let chain_tip_height = storage.chainstate().get_last_write_idx_blocking()?; let finalized_blockid = *sync_state.finalized_blkid(); let finalized_block = storage .l2() @@ -211,7 +213,7 @@ pub fn tracker_task( // Now that we have the database state in order, we can actually init the // FCM. - let fcm = match init_forkchoice_manager(&storage, ¶ms, init_state) { + let mut fcm = match init_forkchoice_manager(&storage, ¶ms, init_state) { Ok(fcm) => fcm, Err(e) => { error!(err = %e, "failed to init forkchoice manager!"); @@ -220,6 +222,14 @@ pub fn tracker_task( }; info!(%finalized_blockid, "forkchoice manager started"); + handle_unprocessed_blocks( + &mut fcm, + &storage, + engine.as_ref(), + &csm_ctl, + &status_channel, + )?; + if let Err(e) = forkchoice_manager_task_inner( &shutdown, handle, @@ -236,6 +246,46 @@ pub fn tracker_task( Ok(()) } +/// Check if there are unprocessed L2 blocks in db. +/// If there are, pass them to fcm. +fn handle_unprocessed_blocks( + fcm: &mut ForkChoiceManager, + storage: &NodeStorage, + engine: &impl ExecEngineCtl, + csm_ctl: &CsmController, + status_channel: &StatusChannel, +) -> anyhow::Result<()> { + info!("check for unprocessed l2blocks"); + + let l2_block_manager = storage.l2(); + let mut slot = fcm.cur_index; + loop { + let blocksids = l2_block_manager.get_blocks_at_height_blocking(slot)?; + if blocksids.is_empty() { + break; + } + warn!(?blocksids, ?slot, "found extra l2blocks"); + for blockid in blocksids { + let status = l2_block_manager.get_block_status_blocking(&blockid)?; + if let Some(BlockStatus::Invalid) = status { + continue; + } + warn!(?blockid, "processing l2block"); + process_fc_message( + ForkChoiceMessage::NewBlock(blockid), + fcm, + engine, + csm_ctl, + status_channel, + )?; + } + slot += 1; + } + info!("completed check for unprocessed l2blocks"); + + Ok(()) +} + #[allow(clippy::large_enum_variant)] enum FcmEvent { NewFcmMsg(ForkChoiceMessage), diff --git a/crates/consensus-logic/src/unfinalized_tracker.rs b/crates/consensus-logic/src/unfinalized_tracker.rs index 72a229c58..2e18294db 100644 --- a/crates/consensus-logic/src/unfinalized_tracker.rs +++ b/crates/consensus-logic/src/unfinalized_tracker.rs @@ -12,12 +12,14 @@ use crate::errors::ChainTipError; /// Entry in block tracker table we use to relate a block with its immediate /// relatives. +#[derive(Debug)] struct BlockEntry { parent: L2BlockId, children: HashSet, } /// Tracks the unfinalized block tree on top of the finalized tip. +#[derive(Debug)] pub struct UnfinalizedBlockTracker { /// Block that we treat as a base that all of the other blocks that we're /// considering uses. diff --git a/crates/evmexec/src/engine.rs b/crates/evmexec/src/engine.rs index 2c2bc9964..384035868 100644 --- a/crates/evmexec/src/engine.rs +++ b/crates/evmexec/src/engine.rs @@ -325,7 +325,10 @@ impl ExecEngineCtl for RpcExecEngineCtl { self.tokio_handle.block_on(async { let fork_choice_state = ForkchoiceStatePartial { head_block_hash: Some(block_hash), - ..Default::default() + // NOTE: reth accepts safe and finalized block hashes to be zero + // and does not update the existing values + safe_block_hash: Some(B256::ZERO), + finalized_block_hash: Some(B256::ZERO), }; self.inner .update_block_state(fork_choice_state) diff --git a/crates/sequencer/src/duty/types.rs b/crates/sequencer/src/duty/types.rs index 47f781dc7..e544487b2 100644 --- a/crates/sequencer/src/duty/types.rs +++ b/crates/sequencer/src/duty/types.rs @@ -26,6 +26,9 @@ pub enum Expiry { CheckpointIdxFinalized(u64), } +/// Unique identifier for a duty. +pub type DutyId = Buf32; + /// Duties the sequencer might carry out. #[derive(Clone, Debug, BorshSerialize, Serialize, Deserialize)] #[allow(clippy::large_enum_variant)] @@ -46,7 +49,7 @@ impl Duty { } /// Returns a unique identifier for the duty. - pub fn id(&self) -> Buf32 { + pub fn id(&self) -> DutyId { match self { // We want Batch commitment duty to be unique by the checkpoint idx Self::CommitBatch(duty) => compute_borsh_hash(&duty.0.batch_info().epoch()), diff --git a/docker/configs/config.fn.toml b/docker/configs/config.fn.toml index 7ab87b609..8b252d060 100644 --- a/docker/configs/config.fn.toml +++ b/docker/configs/config.fn.toml @@ -21,6 +21,10 @@ client_poll_dur_ms = 200 write_poll_dur_ms = 200 fee_policy = "smart" reveal_amount = 100 +bundle_interval_ms = 1000 + +[btcio.broadcaster] +poll_interval_ms = 1000 [sync] l1_follow_distance = 6 diff --git a/docker/configs/config.seq.toml b/docker/configs/config.seq.toml index 9ffebfe3f..4fe642852 100644 --- a/docker/configs/config.seq.toml +++ b/docker/configs/config.seq.toml @@ -20,6 +20,10 @@ client_poll_dur_ms = 200 write_poll_dur_ms = 200 fee_policy = "smart" reveal_amount = 100 +bundle_interval_ms = 1000 + +[btcio.broadcaster] +poll_interval_ms = 1000 [sync] l1_follow_distance = 6 diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 6d86aa2cc..6c5195700 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -158,6 +158,8 @@ services: - ./.data/bridge-client-1:/app/.data ports: - 4781:4781 + profiles: + - bridge bridge-client-2: <<: *base-bridge-client @@ -167,6 +169,8 @@ services: - ./.data/bridge-client-2:/app/.data ports: - 4782:4781 + profiles: + - bridge networks: strata_network: diff --git a/docker/generate-blocks-time.sh b/docker/generate-blocks-time.sh new file mode 100755 index 000000000..3d1918f4f --- /dev/null +++ b/docker/generate-blocks-time.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +generateblock_time() { + block_interval_sec=${1:-60} + + echo Generate blocks every $block_interval_sec Sec + while true; do + sleep $block_interval_sec + docker compose exec bitcoind /app/bcli.sh -rpcwallet=default -generate 1 + done +} + +generateblock_time $@ \ No newline at end of file diff --git a/docker/genkeys.sh b/docker/genkeys.sh deleted file mode 100755 index 6ed59d182..000000000 --- a/docker/genkeys.sh +++ /dev/null @@ -1,19 +0,0 @@ -#!/bin/sh - -generate_random_hex() { - if [ -z "$1" ]; then - return 1 - fi - - if [ -e "$1" ]; then - echo "File '$1' already exists. Skipping." - return 0 - fi - - # Generate 32 random bytes, convert to hex, and write to the file - od -An -tx1 -N32 /dev/urandom | tr -d ' \n' > "$1" -} - -generate_random_hex "configs/jwt.hex" -generate_random_hex "configs/jwt.fn.hex" -generate_random_hex "configs/sequencer.key.hex" diff --git a/docker/init-keys.sh b/docker/init-keys.sh index 9e69ab896..23228d682 100755 --- a/docker/init-keys.sh +++ b/docker/init-keys.sh @@ -11,6 +11,7 @@ fi CONFIG_FILE=configs JWT_FILE=$CONFIG_FILE/jwt.hex +JWT_FN_FILE=$CONFIG_FILE/jwt.fn.hex generate_random_hex() { if [ -z "$1" ]; then @@ -27,6 +28,7 @@ generate_random_hex() { } generate_random_hex $JWT_FILE +generate_random_hex $JWT_FN_FILE SEQ_SEED_FILE=$CONFIG_FILE/sequencer.bin OP1_SEED_FILE=$CONFIG_FILE/operator1.bin diff --git a/functional-tests/factory/factory.py b/functional-tests/factory/factory.py index 312cb0abc..f43b54395 100644 --- a/functional-tests/factory/factory.py +++ b/functional-tests/factory/factory.py @@ -1,4 +1,5 @@ import os +import shutil from typing import Optional, TypedDict import flexitest @@ -284,8 +285,25 @@ def _create_web3(): w3.middleware_onion.add(web3.middleware.SignAndSendRawMiddlewareBuilder.build(account)) return w3 + def snapshot_dir_path(idx: int): + return os.path.join(ctx.envdd_path, f"reth.{id}.{idx}") + + def _snapshot_datadir(idx: int): + snapshot_dir = snapshot_dir_path(idx) + os.makedirs(snapshot_dir, exist_ok=True) + shutil.copytree(datadir, snapshot_dir, dirs_exist_ok=True) + + def _restore_snapshot(idx: int): + assert not svc.is_started(), "Should call restore only when service is stopped" + snapshot_dir = snapshot_dir_path(idx) + assert os.path.exists(snapshot_dir) + shutil.rmtree(datadir) + os.rename(snapshot_dir, datadir) + _inject_service_create_rpc(svc, ethrpc_url, name) svc.create_web3 = _create_web3 + svc.snapshot_datadir = _snapshot_datadir + svc.restore_snapshot = _restore_snapshot return svc diff --git a/functional-tests/mixins/__init__.py b/functional-tests/mixins/__init__.py index c436c98f9..a56fd0462 100644 --- a/functional-tests/mixins/__init__.py +++ b/functional-tests/mixins/__init__.py @@ -11,6 +11,7 @@ def premain(self, ctx: flexitest.RunContext): self.btc = ctx.get_service("bitcoin") self.seq = ctx.get_service("sequencer") + self.seq_signer = ctx.get_service("sequencer_signer") self.reth = ctx.get_service("reth") self.seqrpc = self.seq.create_rpc() diff --git a/functional-tests/tests/bridge/bridge_deposit_seq_unreliable.py b/functional-tests/tests/bridge/bridge_deposit_seq_unreliable.py index 24d3c921b..86009e7a3 100644 --- a/functional-tests/tests/bridge/bridge_deposit_seq_unreliable.py +++ b/functional-tests/tests/bridge/bridge_deposit_seq_unreliable.py @@ -1,8 +1,15 @@ import flexitest +from envs import testenv from envs.rollup_params_cfg import RollupConfig from mixins import bridge_mixin -from utils import check_sequencer_down, get_bridge_pubkey, wait_until, wait_until_with_value +from utils import ( + check_initial_eth_balance, + check_sequencer_down, + get_bridge_pubkey, + wait_until, + wait_until_with_value, +) from utils.constants import SATS_TO_WEI @@ -15,12 +22,12 @@ class BridgeDepositSequencerUnreliableTest(bridge_mixin.BridgeMixin): """ def __init__(self, ctx: flexitest.InitContext): - ctx.set_env("basic") + ctx.set_env(testenv.BasicEnvConfig(101)) def main(self, ctx: flexitest.RunContext): address = ctx.env.gen_ext_btc_address() withdraw_address = ctx.env.gen_ext_btc_address() - el_address = self.eth_account.address + el_address = ctx.env.gen_el_address() bridge_pk = get_bridge_pubkey(self.seqrpc) self.debug(f"Address: {address}") self.debug(f"Change Address: {withdraw_address}") @@ -29,6 +36,9 @@ def main(self, ctx: flexitest.RunContext): cfg: RollupConfig = ctx.env.rollup_cfg() + # Make sure starting ETH balance is 0 + check_initial_eth_balance(self.rethrpc, el_address, self.debug) + # deposit self.deposit(ctx, el_address, bridge_pk) # stop sequencer @@ -53,7 +63,7 @@ def main(self, ctx: flexitest.RunContext): balance_after_deposits = wait_until_with_value( lambda: int(self.rethrpc.eth_getBalance(el_address), 16), predicate=lambda v: v == 2 * cfg.deposit_amount * SATS_TO_WEI, - timeout=600, + timeout=30, ) self.debug(f"Strata Balance after deposits: {balance_after_deposits}") diff --git a/functional-tests/tests/crash/crash_advance_consensus_state.py b/functional-tests/tests/crash/crash_advance_consensus_state.py new file mode 100644 index 000000000..25507c864 --- /dev/null +++ b/functional-tests/tests/crash/crash_advance_consensus_state.py @@ -0,0 +1,21 @@ +import flexitest + +from mixins import seq_crash_mixin +from utils import wait_until + + +@flexitest.register +class CrashAdvanceConsensusStateTest(seq_crash_mixin.SeqCrashMixin): + def __init__(self, ctx: flexitest.InitContext): + ctx.set_env("basic") + + def main(self, ctx: flexitest.RunContext): + cur_chain_tip = self.handle_bail(lambda: "advance_consensus_state") + + wait_until( + lambda: self.seqrpc.strata_clientStatus()["chain_tip_slot"] > cur_chain_tip + 1, + error_with="chain tip slot not progressing", + timeout=20, + ) + + return True diff --git a/functional-tests/tests/crash_duty_sign_block.py b/functional-tests/tests/crash/crash_duty_sign_block.py similarity index 100% rename from functional-tests/tests/crash_duty_sign_block.py rename to functional-tests/tests/crash/crash_duty_sign_block.py diff --git a/functional-tests/tests/crash_sync_event.py b/functional-tests/tests/crash/crash_sync_event.py similarity index 100% rename from functional-tests/tests/crash_sync_event.py rename to functional-tests/tests/crash/crash_sync_event.py diff --git a/functional-tests/tests/crash/crash_sync_event_new_tip.py b/functional-tests/tests/crash/crash_sync_event_new_tip.py new file mode 100644 index 000000000..50f1889b0 --- /dev/null +++ b/functional-tests/tests/crash/crash_sync_event_new_tip.py @@ -0,0 +1,21 @@ +import flexitest + +from mixins import seq_crash_mixin +from utils import wait_until + + +@flexitest.register +class CrashSyncEventNewTipTest(seq_crash_mixin.SeqCrashMixin): + def __init__(self, ctx: flexitest.InitContext): + ctx.set_env("basic") + + def main(self, ctx: flexitest.RunContext): + cur_chain_tip = self.handle_bail(lambda: "sync_event_new_tip") + + wait_until( + lambda: self.seqrpc.strata_clientStatus()["chain_tip_slot"] > cur_chain_tip + 1, + error_with="chain tip slot not progressing", + timeout=20, + ) + + return True diff --git a/functional-tests/tests/crash_advance_consensus_state.py b/functional-tests/tests/crash_advance_consensus_state.py deleted file mode 100644 index cc388181b..000000000 --- a/functional-tests/tests/crash_advance_consensus_state.py +++ /dev/null @@ -1,36 +0,0 @@ -import flexitest - -from mixins import seq_crash_mixin -from utils import wait_until - - -@flexitest.register -class CrashAdvanceConsensusStateTestd(seq_crash_mixin.SeqCrashMixin): - def __init__(self, ctx: flexitest.InitContext): - ctx.set_env("basic") - - def main(self, ctx: flexitest.RunContext): - """ - We encounter the following error after crash. - - strata_consensus_logic::duty::block_assembly: preparing block target_slot=7 - strata_consensus_logic::duty::block_assembly: was turn to propose block, - but found block in database already slot=7 target_slot=7 - - The check is present in the function `sign_and_store_block` on block_assembly. - Further work is required for fixing the problem. - To re-enable this test remove the return below. - Track the issue on: - https://alpenlabs.atlassian.net/browse/STR-916 - """ - return - - cur_chain_tip = self.handle_bail(lambda: "advance_consensus_state") - - wait_until( - lambda: self.seqrpc.strata_clientStatus()["chain_tip_slot"] > cur_chain_tip, - error_with="chain tip slot not progressing", - timeout=20, - ) - - return True diff --git a/functional-tests/tests/el_sync_from_chainstate.py b/functional-tests/tests/el_sync_from_chainstate.py new file mode 100644 index 000000000..0be6f41aa --- /dev/null +++ b/functional-tests/tests/el_sync_from_chainstate.py @@ -0,0 +1,125 @@ +import flexitest +from web3 import Web3 + +from envs import testenv +from utils import ( + wait_until, +) + + +def send_tx(web3: Web3): + dest = web3.to_checksum_address("deedf001900dca3ebeefdeadf001900dca3ebeef") + txid = web3.eth.send_transaction( + { + "to": dest, + "value": hex(1), + "gas": hex(100000), + "from": web3.address, + } + ) + print("txid", txid.to_0x_hex()) + + web3.eth.wait_for_transaction_receipt(txid, timeout=5) + + +@flexitest.register +class ELSyncFromChainstateTest(testenv.StrataTester): + """This tests sync when el is missing blocks""" + + def __init__(self, ctx: flexitest.InitContext): + ctx.set_env("basic") + + def main(self, ctx: flexitest.RunContext): + try: + self.run(ctx) + finally: + seq = ctx.get_service("sequencer") + reth = ctx.get_service("reth") + + if not seq.is_started(): + seq.start() + if not reth.is_started(): + reth.start() + + def run(self, ctx: flexitest.RunContext): + seq = ctx.get_service("sequencer") + reth = ctx.get_service("reth") + web3: Web3 = reth.create_web3() + + rethrpc = reth.create_rpc() + + # workaround for issue restarting reth with no transactions + for _ in range(3): + send_tx(web3) + + # ensure there are some blocks generated + wait_until( + lambda: int(rethrpc.eth_blockNumber(), base=16) > 0, + error_with="not building blocks", + timeout=5, + ) + + print("stop sequencer") + seq.stop() + + orig_blocknumber = int(rethrpc.eth_blockNumber(), base=16) + print(f"stop reth @{orig_blocknumber}") + reth.stop() + + # take snapshot of reth db + SNAPSHOT_IDX = 1 + reth.snapshot_datadir(SNAPSHOT_IDX) + + print("start reth") + reth.start() + + # wait for reth to start + wait_until( + lambda: int(rethrpc.eth_blockNumber(), base=16) > 0, + error_with="reth did not start in time", + timeout=5, + ) + + print("start sequencer") + seq.start() + + # generate more blocks + wait_until( + lambda: int(rethrpc.eth_blockNumber(), base=16) > orig_blocknumber + 1, + error_with="not building blocks", + timeout=5, + ) + + print("stop sequencer") + seq.stop() + final_blocknumber = int(rethrpc.eth_blockNumber(), base=16) + + print(f"stop reth @{final_blocknumber}") + reth.stop() + + # replace reth db with older snapshot + reth.restore_snapshot(SNAPSHOT_IDX) + + # sequencer now contains more blocks than in reth, should trigger EL sync later + print("start reth") + reth.start() + + # wait for reth to start + wait_until( + lambda: int(rethrpc.eth_blockNumber(), base=16) > 0, + error_with="reth did not start in time", + timeout=5, + ) + + # ensure reth db was reset to shorter chain + assert int(rethrpc.eth_blockNumber(), base=16) == orig_blocknumber + + print("start sequencer") + seq.start() + + print("wait for sync") + wait_until( + lambda: int(rethrpc.eth_blockNumber(), base=16) > final_blocknumber, + error_with="not syncing blocks", + timeout=5, + )