diff --git a/bin/prover-client/src/args.rs b/bin/prover-client/src/args.rs index 9ece19523..90c8dbbe1 100644 --- a/bin/prover-client/src/args.rs +++ b/bin/prover-client/src/args.rs @@ -125,6 +125,18 @@ pub struct Args { /// Defaults to `true`. #[argh(option, description = "enable prover client dev rpc", default = "true")] pub enable_dev_rpcs: bool, + + /// Controls the checkpoint proof runner service. + /// + /// When enabled, this prover will automatically generate and submit proofs for checkpoints. + /// This should only be enabled for provers designated as checkpoint provers. + /// Defaults to `false`. + #[argh( + option, + description = "enable prover client checkpoint runner", + default = "false" + )] + pub enable_checkpoint_runner: bool, } impl Args { diff --git a/bin/prover-client/src/checkpoint_runner/error.rs b/bin/prover-client/src/checkpoint_runner/error.rs new file mode 100644 index 000000000..dccf8fd56 --- /dev/null +++ b/bin/prover-client/src/checkpoint_runner/error.rs @@ -0,0 +1,25 @@ +use thiserror::Error; + +/// Represents errors that can occur during checkpoint-related operations. +/// +/// This error type encompasses various failure scenarios that may occur when +/// interacting with checkpoints, including RPC communication issues, +/// data validation problems, and serialization errors. Each variant provides +/// detailed information about the specific error condition. +#[derive(Error, Debug)] +pub enum CheckpointError { + /// Occurs when the RPC request to fetch checkpoint data fails. + #[error("Failed to fetch checkpoint data: {0}")] + FetchError(String), + + /// Occurs when no checkpoint data is returned from the sequencer. + #[error("No checkpoint data returned from sequencer for index {0}")] + CheckpointNotFound(u64), + + /// Occurs when failed to submit checkpoint proof to the sequencer. + #[error("Failed to submit checkpoint proof for index {index}: {error}")] + SubmitProofError { index: u64, error: String }, +} + +/// A type alias for Results involving checkpoint operations. +pub type CheckpointResult = Result; diff --git a/bin/prover-client/src/checkpoint_runner/fetch.rs b/bin/prover-client/src/checkpoint_runner/fetch.rs index 7c00effeb..54841113a 100644 --- a/bin/prover-client/src/checkpoint_runner/fetch.rs +++ b/bin/prover-client/src/checkpoint_runner/fetch.rs @@ -1,23 +1,20 @@ use jsonrpsee::{core::client::ClientT, http_client::HttpClient, rpc_params}; use tracing::error; +use super::error::CheckpointResult; +use crate::checkpoint_runner::error::CheckpointError; + /// Fetches the latest checkpoint index from the sequencer client. -pub async fn fetch_latest_checkpoint_index(sequencer_client: &HttpClient) -> anyhow::Result { - match sequencer_client - .request("strata_getLatestCheckpointIndex", rpc_params![]) +pub async fn fetch_latest_checkpoint_index(sequencer_client: &HttpClient) -> CheckpointResult { + sequencer_client + .request::, _>("strata_getLatestCheckpointIndex", rpc_params![]) .await - { - Ok(Some(idx)) => Ok(idx), - Ok(None) => { - error!("Failed to fetch current checkpoint"); - Err(anyhow::anyhow!("Failed to fetch current checkpoint")) - } - Err(e) => { - error!("Failed to fetch current checkpoint index: {}", e); - Err(anyhow::anyhow!( - "Failed to fetch current checkpoint index: {}", - e - )) - } - } + .map_err(|e| { + error!("Failed to fetch current checkpoint index: {e}"); + CheckpointError::FetchError(e.to_string()) + })? + .ok_or_else(|| { + error!("No checkpoint index returned from sequencer"); + CheckpointError::CheckpointNotFound(0) + }) } diff --git a/bin/prover-client/src/checkpoint_runner/mod.rs b/bin/prover-client/src/checkpoint_runner/mod.rs index 31368ed65..0d8bd941e 100644 --- a/bin/prover-client/src/checkpoint_runner/mod.rs +++ b/bin/prover-client/src/checkpoint_runner/mod.rs @@ -1,5 +1,6 @@ //! A module defining operations for the checkpoint proof generation +pub mod error; pub mod fetch; pub mod runner; pub mod submit; diff --git a/bin/prover-client/src/checkpoint_runner/runner.rs b/bin/prover-client/src/checkpoint_runner/runner.rs index 3587ac791..654c088d8 100644 --- a/bin/prover-client/src/checkpoint_runner/runner.rs +++ b/bin/prover-client/src/checkpoint_runner/runner.rs @@ -23,41 +23,43 @@ pub async fn checkpoint_proof_runner( db: Arc, ) { info!("Checkpoint runner started"); - - let poll_interval = Duration::from_secs(CHECKPOINT_POLL_INTERVAL); - let mut ticker = interval(poll_interval); + let mut ticker = interval(Duration::from_secs(CHECKPOINT_POLL_INTERVAL)); let mut current_checkpoint_idx: Option = None; + loop { ticker.tick().await; - match fetch_latest_checkpoint_index(operator.cl_client()).await { - Ok(new_checkpoint) => { - // Determine if we should update the checkpoint - let should_update = - current_checkpoint_idx.is_none_or(|current| new_checkpoint > current); - - if should_update { - // Create new proving task - if let Err(e) = operator - .create_task(new_checkpoint, task_tracker.clone(), &db) - .await - { - error!("Failed to create proving task: {:?}", e); - continue; - } - - // Update the checkpoint index - current_checkpoint_idx = Some(new_checkpoint); - } else { - info!( - "Fetched checkpoint {} is not newer than current {:?}", - new_checkpoint, current_checkpoint_idx - ); - } - } - Err(e) => { - error!("Failed to fetch the latest checkpoint index: {:?}", e); - } + if let Err(e) = + process_checkpoint(&operator, &task_tracker, &db, &mut current_checkpoint_idx).await + { + error!("Error processing checkpoint: {e:?}"); } } } + +async fn process_checkpoint( + operator: &CheckpointOperator, + task_tracker: &Arc>, + db: &Arc, + current_checkpoint_idx: &mut Option, +) -> anyhow::Result<()> { + let new_checkpoint = fetch_latest_checkpoint_index(operator.cl_client()).await?; + + if !should_update_checkpoint(*current_checkpoint_idx, new_checkpoint) { + info!( + "Fetched checkpoint {new_checkpoint} is not newer than current {current_checkpoint_idx:?}" + ); + return Ok(()); + } + + operator + .create_task(new_checkpoint, task_tracker.clone(), db) + .await?; + *current_checkpoint_idx = Some(new_checkpoint); + + Ok(()) +} + +fn should_update_checkpoint(current: Option, new: u64) -> bool { + current.is_none_or(|current| new > current) +} diff --git a/bin/prover-client/src/checkpoint_runner/submit.rs b/bin/prover-client/src/checkpoint_runner/submit.rs index 9eacb1a7d..fb99f5d58 100644 --- a/bin/prover-client/src/checkpoint_runner/submit.rs +++ b/bin/prover-client/src/checkpoint_runner/submit.rs @@ -10,13 +10,15 @@ use strata_rocksdb::prover::db::ProofDb; use strata_rpc_types::{HexBytes, ProofKey}; use tracing::info; +use super::error::{CheckpointError, CheckpointResult}; + /// Submits checkpoint proof to the sequencer. pub async fn submit_checkpoint_proof( checkpoint_index: u64, sequencer_client: &HttpClient, proof_key: ProofKey, proof_db: Arc, -) -> anyhow::Result<()> { +) -> CheckpointResult<()> { let proof = proof_db.get_proof(proof_key).unwrap().unwrap(); let proof_bytes = HexBytes::from(proof.proof().as_bytes()); @@ -31,5 +33,8 @@ pub async fn submit_checkpoint_proof( rpc_params![checkpoint_index, proof_bytes], ) .await - .map_err(|e| anyhow::anyhow!("Failed to submit checkpoint proof: {:?}", e)) + .map_err(|e| CheckpointError::SubmitProofError { + index: checkpoint_index, + error: e.to_string(), + }) } diff --git a/bin/prover-client/src/main.rs b/bin/prover-client/src/main.rs index 5b5e6f941..d6e6dca6f 100644 --- a/bin/prover-client/src/main.rs +++ b/bin/prover-client/src/main.rs @@ -92,6 +92,22 @@ async fn main_inner(args: Args) -> anyhow::Result<()> { spawn(async move { manager.process_pending_tasks().await }); debug!("Spawn process pending tasks"); + // run the checkpoint runner + if args.enable_checkpoint_runner { + let checkpoint_operator = operator.checkpoint_operator().clone(); + let task_tracker_for_checkpoint = task_tracker.clone(); + let db_for_checkpoint = db.clone(); + + spawn(async move { + checkpoint_proof_runner( + checkpoint_operator, + task_tracker_for_checkpoint, + db_for_checkpoint, + ) + .await + }); + } + // Run prover manager in dev mode or runner mode if args.enable_dev_rpcs { // Run the RPC server on dev mode only