From 2962acc10c885ebe7e3cdc0133f4b5befab5ffc0 Mon Sep 17 00:00:00 2001 From: Sapin Bajracharya Date: Thu, 6 Feb 2025 15:53:15 +0545 Subject: [PATCH] add more logs --- .../src/duty_executor.rs | 42 +++++++++++-------- .../src/duty_fetcher.rs | 2 +- crates/sequencer/src/duty/types.rs | 5 ++- 3 files changed, 29 insertions(+), 20 deletions(-) diff --git a/bin/strata-sequencer-client/src/duty_executor.rs b/bin/strata-sequencer-client/src/duty_executor.rs index 81a91906a..fa6289372 100644 --- a/bin/strata-sequencer-client/src/duty_executor.rs +++ b/bin/strata-sequencer-client/src/duty_executor.rs @@ -1,16 +1,15 @@ use std::{collections::HashSet, sync::Arc}; -use strata_primitives::buf::Buf32; use strata_rpc_api::StrataSequencerApiClient; use strata_rpc_types::HexBytes64; use strata_sequencer::{ block_template::{BlockCompletionData, BlockGenerationConfig}, - duty::types::{BatchCheckpointDuty, BlockSigningDuty, Duty, IdentityData}, + duty::types::{BatchCheckpointDuty, BlockSigningDuty, Duty, DutyId, IdentityData}, utils::now_millis, }; use thiserror::Error; use tokio::{runtime::Handle, select, sync::mpsc}; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; use crate::helpers::{sign_checkpoint, sign_header}; @@ -36,7 +35,7 @@ where // Keep track of seen duties to avoid processing the same duty multiple times. // Does not need to be persisted, as new duties are generated based on current chain state. let mut seen_duties = HashSet::new(); - let (failed_duties_tx, mut failed_duties_rx) = mpsc::channel::(8); + let (failed_duties_tx, mut failed_duties_rx) = mpsc::channel::(8); loop { select! { @@ -44,10 +43,10 @@ where if let Some(duty) = duty { let duty_id = duty.id(); if seen_duties.contains(&duty_id) { - debug!("skipping already seen duty: {:?}", duty); + debug!(%duty_id, "skipping already seen duty"); continue; } - seen_duties.insert(duty.id()); + seen_duties.insert(duty_id); handle.spawn(handle_duty(rpc.clone(), duty, idata.clone(), failed_duties_tx.clone())); } else { // tx is closed, we are done @@ -55,9 +54,10 @@ where } } failed_duty = failed_duties_rx.recv() => { - if let Some(failed_duty_id) = failed_duty { + if let Some(duty_id) = failed_duty { // remove from seen duties, so we can retry if the duty is seen again - seen_duties.remove(&failed_duty_id); + warn!(%duty_id, "removing failed duty"); + seen_duties.remove(&duty_id); } } } @@ -68,18 +68,19 @@ async fn handle_duty( rpc: Arc, duty: Duty, idata: IdentityData, - failed_duties_tx: mpsc::Sender, + failed_duties_tx: mpsc::Sender, ) where R: StrataSequencerApiClient + Send + Sync, { - debug!("handle_duty: {:?}", duty); + let duty_id = duty.id(); + debug!(%duty_id, ?duty, "handle_duty"); let duty_result = match duty.clone() { - Duty::SignBlock(duty) => handle_sign_block_duty(rpc, duty, idata).await, - Duty::CommitBatch(duty) => handle_commit_batch_duty(rpc, duty, idata).await, + Duty::SignBlock(duty) => handle_sign_block_duty(rpc, duty, duty_id, idata).await, + Duty::CommitBatch(duty) => handle_commit_batch_duty(rpc, duty, duty_id, idata).await, }; - if let Err(e) = duty_result { - error!(?duty, "duty failed: {}", e); + if let Err(error) = duty_result { + error!(%duty_id, %error, "duty failed"); let _ = failed_duties_tx.send(duty.id()).await; } } @@ -87,15 +88,17 @@ async fn handle_duty( async fn handle_sign_block_duty( rpc: Arc, duty: BlockSigningDuty, + duty_id: DutyId, idata: IdentityData, ) -> Result<(), DutyExecError> where R: StrataSequencerApiClient + Send + Sync, { - if now_millis() < duty.target_ts() { + let now = now_millis(); + if now < duty.target_ts() { // wait until target time // TODO: ensure duration is within some bounds - info!(?duty, "got duty too early; sleeping till target time"); + warn!(%duty_id, %now, target = duty.target_ts(), "got duty too early; sleeping till target time"); tokio::time::sleep(tokio::time::Duration::from_millis( duty.target_ts() - now_millis(), )) @@ -110,7 +113,7 @@ where let id = template.template_id(); - info!(?duty, ?id, "got block template"); + info!(%duty_id, block_id = %id, "got block template"); let signature = sign_header(template.header(), &idata.key); let completion = BlockCompletionData::from_signature(signature); @@ -119,7 +122,7 @@ where .await .map_err(DutyExecError::CompleteTemplate)?; - info!(?duty, ?id, "block signing complete"); + info!(%duty_id, block_id = %id, "block signing complete"); Ok(()) } @@ -127,6 +130,7 @@ where async fn handle_commit_batch_duty( rpc: Arc, duty: BatchCheckpointDuty, + duty_id: DutyId, idata: IdentityData, ) -> Result<(), DutyExecError> where @@ -134,6 +138,8 @@ where { let sig = sign_checkpoint(duty.checkpoint(), &idata.key); + debug!(%duty_id, %sig, "checkpoint signature"); + rpc.complete_checkpoint_signature(duty.checkpoint().batch_info().idx(), HexBytes64(sig.0)) .await .map_err(DutyExecError::CompleteCheckpoint)?; diff --git a/bin/strata-sequencer-client/src/duty_fetcher.rs b/bin/strata-sequencer-client/src/duty_fetcher.rs index a24f70558..15f265f9b 100644 --- a/bin/strata-sequencer-client/src/duty_fetcher.rs +++ b/bin/strata-sequencer-client/src/duty_fetcher.rs @@ -25,7 +25,7 @@ where } }; - info!("got {} duties", duties.len()); + info!(count = %duties.len(), "got new duties"); for duty in duties { if duty_tx.send(duty).await.is_err() { diff --git a/crates/sequencer/src/duty/types.rs b/crates/sequencer/src/duty/types.rs index f35bbfade..0b006bc2a 100644 --- a/crates/sequencer/src/duty/types.rs +++ b/crates/sequencer/src/duty/types.rs @@ -26,6 +26,9 @@ pub enum Expiry { CheckpointIdxFinalized(u64), } +/// Unique identifier for a duty. +pub type DutyId = Buf32; + /// Duties the sequencer might carry out. #[derive(Clone, Debug, BorshSerialize, Serialize, Deserialize)] #[allow(clippy::large_enum_variant)] @@ -46,7 +49,7 @@ impl Duty { } /// Returns a unique identifier for the duty. - pub fn id(&self) -> Buf32 { + pub fn id(&self) -> DutyId { match self { // We want Batch commitment duty to be unique by the checkpoint idx Self::CommitBatch(duty) => compute_borsh_hash(&duty.idx()),