Skip to content

Commit

Permalink
add more logs
Browse files Browse the repository at this point in the history
  • Loading branch information
sapinb committed Feb 6, 2025
1 parent 3b4bdf3 commit 2962acc
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 20 deletions.
42 changes: 24 additions & 18 deletions bin/strata-sequencer-client/src/duty_executor.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use std::{collections::HashSet, sync::Arc};

use strata_primitives::buf::Buf32;
use strata_rpc_api::StrataSequencerApiClient;
use strata_rpc_types::HexBytes64;
use strata_sequencer::{
block_template::{BlockCompletionData, BlockGenerationConfig},
duty::types::{BatchCheckpointDuty, BlockSigningDuty, Duty, IdentityData},
duty::types::{BatchCheckpointDuty, BlockSigningDuty, Duty, DutyId, IdentityData},
utils::now_millis,
};
use thiserror::Error;
use tokio::{runtime::Handle, select, sync::mpsc};
use tracing::{debug, error, info};
use tracing::{debug, error, info, warn};

use crate::helpers::{sign_checkpoint, sign_header};

Expand All @@ -36,28 +35,29 @@ where
// Keep track of seen duties to avoid processing the same duty multiple times.
// Does not need to be persisted, as new duties are generated based on current chain state.
let mut seen_duties = HashSet::new();
let (failed_duties_tx, mut failed_duties_rx) = mpsc::channel::<Buf32>(8);
let (failed_duties_tx, mut failed_duties_rx) = mpsc::channel::<DutyId>(8);

Check warning on line 38 in bin/strata-sequencer-client/src/duty_executor.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-sequencer-client/src/duty_executor.rs#L38

Added line #L38 was not covered by tests

