Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prover Client Checkpoint Runner #565

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions bin/prover-client/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,18 @@ pub struct Args {
/// Defaults to `true`.
#[argh(option, description = "enable prover client dev rpc", default = "true")]
pub enable_dev_rpcs: bool,

/// Controls the checkpoint proof runner service.
///
/// When enabled, this prover will automatically generate and submit proofs for checkpoints.
/// This should only be enabled for provers designated as checkpoint provers.
/// Defaults to `false`.
#[argh(
option,
description = "enable prover client checkpoint runner",
default = "false"
)]
pub enable_checkpoint_runner: bool,
}

impl Args {
Expand Down
25 changes: 25 additions & 0 deletions bin/prover-client/src/checkpoint_runner/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use thiserror::Error;

/// Represents errors that can occur during checkpoint-related operations.
///
/// This error type encompasses various failure scenarios that may occur when
/// interacting with checkpoints, including RPC communication issues,
/// data validation problems, and serialization errors. Each variant provides
/// detailed information about the specific error condition.
#[derive(Error, Debug)]
pub enum CheckpointError {
/// Occurs when the RPC request to fetch checkpoint data fails.
#[error("Failed to fetch checkpoint data: {0}")]
FetchError(String),

/// Occurs when no checkpoint data is returned from the sequencer.
#[error("No checkpoint data returned from sequencer for index {0}")]
CheckpointNotFound(u64),

/// Occurs when failed to submit checkpoint proof to the sequencer.
#[error("Failed to submit checkpoint proof for index {index}: {error}")]
SubmitProofError { index: u64, error: String },
}

/// A type alias for Results involving checkpoint operations.
pub type CheckpointResult<T> = Result<T, CheckpointError>;
21 changes: 21 additions & 0 deletions bin/prover-client/src/checkpoint_runner/fetch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use jsonrpsee::http_client::HttpClient;
use strata_rpc_api::StrataApiClient;
use tracing::error;

use super::error::CheckpointResult;
use crate::checkpoint_runner::error::CheckpointError;

/// Fetches the latest checkpoint index from the sequencer client.
pub async fn fetch_latest_checkpoint_index(cl_client: &HttpClient) -> CheckpointResult<u64> {
cl_client
.get_latest_checkpoint_index()
.await
.map_err(|e| {
error!("Failed to fetch current checkpoint index: {e}");
CheckpointError::FetchError(e.to_string())
})?
.ok_or_else(|| {
error!("No checkpoint index returned from sequencer");
CheckpointError::CheckpointNotFound(0)
})
}

Check warning on line 21 in bin/prover-client/src/checkpoint_runner/fetch.rs

View check run for this annotation

Codecov / codecov/patch

bin/prover-client/src/checkpoint_runner/fetch.rs#L9-L21

Added lines #L9 - L21 were not covered by tests
6 changes: 6 additions & 0 deletions bin/prover-client/src/checkpoint_runner/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
//! A module defining operations for the checkpoint proof generation

pub mod error;
pub mod fetch;
pub mod runner;
pub mod submit;
65 changes: 65 additions & 0 deletions bin/prover-client/src/checkpoint_runner/runner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use std::sync::Arc;

use strata_rocksdb::prover::db::ProofDb;
use tokio::{
sync::Mutex,
time::{interval, Duration},
};
use tracing::{error, info};

use crate::{
checkpoint_runner::fetch::fetch_latest_checkpoint_index,
operators::{checkpoint::CheckpointOperator, ProvingOp},
task_tracker::TaskTracker,
};

const CHECKPOINT_POLL_INTERVAL: u64 = 10;

/// Periodically polls for the latest checkpoint index and updates the current index.
/// Dispatches tasks when a new checkpoint is detected.
pub async fn checkpoint_proof_runner(
operator: CheckpointOperator,
task_tracker: Arc<Mutex<TaskTracker>>,
db: Arc<ProofDb>,
) {
info!("Checkpoint runner started");
let mut ticker = interval(Duration::from_secs(CHECKPOINT_POLL_INTERVAL));
let mut current_checkpoint_idx: Option<u64> = None;

Check warning on line 27 in bin/prover-client/src/checkpoint_runner/runner.rs

View check run for this annotation

Codecov / codecov/patch

bin/prover-client/src/checkpoint_runner/runner.rs#L20-L27

Added lines #L20 - L27 were not covered by tests

loop {
ticker.tick().await;

Check warning on line 30 in bin/prover-client/src/checkpoint_runner/runner.rs

View check run for this annotation

Codecov / codecov/patch

bin/prover-client/src/checkpoint_runner/runner.rs#L30

Added line #L30 was not covered by tests

if let Err(e) =
process_checkpoint(&operator, &task_tracker, &db, &mut current_checkpoint_idx).await

Check warning on line 33 in bin/prover-client/src/checkpoint_runner/runner.rs

View check run for this annotation

Codecov / codecov/patch

bin/prover-client/src/checkpoint_runner/runner.rs#L32-L33

Added lines #L32 - L33 were not covered by tests
{
error!("Error processing checkpoint: {e:?}");
}

Check warning on line 36 in bin/prover-client/src/checkpoint_runner/runner.rs

View check run for this annotation

Codecov / codecov/patch

bin/prover-client/src/checkpoint_runner/runner.rs#L35-L36

Added lines #L35 - L36 were not covered by tests
}
}

async fn process_checkpoint(
operator: &CheckpointOperator,
task_tracker: &Arc<Mutex<TaskTracker>>,
db: &Arc<ProofDb>,
current_checkpoint_idx: &mut Option<u64>,
) -> anyhow::Result<()> {
let new_checkpoint = fetch_latest_checkpoint_index(operator.cl_client()).await?;

Check warning on line 46 in bin/prover-client/src/checkpoint_runner/runner.rs

View check run for this annotation

Codecov / codecov/patch

bin/prover-client/src/checkpoint_runner/runner.rs#L40-L46

Added lines #L40 - L46 were not covered by tests

if !should_update_checkpoint(*current_checkpoint_idx, new_checkpoint) {
info!(
"Fetched checkpoint {new_checkpoint} is not newer than current {current_checkpoint_idx:?}"

Check warning on line 50 in bin/prover-client/src/checkpoint_runner/runner.rs

View check run for this annotation

Codecov / codecov/patch

bin/prover-client/src/checkpoint_runner/runner.rs#L48-L50

Added lines #L48 - L50 were not covered by tests
);
return Ok(());
}

operator
.create_task(new_checkpoint, task_tracker.clone(), db)
.await?;
*current_checkpoint_idx = Some(new_checkpoint);

Ok(())
}

Check warning on line 61 in bin/prover-client/src/checkpoint_runner/runner.rs

View check run for this annotation

Codecov / codecov/patch

bin/prover-client/src/checkpoint_runner/runner.rs#L52-L61

Added lines #L52 - L61 were not covered by tests

fn should_update_checkpoint(current: Option<u64>, new: u64) -> bool {
current.is_none_or(|current| new > current)
}

Check warning on line 65 in bin/prover-client/src/checkpoint_runner/runner.rs

View check run for this annotation

Codecov / codecov/patch

bin/prover-client/src/checkpoint_runner/runner.rs#L63-L65

Added lines #L63 - L65 were not covered by tests
34 changes: 34 additions & 0 deletions bin/prover-client/src/checkpoint_runner/submit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use std::sync::Arc;

use jsonrpsee::http_client::HttpClient;
use strata_db::traits::ProofDatabase;
use strata_rocksdb::prover::db::ProofDb;
use strata_rpc_api::StrataSequencerApiClient;
use strata_rpc_types::{HexBytes, ProofKey};
use tracing::info;

use super::error::{CheckpointError, CheckpointResult};

/// Submits checkpoint proof to the sequencer.
pub async fn submit_checkpoint_proof(
checkpoint_index: u64,
sequencer_client: &HttpClient,
proof_key: ProofKey,
proof_db: Arc<ProofDb>,
) -> CheckpointResult<()> {
let proof = proof_db.get_proof(proof_key).unwrap().unwrap();
let proof_bytes = HexBytes::from(proof.proof().as_bytes());

info!(
"Sending checkpoint proof: {:?} ckp id: {:?} to the sequencer",

Check warning on line 23 in bin/prover-client/src/checkpoint_runner/submit.rs

View check run for this annotation

Codecov / codecov/patch

bin/prover-client/src/checkpoint_runner/submit.rs#L13-L23

Added lines #L13 - L23 were not covered by tests
proof_key, checkpoint_index
);

sequencer_client
.submit_checkpoint_proof(checkpoint_index, proof_bytes)
.await
.map_err(|e| CheckpointError::SubmitProofError {
index: checkpoint_index,
error: e.to_string(),
})
}

Check warning on line 34 in bin/prover-client/src/checkpoint_runner/submit.rs

View check run for this annotation

Codecov / codecov/patch

bin/prover-client/src/checkpoint_runner/submit.rs#L27-L34

Added lines #L27 - L34 were not covered by tests
151 changes: 0 additions & 151 deletions bin/prover-client/src/ckp_runner.rs

This file was deleted.

18 changes: 18 additions & 0 deletions bin/prover-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use anyhow::Context;
use args::Args;
use checkpoint_runner::runner::checkpoint_proof_runner;
use db::open_rocksdb_database;
use jsonrpsee::http_client::HttpClientBuilder;
use operators::ProofOperator;
Expand All @@ -17,6 +18,7 @@
use tracing::debug;

mod args;
mod checkpoint_runner;
mod db;
mod errors;
mod hosts;
Expand Down Expand Up @@ -90,6 +92,22 @@
spawn(async move { manager.process_pending_tasks().await });
debug!("Spawn process pending tasks");

// run the checkpoint runner
if args.enable_checkpoint_runner {
let checkpoint_operator = operator.checkpoint_operator().clone();
let task_tracker_for_checkpoint = task_tracker.clone();
let db_for_checkpoint = db.clone();

spawn(async move {
checkpoint_proof_runner(
checkpoint_operator,
task_tracker_for_checkpoint,
db_for_checkpoint,
)
.await
});
}

Check warning on line 109 in bin/prover-client/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/prover-client/src/main.rs#L96-L109

Added lines #L96 - L109 were not covered by tests

// Run prover manager in dev mode or runner mode
if args.enable_dev_rpcs {
// Run the RPC server on dev mode only
Expand Down
5 changes: 5 additions & 0 deletions bin/prover-client/src/operators/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@
.map_err(|e| ProvingTaskError::RpcError(e.to_string()))?
.ok_or(ProvingTaskError::WitnessNotFound)
}

/// Returns a reference to the internal CL (Consensus Layer) `HttpClient`.
pub fn cl_client(&self) -> &HttpClient {
&self.cl_client
}

Check warning on line 75 in bin/prover-client/src/operators/checkpoint.rs

View check run for this annotation

Codecov / codecov/patch

bin/prover-client/src/operators/checkpoint.rs#L73-L75

Added lines #L73 - L75 were not covered by tests
}

impl ProvingOp for CheckpointOperator {
Expand Down
Loading
Loading