From 381d4baf91d5aab0989c36f0cb58582c448f9ef6 Mon Sep 17 00:00:00 2001 From: ec2 Date: Tue, 9 Jan 2024 08:40:22 +0000 Subject: [PATCH 1/2] Limit Concurrent Proof Gen --- preprocessor/src/lib.rs | 2 +- preprocessor/src/rotation.rs | 4 +- prover/src/args.rs | 4 ++ prover/src/main.rs | 3 ++ prover/src/rpc.rs | 99 +++++++++++++++++++++++------------- 5 files changed, 75 insertions(+), 37 deletions(-) diff --git a/preprocessor/src/lib.rs b/preprocessor/src/lib.rs index eaf68ea..1b9381c 100644 --- a/preprocessor/src/lib.rs +++ b/preprocessor/src/lib.rs @@ -126,7 +126,7 @@ pub async fn get_block_header( } pub async fn light_client_update_to_args( - update: &mut LightClientUpdateCapella< + update: &LightClientUpdateCapella< { S::SYNC_COMMITTEE_SIZE }, { S::SYNC_COMMITTEE_ROOT_INDEX }, { S::SYNC_COMMITTEE_DEPTH }, diff --git a/preprocessor/src/rotation.rs b/preprocessor/src/rotation.rs index d471a74..a298714 100644 --- a/preprocessor/src/rotation.rs +++ b/preprocessor/src/rotation.rs @@ -35,8 +35,8 @@ where slot, period ); - let mut update = get_light_client_update_at_period(client, period).await?; - rotation_args_from_update(&mut update).await + let update = get_light_client_update_at_period(client, period).await?; + rotation_args_from_update(&update).await } /// Converts a [`LightClientUpdateCapella`] to a [`CommitteeUpdateArgs`] witness. diff --git a/prover/src/args.rs b/prover/src/args.rs index 738ed60..60bc1c6 100644 --- a/prover/src/args.rs +++ b/prover/src/args.rs @@ -45,6 +45,10 @@ pub enum BaseCmd { /// Path to directory with circuit artifacts #[clap(long, short, default_value = "./build")] build_dir: PathBuf, + + /// How many proofs can be run at the same tome + #[clap(long, short, default_value = "1")] + concurrency: usize, }, /// Circuit related commands. Circuit { diff --git a/prover/src/main.rs b/prover/src/main.rs index 808aacc..b6061f3 100644 --- a/prover/src/main.rs +++ b/prover/src/main.rs @@ -24,6 +24,7 @@ async fn app(options: Cli) -> eyre::Result<()> { port, spec, build_dir, + concurrency, } => { match spec { args::Spec::Testnet => { @@ -31,6 +32,7 @@ async fn app(options: Cli) -> eyre::Result<()> { port.parse().unwrap(), options.args.config_dir, build_dir, + concurrency, ) .await } @@ -39,6 +41,7 @@ async fn app(options: Cli) -> eyre::Result<()> { port.parse().unwrap(), options.args.config_dir, build_dir, + concurrency, ) .await } diff --git a/prover/src/rpc.rs b/prover/src/rpc.rs index 2ee8ae1..11266c6 100644 --- a/prover/src/rpc.rs +++ b/prover/src/rpc.rs @@ -6,7 +6,7 @@ use ark_std::{end_timer, start_timer}; use axum::{http::StatusCode, response::IntoResponse, routing::post, Router}; use ethers::prelude::*; use itertools::Itertools; -use jsonrpc_v2::{Data, RequestObject as JsonRpcRequestObject}; +use jsonrpc_v2::{Data, RequestObject as JsonRpcRequestObject, ResponseObjects}; use jsonrpc_v2::{Error as JsonRpcError, Params}; use jsonrpc_v2::{MapRouter as JsonRpcMapRouter, Server as JsonRpcServer}; use lightclient_circuits::halo2_proofs::halo2curves::bn256::{Bn256, Fr, G1Affine}; @@ -19,8 +19,9 @@ use snark_verifier_sdk::{halo2::aggregation::AggregationCircuit, Snark}; use spectre_prover::prover::ProverState; use std::path::{Path, PathBuf}; use std::sync::Arc; +use tokio::sync::Semaphore; -pub type JsonRpcServerState = Arc>; +pub type JsonRpcServerState = Arc; use crate::rpc_api::{ CommitteeUpdateEvmProofResult, GenProofCommitteeUpdateParams, GenProofStepParams, @@ -28,30 +29,52 @@ use crate::rpc_api::{ RPC_EVM_PROOF_STEP_CIRCUIT_COMPRESSED, }; -pub(crate) fn jsonrpc_server( - state: ProverState, -) -> JsonRpcServer -where - [(); S::SYNC_COMMITTEE_SIZE]:, - [(); S::FINALIZED_HEADER_DEPTH]:, - [(); S::BYTES_PER_LOGS_BLOOM]:, - [(); S::MAX_EXTRA_DATA_BYTES]:, - [(); S::SYNC_COMMITTEE_ROOT_INDEX]:, - [(); S::SYNC_COMMITTEE_DEPTH]:, - [(); S::FINALIZED_HEADER_INDEX]:, -{ - JsonRpcServer::new() - .with_data(Data::new(state)) - .with_method( - RPC_EVM_PROOF_COMMITTEE_UPDATE_CIRCUIT_COMPRESSED, - gen_evm_proof_committee_update_handler::, - ) - .with_method( - RPC_EVM_PROOF_STEP_CIRCUIT_COMPRESSED, - gen_evm_proof_sync_step_compressed_handler::, - ) - .finish_unwrapped() +pub struct ServerState { + json_rpc_server: JsonRpcServer, + concurrency: Semaphore, } + +impl ServerState { + pub(crate) fn new(state: ProverState, concurrency: usize) -> Self + where + [(); S::SYNC_COMMITTEE_SIZE]:, + [(); S::FINALIZED_HEADER_DEPTH]:, + [(); S::BYTES_PER_LOGS_BLOOM]:, + [(); S::MAX_EXTRA_DATA_BYTES]:, + [(); S::SYNC_COMMITTEE_ROOT_INDEX]:, + [(); S::SYNC_COMMITTEE_DEPTH]:, + [(); S::FINALIZED_HEADER_INDEX]:, + { + let json_rpc_server = JsonRpcServer::new() + .with_data(Data::new(state)) + .with_method( + RPC_EVM_PROOF_COMMITTEE_UPDATE_CIRCUIT_COMPRESSED, + gen_evm_proof_committee_update_handler::, + ) + .with_method( + RPC_EVM_PROOF_STEP_CIRCUIT_COMPRESSED, + gen_evm_proof_sync_step_compressed_handler::, + ) + .finish_unwrapped(); + Self { + json_rpc_server, + concurrency: Semaphore::new(concurrency), + } + } + + pub async fn handle(&self, rpc_call: JsonRpcRequestObject) -> Result { + log::info!( + "Incoming RPC request with method: {}, {} running proofs", + rpc_call.method_ref(), + self.concurrency.available_permits() + ); + if let Err(e) = self.concurrency.acquire().await { + return Err(e.to_string()); + } + Ok(self.json_rpc_server.handle(rpc_call).await) + } +} + pub(crate) async fn gen_evm_proof_committee_update_handler( Data(state): Data, Params(params): Params, @@ -69,8 +92,8 @@ where light_client_update, } = params; - let mut update = ssz_rs::deserialize(&light_client_update)?; - let witness = rotation_args_from_update(&mut update).await?; + let update = ssz_rs::deserialize(&light_client_update)?; + let witness = rotation_args_from_update(&update).await?; let params = state.params.get(state.committee_update.degree()).unwrap(); let snark = gen_uncompressed_snark::>( @@ -186,6 +209,7 @@ pub async fn run_rpc( port: usize, config_dir: impl AsRef, build_dir: impl AsRef, + concurrency: usize, ) -> Result<(), eyre::Error> where [(); S::SYNC_COMMITTEE_SIZE]:, @@ -200,7 +224,7 @@ where let timer = start_timer!(|| "Load proving keys"); let state = ProverState::new::(config_dir.as_ref(), build_dir.as_ref()); end_timer!(timer); - let rpc_server = Arc::new(jsonrpc_server::(state)); + let rpc_server = Arc::new(ServerState::new(state, concurrency)); let router = Router::new() .route("/rpc", post(handler)) @@ -218,13 +242,20 @@ async fn handler( axum::Json(rpc_call): axum::Json, ) -> impl IntoResponse { let response_headers = [("content-type", "application/json-rpc;charset=utf-8")]; - log::debug!("RPC request with method: {}", rpc_call.method_ref()); - let response = rpc_server.handle(rpc_call).await; - let response_str = serde_json::to_string(&response); - log::debug!("RPC response: {:?}", response_str); - match response_str { - Ok(result) => (StatusCode::OK, response_headers, result), + match rpc_server.handle(rpc_call).await { + Ok(response) => { + let response_str = serde_json::to_string(&response); + log::debug!("RPC response: {:?}", response_str); + match response_str { + Ok(result) => (StatusCode::OK, response_headers, result), + Err(err) => ( + StatusCode::INTERNAL_SERVER_ERROR, + response_headers, + err.to_string(), + ), + } + } Err(err) => ( StatusCode::INTERNAL_SERVER_ERROR, response_headers, From 7e820c2abfe3c5005b5ffd45e03bf138fc800967 Mon Sep 17 00:00:00 2001 From: ec2 Date: Tue, 9 Jan 2024 09:00:27 +0000 Subject: [PATCH 2/2] refactor --- prover/src/prover.rs | 6 ++- prover/src/rpc.rs | 111 ++++++++++++++++++------------------------- 2 files changed, 52 insertions(+), 65 deletions(-) diff --git a/prover/src/prover.rs b/prover/src/prover.rs index 03b016e..41f301f 100644 --- a/prover/src/prover.rs +++ b/prover/src/prover.rs @@ -16,6 +16,8 @@ use lightclient_circuits::util::AppCircuit; use snark_verifier_sdk::halo2::aggregation::AggregationCircuit; use std::collections::BTreeMap; use std::path::{Path, PathBuf}; +use std::sync::Arc; +use tokio::sync::Semaphore; #[derive(Clone, Debug, Getters)] pub struct CircuitContext { @@ -35,10 +37,11 @@ pub struct ProverState { pub step_verifier: CircuitContext, pub committee_update: CircuitContext, pub committee_update_verifier: CircuitContext, + pub concurrency: Arc, } impl ProverState { - pub fn new(config_dir: &Path, build_dir: &Path) -> Self { + pub fn new(config_dir: &Path, build_dir: &Path, concurrency: usize) -> Self { let mut params_map = BTreeMap::new(); fn load_ctx( @@ -109,6 +112,7 @@ impl ProverState { vec![committee_update_snark], ), params: params_map, + concurrency: Arc::new(Semaphore::new(concurrency)), } } } diff --git a/prover/src/rpc.rs b/prover/src/rpc.rs index 11266c6..0fb7c58 100644 --- a/prover/src/rpc.rs +++ b/prover/src/rpc.rs @@ -6,7 +6,7 @@ use ark_std::{end_timer, start_timer}; use axum::{http::StatusCode, response::IntoResponse, routing::post, Router}; use ethers::prelude::*; use itertools::Itertools; -use jsonrpc_v2::{Data, RequestObject as JsonRpcRequestObject, ResponseObjects}; +use jsonrpc_v2::{Data, RequestObject as JsonRpcRequestObject}; use jsonrpc_v2::{Error as JsonRpcError, Params}; use jsonrpc_v2::{MapRouter as JsonRpcMapRouter, Server as JsonRpcServer}; use lightclient_circuits::halo2_proofs::halo2curves::bn256::{Bn256, Fr, G1Affine}; @@ -19,60 +19,37 @@ use snark_verifier_sdk::{halo2::aggregation::AggregationCircuit, Snark}; use spectre_prover::prover::ProverState; use std::path::{Path, PathBuf}; use std::sync::Arc; -use tokio::sync::Semaphore; - -pub type JsonRpcServerState = Arc; +pub type JsonRpcServerState = Arc>; use crate::rpc_api::{ CommitteeUpdateEvmProofResult, GenProofCommitteeUpdateParams, GenProofStepParams, SyncStepCompressedEvmProofResult, RPC_EVM_PROOF_COMMITTEE_UPDATE_CIRCUIT_COMPRESSED, RPC_EVM_PROOF_STEP_CIRCUIT_COMPRESSED, }; -pub struct ServerState { - json_rpc_server: JsonRpcServer, - concurrency: Semaphore, -} - -impl ServerState { - pub(crate) fn new(state: ProverState, concurrency: usize) -> Self - where - [(); S::SYNC_COMMITTEE_SIZE]:, - [(); S::FINALIZED_HEADER_DEPTH]:, - [(); S::BYTES_PER_LOGS_BLOOM]:, - [(); S::MAX_EXTRA_DATA_BYTES]:, - [(); S::SYNC_COMMITTEE_ROOT_INDEX]:, - [(); S::SYNC_COMMITTEE_DEPTH]:, - [(); S::FINALIZED_HEADER_INDEX]:, - { - let json_rpc_server = JsonRpcServer::new() - .with_data(Data::new(state)) - .with_method( - RPC_EVM_PROOF_COMMITTEE_UPDATE_CIRCUIT_COMPRESSED, - gen_evm_proof_committee_update_handler::, - ) - .with_method( - RPC_EVM_PROOF_STEP_CIRCUIT_COMPRESSED, - gen_evm_proof_sync_step_compressed_handler::, - ) - .finish_unwrapped(); - Self { - json_rpc_server, - concurrency: Semaphore::new(concurrency), - } - } - - pub async fn handle(&self, rpc_call: JsonRpcRequestObject) -> Result { - log::info!( - "Incoming RPC request with method: {}, {} running proofs", - rpc_call.method_ref(), - self.concurrency.available_permits() - ); - if let Err(e) = self.concurrency.acquire().await { - return Err(e.to_string()); - } - Ok(self.json_rpc_server.handle(rpc_call).await) - } +pub(crate) fn jsonrpc_server( + state: ProverState, +) -> JsonRpcServer +where + [(); S::SYNC_COMMITTEE_SIZE]:, + [(); S::FINALIZED_HEADER_DEPTH]:, + [(); S::BYTES_PER_LOGS_BLOOM]:, + [(); S::MAX_EXTRA_DATA_BYTES]:, + [(); S::SYNC_COMMITTEE_ROOT_INDEX]:, + [(); S::SYNC_COMMITTEE_DEPTH]:, + [(); S::FINALIZED_HEADER_INDEX]:, +{ + JsonRpcServer::new() + .with_data(Data::new(state)) + .with_method( + RPC_EVM_PROOF_COMMITTEE_UPDATE_CIRCUIT_COMPRESSED, + gen_evm_proof_committee_update_handler::, + ) + .with_method( + RPC_EVM_PROOF_STEP_CIRCUIT_COMPRESSED, + gen_evm_proof_sync_step_compressed_handler::, + ) + .finish_unwrapped() } pub(crate) async fn gen_evm_proof_committee_update_handler( @@ -88,6 +65,13 @@ where [(); S::SYNC_COMMITTEE_DEPTH]:, [(); S::FINALIZED_HEADER_INDEX]:, { + if let Err(e) = state.concurrency.clone().acquire_owned().await { + return Err(JsonRpcError::internal(format!( + "Failed to acquire concurrency lock: {}", + e + ))); + }; + let GenProofCommitteeUpdateParams { light_client_update, } = params; @@ -145,6 +129,12 @@ where [(); S::BYTES_PER_LOGS_BLOOM]:, [(); S::MAX_EXTRA_DATA_BYTES]:, { + if let Err(e) = state.concurrency.clone().acquire_owned().await { + return Err(JsonRpcError::internal(format!( + "Failed to acquire concurrency lock: {}", + e + ))); + }; let GenProofStepParams { light_client_finality_update, domain, @@ -222,10 +212,9 @@ where { let tcp_listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", port)).await?; let timer = start_timer!(|| "Load proving keys"); - let state = ProverState::new::(config_dir.as_ref(), build_dir.as_ref()); + let state = ProverState::new::(config_dir.as_ref(), build_dir.as_ref(), concurrency); end_timer!(timer); - let rpc_server = Arc::new(ServerState::new(state, concurrency)); - + let rpc_server = Arc::new(jsonrpc_server::(state)); let router = Router::new() .route("/rpc", post(handler)) .with_state(rpc_server); @@ -243,19 +232,13 @@ async fn handler( ) -> impl IntoResponse { let response_headers = [("content-type", "application/json-rpc;charset=utf-8")]; - match rpc_server.handle(rpc_call).await { - Ok(response) => { - let response_str = serde_json::to_string(&response); - log::debug!("RPC response: {:?}", response_str); - match response_str { - Ok(result) => (StatusCode::OK, response_headers, result), - Err(err) => ( - StatusCode::INTERNAL_SERVER_ERROR, - response_headers, - err.to_string(), - ), - } - } + log::debug!("RPC request with method: {}", rpc_call.method_ref()); + + let response = rpc_server.handle(rpc_call).await; + let response_str = serde_json::to_string(&response); + log::debug!("RPC response: {:?}", response_str); + match response_str { + Ok(result) => (StatusCode::OK, response_headers, result), Err(err) => ( StatusCode::INTERNAL_SERVER_ERROR, response_headers,