Skip to content

Commit

Permalink
checkpoint runner arg
Browse files Browse the repository at this point in the history
  • Loading branch information
MdTeach committed Dec 27, 2024
1 parent 6b28551 commit 3b6df16
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 50 deletions.
12 changes: 12 additions & 0 deletions bin/prover-client/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,18 @@ pub struct Args {
/// Defaults to `true`.
#[argh(option, description = "enable prover client dev rpc", default = "true")]
pub enable_dev_rpcs: bool,

/// Controls the checkpoint proof runner service.
///
/// When enabled, this prover will automatically generate and submit proofs for checkpoints.
/// This should only be enabled for provers designated as checkpoint provers.
/// Defaults to `false`.
#[argh(
option,
description = "enable prover client checkpoint runner",
default = "false"
)]
pub enable_checkpoint_runner: bool,
}

impl Args {
Expand Down
25 changes: 25 additions & 0 deletions bin/prover-client/src/checkpoint_runner/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use thiserror::Error;

/// Represents errors that can occur during checkpoint-related operations.
///
/// This error type encompasses various failure scenarios that may occur when
/// interacting with checkpoints, including RPC communication issues,
/// data validation problems, and serialization errors. Each variant provides
/// detailed information about the specific error condition.
#[derive(Error, Debug)]
pub enum CheckpointError {
/// Occurs when the RPC request to fetch checkpoint data fails.
#[error("Failed to fetch checkpoint data: {0}")]
FetchError(String),

/// Occurs when no checkpoint data is returned from the sequencer.
#[error("No checkpoint data returned from sequencer for index {0}")]
CheckpointNotFound(u64),

/// Occurs when failed to submit checkpoint proof to the sequencer.
#[error("Failed to submit checkpoint proof for index {index}: {error}")]
SubmitProofError { index: u64, error: String },
}

/// A type alias for Results involving checkpoint operations.
pub type CheckpointResult<T> = Result<T, CheckpointError>;
31 changes: 14 additions & 17 deletions bin/prover-client/src/checkpoint_runner/fetch.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
use jsonrpsee::{core::client::ClientT, http_client::HttpClient, rpc_params};
use tracing::error;

use super::error::CheckpointResult;
use crate::checkpoint_runner::error::CheckpointError;

/// Fetches the latest checkpoint index from the sequencer client.
pub async fn fetch_latest_checkpoint_index(sequencer_client: &HttpClient) -> anyhow::Result<u64> {
match sequencer_client
.request("strata_getLatestCheckpointIndex", rpc_params![])
pub async fn fetch_latest_checkpoint_index(sequencer_client: &HttpClient) -> CheckpointResult<u64> {
sequencer_client
.request::<Option<u64>, _>("strata_getLatestCheckpointIndex", rpc_params![])
.await
{
Ok(Some(idx)) => Ok(idx),
Ok(None) => {
error!("Failed to fetch current checkpoint");
Err(anyhow::anyhow!("Failed to fetch current checkpoint"))
}
Err(e) => {
error!("Failed to fetch current checkpoint index: {}", e);
Err(anyhow::anyhow!(
"Failed to fetch current checkpoint index: {}",
e
))
}
}
.map_err(|e| {
error!("Failed to fetch current checkpoint index: {e}");
CheckpointError::FetchError(e.to_string())
})?
.ok_or_else(|| {
error!("No checkpoint index returned from sequencer");
CheckpointError::CheckpointNotFound(0)
})
}

Check warning on line 20 in bin/prover-client/src/checkpoint_runner/fetch.rs

View check run for this annotation

Codecov / codecov/patch

bin/prover-client/src/checkpoint_runner/fetch.rs#L8-L20

Added lines #L8 - L20 were not covered by tests
1 change: 1 addition & 0 deletions bin/prover-client/src/checkpoint_runner/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! A module defining operations for the checkpoint proof generation
pub mod error;
pub mod fetch;
pub mod runner;
pub mod submit;
64 changes: 33 additions & 31 deletions bin/prover-client/src/checkpoint_runner/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,41 +23,43 @@ pub async fn checkpoint_proof_runner(
db: Arc<ProofDb>,
) {
info!("Checkpoint runner started");

let poll_interval = Duration::from_secs(CHECKPOINT_POLL_INTERVAL);
let mut ticker = interval(poll_interval);
let mut ticker = interval(Duration::from_secs(CHECKPOINT_POLL_INTERVAL));
let mut current_checkpoint_idx: Option<u64> = None;

Check warning on line 27 in bin/prover-client/src/checkpoint_runner/runner.rs

View check run for this annotation

Codecov / codecov/patch

bin/prover-client/src/checkpoint_runner/runner.rs#L20-L27

Added lines #L20 - L27 were not covered by tests

loop {
ticker.tick().await;

Check warning on line 30 in bin/prover-client/src/checkpoint_runner/runner.rs

View check run for this annotation

Codecov / codecov/patch

bin/prover-client/src/checkpoint_runner/runner.rs#L30

Added line #L30 was not covered by tests

match fetch_latest_checkpoint_index(operator.cl_client()).await {
Ok(new_checkpoint) => {
// Determine if we should update the checkpoint
let should_update =
current_checkpoint_idx.is_none_or(|current| new_checkpoint > current);

if should_update {
// Create new proving task
if let Err(e) = operator
.create_task(new_checkpoint, task_tracker.clone(), &db)
.await
{
error!("Failed to create proving task: {:?}", e);
continue;
}

// Update the checkpoint index
current_checkpoint_idx = Some(new_checkpoint);
} else {
info!(
"Fetched checkpoint {} is not newer than current {:?}",
new_checkpoint, current_checkpoint_idx
);
}
}
Err(e) => {
error!("Failed to fetch the latest checkpoint index: {:?}", e);
}
if let Err(e) =
process_checkpoint(&operator, &task_tracker, &db, &mut current_checkpoint_idx).await

Check warning on line 33 in bin/prover-client/src/checkpoint_runner/runner.rs

View check run for this annotation

Codecov / codecov/patch

bin/prover-client/src/checkpoint_runner/runner.rs#L32-L33

Added lines #L32 - L33 were not covered by tests
{
error!("Error processing checkpoint: {e:?}");
}

Check warning on line 36 in bin/prover-client/src/checkpoint_runner/runner.rs

View check run for this annotation

Codecov / codecov/patch

bin/prover-client/src/checkpoint_runner/runner.rs#L35-L36

Added lines #L35 - L36 were not covered by tests
}
}

async fn process_checkpoint(
operator: &CheckpointOperator,
task_tracker: &Arc<Mutex<TaskTracker>>,
db: &Arc<ProofDb>,
current_checkpoint_idx: &mut Option<u64>,
) -> anyhow::Result<()> {
let new_checkpoint = fetch_latest_checkpoint_index(operator.cl_client()).await?;

Check warning on line 46 in bin/prover-client/src/checkpoint_runner/runner.rs

View check run for this annotation

Codecov / codecov/patch

bin/prover-client/src/checkpoint_runner/runner.rs#L40-L46

Added lines #L40 - L46 were not covered by tests

if !should_update_checkpoint(*current_checkpoint_idx, new_checkpoint) {
info!(
"Fetched checkpoint {new_checkpoint} is not newer than current {current_checkpoint_idx:?}"

Check warning on line 50 in bin/prover-client/src/checkpoint_runner/runner.rs

View check run for this annotation

Codecov / codecov/patch

bin/prover-client/src/checkpoint_runner/runner.rs#L48-L50

Added lines #L48 - L50 were not covered by tests
);
return Ok(());
}

operator
.create_task(new_checkpoint, task_tracker.clone(), db)
.await?;
*current_checkpoint_idx = Some(new_checkpoint);

Ok(())
}

Check warning on line 61 in bin/prover-client/src/checkpoint_runner/runner.rs

View check run for this annotation

Codecov / codecov/patch

bin/prover-client/src/checkpoint_runner/runner.rs#L52-L61

Added lines #L52 - L61 were not covered by tests

fn should_update_checkpoint(current: Option<u64>, new: u64) -> bool {
current.is_none_or(|current| new > current)
}

Check warning on line 65 in bin/prover-client/src/checkpoint_runner/runner.rs

View check run for this annotation

Codecov / codecov/patch

bin/prover-client/src/checkpoint_runner/runner.rs#L63-L65

Added lines #L63 - L65 were not covered by tests
9 changes: 7 additions & 2 deletions bin/prover-client/src/checkpoint_runner/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ use strata_rocksdb::prover::db::ProofDb;
use strata_rpc_types::{HexBytes, ProofKey};
use tracing::info;

use super::error::{CheckpointError, CheckpointResult};

/// Submits checkpoint proof to the sequencer.
pub async fn submit_checkpoint_proof(
checkpoint_index: u64,
sequencer_client: &HttpClient,
proof_key: ProofKey,
proof_db: Arc<ProofDb>,
) -> anyhow::Result<()> {
) -> CheckpointResult<()> {
let proof = proof_db.get_proof(proof_key).unwrap().unwrap();
let proof_bytes = HexBytes::from(proof.proof().as_bytes());

Expand All @@ -31,5 +33,8 @@ pub async fn submit_checkpoint_proof(
rpc_params![checkpoint_index, proof_bytes],

Check warning on line 33 in bin/prover-client/src/checkpoint_runner/submit.rs

View check run for this annotation

Codecov / codecov/patch

bin/prover-client/src/checkpoint_runner/submit.rs#L30-L33

Added lines #L30 - L33 were not covered by tests
)
.await
.map_err(|e| anyhow::anyhow!("Failed to submit checkpoint proof: {:?}", e))
.map_err(|e| CheckpointError::SubmitProofError {
index: checkpoint_index,
error: e.to_string(),
})
}

Check warning on line 40 in bin/prover-client/src/checkpoint_runner/submit.rs

View check run for this annotation

Codecov / codecov/patch

bin/prover-client/src/checkpoint_runner/submit.rs#L35-L40

Added lines #L35 - L40 were not covered by tests
16 changes: 16 additions & 0 deletions bin/prover-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,22 @@ async fn main_inner(args: Args) -> anyhow::Result<()> {
spawn(async move { manager.process_pending_tasks().await });
debug!("Spawn process pending tasks");

// run the checkpoint runner
if args.enable_checkpoint_runner {
let checkpoint_operator = operator.checkpoint_operator().clone();
let task_tracker_for_checkpoint = task_tracker.clone();
let db_for_checkpoint = db.clone();

spawn(async move {
checkpoint_proof_runner(
checkpoint_operator,
task_tracker_for_checkpoint,
db_for_checkpoint,
)
.await
});
}

Check warning on line 109 in bin/prover-client/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/prover-client/src/main.rs#L96-L109

Added lines #L96 - L109 were not covered by tests

// Run prover manager in dev mode or runner mode
if args.enable_dev_rpcs {
// Run the RPC server on dev mode only
Expand Down

0 comments on commit 3b6df16

Please sign in to comment.