loop {
select! {
duty = duty_rx.recv() => {
if let Some(duty) = duty {
let duty_id = duty.id();
if seen_duties.contains(&duty_id) {
debug!("skipping already seen duty: {:?}", duty);
debug!(%duty_id, "skipping already seen duty");

Check warning on line 46 in bin/strata-sequencer-client/src/duty_executor.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-sequencer-client/src/duty_executor.rs#L46

Added line #L46 was not covered by tests
continue;
}
seen_duties.insert(duty.id());
seen_duties.insert(duty_id);

Check warning on line 49 in bin/strata-sequencer-client/src/duty_executor.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-sequencer-client/src/duty_executor.rs#L49

Added line #L49 was not covered by tests
handle.spawn(handle_duty(rpc.clone(), duty, idata.clone(), failed_duties_tx.clone()));
} else {
// tx is closed, we are done
return Ok(());
}
}
failed_duty = failed_duties_rx.recv() => {
if let Some(failed_duty_id) = failed_duty {
if let Some(duty_id) = failed_duty {

Check warning on line 57 in bin/strata-sequencer-client/src/duty_executor.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-sequencer-client/src/duty_executor.rs#L57

Added line #L57 was not covered by tests
// remove from seen duties, so we can retry if the duty is seen again
seen_duties.remove(&failed_duty_id);
warn!(%duty_id, "removing failed duty");
seen_duties.remove(&duty_id);

Check warning on line 60 in bin/strata-sequencer-client/src/duty_executor.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-sequencer-client/src/duty_executor.rs#L59-L60

Added lines #L59 - L60 were not covered by tests
}
}
}
Expand All @@ -68,34 +68,37 @@ async fn handle_duty<R>(
rpc: Arc<R>,
duty: Duty,
idata: IdentityData,
failed_duties_tx: mpsc::Sender<Buf32>,
failed_duties_tx: mpsc::Sender<DutyId>,

Check warning on line 71 in bin/strata-sequencer-client/src/duty_executor.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-sequencer-client/src/duty_executor.rs#L71

Added line #L71 was not covered by tests
) where
R: StrataSequencerApiClient + Send + Sync,
{
debug!("handle_duty: {:?}", duty);
let duty_id = duty.id();
debug!(%duty_id, ?duty, "handle_duty");

Check warning on line 76 in bin/strata-sequencer-client/src/duty_executor.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-sequencer-client/src/duty_executor.rs#L75-L76

Added lines #L75 - L76 were not covered by tests
let duty_result = match duty.clone() {
Duty::SignBlock(duty) => handle_sign_block_duty(rpc, duty, idata).await,
Duty::CommitBatch(duty) => handle_commit_batch_duty(rpc, duty, idata).await,
Duty::SignBlock(duty) => handle_sign_block_duty(rpc, duty, duty_id, idata).await,
Duty::CommitBatch(duty) => handle_commit_batch_duty(rpc, duty, duty_id, idata).await,

Check warning on line 79 in bin/strata-sequencer-client/src/duty_executor.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-sequencer-client/src/duty_executor.rs#L78-L79

Added lines #L78 - L79 were not covered by tests
};

if let Err(e) = duty_result {
error!(?duty, "duty failed: {}", e);
if let Err(error) = duty_result {
error!(%duty_id, %error, "duty failed");

Check warning on line 83 in bin/strata-sequencer-client/src/duty_executor.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-sequencer-client/src/duty_executor.rs#L82-L83

Added lines #L82 - L83 were not covered by tests
let _ = failed_duties_tx.send(duty.id()).await;
}
}

async fn handle_sign_block_duty<R>(
rpc: Arc<R>,
duty: BlockSigningDuty,
duty_id: DutyId,

Check warning on line 91 in bin/strata-sequencer-client/src/duty_executor.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-sequencer-client/src/duty_executor.rs#L91

Added line #L91 was not covered by tests
idata: IdentityData,
) -> Result<(), DutyExecError>
where
R: StrataSequencerApiClient + Send + Sync,
{
if now_millis() < duty.target_ts() {
let now = now_millis();
if now < duty.target_ts() {

Check warning on line 98 in bin/strata-sequencer-client/src/duty_executor.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-sequencer-client/src/duty_executor.rs#L97-L98

Added lines #L97 - L98 were not covered by tests
// wait until target time
// TODO: ensure duration is within some bounds
info!(?duty, "got duty too early; sleeping till target time");
warn!(%duty_id, %now, target = duty.target_ts(), "got duty too early; sleeping till target time");

Check warning on line 101 in bin/strata-sequencer-client/src/duty_executor.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-sequencer-client/src/duty_executor.rs#L101

Added line #L101 was not covered by tests
tokio::time::sleep(tokio::time::Duration::from_millis(
duty.target_ts() - now_millis(),
))
Expand All @@ -110,7 +113,7 @@ where

let id = template.template_id();

info!(?duty, ?id, "got block template");
info!(%duty_id, block_id = %id, "got block template");

Check warning on line 116 in bin/strata-sequencer-client/src/duty_executor.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-sequencer-client/src/duty_executor.rs#L114-L116

Added lines #L114 - L116 were not covered by tests

let signature = sign_header(template.header(), &idata.key);
let completion = BlockCompletionData::from_signature(signature);
Expand All @@ -119,21 +122,24 @@ where
.await
.map_err(DutyExecError::CompleteTemplate)?;

info!(?duty, ?id, "block signing complete");
info!(%duty_id, block_id = %id, "block signing complete");

Check warning on line 125 in bin/strata-sequencer-client/src/duty_executor.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-sequencer-client/src/duty_executor.rs#L125

Added line #L125 was not covered by tests

Ok(())
}

async fn handle_commit_batch_duty<R>(
rpc: Arc<R>,
duty: BatchCheckpointDuty,
duty_id: DutyId,

Check warning on line 133 in bin/strata-sequencer-client/src/duty_executor.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-sequencer-client/src/duty_executor.rs#L133

Added line #L133 was not covered by tests
idata: IdentityData,
) -> Result<(), DutyExecError>
where
R: StrataSequencerApiClient + Send + Sync,
{
let sig = sign_checkpoint(duty.checkpoint(), &idata.key);

debug!(%duty_id, %sig, "checkpoint signature");

Check warning on line 141 in bin/strata-sequencer-client/src/duty_executor.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-sequencer-client/src/duty_executor.rs#L141

Added line #L141 was not covered by tests

rpc.complete_checkpoint_signature(duty.checkpoint().batch_info().idx(), HexBytes64(sig.0))
.await
.map_err(DutyExecError::CompleteCheckpoint)?;
Expand Down
2 changes: 1 addition & 1 deletion bin/strata-sequencer-client/src/duty_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ where
}
};

info!("got {} duties", duties.len());
info!(count = %duties.len(), "got new duties");

Check warning on line 28 in bin/strata-sequencer-client/src/duty_fetcher.rs

View check run for this annotation

Codecov / codecov/patch

bin/strata-sequencer-client/src/duty_fetcher.rs#L28

Added line #L28 was not covered by tests

for duty in duties {
if duty_tx.send(duty).await.is_err() {
Expand Down
5 changes: 4 additions & 1 deletion crates/sequencer/src/duty/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ pub enum Expiry {
CheckpointIdxFinalized(u64),
}

/// Unique identifier for a duty.
pub type DutyId = Buf32;

/// Duties the sequencer might carry out.
#[derive(Clone, Debug, BorshSerialize, Serialize, Deserialize)]
#[allow(clippy::large_enum_variant)]
Expand All @@ -46,7 +49,7 @@ impl Duty {
}

/// Returns a unique identifier for the duty.
pub fn id(&self) -> Buf32 {
pub fn id(&self) -> DutyId {

Check warning on line 52 in crates/sequencer/src/duty/types.rs

View check run for this annotation

Codecov / codecov/patch

crates/sequencer/src/duty/types.rs#L52

Added line #L52 was not covered by tests
match self {
// We want Batch commitment duty to be unique by the checkpoint idx
Self::CommitBatch(duty) => compute_borsh_hash(&duty.idx()),
Expand Down

0 comments on commit 2962acc

Please sign in to comment.