Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sequencer crash tests #653

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions .github/workflows/functional.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,32 @@ jobs:

# Run again just to be sure as some tests are flaky
- name: Run functional tests (2)
id: funcTestsRun2
if: steps.funcTestsRun1.outcome == 'failure'
continue-on-error: true
run: |
NEWPATH="$(realpath target/debug/)"
export PATH="${NEWPATH}:${PATH}"
which strata-client
cd functional-tests && poetry run python entry.py

- name: Upload logs as build artifact on failure
if: steps.funcTestsRun1.outcome == 'failure' || steps.funcTestsRun2.outcome == 'failure'
id: upload_logs
uses: actions/upload-artifact@v4
with:
name: fntest_dd
path: functional-tests/_dd/
retention-days: 30
if-no-files-found: error

- name: Fail job if functional tests fail
if: steps.funcTestsRun2.outcome == 'failure'
run: |
echo "Functional tests failed"
exit 1


functional-tests-success:
name: Check that all checks pass
runs-on: ubuntu-latest
Expand Down
118 changes: 118 additions & 0 deletions bin/strata-client/src/el_sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
use strata_db::DbError;
use strata_eectl::{engine::ExecEngineCtl, errors::EngineError, messages::ExecPayloadData};
use strata_state::id::L2BlockId;
use strata_storage::NodeStorage;
use thiserror::Error;
use tracing::{debug, info};

