diff --git a/preprocessor/src/lib.rs b/preprocessor/src/lib.rs index eaf68ea..1b9381c 100644 --- a/preprocessor/src/lib.rs +++ b/preprocessor/src/lib.rs @@ -126,7 +126,7 @@ pub async fn get_block_header( } pub async fn light_client_update_to_args( - update: &mut LightClientUpdateCapella< + update: &LightClientUpdateCapella< { S::SYNC_COMMITTEE_SIZE }, { S::SYNC_COMMITTEE_ROOT_INDEX }, { S::SYNC_COMMITTEE_DEPTH }, diff --git a/preprocessor/src/rotation.rs b/preprocessor/src/rotation.rs index d471a74..a298714 100644 --- a/preprocessor/src/rotation.rs +++ b/preprocessor/src/rotation.rs @@ -35,8 +35,8 @@ where slot, period ); - let mut update = get_light_client_update_at_period(client, period).await?; - rotation_args_from_update(&mut update).await + let update = get_light_client_update_at_period(client, period).await?; + rotation_args_from_update(&update).await } /// Converts a [`LightClientUpdateCapella`] to a [`CommitteeUpdateArgs`] witness. diff --git a/prover/src/args.rs b/prover/src/args.rs index 738ed60..60bc1c6 100644 --- a/prover/src/args.rs +++ b/prover/src/args.rs @@ -45,6 +45,10 @@ pub enum BaseCmd { /// Path to directory with circuit artifacts #[clap(long, short, default_value = "./build")] build_dir: PathBuf, + + /// How many proofs can be run at the same tome + #[clap(long, short, default_value = "1")] + concurrency: usize, }, /// Circuit related commands. Circuit { diff --git a/prover/src/main.rs b/prover/src/main.rs index 808aacc..b6061f3 100644 --- a/prover/src/main.rs +++ b/prover/src/main.rs @@ -24,6 +24,7 @@ async fn app(options: Cli) -> eyre::Result<()> { port, spec, build_dir, + concurrency, } => { match spec { args::Spec::Testnet => { @@ -31,6 +32,7 @@ async fn app(options: Cli) -> eyre::Result<()> { port.parse().unwrap(), options.args.config_dir, build_dir, + concurrency, ) .await } @@ -39,6 +41,7 @@ async fn app(options: Cli) -> eyre::Result<()> { port.parse().unwrap(), options.args.config_dir, build_dir, + concurrency, ) .await } diff --git a/prover/src/prover.rs b/prover/src/prover.rs index 03b016e..41f301f 100644 --- a/prover/src/prover.rs +++ b/prover/src/prover.rs @@ -16,6 +16,8 @@ use lightclient_circuits::util::AppCircuit; use snark_verifier_sdk::halo2::aggregation::AggregationCircuit; use std::collections::BTreeMap; use std::path::{Path, PathBuf}; +use std::sync::Arc; +use tokio::sync::Semaphore; #[derive(Clone, Debug, Getters)] pub struct CircuitContext { @@ -35,10 +37,11 @@ pub struct ProverState { pub step_verifier: CircuitContext, pub committee_update: CircuitContext, pub committee_update_verifier: CircuitContext, + pub concurrency: Arc, } impl ProverState { - pub fn new(config_dir: &Path, build_dir: &Path) -> Self { + pub fn new(config_dir: &Path, build_dir: &Path, concurrency: usize) -> Self { let mut params_map = BTreeMap::new(); fn load_ctx( @@ -109,6 +112,7 @@ impl ProverState { vec![committee_update_snark], ), params: params_map, + concurrency: Arc::new(Semaphore::new(concurrency)), } } } diff --git a/prover/src/rpc.rs b/prover/src/rpc.rs index 2ee8ae1..0fb7c58 100644 --- a/prover/src/rpc.rs +++ b/prover/src/rpc.rs @@ -21,7 +21,6 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; pub type JsonRpcServerState = Arc>; - use crate::rpc_api::{ CommitteeUpdateEvmProofResult, GenProofCommitteeUpdateParams, GenProofStepParams, SyncStepCompressedEvmProofResult, RPC_EVM_PROOF_COMMITTEE_UPDATE_CIRCUIT_COMPRESSED, @@ -52,6 +51,7 @@ where ) .finish_unwrapped() } + pub(crate) async fn gen_evm_proof_committee_update_handler( Data(state): Data, Params(params): Params, @@ -65,12 +65,19 @@ where [(); S::SYNC_COMMITTEE_DEPTH]:, [(); S::FINALIZED_HEADER_INDEX]:, { + if let Err(e) = state.concurrency.clone().acquire_owned().await { + return Err(JsonRpcError::internal(format!( + "Failed to acquire concurrency lock: {}", + e + ))); + }; + let GenProofCommitteeUpdateParams { light_client_update, } = params; - let mut update = ssz_rs::deserialize(&light_client_update)?; - let witness = rotation_args_from_update(&mut update).await?; + let update = ssz_rs::deserialize(&light_client_update)?; + let witness = rotation_args_from_update(&update).await?; let params = state.params.get(state.committee_update.degree()).unwrap(); let snark = gen_uncompressed_snark::>( @@ -122,6 +129,12 @@ where [(); S::BYTES_PER_LOGS_BLOOM]:, [(); S::MAX_EXTRA_DATA_BYTES]:, { + if let Err(e) = state.concurrency.clone().acquire_owned().await { + return Err(JsonRpcError::internal(format!( + "Failed to acquire concurrency lock: {}", + e + ))); + }; let GenProofStepParams { light_client_finality_update, domain, @@ -186,6 +199,7 @@ pub async fn run_rpc( port: usize, config_dir: impl AsRef, build_dir: impl AsRef, + concurrency: usize, ) -> Result<(), eyre::Error> where [(); S::SYNC_COMMITTEE_SIZE]:, @@ -198,10 +212,9 @@ where { let tcp_listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", port)).await?; let timer = start_timer!(|| "Load proving keys"); - let state = ProverState::new::(config_dir.as_ref(), build_dir.as_ref()); + let state = ProverState::new::(config_dir.as_ref(), build_dir.as_ref(), concurrency); end_timer!(timer); let rpc_server = Arc::new(jsonrpc_server::(state)); - let router = Router::new() .route("/rpc", post(handler)) .with_state(rpc_server); @@ -218,6 +231,7 @@ async fn handler( axum::Json(rpc_call): axum::Json, ) -> impl IntoResponse { let response_headers = [("content-type", "application/json-rpc;charset=utf-8")]; + log::debug!("RPC request with method: {}", rpc_call.method_ref()); let response = rpc_server.handle(rpc_call).await;