From 2baa576739eae1ab53b93a7123606ae8890295a2 Mon Sep 17 00:00:00 2001 From: Gianbelinche <39842759+gianbelinche@users.noreply.github.com> Date: Fri, 18 Oct 2024 17:35:29 -0300 Subject: [PATCH 01/11] Add initial implementation disperser client --- Cargo.lock | 10 ++ .../config/src/configs/da_client/eigen_da.rs | 4 +- .../src/proto/config/da_client.proto | 4 +- core/node/eigenda_proxy/Cargo.toml | 2 + core/node/eigenda_proxy/src/blob_info.rs | 126 +++++++++++++++ core/node/eigenda_proxy/src/disperser.rs | 93 ++++------- .../eigenda_proxy/src/eigenda_proxy_client.rs | 153 ++++++++++++++++++ core/node/eigenda_proxy/src/lib.rs | 2 + 8 files changed, 324 insertions(+), 70 deletions(-) create mode 100644 core/node/eigenda_proxy/src/eigenda_proxy_client.rs diff --git a/Cargo.lock b/Cargo.lock index 0701bc420c3..9b2471a7d66 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4022,6 +4022,14 @@ dependencies = [ "log", ] +[[package]] +name = "kzgpad-rs" +version = "0.1.0" +source = "git+https://github.com/Layr-Labs/kzgpad-rs.git?tag=v0.1.0#b5f8c8d3d6482407dc118cb1f51597a017a1cc89" +dependencies = [ + "rand 0.8.5", +] + [[package]] name = "lalrpop" version = "0.20.2" @@ -10169,6 +10177,7 @@ version = "0.1.0" dependencies = [ "anyhow", "axum", + "kzgpad-rs", "prost 0.13.3", "rand 0.8.5", "rlp", @@ -10178,6 +10187,7 @@ dependencies = [ "tonic-build", "tonic-web", "tracing", + "zksync_config", ] [[package]] diff --git a/core/lib/config/src/configs/da_client/eigen_da.rs b/core/lib/config/src/configs/da_client/eigen_da.rs index 5c77d32846b..ed678c1c818 100644 --- a/core/lib/config/src/configs/da_client/eigen_da.rs +++ b/core/lib/config/src/configs/da_client/eigen_da.rs @@ -21,8 +21,8 @@ pub struct MemStoreConfig { #[derive(Clone, Debug, PartialEq, Deserialize, Default)] pub struct DisperserConfig { pub api_node_url: String, // todo: This should be removed once eigenda proxy is no longer used - pub custom_quorum_numbers: Option>, // todo: This should be removed once eigenda proxy is no longer used - pub account_id: Option, // todo: This should be removed once eigenda proxy is no longer used + pub custom_quorum_numbers: Option>, + pub account_id: Option, pub disperser_rpc: String, pub eth_confirmation_depth: i32, pub eigenda_eth_rpc: String, diff --git a/core/lib/protobuf_config/src/proto/config/da_client.proto b/core/lib/protobuf_config/src/proto/config/da_client.proto index 18c1dc94f90..7f5498fed97 100644 --- a/core/lib/protobuf_config/src/proto/config/da_client.proto +++ b/core/lib/protobuf_config/src/proto/config/da_client.proto @@ -25,8 +25,8 @@ message MemStoreConfig { message DisperserConfig { optional string api_node_url = 1; // TODO: This should be removed once eigenda proxy is no longer used - repeated uint32 custom_quorum_numbers = 2; // TODO: This should be removed once eigenda proxy is no longer used - optional string account_id = 3; // TODO: This should be removed once eigenda proxy is no longer used + repeated uint32 custom_quorum_numbers = 2; + optional string account_id = 3; optional string disperser_rpc = 4; optional int32 eth_confirmation_depth = 5; optional string eigenda_eth_rpc = 6; diff --git a/core/node/eigenda_proxy/Cargo.toml b/core/node/eigenda_proxy/Cargo.toml index fcd5e4347c2..f1b7321758b 100644 --- a/core/node/eigenda_proxy/Cargo.toml +++ b/core/node/eigenda_proxy/Cargo.toml @@ -17,11 +17,13 @@ tracing.workspace = true rlp.workspace = true rand.workspace = true sha3.workspace = true +zksync_config.workspace = true # we can't use the workspace version of prost because # the tonic dependency requires a hugher version. prost = "0.13.1" tonic = { version = "0.12.1", features = ["tls", "channel", "tls-roots"]} tonic-web = "0.12.1" +kzgpad-rs = { git = "https://github.com/Layr-Labs/kzgpad-rs.git", tag = "v0.1.0" } [build-dependencies] tonic-build = { version = "0.12.1", features = ["prost"] } diff --git a/core/node/eigenda_proxy/src/blob_info.rs b/core/node/eigenda_proxy/src/blob_info.rs index ef1ac6ded8d..e999df91a9e 100644 --- a/core/node/eigenda_proxy/src/blob_info.rs +++ b/core/node/eigenda_proxy/src/blob_info.rs @@ -1,5 +1,30 @@ +use std::fmt; + use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream}; +use crate::{ + common::G1Commitment as DisperserG1Commitment, + disperser::{ + BatchHeader as DisperserBatchHeader, BatchMetadata as DisperserBatchMetadata, + BlobHeader as DisperserBlobHeader, BlobInfo as DisperserBlobInfo, + BlobQuorumParam as DisperserBlobQuorumParam, + BlobVerificationProof as DisperserBlobVerificationProof, + }, +}; + +#[derive(Debug)] +pub enum ConversionError { + NotPresentError, +} + +impl fmt::Display for ConversionError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ConversionError::NotPresentError => write!(f, "Failed to convert BlobInfo"), + } + } +} + #[derive(Debug)] pub struct G1Commitment { pub x: Vec, @@ -35,6 +60,16 @@ impl Encodable for G1Commitment { } } +impl TryFrom for G1Commitment { + type Error = ConversionError; + fn try_from(value: DisperserG1Commitment) -> Result { + Ok(Self { + x: value.x, + y: value.y, + }) + } +} + #[derive(Debug)] pub struct BlobQuorumParam { pub quorum_number: u32, @@ -76,6 +111,18 @@ impl Encodable for BlobQuorumParam { } } +impl TryFrom for BlobQuorumParam { + type Error = ConversionError; + fn try_from(value: DisperserBlobQuorumParam) -> Result { + Ok(Self { + quorum_number: value.quorum_number, + adversary_threshold_percentage: value.adversary_threshold_percentage, + confirmation_threshold_percentage: value.confirmation_threshold_percentage, + chunk_length: value.chunk_length, + }) + } +} + #[derive(Debug)] pub struct BlobHeader { pub commitment: G1Commitment, @@ -121,6 +168,26 @@ impl Encodable for BlobHeader { } } +impl TryFrom for BlobHeader { + type Error = ConversionError; + fn try_from(value: DisperserBlobHeader) -> Result { + if value.commitment.is_none() { + return Err(ConversionError::NotPresentError); + } + let blob_quorum_params: Result, Self::Error> = value + .blob_quorum_params + .iter() + .map(|param| BlobQuorumParam::try_from(param.clone())) + .collect(); + let blob_quorum_params = blob_quorum_params?; + Ok(Self { + commitment: G1Commitment::try_from(value.commitment.unwrap())?, + data_length: value.data_length, + blob_quorum_params, + }) + } +} + #[derive(Debug)] pub struct BatchHeader { pub batch_root: Vec, @@ -165,6 +232,18 @@ impl Encodable for BatchHeader { } } +impl TryFrom for BatchHeader { + type Error = ConversionError; + fn try_from(value: DisperserBatchHeader) -> Result { + Ok(Self { + batch_root: value.batch_root, + quorum_numbers: value.quorum_numbers, + quorum_signed_percentages: value.quorum_signed_percentages, + reference_block_number: value.reference_block_number, + }) + } +} + #[derive(Debug)] pub struct BatchMetadata { pub batch_header: BatchHeader, @@ -210,6 +289,22 @@ impl Encodable for BatchMetadata { } } +impl TryFrom for BatchMetadata { + type Error = ConversionError; + fn try_from(value: DisperserBatchMetadata) -> Result { + if value.batch_header.is_none() { + return Err(ConversionError::NotPresentError); + } + Ok(Self { + batch_header: BatchHeader::try_from(value.batch_header.unwrap())?, + signatory_record_hash: value.signatory_record_hash, + fee: value.fee, + confirmation_block_number: value.confirmation_block_number, + batch_header_hash: value.batch_header_hash, + }) + } +} + #[derive(Debug)] pub struct BlobVerificationProof { pub batch_id: u32, @@ -257,6 +352,22 @@ impl Encodable for BlobVerificationProof { } } +impl TryFrom for BlobVerificationProof { + type Error = ConversionError; + fn try_from(value: DisperserBlobVerificationProof) -> Result { + if value.batch_metadata.is_none() { + return Err(ConversionError::NotPresentError); + } + Ok(Self { + batch_id: value.batch_id, + blob_index: value.blob_index, + batch_medatada: BatchMetadata::try_from(value.batch_metadata.unwrap())?, + inclusion_proof: value.inclusion_proof, + quorum_indexes: value.quorum_indexes, + }) + } +} + #[derive(Debug)] pub struct BlobInfo { pub blob_header: BlobHeader, @@ -294,3 +405,18 @@ impl Encodable for BlobInfo { s.append(&self.blob_verification_proof); } } + +impl TryFrom for BlobInfo { + type Error = ConversionError; + fn try_from(value: DisperserBlobInfo) -> Result { + if value.blob_header.is_none() || value.blob_verification_proof.is_none() { + return Err(ConversionError::NotPresentError); + } + Ok(Self { + blob_header: BlobHeader::try_from(value.blob_header.unwrap())?, + blob_verification_proof: BlobVerificationProof::try_from( + value.blob_verification_proof.unwrap(), + )?, + }) + } +} diff --git a/core/node/eigenda_proxy/src/disperser.rs b/core/node/eigenda_proxy/src/disperser.rs index 2e2800828ff..08a838ae787 100644 --- a/core/node/eigenda_proxy/src/disperser.rs +++ b/core/node/eigenda_proxy/src/disperser.rs @@ -293,10 +293,9 @@ pub mod disperser_client { dead_code, missing_docs, clippy::wildcard_imports, - clippy::let_unit_value, + clippy::let_unit_value )] - use tonic::codegen::*; - use tonic::codegen::http::Uri; + use tonic::codegen::{http::Uri, *}; /// Disperser defines the public APIs for dispersing blobs. #[derive(Debug, Clone)] pub struct DisperserClient { @@ -341,9 +340,8 @@ pub mod disperser_client { >::ResponseBody, >, >, - , - >>::Error: Into + std::marker::Send + std::marker::Sync, + >>::Error: + Into + std::marker::Send + std::marker::Sync, { DisperserClient::new(InterceptedService::new(inner, interceptor)) } @@ -385,22 +383,12 @@ pub mod disperser_client { pub async fn disperse_blob( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::unknown( - format!("Service was not ready: {}", e.into()), - ) - })?; + ) -> std::result::Result, tonic::Status> { + self.inner.ready().await.map_err(|e| { + tonic::Status::unknown(format!("Service was not ready: {}", e.into())) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/disperser.Disperser/DisperseBlob", - ); + let path = http::uri::PathAndQuery::from_static("/disperser.Disperser/DisperseBlob"); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("disperser.Disperser", "DisperseBlob")); @@ -416,52 +404,35 @@ pub mod disperser_client { /// 4. The Disperser verifies the signature and returns a DisperseBlobReply message. pub async fn disperse_blob_authenticated( &mut self, - request: impl tonic::IntoStreamingRequest< - Message = super::AuthenticatedRequest, - >, + request: impl tonic::IntoStreamingRequest, ) -> std::result::Result< tonic::Response>, tonic::Status, > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::unknown( - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner.ready().await.map_err(|e| { + tonic::Status::unknown(format!("Service was not ready: {}", e.into())) + })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( "/disperser.Disperser/DisperseBlobAuthenticated", ); let mut req = request.into_streaming_request(); - req.extensions_mut() - .insert( - GrpcMethod::new("disperser.Disperser", "DisperseBlobAuthenticated"), - ); + req.extensions_mut().insert(GrpcMethod::new( + "disperser.Disperser", + "DisperseBlobAuthenticated", + )); self.inner.streaming(req, path, codec).await } /// This API is meant to be polled for the blob status. pub async fn get_blob_status( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::unknown( - format!("Service was not ready: {}", e.into()), - ) - })?; + ) -> std::result::Result, tonic::Status> { + self.inner.ready().await.map_err(|e| { + tonic::Status::unknown(format!("Service was not ready: {}", e.into())) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/disperser.Disperser/GetBlobStatus", - ); + let path = http::uri::PathAndQuery::from_static("/disperser.Disperser/GetBlobStatus"); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("disperser.Disperser", "GetBlobStatus")); @@ -476,22 +447,12 @@ pub mod disperser_client { pub async fn retrieve_blob( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::unknown( - format!("Service was not ready: {}", e.into()), - ) - })?; + ) -> std::result::Result, tonic::Status> { + self.inner.ready().await.map_err(|e| { + tonic::Status::unknown(format!("Service was not ready: {}", e.into())) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/disperser.Disperser/RetrieveBlob", - ); + let path = http::uri::PathAndQuery::from_static("/disperser.Disperser/RetrieveBlob"); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("disperser.Disperser", "RetrieveBlob")); diff --git a/core/node/eigenda_proxy/src/eigenda_proxy_client.rs b/core/node/eigenda_proxy/src/eigenda_proxy_client.rs new file mode 100644 index 00000000000..73845b4d56d --- /dev/null +++ b/core/node/eigenda_proxy/src/eigenda_proxy_client.rs @@ -0,0 +1,153 @@ +use std::{ + fmt::Debug, + sync::Arc, + time::{Duration, Instant}, +}; + +use tokio::{sync::Mutex, time::interval}; +use tonic::transport::{Channel, ClientTlsConfig}; +use zksync_config::configs::da_client::eigen_da::DisperserConfig; + +use crate::{ + blob_info::BlobInfo, + disperser::{self, disperser_client::DisperserClient, BlobStatusRequest, DisperseBlobRequest}, +}; + +pub struct EigenDAProxyClient { + disperser: Arc>>, + config: DisperserConfig, +} + +impl EigenDAProxyClient { + pub const BLOB_SIZE_LIMIT_IN_BYTES: usize = 2 * 1024 * 1024; // 2MB todo: add to config + pub const STATUS_QUERY_TIMEOUT: u64 = 1800; // 30 minutes todo: add to config + pub const STATUS_QUERY_RETRY_INTERVAL: u64 = 5; // 5 seconds todo: add to config + pub const WAIT_FOR_FINALAZATION: bool = false; // todo: add to config + pub async fn new(config: DisperserConfig) -> anyhow::Result { + let inner = Channel::builder(config.disperser_rpc.parse()?) + .tls_config(ClientTlsConfig::new().with_native_roots())?; + Ok(Self { + disperser: Arc::new(Mutex::new(DisperserClient::connect(inner).await?)), + config, + }) + } + + fn result_to_status(&self, result: i32) -> disperser::BlobStatus { + match result { + 0 => disperser::BlobStatus::Unknown, + 1 => disperser::BlobStatus::Processing, + 2 => disperser::BlobStatus::Confirmed, + 3 => disperser::BlobStatus::Failed, + 4 => disperser::BlobStatus::Finalized, + 5 => disperser::BlobStatus::InsufficientSignatures, + 6 => disperser::BlobStatus::Dispersing, + _ => disperser::BlobStatus::Unknown, + } + } + + pub async fn put_blob(&self, blob_data: Vec) -> Result { + let reply = self + .disperser + .lock() + .await + .disperse_blob(DisperseBlobRequest { + data: kzgpad_rs::convert_by_padding_empty_byte(&blob_data), + custom_quorum_numbers: self + .config + .custom_quorum_numbers + .clone() + .unwrap_or_default(), + account_id: self.config.account_id.clone().unwrap_or_default(), + }) + .await + .unwrap() + .into_inner(); + + if self.result_to_status(reply.result) == disperser::BlobStatus::Failed { + return Err(()); + } + + let mut interval = interval(Duration::from_secs(Self::STATUS_QUERY_RETRY_INTERVAL)); + let start_time = Instant::now(); + while Instant::now() - start_time < Duration::from_secs(Self::STATUS_QUERY_TIMEOUT) { + let blob_status_reply = self + .disperser + .lock() + .await + .get_blob_status(BlobStatusRequest { + request_id: reply.request_id.clone(), + }) + .await + .unwrap() + .into_inner(); + + let blob_status = blob_status_reply.status(); + + match blob_status { + disperser::BlobStatus::Unknown => { + interval.tick().await; + } + disperser::BlobStatus::Processing => { + interval.tick().await; + } + disperser::BlobStatus::Confirmed => { + if Self::WAIT_FOR_FINALAZATION { + interval.tick().await; + } else { + match blob_status_reply.info { + Some(info) => { + return BlobInfo::try_from(info).map_err(|_| ()); + } + None => { + return Err(()); + } + } + } + } + disperser::BlobStatus::Failed => { + return Err(()); + } + disperser::BlobStatus::InsufficientSignatures => { + return Err(()); + } + disperser::BlobStatus::Dispersing => { + interval.tick().await; + } + disperser::BlobStatus::Finalized => match blob_status_reply.info { + Some(info) => { + return BlobInfo::try_from(info).map_err(|_| ()); + } + None => { + return Err(()); + } + }, + } + } + + return Err(()); + } + + pub async fn get_blob( + &self, + batch_header_hash: Vec, + blob_index: u32, + ) -> Result, ()> { + let get_response = self + .disperser + .lock() + .await + .retrieve_blob(disperser::RetrieveBlobRequest { + batch_header_hash: batch_header_hash, + blob_index: blob_index, + }) + .await + .unwrap() + .into_inner(); + + if get_response.data.len() == 0 { + return Err(()); + } + + return Ok(get_response.data); + } +} diff --git a/core/node/eigenda_proxy/src/lib.rs b/core/node/eigenda_proxy/src/lib.rs index 3d457e7d33d..4a0bfb533ed 100644 --- a/core/node/eigenda_proxy/src/lib.rs +++ b/core/node/eigenda_proxy/src/lib.rs @@ -2,6 +2,7 @@ mod common; mod disperser; use std::net::SocketAddr; + use anyhow::Context as _; use axum::{ routing::{get, put}, @@ -9,6 +10,7 @@ use axum::{ }; use tokio::sync::watch; mod blob_info; +mod eigenda_proxy_client; mod errors; mod memstore; From ac5f3f16ad7c06a0f769d5e2613f280274367082 Mon Sep 17 00:00:00 2001 From: Gianbelinche <39842759+gianbelinche@users.noreply.github.com> Date: Mon, 21 Oct 2024 12:33:31 -0300 Subject: [PATCH 02/11] Add holesky tests --- Cargo.lock | 1 + core/node/eigenda_proxy/Cargo.toml | 3 +- .../eigenda_proxy/src/eigenda_proxy_client.rs | 98 ++++++++++++++++--- 3 files changed, 87 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9b2471a7d66..0a160696dc8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10181,6 +10181,7 @@ dependencies = [ "prost 0.13.3", "rand 0.8.5", "rlp", + "rustls 0.23.13", "sha3 0.10.8", "tokio", "tonic", diff --git a/core/node/eigenda_proxy/Cargo.toml b/core/node/eigenda_proxy/Cargo.toml index f1b7321758b..d611a0805dc 100644 --- a/core/node/eigenda_proxy/Cargo.toml +++ b/core/node/eigenda_proxy/Cargo.toml @@ -21,9 +21,10 @@ zksync_config.workspace = true # we can't use the workspace version of prost because # the tonic dependency requires a hugher version. prost = "0.13.1" -tonic = { version = "0.12.1", features = ["tls", "channel", "tls-roots"]} +tonic = { version = "0.12.1", features = ["tls", "channel", "tls-native-roots"]} tonic-web = "0.12.1" kzgpad-rs = { git = "https://github.com/Layr-Labs/kzgpad-rs.git", tag = "v0.1.0" } +rustls.workspace = true [build-dependencies] tonic-build = { version = "0.12.1", features = ["prost"] } diff --git a/core/node/eigenda_proxy/src/eigenda_proxy_client.rs b/core/node/eigenda_proxy/src/eigenda_proxy_client.rs index 73845b4d56d..b0cb26888f9 100644 --- a/core/node/eigenda_proxy/src/eigenda_proxy_client.rs +++ b/core/node/eigenda_proxy/src/eigenda_proxy_client.rs @@ -4,6 +4,7 @@ use std::{ time::{Duration, Instant}, }; +use rlp::decode; use tokio::{sync::Mutex, time::interval}; use tonic::transport::{Channel, ClientTlsConfig}; use zksync_config::configs::da_client::eigen_da::DisperserConfig; @@ -24,12 +25,13 @@ impl EigenDAProxyClient { pub const STATUS_QUERY_RETRY_INTERVAL: u64 = 5; // 5 seconds todo: add to config pub const WAIT_FOR_FINALAZATION: bool = false; // todo: add to config pub async fn new(config: DisperserConfig) -> anyhow::Result { + rustls::crypto::ring::default_provider().install_default(); let inner = Channel::builder(config.disperser_rpc.parse()?) .tls_config(ClientTlsConfig::new().with_native_roots())?; - Ok(Self { - disperser: Arc::new(Mutex::new(DisperserClient::connect(inner).await?)), - config, - }) + + let disperser = Arc::new(Mutex::new(DisperserClient::connect(inner).await?)); + + Ok(Self { disperser, config }) } fn result_to_status(&self, result: i32) -> disperser::BlobStatus { @@ -45,13 +47,14 @@ impl EigenDAProxyClient { } } - pub async fn put_blob(&self, blob_data: Vec) -> Result { + pub async fn put_blob(&self, blob_data: Vec) -> Result, ()> { + println!("Putting blob"); let reply = self .disperser .lock() .await .disperse_blob(DisperseBlobRequest { - data: kzgpad_rs::convert_by_padding_empty_byte(&blob_data), + data: blob_data, custom_quorum_numbers: self .config .custom_quorum_numbers @@ -67,6 +70,8 @@ impl EigenDAProxyClient { return Err(()); } + let request_id_str = String::from_utf8(reply.request_id.clone()).unwrap(); + let mut interval = interval(Duration::from_secs(Self::STATUS_QUERY_RETRY_INTERVAL)); let start_time = Instant::now(); while Instant::now() - start_time < Duration::from_secs(Self::STATUS_QUERY_TIMEOUT) { @@ -83,6 +88,11 @@ impl EigenDAProxyClient { let blob_status = blob_status_reply.status(); + println!( + "Dispersing blob {:?}, status: {:?}", + request_id_str, blob_status + ); + match blob_status { disperser::BlobStatus::Unknown => { interval.tick().await; @@ -96,7 +106,8 @@ impl EigenDAProxyClient { } else { match blob_status_reply.info { Some(info) => { - return BlobInfo::try_from(info).map_err(|_| ()); + let blob_info = BlobInfo::try_from(info).map_err(|_| ())?; + return Ok(rlp::encode(&blob_info).to_vec()); } None => { return Err(()); @@ -115,7 +126,8 @@ impl EigenDAProxyClient { } disperser::BlobStatus::Finalized => match blob_status_reply.info { Some(info) => { - return BlobInfo::try_from(info).map_err(|_| ()); + let blob_info = BlobInfo::try_from(info).map_err(|_| ())?; + return Ok(rlp::encode(&blob_info).to_vec()); } None => { return Err(()); @@ -127,18 +139,21 @@ impl EigenDAProxyClient { return Err(()); } - pub async fn get_blob( - &self, - batch_header_hash: Vec, - blob_index: u32, - ) -> Result, ()> { + pub async fn get_blob(&self, commit: Vec) -> Result, ()> { + println!("Getting blob"); + let blob_info: BlobInfo = decode(&commit).map_err(|_| ())?; + let blob_index = blob_info.blob_verification_proof.blob_index; + let batch_header_hash = blob_info + .blob_verification_proof + .batch_medatada + .batch_header_hash; let get_response = self .disperser .lock() .await .retrieve_blob(disperser::RetrieveBlobRequest { batch_header_hash: batch_header_hash, - blob_index: blob_index, + blob_index, }) .await .unwrap() @@ -151,3 +166,58 @@ impl EigenDAProxyClient { return Ok(get_response.data); } } + +mod test { + use std::time::Duration; + + use super::*; + + #[tokio::test] + async fn test_eigenda_proxy() { + let config = DisperserConfig { + api_node_url: "".to_string(), + custom_quorum_numbers: Some(vec![]), + account_id: Some("".to_string()), + disperser_rpc: "https://disperser-holesky.eigenda.xyz:443".to_string(), + eth_confirmation_depth: -1, + eigenda_eth_rpc: "".to_string(), + eigenda_svc_manager_addr: "".to_string(), + }; + let store = match EigenDAProxyClient::new(config).await { + Ok(store) => store, + Err(e) => panic!("Failed to create EigenDAProxyClient {:?}", e), + }; + + let blob = vec![0u8; 100]; + let cert = store.put_blob(blob.clone()).await.unwrap(); + let blob2 = store.get_blob(cert).await.unwrap(); + assert_eq!(blob, blob2); + } + + #[tokio::test] + async fn test_eigenda_multiple() { + let config = DisperserConfig { + api_node_url: "".to_string(), + custom_quorum_numbers: Some(vec![]), + account_id: Some("".to_string()), + disperser_rpc: "https://disperser-holesky.eigenda.xyz:443".to_string(), + eth_confirmation_depth: -1, + eigenda_eth_rpc: "".to_string(), + eigenda_svc_manager_addr: "".to_string(), + }; + let store = match EigenDAProxyClient::new(config).await { + Ok(store) => store, + Err(e) => panic!("Failed to create EigenDAProxyClient {:?}", e), + }; + + let blob = vec![0u8; 100]; + let blob2 = vec![1u8; 100]; + let cert = store.put_blob(blob.clone()); + let cert2 = store.put_blob(blob2.clone()); + let (val1, val2) = tokio::join!(cert, cert2); + let blob_result = store.get_blob(val1.unwrap()).await.unwrap(); + let blob_result2 = store.get_blob(val2.unwrap()).await.unwrap(); + assert_eq!(blob, blob_result); + assert_eq!(blob2, blob_result2); + } +} From 0ae04c350168d0d8b8de7dc8b8b3485253f0c3c8 Mon Sep 17 00:00:00 2001 From: Gianbelinche <39842759+gianbelinche@users.noreply.github.com> Date: Mon, 21 Oct 2024 14:34:47 -0300 Subject: [PATCH 03/11] Add error handling --- .../eigenda_proxy/src/eigenda_proxy_client.rs | 61 ++++++++++++------- core/node/eigenda_proxy/src/errors.rs | 9 +++ 2 files changed, 48 insertions(+), 22 deletions(-) diff --git a/core/node/eigenda_proxy/src/eigenda_proxy_client.rs b/core/node/eigenda_proxy/src/eigenda_proxy_client.rs index b0cb26888f9..bd92b861e13 100644 --- a/core/node/eigenda_proxy/src/eigenda_proxy_client.rs +++ b/core/node/eigenda_proxy/src/eigenda_proxy_client.rs @@ -12,6 +12,7 @@ use zksync_config::configs::da_client::eigen_da::DisperserConfig; use crate::{ blob_info::BlobInfo, disperser::{self, disperser_client::DisperserClient, BlobStatusRequest, DisperseBlobRequest}, + errors::EigenDAError, }; pub struct EigenDAProxyClient { @@ -24,12 +25,25 @@ impl EigenDAProxyClient { pub const STATUS_QUERY_TIMEOUT: u64 = 1800; // 30 minutes todo: add to config pub const STATUS_QUERY_RETRY_INTERVAL: u64 = 5; // 5 seconds todo: add to config pub const WAIT_FOR_FINALAZATION: bool = false; // todo: add to config - pub async fn new(config: DisperserConfig) -> anyhow::Result { - rustls::crypto::ring::default_provider().install_default(); - let inner = Channel::builder(config.disperser_rpc.parse()?) - .tls_config(ClientTlsConfig::new().with_native_roots())?; - - let disperser = Arc::new(Mutex::new(DisperserClient::connect(inner).await?)); + pub async fn new(config: DisperserConfig) -> Result { + match rustls::crypto::ring::default_provider().install_default() { + Ok(_) => {} + Err(_) => {} // This is not an actual error, we expect this function to return an Err(Arc) + }; + let inner = Channel::builder( + config + .disperser_rpc + .parse() + .map_err(|_| EigenDAError::UriError)?, + ) + .tls_config(ClientTlsConfig::new().with_native_roots()) + .map_err(|_| EigenDAError::TlsError)?; + + let disperser = Arc::new(Mutex::new( + DisperserClient::connect(inner) + .await + .map_err(|_| EigenDAError::ConnectionError)?, + )); Ok(Self { disperser, config }) } @@ -47,7 +61,7 @@ impl EigenDAProxyClient { } } - pub async fn put_blob(&self, blob_data: Vec) -> Result, ()> { + pub async fn put_blob(&self, blob_data: Vec) -> Result, EigenDAError> { println!("Putting blob"); let reply = self .disperser @@ -63,14 +77,15 @@ impl EigenDAProxyClient { account_id: self.config.account_id.clone().unwrap_or_default(), }) .await - .unwrap() + .map_err(|_| EigenDAError::PutError)? .into_inner(); if self.result_to_status(reply.result) == disperser::BlobStatus::Failed { - return Err(()); + return Err(EigenDAError::PutError); } - let request_id_str = String::from_utf8(reply.request_id.clone()).unwrap(); + let request_id_str = + String::from_utf8(reply.request_id.clone()).map_err(|_| EigenDAError::PutError)?; let mut interval = interval(Duration::from_secs(Self::STATUS_QUERY_RETRY_INTERVAL)); let start_time = Instant::now(); @@ -83,7 +98,7 @@ impl EigenDAProxyClient { request_id: reply.request_id.clone(), }) .await - .unwrap() + .map_err(|_| EigenDAError::PutError)? .into_inner(); let blob_status = blob_status_reply.status(); @@ -106,42 +121,44 @@ impl EigenDAProxyClient { } else { match blob_status_reply.info { Some(info) => { - let blob_info = BlobInfo::try_from(info).map_err(|_| ())?; + let blob_info = + BlobInfo::try_from(info).map_err(|_| EigenDAError::PutError)?; return Ok(rlp::encode(&blob_info).to_vec()); } None => { - return Err(()); + return Err(EigenDAError::PutError); } } } } disperser::BlobStatus::Failed => { - return Err(()); + return Err(EigenDAError::PutError); } disperser::BlobStatus::InsufficientSignatures => { - return Err(()); + return Err(EigenDAError::PutError); } disperser::BlobStatus::Dispersing => { interval.tick().await; } disperser::BlobStatus::Finalized => match blob_status_reply.info { Some(info) => { - let blob_info = BlobInfo::try_from(info).map_err(|_| ())?; + let blob_info = + BlobInfo::try_from(info).map_err(|_| EigenDAError::PutError)?; return Ok(rlp::encode(&blob_info).to_vec()); } None => { - return Err(()); + return Err(EigenDAError::PutError); } }, } } - return Err(()); + return Err(EigenDAError::PutError); } - pub async fn get_blob(&self, commit: Vec) -> Result, ()> { + pub async fn get_blob(&self, commit: Vec) -> Result, EigenDAError> { println!("Getting blob"); - let blob_info: BlobInfo = decode(&commit).map_err(|_| ())?; + let blob_info: BlobInfo = decode(&commit).map_err(|_| EigenDAError::GetError)?; let blob_index = blob_info.blob_verification_proof.blob_index; let batch_header_hash = blob_info .blob_verification_proof @@ -156,11 +173,11 @@ impl EigenDAProxyClient { blob_index, }) .await - .unwrap() + .map_err(|_| EigenDAError::GetError)? .into_inner(); if get_response.data.len() == 0 { - return Err(()); + return Err(EigenDAError::GetError); } return Ok(get_response.data); diff --git a/core/node/eigenda_proxy/src/errors.rs b/core/node/eigenda_proxy/src/errors.rs index 1ce94a89033..050c3f42d23 100644 --- a/core/node/eigenda_proxy/src/errors.rs +++ b/core/node/eigenda_proxy/src/errors.rs @@ -6,3 +6,12 @@ pub enum MemStoreError { IncorrectCommitment, BlobNotFound, } + +#[derive(Debug)] +pub enum EigenDAError { + TlsError, + UriError, + ConnectionError, + PutError, + GetError, +} From 1a9cf5790ad6ca8ca2209201a92f1435eac86353 Mon Sep 17 00:00:00 2001 From: Gianbelinche <39842759+gianbelinche@users.noreply.github.com> Date: Mon, 21 Oct 2024 14:36:41 -0300 Subject: [PATCH 04/11] Remove proxy from name --- .../src/{eigenda_proxy_client.rs => eigenda_client.rs} | 8 ++++---- core/node/eigenda_proxy/src/lib.rs | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) rename core/node/eigenda_proxy/src/{eigenda_proxy_client.rs => eigenda_client.rs} (97%) diff --git a/core/node/eigenda_proxy/src/eigenda_proxy_client.rs b/core/node/eigenda_proxy/src/eigenda_client.rs similarity index 97% rename from core/node/eigenda_proxy/src/eigenda_proxy_client.rs rename to core/node/eigenda_proxy/src/eigenda_client.rs index bd92b861e13..3e144f8b83c 100644 --- a/core/node/eigenda_proxy/src/eigenda_proxy_client.rs +++ b/core/node/eigenda_proxy/src/eigenda_client.rs @@ -15,12 +15,12 @@ use crate::{ errors::EigenDAError, }; -pub struct EigenDAProxyClient { +pub struct EigenDAClient { disperser: Arc>>, config: DisperserConfig, } -impl EigenDAProxyClient { +impl EigenDAClient { pub const BLOB_SIZE_LIMIT_IN_BYTES: usize = 2 * 1024 * 1024; // 2MB todo: add to config pub const STATUS_QUERY_TIMEOUT: u64 = 1800; // 30 minutes todo: add to config pub const STATUS_QUERY_RETRY_INTERVAL: u64 = 5; // 5 seconds todo: add to config @@ -200,7 +200,7 @@ mod test { eigenda_eth_rpc: "".to_string(), eigenda_svc_manager_addr: "".to_string(), }; - let store = match EigenDAProxyClient::new(config).await { + let store = match EigenDAClient::new(config).await { Ok(store) => store, Err(e) => panic!("Failed to create EigenDAProxyClient {:?}", e), }; @@ -222,7 +222,7 @@ mod test { eigenda_eth_rpc: "".to_string(), eigenda_svc_manager_addr: "".to_string(), }; - let store = match EigenDAProxyClient::new(config).await { + let store = match EigenDAClient::new(config).await { Ok(store) => store, Err(e) => panic!("Failed to create EigenDAProxyClient {:?}", e), }; diff --git a/core/node/eigenda_proxy/src/lib.rs b/core/node/eigenda_proxy/src/lib.rs index 4a0bfb533ed..2a3b6427479 100644 --- a/core/node/eigenda_proxy/src/lib.rs +++ b/core/node/eigenda_proxy/src/lib.rs @@ -10,7 +10,7 @@ use axum::{ }; use tokio::sync::watch; mod blob_info; -mod eigenda_proxy_client; +mod eigenda_client; mod errors; mod memstore; From 017b544b61f9b12a13cea993c530624502de8d8f Mon Sep 17 00:00:00 2001 From: Gianbelinche <39842759+gianbelinche@users.noreply.github.com> Date: Mon, 21 Oct 2024 15:00:00 -0300 Subject: [PATCH 05/11] Add new configs --- .../config/src/configs/da_client/eigen_da.rs | 4 ++ core/lib/protobuf_config/src/da_client.rs | 16 +++++++ .../src/proto/config/da_client.proto | 4 ++ core/node/eigenda_proxy/src/eigenda_client.rs | 48 +++++++++++++++---- 4 files changed, 64 insertions(+), 8 deletions(-) diff --git a/core/lib/config/src/configs/da_client/eigen_da.rs b/core/lib/config/src/configs/da_client/eigen_da.rs index ed678c1c818..3543a6b5c53 100644 --- a/core/lib/config/src/configs/da_client/eigen_da.rs +++ b/core/lib/config/src/configs/da_client/eigen_da.rs @@ -27,4 +27,8 @@ pub struct DisperserConfig { pub eth_confirmation_depth: i32, pub eigenda_eth_rpc: String, pub eigenda_svc_manager_addr: String, + pub blob_size_limit: u64, + pub status_query_timeout: u64, + pub status_query_interval: u64, + pub wait_for_finalization: bool, } diff --git a/core/lib/protobuf_config/src/da_client.rs b/core/lib/protobuf_config/src/da_client.rs index f6798bd27d3..933a16ecd61 100644 --- a/core/lib/protobuf_config/src/da_client.rs +++ b/core/lib/protobuf_config/src/da_client.rs @@ -77,6 +77,18 @@ impl ProtoRepr for proto::DataAvailabilityClient { eigenda_svc_manager_addr: required(&conf.eigenda_svc_manager_addr) .context("eigenda_svc_manager_addr")? .clone(), + blob_size_limit: required(&conf.blob_size_limit) + .context("blob_size_limit")? + .clone(), + status_query_timeout: required(&conf.status_query_timeout) + .context("status_query_timeout")? + .clone(), + status_query_interval: required(&conf.status_query_interval) + .context("status_query_interval")? + .clone(), + wait_for_finalization: required(&conf.wait_for_finalization) + .context("wait_for_finalization")? + .clone(), }) } }; @@ -143,6 +155,10 @@ impl ProtoRepr for proto::DataAvailabilityClient { eigenda_svc_manager_addr: Some( config.eigenda_svc_manager_addr.clone(), ), + blob_size_limit: Some(config.blob_size_limit), + status_query_timeout: Some(config.status_query_timeout), + status_query_interval: Some(config.status_query_interval), + wait_for_finalization: Some(config.wait_for_finalization), }, )), }, diff --git a/core/lib/protobuf_config/src/proto/config/da_client.proto b/core/lib/protobuf_config/src/proto/config/da_client.proto index 7f5498fed97..9240af15213 100644 --- a/core/lib/protobuf_config/src/proto/config/da_client.proto +++ b/core/lib/protobuf_config/src/proto/config/da_client.proto @@ -31,6 +31,10 @@ message DisperserConfig { optional int32 eth_confirmation_depth = 5; optional string eigenda_eth_rpc = 6; optional string eigenda_svc_manager_addr = 7; + optional uint64 blob_size_limit = 8; + optional uint64 status_query_timeout = 9; + optional uint64 status_query_interval = 10; + optional bool wait_for_finalization = 11; } message EigenDaConfig { diff --git a/core/node/eigenda_proxy/src/eigenda_client.rs b/core/node/eigenda_proxy/src/eigenda_client.rs index 3e144f8b83c..2d6d01f2887 100644 --- a/core/node/eigenda_proxy/src/eigenda_client.rs +++ b/core/node/eigenda_proxy/src/eigenda_client.rs @@ -21,10 +21,6 @@ pub struct EigenDAClient { } impl EigenDAClient { - pub const BLOB_SIZE_LIMIT_IN_BYTES: usize = 2 * 1024 * 1024; // 2MB todo: add to config - pub const STATUS_QUERY_TIMEOUT: u64 = 1800; // 30 minutes todo: add to config - pub const STATUS_QUERY_RETRY_INTERVAL: u64 = 5; // 5 seconds todo: add to config - pub const WAIT_FOR_FINALAZATION: bool = false; // todo: add to config pub async fn new(config: DisperserConfig) -> Result { match rustls::crypto::ring::default_provider().install_default() { Ok(_) => {} @@ -63,6 +59,9 @@ impl EigenDAClient { pub async fn put_blob(&self, blob_data: Vec) -> Result, EigenDAError> { println!("Putting blob"); + if blob_data.len() > self.config.blob_size_limit as usize { + return Err(EigenDAError::PutError); + } let reply = self .disperser .lock() @@ -87,9 +86,9 @@ impl EigenDAClient { let request_id_str = String::from_utf8(reply.request_id.clone()).map_err(|_| EigenDAError::PutError)?; - let mut interval = interval(Duration::from_secs(Self::STATUS_QUERY_RETRY_INTERVAL)); + let mut interval = interval(Duration::from_secs(self.config.status_query_interval)); let start_time = Instant::now(); - while Instant::now() - start_time < Duration::from_secs(Self::STATUS_QUERY_TIMEOUT) { + while Instant::now() - start_time < Duration::from_secs(self.config.status_query_timeout) { let blob_status_reply = self .disperser .lock() @@ -116,7 +115,7 @@ impl EigenDAClient { interval.tick().await; } disperser::BlobStatus::Confirmed => { - if Self::WAIT_FOR_FINALAZATION { + if self.config.wait_for_finalization { interval.tick().await; } else { match blob_status_reply.info { @@ -190,7 +189,7 @@ mod test { use super::*; #[tokio::test] - async fn test_eigenda_proxy() { + async fn test_eigenda_client() { let config = DisperserConfig { api_node_url: "".to_string(), custom_quorum_numbers: Some(vec![]), @@ -199,6 +198,10 @@ mod test { eth_confirmation_depth: -1, eigenda_eth_rpc: "".to_string(), eigenda_svc_manager_addr: "".to_string(), + blob_size_limit: 2 * 1024 * 1024, // 2MB + status_query_timeout: 1800, // 30 minutes + status_query_interval: 5, // 5 seconds + wait_for_finalization: false, }; let store = match EigenDAClient::new(config).await { Ok(store) => store, @@ -221,6 +224,10 @@ mod test { eth_confirmation_depth: -1, eigenda_eth_rpc: "".to_string(), eigenda_svc_manager_addr: "".to_string(), + blob_size_limit: 2 * 1024 * 1024, // 2MB + status_query_timeout: 1800, // 30 minutes + status_query_interval: 5, // 5 seconds + wait_for_finalization: false, }; let store = match EigenDAClient::new(config).await { Ok(store) => store, @@ -237,4 +244,29 @@ mod test { assert_eq!(blob, blob_result); assert_eq!(blob2, blob_result2); } + + #[tokio::test] + async fn test_eigenda_blob_size_limit() { + let config = DisperserConfig { + api_node_url: "".to_string(), + custom_quorum_numbers: Some(vec![]), + account_id: Some("".to_string()), + disperser_rpc: "https://disperser-holesky.eigenda.xyz:443".to_string(), + eth_confirmation_depth: -1, + eigenda_eth_rpc: "".to_string(), + eigenda_svc_manager_addr: "".to_string(), + blob_size_limit: 2, // 2MB + status_query_timeout: 1800, // 30 minutes + status_query_interval: 5, // 5 seconds + wait_for_finalization: false, + }; + let store = match EigenDAClient::new(config).await { + Ok(store) => store, + Err(e) => panic!("Failed to create EigenDAProxyClient {:?}", e), + }; + + let blob = vec![0u8; 3]; + let cert = store.put_blob(blob.clone()).await; + assert!(cert.is_err()); + } } From 3cb1c31fcd891f9143ae74642b5f392c5161b9f4 Mon Sep 17 00:00:00 2001 From: Gianbelinche <39842759+gianbelinche@users.noreply.github.com> Date: Mon, 21 Oct 2024 15:03:49 -0300 Subject: [PATCH 06/11] Update eigenda-integration.md --- eigenda-integration.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/eigenda-integration.md b/eigenda-integration.md index 36c88c328dd..eb3704c753c 100644 --- a/eigenda-integration.md +++ b/eigenda-integration.md @@ -43,6 +43,10 @@ da_client: eth_confirmation_depth: -1 eigenda_eth_rpc: eigenda_svc_manager_addr: '0xD4A7E1Bd8015057293f0D0A557088c286942e84b' + blob_size_limit: 2097152 + status_query_timeout: 1800 + status_query_interval: 5 + wait_for_finalization: false ``` 2. Add `eigenda-proxy` to the `docker-compose.yml` file: From 0b1727de4a7d3bd7e0fd64ea3b5714e8d391b809 Mon Sep 17 00:00:00 2001 From: Gianbelinche <39842759+gianbelinche@users.noreply.github.com> Date: Mon, 21 Oct 2024 15:52:36 -0300 Subject: [PATCH 07/11] Address pr comments --- core/node/eigenda_proxy/src/eigenda_client.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/core/node/eigenda_proxy/src/eigenda_client.rs b/core/node/eigenda_proxy/src/eigenda_client.rs index 2d6d01f2887..b8a4c552020 100644 --- a/core/node/eigenda_proxy/src/eigenda_client.rs +++ b/core/node/eigenda_proxy/src/eigenda_client.rs @@ -1,5 +1,4 @@ use std::{ - fmt::Debug, sync::Arc, time::{Duration, Instant}, }; @@ -58,7 +57,7 @@ impl EigenDAClient { } pub async fn put_blob(&self, blob_data: Vec) -> Result, EigenDAError> { - println!("Putting blob"); + tracing::info!("Putting blob"); if blob_data.len() > self.config.blob_size_limit as usize { return Err(EigenDAError::PutError); } @@ -102,9 +101,10 @@ impl EigenDAClient { let blob_status = blob_status_reply.status(); - println!( + tracing::info!( "Dispersing blob {:?}, status: {:?}", - request_id_str, blob_status + request_id_str, + blob_status ); match blob_status { @@ -156,7 +156,7 @@ impl EigenDAClient { } pub async fn get_blob(&self, commit: Vec) -> Result, EigenDAError> { - println!("Getting blob"); + tracing::info!("Getting blob"); let blob_info: BlobInfo = decode(&commit).map_err(|_| EigenDAError::GetError)?; let blob_index = blob_info.blob_verification_proof.blob_index; let batch_header_hash = blob_info @@ -168,7 +168,7 @@ impl EigenDAClient { .lock() .await .retrieve_blob(disperser::RetrieveBlobRequest { - batch_header_hash: batch_header_hash, + batch_header_hash, blob_index, }) .await @@ -183,9 +183,8 @@ impl EigenDAClient { } } +#[cfg(test)] mod test { - use std::time::Duration; - use super::*; #[tokio::test] From f8a93b11872629a464bdeba793ba9ec09ecde538 Mon Sep 17 00:00:00 2001 From: Gianbelinche <39842759+gianbelinche@users.noreply.github.com> Date: Mon, 21 Oct 2024 17:36:12 -0300 Subject: [PATCH 08/11] Implement first authenticated call --- Cargo.lock | 3 + core/node/eigenda_proxy/Cargo.toml | 3 + core/node/eigenda_proxy/src/eigenda_client.rs | 177 +++++++++++++++++- 3 files changed, 182 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 0a160696dc8..4caa7b7ea3a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10177,11 +10177,14 @@ version = "0.1.0" dependencies = [ "anyhow", "axum", + "futures 0.3.30", + "hex", "kzgpad-rs", "prost 0.13.3", "rand 0.8.5", "rlp", "rustls 0.23.13", + "secp256k1", "sha3 0.10.8", "tokio", "tonic", diff --git a/core/node/eigenda_proxy/Cargo.toml b/core/node/eigenda_proxy/Cargo.toml index d611a0805dc..11462da6d8c 100644 --- a/core/node/eigenda_proxy/Cargo.toml +++ b/core/node/eigenda_proxy/Cargo.toml @@ -25,6 +25,9 @@ tonic = { version = "0.12.1", features = ["tls", "channel", "tls-native-roots"]} tonic-web = "0.12.1" kzgpad-rs = { git = "https://github.com/Layr-Labs/kzgpad-rs.git", tag = "v0.1.0" } rustls.workspace = true +futures.workspace = true +secp256k1.workspace = true +hex.workspace = true [build-dependencies] tonic-build = { version = "0.12.1", features = ["prost"] } diff --git a/core/node/eigenda_proxy/src/eigenda_client.rs b/core/node/eigenda_proxy/src/eigenda_client.rs index b8a4c552020..9a8848d6f3b 100644 --- a/core/node/eigenda_proxy/src/eigenda_client.rs +++ b/core/node/eigenda_proxy/src/eigenda_client.rs @@ -1,16 +1,21 @@ use std::{ + str::FromStr, sync::Arc, time::{Duration, Instant}, }; use rlp::decode; +use secp256k1::{PublicKey, Secp256k1, SecretKey}; use tokio::{sync::Mutex, time::interval}; use tonic::transport::{Channel, ClientTlsConfig}; use zksync_config::configs::da_client::eigen_da::DisperserConfig; use crate::{ blob_info::BlobInfo, - disperser::{self, disperser_client::DisperserClient, BlobStatusRequest, DisperseBlobRequest}, + disperser::{ + self, disperser_client::DisperserClient, AuthenticatedRequest, BlobStatusRequest, + DisperseBlobRequest, + }, errors::EigenDAError, }; @@ -155,6 +160,148 @@ impl EigenDAClient { return Err(EigenDAError::PutError); } + pub async fn put_blob_authenticated( + &self, + blob_data: Vec, + ) -> Result, EigenDAError> { + tracing::info!("Putting blob"); + println!("Putting blob"); + if blob_data.len() > self.config.blob_size_limit as usize { + return Err(EigenDAError::PutError); + } + + let custom_quorum_numbers = self + .config + .custom_quorum_numbers + .clone() + .unwrap_or_default(); + let account_id = self.config.account_id.clone().unwrap_or_default(); + let secp = Secp256k1::new(); + let secret_key = SecretKey::from_str(account_id.as_str()).map_err(|e| { + println!("Error: {:?}", e); + EigenDAError::PutError + })?; + let public_key = PublicKey::from_secret_key(&secp, &secret_key); + let account_id = "0x".to_string() + &hex::encode(public_key.serialize_uncompressed()); + let request_stream = futures::stream::once(async { + AuthenticatedRequest { + payload: Some(disperser::authenticated_request::Payload::DisperseRequest( + DisperseBlobRequest { + data: blob_data, + custom_quorum_numbers, + account_id, + }, + )), + } + }); + + println!("Request stream: "); + + let result = self + .disperser + .lock() + .await + .disperse_blob_authenticated(request_stream) + .await + .map_err(|e| { + println!("Error {:?}", e); + EigenDAError::PutError + })? + .get_mut() + .message() + .await + .map_err(|e| { + println!("Error {:?}", e); + EigenDAError::PutError + })?; + + println!("Result: {:?}", result); + + let reply = match result.unwrap().payload.unwrap() { + disperser::authenticated_reply::Payload::DisperseReply(reply) => reply, + _ => { + println!("Error"); + return Err(EigenDAError::PutError); + } + }; + + if self.result_to_status(reply.result) == disperser::BlobStatus::Failed { + return Err(EigenDAError::PutError); + } + + let request_id_str = + String::from_utf8(reply.request_id.clone()).map_err(|_| EigenDAError::PutError)?; + + let mut interval = interval(Duration::from_secs(self.config.status_query_interval)); + let start_time = Instant::now(); + while Instant::now() - start_time < Duration::from_secs(self.config.status_query_timeout) { + let blob_status_reply = self + .disperser + .lock() + .await + .get_blob_status(BlobStatusRequest { + request_id: reply.request_id.clone(), + }) + .await + .map_err(|_| EigenDAError::PutError)? + .into_inner(); + + let blob_status = blob_status_reply.status(); + + tracing::info!( + "Dispersing blob {:?}, status: {:?}", + request_id_str, + blob_status + ); + + match blob_status { + disperser::BlobStatus::Unknown => { + interval.tick().await; + } + disperser::BlobStatus::Processing => { + interval.tick().await; + } + disperser::BlobStatus::Confirmed => { + if self.config.wait_for_finalization { + interval.tick().await; + } else { + match blob_status_reply.info { + Some(info) => { + let blob_info = + BlobInfo::try_from(info).map_err(|_| EigenDAError::PutError)?; + return Ok(rlp::encode(&blob_info).to_vec()); + } + None => { + return Err(EigenDAError::PutError); + } + } + } + } + disperser::BlobStatus::Failed => { + return Err(EigenDAError::PutError); + } + disperser::BlobStatus::InsufficientSignatures => { + return Err(EigenDAError::PutError); + } + disperser::BlobStatus::Dispersing => { + interval.tick().await; + } + disperser::BlobStatus::Finalized => match blob_status_reply.info { + Some(info) => { + let blob_info = + BlobInfo::try_from(info).map_err(|_| EigenDAError::PutError)?; + return Ok(rlp::encode(&blob_info).to_vec()); + } + None => { + return Err(EigenDAError::PutError); + } + }, + } + } + + return Err(EigenDAError::PutError); + } + pub async fn get_blob(&self, commit: Vec) -> Result, EigenDAError> { tracing::info!("Getting blob"); let blob_info: BlobInfo = decode(&commit).map_err(|_| EigenDAError::GetError)?; @@ -268,4 +415,32 @@ mod test { let cert = store.put_blob(blob.clone()).await; assert!(cert.is_err()); } + + #[tokio::test] + async fn test_eigenda_client_authenticated() { + let config = DisperserConfig { + api_node_url: "".to_string(), + custom_quorum_numbers: Some(vec![]), + account_id: Some( + "850683b40d4a740aa6e745f889a6fdc8327be76e122f5aba645a5b02d0248db8".to_string(), + ), + disperser_rpc: "https://disperser-holesky.eigenda.xyz:443".to_string(), + eth_confirmation_depth: -1, + eigenda_eth_rpc: "".to_string(), + eigenda_svc_manager_addr: "".to_string(), + blob_size_limit: 2 * 1024 * 1024, // 2MB + status_query_timeout: 1800, // 30 minutes + status_query_interval: 5, // 5 seconds + wait_for_finalization: false, + }; + let store = match EigenDAClient::new(config).await { + Ok(store) => store, + Err(e) => panic!("Failed to create EigenDAProxyClient {:?}", e), + }; + + let blob = vec![0u8; 100]; + let cert = store.put_blob_authenticated(blob.clone()).await.unwrap(); + let blob2 = store.get_blob(cert).await.unwrap(); + assert_eq!(blob, blob2); + } } From 73ca611b1d6718bb83b4cc0a3a0187cb0ade1243 Mon Sep 17 00:00:00 2001 From: Gianbelinche <39842759+gianbelinche@users.noreply.github.com> Date: Tue, 22 Oct 2024 16:18:54 -0300 Subject: [PATCH 09/11] Add second request to authentication --- Cargo.lock | 3 + core/node/eigenda_proxy/Cargo.toml | 3 + core/node/eigenda_proxy/src/eigenda_client.rs | 153 +++++++++++++----- 3 files changed, 117 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4caa7b7ea3a..a8ec17ddbec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10177,6 +10177,7 @@ version = "0.1.0" dependencies = [ "anyhow", "axum", + "byteorder", "futures 0.3.30", "hex", "kzgpad-rs", @@ -10186,7 +10187,9 @@ dependencies = [ "rustls 0.23.13", "secp256k1", "sha3 0.10.8", + "tiny-keccak 2.0.2", "tokio", + "tokio-stream", "tonic", "tonic-build", "tonic-web", diff --git a/core/node/eigenda_proxy/Cargo.toml b/core/node/eigenda_proxy/Cargo.toml index 11462da6d8c..6926d546fe8 100644 --- a/core/node/eigenda_proxy/Cargo.toml +++ b/core/node/eigenda_proxy/Cargo.toml @@ -28,6 +28,9 @@ rustls.workspace = true futures.workspace = true secp256k1.workspace = true hex.workspace = true +tokio-stream = "0.1.16" +byteorder = "1.5.0" +tiny-keccak.workspace = true [build-dependencies] tonic-build = { version = "0.12.1", features = ["prost"] } diff --git a/core/node/eigenda_proxy/src/eigenda_client.rs b/core/node/eigenda_proxy/src/eigenda_client.rs index 9a8848d6f3b..775cd8780cc 100644 --- a/core/node/eigenda_proxy/src/eigenda_client.rs +++ b/core/node/eigenda_proxy/src/eigenda_client.rs @@ -4,8 +4,10 @@ use std::{ time::{Duration, Instant}, }; +use byteorder::{BigEndian, ByteOrder}; use rlp::decode; -use secp256k1::{PublicKey, Secp256k1, SecretKey}; +use secp256k1::{Message, PublicKey, Secp256k1, SecretKey}; +use tiny_keccak::{Hasher, Keccak}; use tokio::{sync::Mutex, time::interval}; use tonic::transport::{Channel, ClientTlsConfig}; use zksync_config::configs::da_client::eigen_da::DisperserConfig; @@ -13,8 +15,8 @@ use zksync_config::configs::da_client::eigen_da::DisperserConfig; use crate::{ blob_info::BlobInfo, disperser::{ - self, disperser_client::DisperserClient, AuthenticatedRequest, BlobStatusRequest, - DisperseBlobRequest, + self, disperser_client::DisperserClient, AuthenticatedRequest, AuthenticationData, + BlobStatusRequest, DisperseBlobRequest, }, errors::EigenDAError, }; @@ -160,6 +162,103 @@ impl EigenDAClient { return Err(EigenDAError::PutError); } + fn keccak256(&self, input: &[u8]) -> [u8; 32] { + let mut hasher = Keccak::v256(); + let mut output = [0u8; 32]; + hasher.update(input); + hasher.finalize(&mut output); + output + } + + fn sign(&self, challenge: u32, private_key: &SecretKey) -> Vec { + let mut buf = [0u8; 4]; + BigEndian::write_u32(&mut buf, challenge); + let hash = self.keccak256(&buf); + let message = Message::from_slice(&hash).unwrap(); + let secp = Secp256k1::signing_only(); + let recoverable_sig = secp.sign_ecdsa_recoverable(&message, private_key); + + // Step 5: Convert recoverable signature to a 65-byte array (64 bytes for signature + 1 byte for recovery ID) + let (recovery_id, sig_bytes) = recoverable_sig.serialize_compact(); + + // Step 6: Append the recovery ID as the last byte to form a 65-byte signature + let mut full_signature = [0u8; 65]; + full_signature[..64].copy_from_slice(&sig_bytes); + full_signature[64] = recovery_id.to_i32() as u8; // Append the recovery ID as the last byte + + full_signature.to_vec() + } + + async fn authentication( + &self, + blob_data: Vec, + custom_quorum_numbers: Vec, + account_id: String, + private_key: &SecretKey, + ) -> Result { + let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::(); + let request = AuthenticatedRequest { + payload: Some(disperser::authenticated_request::Payload::DisperseRequest( + DisperseBlobRequest { + data: blob_data, + custom_quorum_numbers, + account_id, + }, + )), + }; + sender.send(request).unwrap(); + let receiver_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(receiver); + let mut stream = self + .disperser + .lock() + .await + .disperse_blob_authenticated(receiver_stream) + .await + .map_err(|e| { + println!("Error {:?}", e); + EigenDAError::PutError + })?; + + let result = stream.get_mut().message().await.map_err(|e| { + println!("Error {:?}", e); + EigenDAError::PutError + })?; + + let reply = match result.unwrap().payload.unwrap() { + disperser::authenticated_reply::Payload::DisperseReply(_) => { + return Err(EigenDAError::PutError); + } + disperser::authenticated_reply::Payload::BlobAuthHeader(reply) => { + let challenge = reply.challenge_parameter; + println!("Challenge: {:?}", challenge); + let new_request = AuthenticatedRequest { + payload: Some( + disperser::authenticated_request::Payload::AuthenticationData( + AuthenticationData { + authentication_data: self.sign(challenge, private_key), // Todo: real signature + }, + ), + ), + }; + sender.send(new_request).unwrap(); + let result = stream.get_mut().message().await.map_err(|e| { + println!("Error {:?}", e); + EigenDAError::PutError + })?; + + let reply = match result.unwrap().payload.unwrap() { + disperser::authenticated_reply::Payload::DisperseReply(reply) => reply, + _ => { + return Err(EigenDAError::PutError); + } + }; + reply + } + }; + + Ok(reply) + } + pub async fn put_blob_authenticated( &self, blob_data: Vec, @@ -183,47 +282,12 @@ impl EigenDAClient { })?; let public_key = PublicKey::from_secret_key(&secp, &secret_key); let account_id = "0x".to_string() + &hex::encode(public_key.serialize_uncompressed()); - let request_stream = futures::stream::once(async { - AuthenticatedRequest { - payload: Some(disperser::authenticated_request::Payload::DisperseRequest( - DisperseBlobRequest { - data: blob_data, - custom_quorum_numbers, - account_id, - }, - )), - } - }); - - println!("Request stream: "); - - let result = self - .disperser - .lock() - .await - .disperse_blob_authenticated(request_stream) - .await - .map_err(|e| { - println!("Error {:?}", e); - EigenDAError::PutError - })? - .get_mut() - .message() - .await - .map_err(|e| { - println!("Error {:?}", e); - EigenDAError::PutError - })?; - println!("Result: {:?}", result); + let reply = self + .authentication(blob_data, custom_quorum_numbers, account_id, &secret_key) + .await?; - let reply = match result.unwrap().payload.unwrap() { - disperser::authenticated_reply::Payload::DisperseReply(reply) => reply, - _ => { - println!("Error"); - return Err(EigenDAError::PutError); - } - }; + println!("Reply: {:?}", reply); if self.result_to_status(reply.result) == disperser::BlobStatus::Failed { return Err(EigenDAError::PutError); @@ -254,6 +318,11 @@ impl EigenDAClient { blob_status ); + println!( + "Dispersing blob {:?}, status: {:?}", + request_id_str, blob_status + ); + match blob_status { disperser::BlobStatus::Unknown => { interval.tick().await; From fd00942814b38dac60784b9e9276baec9d53f45c Mon Sep 17 00:00:00 2001 From: Gianbelinche <39842759+gianbelinche@users.noreply.github.com> Date: Tue, 22 Oct 2024 17:12:30 -0300 Subject: [PATCH 10/11] Simplify authenticated dispersal --- core/node/eigenda_proxy/src/eigenda_client.rs | 98 +++++++------------ core/node/eigenda_proxy/src/lib.rs | 1 + core/node/eigenda_proxy/src/signer.rs | 30 ++++++ 3 files changed, 68 insertions(+), 61 deletions(-) create mode 100644 core/node/eigenda_proxy/src/signer.rs diff --git a/core/node/eigenda_proxy/src/eigenda_client.rs b/core/node/eigenda_proxy/src/eigenda_client.rs index 775cd8780cc..1c998caf789 100644 --- a/core/node/eigenda_proxy/src/eigenda_client.rs +++ b/core/node/eigenda_proxy/src/eigenda_client.rs @@ -4,10 +4,8 @@ use std::{ time::{Duration, Instant}, }; -use byteorder::{BigEndian, ByteOrder}; use rlp::decode; -use secp256k1::{Message, PublicKey, Secp256k1, SecretKey}; -use tiny_keccak::{Hasher, Keccak}; +use secp256k1::{PublicKey, Secp256k1, SecretKey}; use tokio::{sync::Mutex, time::interval}; use tonic::transport::{Channel, ClientTlsConfig}; use zksync_config::configs::da_client::eigen_da::DisperserConfig; @@ -15,10 +13,12 @@ use zksync_config::configs::da_client::eigen_da::DisperserConfig; use crate::{ blob_info::BlobInfo, disperser::{ - self, disperser_client::DisperserClient, AuthenticatedRequest, AuthenticationData, - BlobStatusRequest, DisperseBlobRequest, + self, authenticated_reply::Payload, disperser_client::DisperserClient, AuthenticatedReply, + AuthenticatedRequest, AuthenticationData, BlobAuthHeader, BlobStatusRequest, + DisperseBlobRequest, }, errors::EigenDAError, + signer::sign, }; pub struct EigenDAClient { @@ -162,33 +162,6 @@ impl EigenDAClient { return Err(EigenDAError::PutError); } - fn keccak256(&self, input: &[u8]) -> [u8; 32] { - let mut hasher = Keccak::v256(); - let mut output = [0u8; 32]; - hasher.update(input); - hasher.finalize(&mut output); - output - } - - fn sign(&self, challenge: u32, private_key: &SecretKey) -> Vec { - let mut buf = [0u8; 4]; - BigEndian::write_u32(&mut buf, challenge); - let hash = self.keccak256(&buf); - let message = Message::from_slice(&hash).unwrap(); - let secp = Secp256k1::signing_only(); - let recoverable_sig = secp.sign_ecdsa_recoverable(&message, private_key); - - // Step 5: Convert recoverable signature to a 65-byte array (64 bytes for signature + 1 byte for recovery ID) - let (recovery_id, sig_bytes) = recoverable_sig.serialize_compact(); - - // Step 6: Append the recovery ID as the last byte to form a 65-byte signature - let mut full_signature = [0u8; 65]; - full_signature[..64].copy_from_slice(&sig_bytes); - full_signature[64] = recovery_id.to_i32() as u8; // Append the recovery ID as the last byte - - full_signature.to_vec() - } - async fn authentication( &self, blob_data: Vec, @@ -206,7 +179,7 @@ impl EigenDAClient { }, )), }; - sender.send(request).unwrap(); + sender.send(request).map_err(|_| EigenDAError::PutError)?; let receiver_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(receiver); let mut stream = self .disperser @@ -224,36 +197,39 @@ impl EigenDAClient { EigenDAError::PutError })?; - let reply = match result.unwrap().payload.unwrap() { - disperser::authenticated_reply::Payload::DisperseReply(_) => { - return Err(EigenDAError::PutError); - } - disperser::authenticated_reply::Payload::BlobAuthHeader(reply) => { - let challenge = reply.challenge_parameter; - println!("Challenge: {:?}", challenge); - let new_request = AuthenticatedRequest { - payload: Some( - disperser::authenticated_request::Payload::AuthenticationData( - AuthenticationData { - authentication_data: self.sign(challenge, private_key), // Todo: real signature - }, - ), + let reply = if let Some(AuthenticatedReply { + payload: Some(Payload::BlobAuthHeader(header)), + }) = result + { + let challenge = header.challenge_parameter; + let new_request = AuthenticatedRequest { + payload: Some( + disperser::authenticated_request::Payload::AuthenticationData( + AuthenticationData { + authentication_data: sign(challenge, private_key), + }, ), - }; - sender.send(new_request).unwrap(); - let result = stream.get_mut().message().await.map_err(|e| { - println!("Error {:?}", e); - EigenDAError::PutError - })?; - - let reply = match result.unwrap().payload.unwrap() { - disperser::authenticated_reply::Payload::DisperseReply(reply) => reply, - _ => { - return Err(EigenDAError::PutError); - } - }; + ), + }; + sender + .send(new_request) + .map_err(|_| EigenDAError::PutError)?; + let result = stream.get_mut().message().await.map_err(|e| { + println!("Error {:?}", e); + EigenDAError::PutError + })?; + + let reply = if let Some(AuthenticatedReply { + payload: Some(Payload::DisperseReply(reply)), + }) = result + { reply - } + } else { + return Err(EigenDAError::PutError); + }; + reply + } else { + return Err(EigenDAError::PutError); }; Ok(reply) diff --git a/core/node/eigenda_proxy/src/lib.rs b/core/node/eigenda_proxy/src/lib.rs index 2a3b6427479..ed8e7d17e4c 100644 --- a/core/node/eigenda_proxy/src/lib.rs +++ b/core/node/eigenda_proxy/src/lib.rs @@ -13,6 +13,7 @@ mod blob_info; mod eigenda_client; mod errors; mod memstore; +mod signer; pub async fn run_server(mut stop_receiver: watch::Receiver) -> anyhow::Result<()> { // TODO: Replace port for config diff --git a/core/node/eigenda_proxy/src/signer.rs b/core/node/eigenda_proxy/src/signer.rs new file mode 100644 index 00000000000..be9ecddb724 --- /dev/null +++ b/core/node/eigenda_proxy/src/signer.rs @@ -0,0 +1,30 @@ +use byteorder::{BigEndian, ByteOrder}; +use secp256k1::{Message, Secp256k1, SecretKey}; +use tiny_keccak::{Hasher, Keccak}; + +fn keccak256(input: &[u8]) -> [u8; 32] { + let mut hasher = Keccak::v256(); + let mut output = [0u8; 32]; + hasher.update(input); + hasher.finalize(&mut output); + output +} + +pub fn sign(challenge: u32, private_key: &SecretKey) -> Vec { + let mut buf = [0u8; 4]; + BigEndian::write_u32(&mut buf, challenge); + let hash = keccak256(&buf); + let message = Message::from_slice(&hash).unwrap(); + let secp = Secp256k1::signing_only(); + let recoverable_sig = secp.sign_ecdsa_recoverable(&message, private_key); + + // Step 5: Convert recoverable signature to a 65-byte array (64 bytes for signature + 1 byte for recovery ID) + let (recovery_id, sig_bytes) = recoverable_sig.serialize_compact(); + + // Step 6: Append the recovery ID as the last byte to form a 65-byte signature + let mut full_signature = [0u8; 65]; + full_signature[..64].copy_from_slice(&sig_bytes); + full_signature[64] = recovery_id.to_i32() as u8; // Append the recovery ID as the last byte + + full_signature.to_vec() +} From 37b36b814b0ca047e4badab3082d6a155a826dfa Mon Sep 17 00:00:00 2001 From: Gianbelinche <39842759+gianbelinche@users.noreply.github.com> Date: Tue, 22 Oct 2024 17:24:47 -0300 Subject: [PATCH 11/11] Add message to error --- core/node/eigenda_proxy/src/eigenda_client.rs | 129 ++++++++++-------- core/node/eigenda_proxy/src/errors.rs | 2 +- 2 files changed, 73 insertions(+), 58 deletions(-) diff --git a/core/node/eigenda_proxy/src/eigenda_client.rs b/core/node/eigenda_proxy/src/eigenda_client.rs index 1c998caf789..1633a2960bf 100644 --- a/core/node/eigenda_proxy/src/eigenda_client.rs +++ b/core/node/eigenda_proxy/src/eigenda_client.rs @@ -66,7 +66,7 @@ impl EigenDAClient { pub async fn put_blob(&self, blob_data: Vec) -> Result, EigenDAError> { tracing::info!("Putting blob"); if blob_data.len() > self.config.blob_size_limit as usize { - return Err(EigenDAError::PutError); + return Err(EigenDAError::PutError("Blob too large".to_string())); } let reply = self .disperser @@ -82,15 +82,17 @@ impl EigenDAClient { account_id: self.config.account_id.clone().unwrap_or_default(), }) .await - .map_err(|_| EigenDAError::PutError)? + .map_err(|e| EigenDAError::PutError(e.to_string()))? .into_inner(); if self.result_to_status(reply.result) == disperser::BlobStatus::Failed { - return Err(EigenDAError::PutError); + return Err(EigenDAError::PutError( + "Failed to disperse blob".to_string(), + )); } - let request_id_str = - String::from_utf8(reply.request_id.clone()).map_err(|_| EigenDAError::PutError)?; + let request_id_str = String::from_utf8(reply.request_id.clone()) + .map_err(|e| EigenDAError::PutError(e.to_string()))?; let mut interval = interval(Duration::from_secs(self.config.status_query_interval)); let start_time = Instant::now(); @@ -103,7 +105,7 @@ impl EigenDAClient { request_id: reply.request_id.clone(), }) .await - .map_err(|_| EigenDAError::PutError)? + .map_err(|e| EigenDAError::PutError(e.to_string()))? .into_inner(); let blob_status = blob_status_reply.status(); @@ -127,39 +129,49 @@ impl EigenDAClient { } else { match blob_status_reply.info { Some(info) => { - let blob_info = - BlobInfo::try_from(info).map_err(|_| EigenDAError::PutError)?; + let blob_info = BlobInfo::try_from(info) + .map_err(|e| EigenDAError::PutError(e.to_string()))?; return Ok(rlp::encode(&blob_info).to_vec()); } None => { - return Err(EigenDAError::PutError); + return Err(EigenDAError::PutError( + "Failed to get blob info".to_string(), + )); } } } } disperser::BlobStatus::Failed => { - return Err(EigenDAError::PutError); + return Err(EigenDAError::PutError( + "Failed to disperse blob".to_string(), + )); } disperser::BlobStatus::InsufficientSignatures => { - return Err(EigenDAError::PutError); + return Err(EigenDAError::PutError( + "Insufficient signatures".to_string(), + )); } disperser::BlobStatus::Dispersing => { interval.tick().await; } disperser::BlobStatus::Finalized => match blob_status_reply.info { Some(info) => { - let blob_info = - BlobInfo::try_from(info).map_err(|_| EigenDAError::PutError)?; + let blob_info = BlobInfo::try_from(info) + .map_err(|e| EigenDAError::PutError(e.to_string()))?; return Ok(rlp::encode(&blob_info).to_vec()); } None => { - return Err(EigenDAError::PutError); + return Err(EigenDAError::PutError( + "Failed to get blob info".to_string(), + )); } }, } } - return Err(EigenDAError::PutError); + return Err(EigenDAError::PutError( + "Failed to disperse blob".to_string(), + )); } async fn authentication( @@ -179,7 +191,9 @@ impl EigenDAClient { }, )), }; - sender.send(request).map_err(|_| EigenDAError::PutError)?; + sender + .send(request) + .map_err(|e| EigenDAError::PutError(e.to_string()))?; let receiver_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(receiver); let mut stream = self .disperser @@ -187,15 +201,13 @@ impl EigenDAClient { .await .disperse_blob_authenticated(receiver_stream) .await - .map_err(|e| { - println!("Error {:?}", e); - EigenDAError::PutError - })?; + .map_err(|e| EigenDAError::PutError(e.to_string()))?; - let result = stream.get_mut().message().await.map_err(|e| { - println!("Error {:?}", e); - EigenDAError::PutError - })?; + let result = stream + .get_mut() + .message() + .await + .map_err(|e| EigenDAError::PutError(e.to_string()))?; let reply = if let Some(AuthenticatedReply { payload: Some(Payload::BlobAuthHeader(header)), @@ -213,11 +225,12 @@ impl EigenDAClient { }; sender .send(new_request) - .map_err(|_| EigenDAError::PutError)?; - let result = stream.get_mut().message().await.map_err(|e| { - println!("Error {:?}", e); - EigenDAError::PutError - })?; + .map_err(|e| EigenDAError::PutError(e.to_string()))?; + let result = stream + .get_mut() + .message() + .await + .map_err(|e| EigenDAError::PutError(e.to_string()))?; let reply = if let Some(AuthenticatedReply { payload: Some(Payload::DisperseReply(reply)), @@ -225,11 +238,11 @@ impl EigenDAClient { { reply } else { - return Err(EigenDAError::PutError); + return Err(EigenDAError::PutError("Failed to authenticate".to_string())); }; reply } else { - return Err(EigenDAError::PutError); + return Err(EigenDAError::PutError("Failed to authenticate".to_string())); }; Ok(reply) @@ -240,9 +253,8 @@ impl EigenDAClient { blob_data: Vec, ) -> Result, EigenDAError> { tracing::info!("Putting blob"); - println!("Putting blob"); if blob_data.len() > self.config.blob_size_limit as usize { - return Err(EigenDAError::PutError); + return Err(EigenDAError::PutError("Blob too large".to_string())); } let custom_quorum_numbers = self @@ -252,10 +264,8 @@ impl EigenDAClient { .unwrap_or_default(); let account_id = self.config.account_id.clone().unwrap_or_default(); let secp = Secp256k1::new(); - let secret_key = SecretKey::from_str(account_id.as_str()).map_err(|e| { - println!("Error: {:?}", e); - EigenDAError::PutError - })?; + let secret_key = SecretKey::from_str(account_id.as_str()) + .map_err(|e| EigenDAError::PutError(e.to_string()))?; let public_key = PublicKey::from_secret_key(&secp, &secret_key); let account_id = "0x".to_string() + &hex::encode(public_key.serialize_uncompressed()); @@ -263,14 +273,14 @@ impl EigenDAClient { .authentication(blob_data, custom_quorum_numbers, account_id, &secret_key) .await?; - println!("Reply: {:?}", reply); - if self.result_to_status(reply.result) == disperser::BlobStatus::Failed { - return Err(EigenDAError::PutError); + return Err(EigenDAError::PutError( + "Failed to disperse blob".to_string(), + )); } - let request_id_str = - String::from_utf8(reply.request_id.clone()).map_err(|_| EigenDAError::PutError)?; + let request_id_str = String::from_utf8(reply.request_id.clone()) + .map_err(|e| EigenDAError::PutError(e.to_string()))?; let mut interval = interval(Duration::from_secs(self.config.status_query_interval)); let start_time = Instant::now(); @@ -283,7 +293,7 @@ impl EigenDAClient { request_id: reply.request_id.clone(), }) .await - .map_err(|_| EigenDAError::PutError)? + .map_err(|e| EigenDAError::PutError(e.to_string()))? .into_inner(); let blob_status = blob_status_reply.status(); @@ -294,11 +304,6 @@ impl EigenDAClient { blob_status ); - println!( - "Dispersing blob {:?}, status: {:?}", - request_id_str, blob_status - ); - match blob_status { disperser::BlobStatus::Unknown => { interval.tick().await; @@ -312,39 +317,49 @@ impl EigenDAClient { } else { match blob_status_reply.info { Some(info) => { - let blob_info = - BlobInfo::try_from(info).map_err(|_| EigenDAError::PutError)?; + let blob_info = BlobInfo::try_from(info) + .map_err(|e| EigenDAError::PutError(e.to_string()))?; return Ok(rlp::encode(&blob_info).to_vec()); } None => { - return Err(EigenDAError::PutError); + return Err(EigenDAError::PutError( + "Failed to get blob info".to_string(), + )); } } } } disperser::BlobStatus::Failed => { - return Err(EigenDAError::PutError); + return Err(EigenDAError::PutError( + "Failed to disperse blob".to_string(), + )); } disperser::BlobStatus::InsufficientSignatures => { - return Err(EigenDAError::PutError); + return Err(EigenDAError::PutError( + "Insufficient signatures".to_string(), + )); } disperser::BlobStatus::Dispersing => { interval.tick().await; } disperser::BlobStatus::Finalized => match blob_status_reply.info { Some(info) => { - let blob_info = - BlobInfo::try_from(info).map_err(|_| EigenDAError::PutError)?; + let blob_info = BlobInfo::try_from(info) + .map_err(|e| EigenDAError::PutError(e.to_string()))?; return Ok(rlp::encode(&blob_info).to_vec()); } None => { - return Err(EigenDAError::PutError); + return Err(EigenDAError::PutError( + "Failed to get blob info".to_string(), + )); } }, } } - return Err(EigenDAError::PutError); + return Err(EigenDAError::PutError( + "Failed to disperse blob".to_string(), + )); } pub async fn get_blob(&self, commit: Vec) -> Result, EigenDAError> { diff --git a/core/node/eigenda_proxy/src/errors.rs b/core/node/eigenda_proxy/src/errors.rs index 050c3f42d23..fdaf2e21bf9 100644 --- a/core/node/eigenda_proxy/src/errors.rs +++ b/core/node/eigenda_proxy/src/errors.rs @@ -12,6 +12,6 @@ pub enum EigenDAError { TlsError, UriError, ConnectionError, - PutError, + PutError(String), GetError, }