#[derive(Debug, Error)]
pub enum Error {
#[error("missing chainstate for slot {0}")]
MissingChainstate(u64),
#[error("missing l2block {0}")]
MissingL2Block(L2BlockId),
#[error("db: {0}")]
Db(#[from] DbError),
#[error("engine: {0}")]
Engine(#[from] EngineError),
}

/// Sync missing blocks in EL using payloads stored in L2 block database.
///
/// TODO: retry on network errors
pub fn sync_chainstate_to_el(
storage: &NodeStorage,
engine: &impl ExecEngineCtl,
) -> Result<(), Error> {
let chainstate_manager = storage.chainstate();
let l2_block_manager = storage.l2();
let earliest_idx = chainstate_manager.get_earliest_write_idx_blocking()?;
let latest_idx = chainstate_manager.get_last_write_idx_blocking()?;

Check warning on line 30 in bin/strata-client/src/el_sync.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-client/src/el_sync.rs#L23-L30

Added lines #L23 - L30 were not covered by tests

info!(%earliest_idx, %latest_idx, "search for last known idx");

Check warning on line 32 in bin/strata-client/src/el_sync.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-client/src/el_sync.rs#L32

Added line #L32 was not covered by tests

// last idx of chainstate whose corresponding block is present in el
let sync_from_idx = find_last_match((earliest_idx, latest_idx), |idx| {
let Some(chain_state) = chainstate_manager.get_toplevel_chainstate_blocking(idx)? else {
return Err(Error::MissingChainstate(idx));

Check warning on line 37 in bin/strata-client/src/el_sync.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-client/src/el_sync.rs#L35-L37

Added lines #L35 - L37 were not covered by tests
};

let block_id = chain_state.chain_tip_blkid();

Ok(engine.check_block_exists(*block_id)?)
sapinb marked this conversation as resolved.
Show resolved Hide resolved
})?
.map(|idx| idx + 1) // sync from next index
.unwrap_or(0); // sync from genesis

info!(%sync_from_idx, "last known index in EL");

Check warning on line 47 in bin/strata-client/src/el_sync.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-client/src/el_sync.rs#L40-L47

Added lines #L40 - L47 were not covered by tests

for idx in sync_from_idx..=latest_idx {
debug!(?idx, "Syncing chainstate");
sapinb marked this conversation as resolved.
Show resolved Hide resolved
let Some(chain_state) = chainstate_manager.get_toplevel_chainstate_blocking(idx)? else {
return Err(Error::MissingChainstate(idx));

Check warning on line 52 in bin/strata-client/src/el_sync.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-client/src/el_sync.rs#L49-L52

Added lines #L49 - L52 were not covered by tests
};

let block_id = chain_state.chain_tip_blkid();

Check warning on line 55 in bin/strata-client/src/el_sync.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-client/src/el_sync.rs#L55

Added line #L55 was not covered by tests

let Some(l2block) = l2_block_manager.get_block_data_blocking(block_id)? else {
return Err(Error::MissingL2Block(*block_id));

Check warning on line 58 in bin/strata-client/src/el_sync.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-client/src/el_sync.rs#L57-L58

Added lines #L57 - L58 were not covered by tests
};

let payload = ExecPayloadData::from_l2_block_bundle(&l2block);

engine.submit_payload(payload)?;
engine.update_head_block(*block_id)?;

Check warning on line 64 in bin/strata-client/src/el_sync.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-client/src/el_sync.rs#L61-L64

Added lines #L61 - L64 were not covered by tests
}

Ok(())
}

Check warning on line 68 in bin/strata-client/src/el_sync.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-client/src/el_sync.rs#L67-L68

Added lines #L67 - L68 were not covered by tests

fn find_last_match(
range: (u64, u64),
predicate: impl Fn(u64) -> Result<bool, Error>,
) -> Result<Option<u64>, Error> {
let (mut left, mut right) = range;

// Check the leftmost value first
if !predicate(left)? {
return Ok(None); // If the leftmost value is false, no values can be true
}

let mut best_match = None;

// Proceed with binary search
while left <= right {
let mid = left + (right - left) / 2;

if predicate(mid)? {
best_match = Some(mid); // Update best match
left = mid + 1; // Continue searching in the right half
} else {
right = mid - 1; // Search in the left half
}
}

Ok(best_match)
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_find_last_match() {
// find match
assert!(matches!(
find_last_match((0, 5), |idx| Ok(idx < 3)),
Ok(Some(2))
));
// found no match
assert!(matches!(find_last_match((0, 5), |_| Ok(false)), Ok(None)));
// got error
let error_message = "intentional error for test";
assert!(matches!(
find_last_match((0, 5), |_| Err(EngineError::Other(error_message.into()))?),
Err(err) if err.to_string().contains(error_message)
));
}
}
6 changes: 4 additions & 2 deletions bin/strata-client/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{sync::Arc, time::Duration};

use bitcoin::{hashes::Hash, BlockHash};
use el_sync::sync_chainstate_to_el;
use jsonrpsee::Methods;
use parking_lot::lock_api::RwLock;
use rpc_client::sync_client;
Expand Down Expand Up @@ -45,6 +46,7 @@
use crate::{args::Args, helpers::*};

mod args;
mod el_sync;
mod errors;
mod extractor;
mod helpers;
Expand Down Expand Up @@ -284,8 +286,8 @@
}
Ok(false) => {
// Current chain tip tip block is not known by the EL.
// TODO: Try to sync EL using existing block payloads from DB.
anyhow::bail!("missing expected evm block, block_id = {}", chain_tip);
warn!(%chain_tip, "missing expected EVM block");
sync_chainstate_to_el(storage, engine)?;

Check warning on line 290 in bin/strata-client/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-client/src/main.rs#L289-L290

Added lines #L289 - L290 were not covered by tests
}
Err(error) => {
// Likely network issue
Expand Down
43 changes: 28 additions & 15 deletions bin/strata-sequencer-client/src/duty_executor.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use std::{collections::HashSet, sync::Arc};

use strata_primitives::buf::Buf32;
use strata_rpc_api::StrataSequencerApiClient;
use strata_rpc_types::HexBytes64;
use strata_sequencer::{
block_template::{BlockCompletionData, BlockGenerationConfig},
duty::types::{BatchCheckpointDuty, BlockSigningDuty, Duty, IdentityData},
duty::types::{BatchCheckpointDuty, BlockSigningDuty, Duty, DutyId, IdentityData},
utils::now_millis,
};
use thiserror::Error;
use tokio::{runtime::Handle, select, sync::mpsc};
use tracing::{debug, error};
use tracing::{debug, error, info, warn};

use crate::helpers::{sign_checkpoint, sign_header};

Expand All @@ -36,28 +35,29 @@
// Keep track of seen duties to avoid processing the same duty multiple times.
// Does not need to be persisted, as new duties are generated based on current chain state.
let mut seen_duties = HashSet::new();
let (failed_duties_tx, mut failed_duties_rx) = mpsc::channel::<Buf32>(8);
let (failed_duties_tx, mut failed_duties_rx) = mpsc::channel::<DutyId>(8);

Check warning on line 38 in bin/strata-sequencer-client/src/duty_executor.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-sequencer-client/src/duty_executor.rs#L38

Added line #L38 was not covered by tests

loop {
select! {
duty = duty_rx.recv() => {
if let Some(duty) = duty {
let duty_id = duty.id();
if seen_duties.contains(&duty_id) {
debug!("skipping already seen duty: {:?}", duty);
debug!(%duty_id, "skipping already seen duty");

Check warning on line 46 in bin/strata-sequencer-client/src/duty_executor.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-sequencer-client/src/duty_executor.rs#L46

Added line #L46 was not covered by tests
continue;
}
seen_duties.insert(duty.id());
seen_duties.insert(duty_id);

Check warning on line 49 in bin/strata-sequencer-client/src/duty_executor.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-sequencer-client/src/duty_executor.rs#L49

Added line #L49 was not covered by tests
handle.spawn(handle_duty(rpc.clone(), duty, idata.clone(), failed_duties_tx.clone()));
} else {
// tx is closed, we are done
return Ok(());
}
}
failed_duty = failed_duties_rx.recv() => {
if let Some(failed_duty_id) = failed_duty {
if let Some(duty_id) = failed_duty {

Check warning on line 57 in bin/strata-sequencer-client/src/duty_executor.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-sequencer-client/src/duty_executor.rs#L57

Added line #L57 was not covered by tests
// remove from seen duties, so we can retry if the duty is seen again
seen_duties.remove(&failed_duty_id);
warn!(%duty_id, "removing failed duty");
seen_duties.remove(&duty_id);

Check warning on line 60 in bin/strata-sequencer-client/src/duty_executor.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-sequencer-client/src/duty_executor.rs#L59-L60

Added lines #L59 - L60 were not covered by tests
}
}
}
Expand All @@ -68,33 +68,37 @@
rpc: Arc<R>,
duty: Duty,
idata: IdentityData,
failed_duties_tx: mpsc::Sender<Buf32>,
failed_duties_tx: mpsc::Sender<DutyId>,

Check warning on line 71 in bin/strata-sequencer-client/src/duty_executor.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-sequencer-client/src/duty_executor.rs#L71

Added line #L71 was not covered by tests
) where
R: StrataSequencerApiClient + Send + Sync,
{
debug!("handle_duty: {:?}", duty);
let duty_id = duty.id();
debug!(%duty_id, ?duty, "handle_duty");

Check warning on line 76 in bin/strata-sequencer-client/src/duty_executor.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-sequencer-client/src/duty_executor.rs#L75-L76

Added lines #L75 - L76 were not covered by tests
let duty_result = match duty.clone() {
Duty::SignBlock(duty) => handle_sign_block_duty(rpc, duty, idata).await,
Duty::CommitBatch(duty) => handle_commit_batch_duty(rpc, duty, idata).await,
Duty::SignBlock(duty) => handle_sign_block_duty(rpc, duty, duty_id, idata).await,
Duty::CommitBatch(duty) => handle_commit_batch_duty(rpc, duty, duty_id, idata).await,

Check warning on line 79 in bin/strata-sequencer-client/src/duty_executor.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-sequencer-client/src/duty_executor.rs#L78-L79

Added lines #L78 - L79 were not covered by tests
};

if let Err(e) = duty_result {
error!(?duty, "duty failed: {}", e);
if let Err(error) = duty_result {
error!(%duty_id, %error, "duty failed");

Check warning on line 83 in bin/strata-sequencer-client/src/duty_executor.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-sequencer-client/src/duty_executor.rs#L82-L83

Added lines #L82 - L83 were not covered by tests
let _ = failed_duties_tx.send(duty.id()).await;
}
}

async fn handle_sign_block_duty<R>(
rpc: Arc<R>,
duty: BlockSigningDuty,
duty_id: DutyId,

Check warning on line 91 in bin/strata-sequencer-client/src/duty_executor.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-sequencer-client/src/duty_executor.rs#L91

Added line #L91 was not covered by tests
idata: IdentityData,
) -> Result<(), DutyExecError>
where
R: StrataSequencerApiClient + Send + Sync,
{
if now_millis() < duty.target_ts() {
let now = now_millis();
if now < duty.target_ts() {

Check warning on line 98 in bin/strata-sequencer-client/src/duty_executor.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-sequencer-client/src/duty_executor.rs#L97-L98

Added lines #L97 - L98 were not covered by tests
// wait until target time
// TODO: ensure duration is within some bounds
warn!(%duty_id, %now, target = duty.target_ts(), "got duty too early; sleeping till target time");

Check warning on line 101 in bin/strata-sequencer-client/src/duty_executor.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-sequencer-client/src/duty_executor.rs#L101

Added line #L101 was not covered by tests
tokio::time::sleep(tokio::time::Duration::from_millis(
duty.target_ts() - now_millis(),
))
Expand All @@ -107,26 +111,35 @@
.await
.map_err(DutyExecError::GenerateTemplate)?;

let id = template.template_id();

info!(%duty_id, block_id = %id, "got block template");

Check warning on line 116 in bin/strata-sequencer-client/src/duty_executor.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-sequencer-client/src/duty_executor.rs#L114-L116

Added lines #L114 - L116 were not covered by tests

let signature = sign_header(template.header(), &idata.key);
let completion = BlockCompletionData::from_signature(signature);

rpc.complete_block_template(template.template_id(), completion)
.await
.map_err(DutyExecError::CompleteTemplate)?;

info!(%duty_id, block_id = %id, "block signing complete");

Check warning on line 125 in bin/strata-sequencer-client/src/duty_executor.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-sequencer-client/src/duty_executor.rs#L125

Added line #L125 was not covered by tests

Ok(())
}

async fn handle_commit_batch_duty<R>(
rpc: Arc<R>,
duty: BatchCheckpointDuty,
duty_id: DutyId,

Check warning on line 133 in bin/strata-sequencer-client/src/duty_executor.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-sequencer-client/src/duty_executor.rs#L133

Added line #L133 was not covered by tests
idata: IdentityData,
) -> Result<(), DutyExecError>
where
R: StrataSequencerApiClient + Send + Sync,
{
let sig = sign_checkpoint(duty.inner(), &idata.key);

debug!(%duty_id, %sig, "checkpoint signature");

Check warning on line 141 in bin/strata-sequencer-client/src/duty_executor.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-sequencer-client/src/duty_executor.rs#L141

Added line #L141 was not covered by tests

rpc.complete_checkpoint_signature(duty.inner().batch_info().epoch(), HexBytes64(sig.0))
.await
.map_err(DutyExecError::CompleteCheckpoint)?;
Expand Down
4 changes: 2 additions & 2 deletions bin/strata-sequencer-client/src/duty_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use strata_rpc_api::StrataSequencerApiClient;
use strata_sequencer::duty::types::Duty;
use tokio::sync::mpsc;
use tracing::{debug, error, warn};
use tracing::{error, info, warn};

pub(crate) async fn duty_fetcher_worker<R>(
rpc: Arc<R>,
Expand All @@ -25,7 +25,7 @@
}
};

debug!("got {} duties", duties.len());
info!(count = %duties.len(), "got new duties");

Check warning on line 28 in bin/strata-sequencer-client/src/duty_fetcher.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-sequencer-client/src/duty_fetcher.rs#L28

Added line #L28 was not covered by tests

for duty in duties {
if duty_tx.send(duty).await.is_err() {
Expand Down
1 change: 1 addition & 0 deletions crates/common/src/bail_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use tokio::sync::watch;
pub static BAIL_DUTY_SIGN_BLOCK: &str = "duty_sign_block";
pub static BAIL_ADVANCE_CONSENSUS_STATE: &str = "advance_consensus_state";
pub static BAIL_SYNC_EVENT: &str = "sync_event";
pub static BAIL_SYNC_EVENT_NEW_TIP: &str = "sync_event_new_tip";

struct BailWatch {
sender: watch::Sender<Option<String>>,
Expand Down
9 changes: 7 additions & 2 deletions crates/consensus-logic/src/csm/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use std::{sync::Arc, thread};

#[cfg(feature = "debug-utils")]
use strata_common::bail_manager::{check_bail_trigger, BAIL_SYNC_EVENT};
use strata_common::bail_manager::{check_bail_trigger, BAIL_SYNC_EVENT, BAIL_SYNC_EVENT_NEW_TIP};
use strata_db::{
traits::*,
types::{CheckpointConfStatus, CheckpointEntry, CheckpointProvingStatus},
Expand Down Expand Up @@ -143,7 +143,12 @@ impl WorkerState {
debug!(?ev, "processing sync event");

#[cfg(feature = "debug-utils")]
check_bail_trigger(BAIL_SYNC_EVENT);
{
check_bail_trigger(BAIL_SYNC_EVENT);
if matches!(ev, strata_state::sync_event::SyncEvent::NewTipBlock(_)) {
check_bail_trigger(BAIL_SYNC_EVENT_NEW_TIP);
}
}

// Compute the state transition.
let context = client_transition::StorageEventContext::new(&self.storage);
Expand Down
Loading
Loading