diff --git a/Cargo.lock b/Cargo.lock index 0701bc420c3..9c622c977bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4022,6 +4022,14 @@ dependencies = [ "log", ] +[[package]] +name = "kzgpad-rs" +version = "0.1.0" +source = "git+https://github.com/Layr-Labs/kzgpad-rs.git?tag=v0.1.0#b5f8c8d3d6482407dc118cb1f51597a017a1cc89" +dependencies = [ + "rand 0.8.5", +] + [[package]] name = "lalrpop" version = "0.20.2" @@ -10169,15 +10177,19 @@ version = "0.1.0" dependencies = [ "anyhow", "axum", + "hex", + "kzgpad-rs", "prost 0.13.3", "rand 0.8.5", "rlp", + "rustls 0.23.13", "sha3 0.10.8", "tokio", "tonic", "tonic-build", "tonic-web", "tracing", + "zksync_config", ] [[package]] diff --git a/core/lib/config/src/configs/da_client/eigen_da.rs b/core/lib/config/src/configs/da_client/eigen_da.rs index 5c77d32846b..483b8a21ca8 100644 --- a/core/lib/config/src/configs/da_client/eigen_da.rs +++ b/core/lib/config/src/configs/da_client/eigen_da.rs @@ -13,18 +13,25 @@ pub struct MemStoreConfig { pub custom_quorum_numbers: Option>, // todo: This should be removed once eigenda proxy is no longer used pub account_id: Option, // todo: This should be removed once eigenda proxy is no longer used pub max_blob_size_bytes: u64, + /// Blob expiration time in seconds pub blob_expiration: u64, + /// Latency in milliseconds for get operations pub get_latency: u64, + /// Latency in milliseconds for put operations pub put_latency: u64, } #[derive(Clone, Debug, PartialEq, Deserialize, Default)] pub struct DisperserConfig { pub api_node_url: String, // todo: This should be removed once eigenda proxy is no longer used - pub custom_quorum_numbers: Option>, // todo: This should be removed once eigenda proxy is no longer used - pub account_id: Option, // todo: This should be removed once eigenda proxy is no longer used + pub custom_quorum_numbers: Option>, + pub account_id: Option, pub disperser_rpc: String, pub eth_confirmation_depth: i32, pub eigenda_eth_rpc: String, pub eigenda_svc_manager_addr: String, + pub blob_size_limit: u64, + pub status_query_timeout: u64, + pub status_query_interval: u64, + pub wait_for_finalization: bool, } diff --git a/core/lib/protobuf_config/src/da_client.rs b/core/lib/protobuf_config/src/da_client.rs index f6798bd27d3..933a16ecd61 100644 --- a/core/lib/protobuf_config/src/da_client.rs +++ b/core/lib/protobuf_config/src/da_client.rs @@ -77,6 +77,18 @@ impl ProtoRepr for proto::DataAvailabilityClient { eigenda_svc_manager_addr: required(&conf.eigenda_svc_manager_addr) .context("eigenda_svc_manager_addr")? .clone(), + blob_size_limit: required(&conf.blob_size_limit) + .context("blob_size_limit")? + .clone(), + status_query_timeout: required(&conf.status_query_timeout) + .context("status_query_timeout")? + .clone(), + status_query_interval: required(&conf.status_query_interval) + .context("status_query_interval")? + .clone(), + wait_for_finalization: required(&conf.wait_for_finalization) + .context("wait_for_finalization")? + .clone(), }) } }; @@ -143,6 +155,10 @@ impl ProtoRepr for proto::DataAvailabilityClient { eigenda_svc_manager_addr: Some( config.eigenda_svc_manager_addr.clone(), ), + blob_size_limit: Some(config.blob_size_limit), + status_query_timeout: Some(config.status_query_timeout), + status_query_interval: Some(config.status_query_interval), + wait_for_finalization: Some(config.wait_for_finalization), }, )), }, diff --git a/core/lib/protobuf_config/src/proto/config/da_client.proto b/core/lib/protobuf_config/src/proto/config/da_client.proto index 18c1dc94f90..9240af15213 100644 --- a/core/lib/protobuf_config/src/proto/config/da_client.proto +++ b/core/lib/protobuf_config/src/proto/config/da_client.proto @@ -25,12 +25,16 @@ message MemStoreConfig { message DisperserConfig { optional string api_node_url = 1; // TODO: This should be removed once eigenda proxy is no longer used - repeated uint32 custom_quorum_numbers = 2; // TODO: This should be removed once eigenda proxy is no longer used - optional string account_id = 3; // TODO: This should be removed once eigenda proxy is no longer used + repeated uint32 custom_quorum_numbers = 2; + optional string account_id = 3; optional string disperser_rpc = 4; optional int32 eth_confirmation_depth = 5; optional string eigenda_eth_rpc = 6; optional string eigenda_svc_manager_addr = 7; + optional uint64 blob_size_limit = 8; + optional uint64 status_query_timeout = 9; + optional uint64 status_query_interval = 10; + optional bool wait_for_finalization = 11; } message EigenDaConfig { diff --git a/core/node/eigenda_proxy/Cargo.toml b/core/node/eigenda_proxy/Cargo.toml index fcd5e4347c2..63ad329d93e 100644 --- a/core/node/eigenda_proxy/Cargo.toml +++ b/core/node/eigenda_proxy/Cargo.toml @@ -17,11 +17,15 @@ tracing.workspace = true rlp.workspace = true rand.workspace = true sha3.workspace = true +hex.workspace = true +zksync_config.workspace = true # we can't use the workspace version of prost because # the tonic dependency requires a hugher version. prost = "0.13.1" -tonic = { version = "0.12.1", features = ["tls", "channel", "tls-roots"]} +tonic = { version = "0.12.1", features = ["tls", "channel", "tls-native-roots"]} tonic-web = "0.12.1" +kzgpad-rs = { git = "https://github.com/Layr-Labs/kzgpad-rs.git", tag = "v0.1.0" } +rustls.workspace = true [build-dependencies] tonic-build = { version = "0.12.1", features = ["prost"] } diff --git a/core/node/eigenda_proxy/src/blob_info.rs b/core/node/eigenda_proxy/src/blob_info.rs index ef1ac6ded8d..e999df91a9e 100644 --- a/core/node/eigenda_proxy/src/blob_info.rs +++ b/core/node/eigenda_proxy/src/blob_info.rs @@ -1,5 +1,30 @@ +use std::fmt; + use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream}; +use crate::{ + common::G1Commitment as DisperserG1Commitment, + disperser::{ + BatchHeader as DisperserBatchHeader, BatchMetadata as DisperserBatchMetadata, + BlobHeader as DisperserBlobHeader, BlobInfo as DisperserBlobInfo, + BlobQuorumParam as DisperserBlobQuorumParam, + BlobVerificationProof as DisperserBlobVerificationProof, + }, +}; + +#[derive(Debug)] +pub enum ConversionError { + NotPresentError, +} + +impl fmt::Display for ConversionError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ConversionError::NotPresentError => write!(f, "Failed to convert BlobInfo"), + } + } +} + #[derive(Debug)] pub struct G1Commitment { pub x: Vec, @@ -35,6 +60,16 @@ impl Encodable for G1Commitment { } } +impl TryFrom for G1Commitment { + type Error = ConversionError; + fn try_from(value: DisperserG1Commitment) -> Result { + Ok(Self { + x: value.x, + y: value.y, + }) + } +} + #[derive(Debug)] pub struct BlobQuorumParam { pub quorum_number: u32, @@ -76,6 +111,18 @@ impl Encodable for BlobQuorumParam { } } +impl TryFrom for BlobQuorumParam { + type Error = ConversionError; + fn try_from(value: DisperserBlobQuorumParam) -> Result { + Ok(Self { + quorum_number: value.quorum_number, + adversary_threshold_percentage: value.adversary_threshold_percentage, + confirmation_threshold_percentage: value.confirmation_threshold_percentage, + chunk_length: value.chunk_length, + }) + } +} + #[derive(Debug)] pub struct BlobHeader { pub commitment: G1Commitment, @@ -121,6 +168,26 @@ impl Encodable for BlobHeader { } } +impl TryFrom for BlobHeader { + type Error = ConversionError; + fn try_from(value: DisperserBlobHeader) -> Result { + if value.commitment.is_none() { + return Err(ConversionError::NotPresentError); + } + let blob_quorum_params: Result, Self::Error> = value + .blob_quorum_params + .iter() + .map(|param| BlobQuorumParam::try_from(param.clone())) + .collect(); + let blob_quorum_params = blob_quorum_params?; + Ok(Self { + commitment: G1Commitment::try_from(value.commitment.unwrap())?, + data_length: value.data_length, + blob_quorum_params, + }) + } +} + #[derive(Debug)] pub struct BatchHeader { pub batch_root: Vec, @@ -165,6 +232,18 @@ impl Encodable for BatchHeader { } } +impl TryFrom for BatchHeader { + type Error = ConversionError; + fn try_from(value: DisperserBatchHeader) -> Result { + Ok(Self { + batch_root: value.batch_root, + quorum_numbers: value.quorum_numbers, + quorum_signed_percentages: value.quorum_signed_percentages, + reference_block_number: value.reference_block_number, + }) + } +} + #[derive(Debug)] pub struct BatchMetadata { pub batch_header: BatchHeader, @@ -210,6 +289,22 @@ impl Encodable for BatchMetadata { } } +impl TryFrom for BatchMetadata { + type Error = ConversionError; + fn try_from(value: DisperserBatchMetadata) -> Result { + if value.batch_header.is_none() { + return Err(ConversionError::NotPresentError); + } + Ok(Self { + batch_header: BatchHeader::try_from(value.batch_header.unwrap())?, + signatory_record_hash: value.signatory_record_hash, + fee: value.fee, + confirmation_block_number: value.confirmation_block_number, + batch_header_hash: value.batch_header_hash, + }) + } +} + #[derive(Debug)] pub struct BlobVerificationProof { pub batch_id: u32, @@ -257,6 +352,22 @@ impl Encodable for BlobVerificationProof { } } +impl TryFrom for BlobVerificationProof { + type Error = ConversionError; + fn try_from(value: DisperserBlobVerificationProof) -> Result { + if value.batch_metadata.is_none() { + return Err(ConversionError::NotPresentError); + } + Ok(Self { + batch_id: value.batch_id, + blob_index: value.blob_index, + batch_medatada: BatchMetadata::try_from(value.batch_metadata.unwrap())?, + inclusion_proof: value.inclusion_proof, + quorum_indexes: value.quorum_indexes, + }) + } +} + #[derive(Debug)] pub struct BlobInfo { pub blob_header: BlobHeader, @@ -294,3 +405,18 @@ impl Encodable for BlobInfo { s.append(&self.blob_verification_proof); } } + +impl TryFrom for BlobInfo { + type Error = ConversionError; + fn try_from(value: DisperserBlobInfo) -> Result { + if value.blob_header.is_none() || value.blob_verification_proof.is_none() { + return Err(ConversionError::NotPresentError); + } + Ok(Self { + blob_header: BlobHeader::try_from(value.blob_header.unwrap())?, + blob_verification_proof: BlobVerificationProof::try_from( + value.blob_verification_proof.unwrap(), + )?, + }) + } +} diff --git a/core/node/eigenda_proxy/src/disperser.rs b/core/node/eigenda_proxy/src/disperser.rs index 2e2800828ff..08a838ae787 100644 --- a/core/node/eigenda_proxy/src/disperser.rs +++ b/core/node/eigenda_proxy/src/disperser.rs @@ -293,10 +293,9 @@ pub mod disperser_client { dead_code, missing_docs, clippy::wildcard_imports, - clippy::let_unit_value, + clippy::let_unit_value )] - use tonic::codegen::*; - use tonic::codegen::http::Uri; + use tonic::codegen::{http::Uri, *}; /// Disperser defines the public APIs for dispersing blobs. #[derive(Debug, Clone)] pub struct DisperserClient { @@ -341,9 +340,8 @@ pub mod disperser_client { >::ResponseBody, >, >, - , - >>::Error: Into + std::marker::Send + std::marker::Sync, + >>::Error: + Into + std::marker::Send + std::marker::Sync, { DisperserClient::new(InterceptedService::new(inner, interceptor)) } @@ -385,22 +383,12 @@ pub mod disperser_client { pub async fn disperse_blob( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::unknown( - format!("Service was not ready: {}", e.into()), - ) - })?; + ) -> std::result::Result, tonic::Status> { + self.inner.ready().await.map_err(|e| { + tonic::Status::unknown(format!("Service was not ready: {}", e.into())) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/disperser.Disperser/DisperseBlob", - ); + let path = http::uri::PathAndQuery::from_static("/disperser.Disperser/DisperseBlob"); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("disperser.Disperser", "DisperseBlob")); @@ -416,52 +404,35 @@ pub mod disperser_client { /// 4. The Disperser verifies the signature and returns a DisperseBlobReply message. pub async fn disperse_blob_authenticated( &mut self, - request: impl tonic::IntoStreamingRequest< - Message = super::AuthenticatedRequest, - >, + request: impl tonic::IntoStreamingRequest, ) -> std::result::Result< tonic::Response>, tonic::Status, > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::unknown( - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner.ready().await.map_err(|e| { + tonic::Status::unknown(format!("Service was not ready: {}", e.into())) + })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( "/disperser.Disperser/DisperseBlobAuthenticated", ); let mut req = request.into_streaming_request(); - req.extensions_mut() - .insert( - GrpcMethod::new("disperser.Disperser", "DisperseBlobAuthenticated"), - ); + req.extensions_mut().insert(GrpcMethod::new( + "disperser.Disperser", + "DisperseBlobAuthenticated", + )); self.inner.streaming(req, path, codec).await } /// This API is meant to be polled for the blob status. pub async fn get_blob_status( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::unknown( - format!("Service was not ready: {}", e.into()), - ) - })?; + ) -> std::result::Result, tonic::Status> { + self.inner.ready().await.map_err(|e| { + tonic::Status::unknown(format!("Service was not ready: {}", e.into())) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/disperser.Disperser/GetBlobStatus", - ); + let path = http::uri::PathAndQuery::from_static("/disperser.Disperser/GetBlobStatus"); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("disperser.Disperser", "GetBlobStatus")); @@ -476,22 +447,12 @@ pub mod disperser_client { pub async fn retrieve_blob( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::unknown( - format!("Service was not ready: {}", e.into()), - ) - })?; + ) -> std::result::Result, tonic::Status> { + self.inner.ready().await.map_err(|e| { + tonic::Status::unknown(format!("Service was not ready: {}", e.into())) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/disperser.Disperser/RetrieveBlob", - ); + let path = http::uri::PathAndQuery::from_static("/disperser.Disperser/RetrieveBlob"); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("disperser.Disperser", "RetrieveBlob")); diff --git a/core/node/eigenda_proxy/src/eigenda_client.rs b/core/node/eigenda_proxy/src/eigenda_client.rs new file mode 100644 index 00000000000..22ae36731bc --- /dev/null +++ b/core/node/eigenda_proxy/src/eigenda_client.rs @@ -0,0 +1,272 @@ +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; + +use rlp::decode; +use tokio::{sync::Mutex, time::interval}; +use tonic::transport::{Channel, ClientTlsConfig}; +use zksync_config::configs::da_client::eigen_da::DisperserConfig; + +use crate::{ + blob_info::BlobInfo, + disperser::{self, disperser_client::DisperserClient, BlobStatusRequest, DisperseBlobRequest}, + errors::EigenDAError, +}; + +#[derive(Clone)] +pub struct EigenDAClient { + disperser: Arc>>, + config: DisperserConfig, +} + +impl EigenDAClient { + pub async fn new(config: DisperserConfig) -> Result { + match rustls::crypto::ring::default_provider().install_default() { + Ok(_) => {} + Err(_) => {} // This is not an actual error, we expect this function to return an Err(Arc) + }; + let inner = Channel::builder( + config + .disperser_rpc + .parse() + .map_err(|_| EigenDAError::UriError)?, + ) + .tls_config(ClientTlsConfig::new().with_native_roots()) + .map_err(|_| EigenDAError::TlsError)?; + + let disperser = Arc::new(Mutex::new( + DisperserClient::connect(inner) + .await + .map_err(|err| EigenDAError::ConnectionError(err))?, + )); + + Ok(Self { disperser, config }) + } + + fn result_to_status(&self, result: i32) -> disperser::BlobStatus { + match result { + 0 => disperser::BlobStatus::Unknown, + 1 => disperser::BlobStatus::Processing, + 2 => disperser::BlobStatus::Confirmed, + 3 => disperser::BlobStatus::Failed, + 4 => disperser::BlobStatus::Finalized, + 5 => disperser::BlobStatus::InsufficientSignatures, + 6 => disperser::BlobStatus::Dispersing, + _ => disperser::BlobStatus::Unknown, + } + } + + pub async fn put_blob(&self, blob_data: Vec) -> Result, EigenDAError> { + tracing::info!("Putting blob"); + if blob_data.len() > self.config.blob_size_limit as usize { + return Err(EigenDAError::PutError); + } + let reply = self + .disperser + .lock() + .await + .disperse_blob(DisperseBlobRequest { + data: blob_data, + custom_quorum_numbers: self + .config + .custom_quorum_numbers + .clone() + .unwrap_or_default(), + account_id: self.config.account_id.clone().unwrap_or_default(), + }) + .await + .map_err(|_| EigenDAError::PutError)? + .into_inner(); + + if self.result_to_status(reply.result) == disperser::BlobStatus::Failed { + return Err(EigenDAError::PutError); + } + + let request_id_str = + String::from_utf8(reply.request_id.clone()).map_err(|_| EigenDAError::PutError)?; + + let mut interval = interval(Duration::from_secs(self.config.status_query_interval)); + let start_time = Instant::now(); + while Instant::now() - start_time < Duration::from_secs(self.config.status_query_timeout) { + let blob_status_reply = self + .disperser + .lock() + .await + .get_blob_status(BlobStatusRequest { + request_id: reply.request_id.clone(), + }) + .await + .map_err(|_| EigenDAError::PutError)? + .into_inner(); + + let blob_status = blob_status_reply.status(); + + tracing::info!( + "Dispersing blob {:?}, status: {:?}", + request_id_str, + blob_status + ); + + match blob_status { + disperser::BlobStatus::Unknown => { + interval.tick().await; + } + disperser::BlobStatus::Processing => { + interval.tick().await; + } + disperser::BlobStatus::Confirmed => { + if self.config.wait_for_finalization { + interval.tick().await; + } else { + match blob_status_reply.info { + Some(info) => { + let blob_info = + BlobInfo::try_from(info).map_err(|_| EigenDAError::PutError)?; + return Ok(rlp::encode(&blob_info).to_vec()); + } + None => { + return Err(EigenDAError::PutError); + } + } + } + } + disperser::BlobStatus::Failed => { + return Err(EigenDAError::PutError); + } + disperser::BlobStatus::InsufficientSignatures => { + return Err(EigenDAError::PutError); + } + disperser::BlobStatus::Dispersing => { + interval.tick().await; + } + disperser::BlobStatus::Finalized => match blob_status_reply.info { + Some(info) => { + let blob_info = + BlobInfo::try_from(info).map_err(|_| EigenDAError::PutError)?; + return Ok(rlp::encode(&blob_info).to_vec()); + } + None => { + return Err(EigenDAError::PutError); + } + }, + } + } + + return Err(EigenDAError::PutError); + } + + pub async fn get_blob(&self, commit: Vec) -> Result, EigenDAError> { + tracing::info!("Getting blob"); + let blob_info: BlobInfo = decode(&commit).map_err(|_| EigenDAError::GetError)?; + let blob_index = blob_info.blob_verification_proof.blob_index; + let batch_header_hash = blob_info + .blob_verification_proof + .batch_medatada + .batch_header_hash; + let get_response = self + .disperser + .lock() + .await + .retrieve_blob(disperser::RetrieveBlobRequest { + batch_header_hash, + blob_index, + }) + .await + .map_err(|_| EigenDAError::GetError)? + .into_inner(); + + if get_response.data.len() == 0 { + return Err(EigenDAError::GetError); + } + + return Ok(get_response.data); + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[tokio::test] + async fn test_eigenda_client() { + let config = DisperserConfig { + api_node_url: "".to_string(), + custom_quorum_numbers: Some(vec![]), + account_id: Some("".to_string()), + disperser_rpc: "https://disperser-holesky.eigenda.xyz:443".to_string(), + eth_confirmation_depth: -1, + eigenda_eth_rpc: "".to_string(), + eigenda_svc_manager_addr: "".to_string(), + blob_size_limit: 2 * 1024 * 1024, // 2MB + status_query_timeout: 1800, // 30 minutes + status_query_interval: 5, // 5 seconds + wait_for_finalization: false, + }; + let store = match EigenDAClient::new(config).await { + Ok(store) => store, + Err(e) => panic!("Failed to create EigenDAProxyClient {:?}", e), + }; + + let blob = vec![0u8; 100]; + let cert = store.put_blob(blob.clone()).await.unwrap(); + let blob2 = store.get_blob(cert).await.unwrap(); + assert_eq!(blob, blob2); + } + + #[tokio::test] + async fn test_eigenda_multiple() { + let config = DisperserConfig { + api_node_url: "".to_string(), + custom_quorum_numbers: Some(vec![]), + account_id: Some("".to_string()), + disperser_rpc: "https://disperser-holesky.eigenda.xyz:443".to_string(), + eth_confirmation_depth: -1, + eigenda_eth_rpc: "".to_string(), + eigenda_svc_manager_addr: "".to_string(), + blob_size_limit: 2 * 1024 * 1024, // 2MB + status_query_timeout: 1800, // 30 minutes + status_query_interval: 5, // 5 seconds + wait_for_finalization: false, + }; + let store = match EigenDAClient::new(config).await { + Ok(store) => store, + Err(e) => panic!("Failed to create EigenDAProxyClient {:?}", e), + }; + + let blob = vec![0u8; 100]; + let blob2 = vec![1u8; 100]; + let cert = store.put_blob(blob.clone()); + let cert2 = store.put_blob(blob2.clone()); + let (val1, val2) = tokio::join!(cert, cert2); + let blob_result = store.get_blob(val1.unwrap()).await.unwrap(); + let blob_result2 = store.get_blob(val2.unwrap()).await.unwrap(); + assert_eq!(blob, blob_result); + assert_eq!(blob2, blob_result2); + } + + #[tokio::test] + async fn test_eigenda_blob_size_limit() { + let config = DisperserConfig { + api_node_url: "".to_string(), + custom_quorum_numbers: Some(vec![]), + account_id: Some("".to_string()), + disperser_rpc: "https://disperser-holesky.eigenda.xyz:443".to_string(), + eth_confirmation_depth: -1, + eigenda_eth_rpc: "".to_string(), + eigenda_svc_manager_addr: "".to_string(), + blob_size_limit: 2, // 2MB + status_query_timeout: 1800, // 30 minutes + status_query_interval: 5, // 5 seconds + wait_for_finalization: false, + }; + let store = match EigenDAClient::new(config).await { + Ok(store) => store, + Err(e) => panic!("Failed to create EigenDAProxyClient {:?}", e), + }; + + let blob = vec![0u8; 3]; + let cert = store.put_blob(blob.clone()).await; + assert!(cert.is_err()); + } +} diff --git a/core/node/eigenda_proxy/src/errors.rs b/core/node/eigenda_proxy/src/errors.rs index 1ce94a89033..f657e0526fa 100644 --- a/core/node/eigenda_proxy/src/errors.rs +++ b/core/node/eigenda_proxy/src/errors.rs @@ -1,8 +1,65 @@ +use axum::{ + http::StatusCode, + response::{IntoResponse, Response}, +}; + #[derive(Debug, PartialEq)] pub enum MemStoreError { BlobToLarge, - IncorrectString, BlobAlreadyExists, IncorrectCommitment, BlobNotFound, } + +#[derive(Debug)] +pub enum EigenDAError { + TlsError, + UriError, + ConnectionError(tonic::transport::Error), + PutError, + GetError, +} + +#[derive(Debug)] +pub(crate) enum RequestProcessorError { + EigenDA(EigenDAError), + MemStore(MemStoreError), +} + +impl IntoResponse for RequestProcessorError { + fn into_response(self) -> Response { + let (status_code, message) = match self { + RequestProcessorError::EigenDA(err) => { + tracing::error!("EigenDA error: {:?}", err); + match err { + EigenDAError::TlsError => (StatusCode::BAD_GATEWAY, "Tls error".to_owned()), + EigenDAError::UriError => (StatusCode::BAD_GATEWAY, "Uri error".to_owned()), + EigenDAError::ConnectionError(err) => ( + StatusCode::BAD_GATEWAY, + format!("Connection error: {:?}", err).to_owned(), + ), + EigenDAError::PutError => (StatusCode::BAD_GATEWAY, "Put error".to_owned()), + EigenDAError::GetError => (StatusCode::BAD_GATEWAY, "Get error".to_owned()), + } + } + RequestProcessorError::MemStore(err) => { + tracing::error!("MemStore error: {:?}", err); + match err { + MemStoreError::BlobToLarge => { + (StatusCode::BAD_REQUEST, "Blob too large".to_owned()) + } + MemStoreError::BlobAlreadyExists => { + (StatusCode::BAD_REQUEST, "Blob already exists".to_owned()) + } + MemStoreError::IncorrectCommitment => { + (StatusCode::BAD_REQUEST, "Incorrect commitment".to_owned()) + } + MemStoreError::BlobNotFound => { + (StatusCode::NOT_FOUND, "Blob not found".to_owned()) + } + } + } + }; + (status_code, message).into_response() + } +} diff --git a/core/node/eigenda_proxy/src/lib.rs b/core/node/eigenda_proxy/src/lib.rs index 3d457e7d33d..866663fd130 100644 --- a/core/node/eigenda_proxy/src/lib.rs +++ b/core/node/eigenda_proxy/src/lib.rs @@ -1,22 +1,50 @@ mod common; mod disperser; -use std::net::SocketAddr; +use std::{net::SocketAddr, str::FromStr}; + use anyhow::Context as _; use axum::{ - routing::{get, put}, + extract::Path, + routing::{get, post}, Router, }; +use eigenda_client::EigenDAClient; +use memstore::MemStore; +use request_processor::{ClientType, RequestProcessorNew}; use tokio::sync::watch; +use zksync_config::configs::da_client::eigen_da::EigenDAConfig; + mod blob_info; +mod eigenda_client; mod errors; mod memstore; +mod request_processor; + +pub async fn run_server( + config: EigenDAConfig, + mut stop_receiver: watch::Receiver, +) -> anyhow::Result<()> { + let (bind_address, client) = match config { + EigenDAConfig::MemStore(cfg) => { + let bind_address = SocketAddr::from_str(&cfg.api_node_url)?; + + let client = MemStore::new(cfg); + (bind_address, ClientType::Memory(client)) + } + EigenDAConfig::Disperser(cfg) => { + let bind_address = SocketAddr::from_str(&cfg.api_node_url)?; + + let client = EigenDAClient::new(cfg) + .await + .map_err(|e| anyhow::anyhow!("Failed to create EigenDA client: {:?}", e))?; + (bind_address, ClientType::Disperser(client)) + } + }; -pub async fn run_server(mut stop_receiver: watch::Receiver) -> anyhow::Result<()> { - // TODO: Replace port for config - let bind_address = SocketAddr::from(([0, 0, 0, 0], 4242)); tracing::debug!("Starting eigenda proxy on {bind_address}"); - let app = create_eigenda_proxy_router(); + + let app = create_eigenda_proxy_router(client); let listener = tokio::net::TcpListener::bind(bind_address) .await @@ -36,15 +64,21 @@ pub async fn run_server(mut stop_receiver: watch::Receiver) -> anyhow::Res Ok(()) } -fn create_eigenda_proxy_router() -> Router { +fn create_eigenda_proxy_router(client: ClientType) -> Router { + let get_blob_id_processor = RequestProcessorNew::new(client); + let pub_blob_id_processor = get_blob_id_processor.clone(); let router = Router::new() .route( - "/get/", - get(|| async { todo!("Handle eigenda proxy get request") }), + "/get/:l1_batch_number", + get(move |blob_id: Path| async move { + get_blob_id_processor.get_blob_id(blob_id).await + }), ) .route( "/put/", - put(|| async { todo!("Handle eigenda proxy put request") }), + post(move |blob_id: Path| async move { + pub_blob_id_processor.put_blob_id(blob_id).await + }), ); router } diff --git a/core/node/eigenda_proxy/src/memstore.rs b/core/node/eigenda_proxy/src/memstore.rs index f544749d878..88d6b56017a 100644 --- a/core/node/eigenda_proxy/src/memstore.rs +++ b/core/node/eigenda_proxy/src/memstore.rs @@ -8,31 +8,26 @@ use rand::{rngs::OsRng, Rng, RngCore}; use rlp::decode; use sha3::{Digest, Keccak256}; use tokio::time::interval; +use zksync_config::configs::da_client::eigen_da::MemStoreConfig; use crate::{ blob_info::{self, BlobInfo}, errors::MemStoreError, }; -struct MemStoreConfig { - max_blob_size_bytes: u64, - blob_expiration: Duration, - put_latency: Duration, - get_latency: Duration, -} - struct MemStoreData { store: HashMap>, key_starts: HashMap, } -struct MemStore { +#[derive(Clone)] +pub struct MemStore { config: MemStoreConfig, data: Arc>, } impl MemStore { - fn new(config: MemStoreConfig) -> Arc { + pub fn new(config: MemStoreConfig) -> Arc { let memstore = Arc::new(Self { config, data: Arc::new(RwLock::new(MemStoreData { @@ -47,8 +42,8 @@ impl MemStore { memstore } - async fn put(self: Arc, value: Vec) -> Result, MemStoreError> { - tokio::time::sleep(self.config.put_latency).await; + pub async fn put_blob(self: Arc, value: Vec) -> Result, MemStoreError> { + tokio::time::sleep(Duration::from_millis(self.config.put_latency)).await; if value.len() as u64 > self.config.max_blob_size_bytes { return Err(MemStoreError::BlobToLarge.into()); } @@ -121,8 +116,8 @@ impl MemStore { Ok(cert_bytes) } - async fn get(self: Arc, commit: Vec) -> Result, MemStoreError> { - tokio::time::sleep(self.config.get_latency).await; + pub async fn get_blob(self: Arc, commit: Vec) -> Result, MemStoreError> { + tokio::time::sleep(Duration::from_millis(self.config.get_latency)).await; let blob_info: BlobInfo = decode(&commit).map_err(|_| MemStoreError::IncorrectCommitment)?; let key = String::from_utf8_lossy( @@ -147,7 +142,7 @@ impl MemStore { let mut data = self.data.write().unwrap(); let mut to_remove = vec![]; for (key, start) in data.key_starts.iter() { - if start.elapsed() > self.config.blob_expiration { + if start.elapsed() > Duration::from_secs(self.config.blob_expiration) { to_remove.push(key.clone()); } } @@ -158,7 +153,7 @@ impl MemStore { } async fn pruning_loop(self: Arc) { - let mut interval = interval(self.config.blob_expiration); + let mut interval = interval(Duration::from_secs(self.config.blob_expiration)); loop { interval.tick().await; @@ -168,6 +163,7 @@ impl MemStore { } } +#[cfg(test)] mod test { use std::time::Duration; @@ -177,15 +173,18 @@ mod test { async fn test_memstore() { let config = MemStoreConfig { max_blob_size_bytes: 1024, - blob_expiration: Duration::from_secs(60), - put_latency: Duration::from_millis(100), - get_latency: Duration::from_millis(100), + blob_expiration: 60, + put_latency: 100, + get_latency: 100, + api_node_url: String::default(), // unused for this test + custom_quorum_numbers: None, // unused for this test + account_id: None, // unused for this test }; let store = MemStore::new(config); let blob = vec![0u8; 100]; - let cert = store.clone().put(blob.clone()).await.unwrap(); - let blob2 = store.get(cert).await.unwrap(); + let cert = store.clone().put_blob(blob.clone()).await.unwrap(); + let blob2 = store.get_blob(cert).await.unwrap(); assert_eq!(blob, blob2); } @@ -193,59 +192,65 @@ mod test { async fn test_memstore_multiple() { let config = MemStoreConfig { max_blob_size_bytes: 1024, - blob_expiration: Duration::from_secs(60), - put_latency: Duration::from_millis(100), - get_latency: Duration::from_millis(100), + blob_expiration: 60, + put_latency: 100, + get_latency: 100, + api_node_url: String::default(), // unused for this test + custom_quorum_numbers: None, // unused for this test + account_id: None, // unused for this test }; let store = MemStore::new(config); let blob = vec![0u8; 100]; let blob2 = vec![1u8; 100]; - let cert = store.clone().put(blob.clone()).await.unwrap(); - let cert2 = store.clone().put(blob2.clone()).await.unwrap(); - let blob_result = store.clone().get(cert).await.unwrap(); - let blob_result2 = store.get(cert2).await.unwrap(); + let cert = store.clone().put_blob(blob.clone()).await.unwrap(); + let cert2 = store.clone().put_blob(blob2.clone()).await.unwrap(); + let blob_result = store.clone().get_blob(cert).await.unwrap(); + let blob_result2 = store.get_blob(cert2).await.unwrap(); assert_eq!(blob, blob_result); assert_eq!(blob2, blob_result2); } #[tokio::test] async fn test_memstore_latency() { - let put_latency = Duration::from_millis(1000); - let get_latency = Duration::from_millis(1000); let config = MemStoreConfig { max_blob_size_bytes: 1024, - blob_expiration: Duration::from_secs(60), - put_latency, - get_latency, + blob_expiration: 60, + put_latency: 1000, + get_latency: 1000, + api_node_url: String::default(), // unused for this test + custom_quorum_numbers: None, // unused for this test + account_id: None, // unused for this test }; - let store = MemStore::new(config); + let store = MemStore::new(config.clone()); let blob = vec![0u8; 100]; let time_before_put = Instant::now(); - let cert = store.clone().put(blob.clone()).await.unwrap(); - assert!(time_before_put.elapsed() >= put_latency); + let cert = store.clone().put_blob(blob.clone()).await.unwrap(); + assert!(time_before_put.elapsed() >= Duration::from_millis(config.put_latency)); let time_before_get = Instant::now(); - let blob2 = store.get(cert).await.unwrap(); - assert!(time_before_get.elapsed() >= get_latency); + let blob2 = store.get_blob(cert).await.unwrap(); + assert!(time_before_get.elapsed() >= Duration::from_millis(config.get_latency)); assert_eq!(blob, blob2); } #[tokio::test] async fn test_memstore_expiration() { - let blob_expiration = Duration::from_millis(100); let config = MemStoreConfig { max_blob_size_bytes: 1024, - blob_expiration, - put_latency: Duration::from_millis(1), - get_latency: Duration::from_millis(1), + blob_expiration: 2, + put_latency: 1, + get_latency: 1, + api_node_url: String::default(), // unused for this test + custom_quorum_numbers: None, // unused for this test + account_id: None, // unused for this test }; - let store = MemStore::new(config); + let store = MemStore::new(config.clone()); let blob = vec![0u8; 100]; - let cert = store.clone().put(blob.clone()).await.unwrap(); - tokio::time::sleep(blob_expiration * 2).await; - let result = store.get(cert).await; + let cert = store.clone().put_blob(blob.clone()).await.unwrap(); + tokio::time::sleep(Duration::from_secs(config.blob_expiration * 2)).await; + let result = store.get_blob(cert).await; assert!(result.is_err()); assert_eq!(result.unwrap_err(), MemStoreError::BlobNotFound); } diff --git a/core/node/eigenda_proxy/src/request_processor.rs b/core/node/eigenda_proxy/src/request_processor.rs new file mode 100644 index 00000000000..f6171346aa0 --- /dev/null +++ b/core/node/eigenda_proxy/src/request_processor.rs @@ -0,0 +1,70 @@ +use std::sync::Arc; + +use axum::{extract::Path, http::Response}; + +use crate::{eigenda_client::EigenDAClient, errors::RequestProcessorError, memstore::MemStore}; + +#[derive(Clone)] +pub(crate) enum ClientType { + Memory(Arc), + Disperser(EigenDAClient), +} + +impl ClientType { + async fn get_blob(&self, blob_id: Vec) -> Result, RequestProcessorError> { + match self { + Self::Memory(memstore) => memstore + .clone() + .get_blob(blob_id) + .await + .map_err(|e| RequestProcessorError::MemStore(e)), + Self::Disperser(disperser) => disperser + .get_blob(blob_id) + .await + .map_err(|e| RequestProcessorError::EigenDA(e)), + } + } + + async fn put_blob(&self, data: Vec) -> Result, RequestProcessorError> { + match self { + Self::Memory(memstore) => memstore + .clone() + .put_blob(data) + .await + .map_err(|e| RequestProcessorError::MemStore(e)), + Self::Disperser(disperser) => disperser + .put_blob(data) + .await + .map_err(|e| RequestProcessorError::EigenDA(e)), + } + } +} + +#[derive(Clone)] +pub(crate) struct RequestProcessorNew { + client: ClientType, +} + +impl RequestProcessorNew { + pub(crate) fn new(client: ClientType) -> Self { + Self { client } + } + + pub(crate) async fn get_blob_id( + &self, + Path(blob_id): Path, + ) -> Result { + let blob_id_bytes = hex::decode(blob_id).unwrap(); + let response = self.client.get_blob(blob_id_bytes).await.unwrap(); + Ok(Response::new(response.into())) + } + + pub(crate) async fn put_blob_id( + &self, + Path(data): Path, + ) -> Result { + let data_bytes = hex::decode(data).unwrap(); + let response = self.client.put_blob(data_bytes).await.unwrap(); + Ok(Response::new(response.into())) + } +} diff --git a/core/node/node_framework/src/implementations/layers/eigenda_proxy.rs b/core/node/node_framework/src/implementations/layers/eigenda_proxy.rs index 2820030a843..e36a3cd3c04 100644 --- a/core/node/node_framework/src/implementations/layers/eigenda_proxy.rs +++ b/core/node/node_framework/src/implementations/layers/eigenda_proxy.rs @@ -47,14 +47,18 @@ impl WiringLayer for EigenDAProxyLayer { } async fn wire(self, _input: Self::Input) -> Result { - let task = EigenDAProxyTask {}; + let task = EigenDAProxyTask { + eigenda_config: self.eigenda_config, + }; Ok(Output { task }) } } #[derive(Debug)] -pub struct EigenDAProxyTask {} +pub struct EigenDAProxyTask { + eigenda_config: EigenDAConfig, +} #[async_trait::async_trait] impl Task for EigenDAProxyTask { @@ -63,6 +67,6 @@ impl Task for EigenDAProxyTask { } async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { - zksync_eigenda_proxy::run_server(stop_receiver.0).await + zksync_eigenda_proxy::run_server(self.eigenda_config, stop_receiver.0).await } } diff --git a/eigenda-integration.md b/eigenda-integration.md index 36c88c328dd..f8fef912c25 100644 --- a/eigenda-integration.md +++ b/eigenda-integration.md @@ -24,8 +24,8 @@ If you want to use memstore: ```yaml da_client: eigen_da: - memstore: - api_node_url: http://127.0.0.1:4242 # TODO: This should be removed once eigenda proxy is no longer used + mem_store: + api_node_url: 0.0.0.0:4242 # TODO: This should be removed once eigenda proxy is no longer used max_blob_size_bytes: 2097152 blob_expiration: 100000 get_latency: 100 @@ -38,14 +38,18 @@ If you want to use disperser: da_client: eigen_da: disperser: - api_node_url: http://127.0.0.1:4242 # TODO: This should be removed once eigenda proxy is no longer used + api_node_url: 0.0.0.0:4242 # TODO: This should be removed once eigenda proxy is no longer used disperser_rpc: eth_confirmation_depth: -1 eigenda_eth_rpc: eigenda_svc_manager_addr: '0xD4A7E1Bd8015057293f0D0A557088c286942e84b' + blob_size_limit: 2097152 + status_query_timeout: 1800 + status_query_interval: 5 + wait_for_finalization: false ``` -2. Add `eigenda-proxy` to the `docker-compose.yml` file: +2. OLD: Add `eigenda-proxy` to the `docker-compose.yml` file: ```yaml eigenda-proxy: @@ -55,11 +59,19 @@ eigenda-proxy: command: ./eigenda-proxy --addr 0.0.0.0 --port 4242 --memstore.enabled --eigenda-max-blob-length "2MiB" ``` +2. NEW (temporary): Add eigenda proxy layer to the default components in `core/bin/zksync_server/src/node_builder.rs`: + +````rs +.add_gas_adjuster_layer()? +.add_eigenda_proxy_layer()?; +``` +(line 696) + 3. (optional) for using pubdata with 2MiB (as per specification), modify `etc/env/file_based/general.yaml`: ```yaml max_pubdata_per_batch: 2097152 -``` +```` ## Local Setup