Skip to content

Commit

Permalink
Add error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
gianbelinche committed Oct 21, 2024
1 parent ac5f3f1 commit 0ae04c3
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 22 deletions.
61 changes: 39 additions & 22 deletions core/node/eigenda_proxy/src/eigenda_proxy_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use zksync_config::configs::da_client::eigen_da::DisperserConfig;
use crate::{
blob_info::BlobInfo,
disperser::{self, disperser_client::DisperserClient, BlobStatusRequest, DisperseBlobRequest},
errors::EigenDAError,
};

pub struct EigenDAProxyClient {
Expand All @@ -24,12 +25,25 @@ impl EigenDAProxyClient {
pub const STATUS_QUERY_TIMEOUT: u64 = 1800; // 30 minutes todo: add to config
pub const STATUS_QUERY_RETRY_INTERVAL: u64 = 5; // 5 seconds todo: add to config
pub const WAIT_FOR_FINALAZATION: bool = false; // todo: add to config
pub async fn new(config: DisperserConfig) -> anyhow::Result<Self> {
rustls::crypto::ring::default_provider().install_default();
let inner = Channel::builder(config.disperser_rpc.parse()?)
.tls_config(ClientTlsConfig::new().with_native_roots())?;

let disperser = Arc::new(Mutex::new(DisperserClient::connect(inner).await?));
pub async fn new(config: DisperserConfig) -> Result<Self, EigenDAError> {
match rustls::crypto::ring::default_provider().install_default() {
Ok(_) => {}
Err(_) => {} // This is not an actual error, we expect this function to return an Err(Arc<CryptoProvider>)
};
let inner = Channel::builder(
config
.disperser_rpc
.parse()
.map_err(|_| EigenDAError::UriError)?,
)
.tls_config(ClientTlsConfig::new().with_native_roots())
.map_err(|_| EigenDAError::TlsError)?;

let disperser = Arc::new(Mutex::new(
DisperserClient::connect(inner)
.await
.map_err(|_| EigenDAError::ConnectionError)?,
));

Ok(Self { disperser, config })
}
Expand All @@ -47,7 +61,7 @@ impl EigenDAProxyClient {
}
}

pub async fn put_blob(&self, blob_data: Vec<u8>) -> Result<Vec<u8>, ()> {
pub async fn put_blob(&self, blob_data: Vec<u8>) -> Result<Vec<u8>, EigenDAError> {
println!("Putting blob");
let reply = self
.disperser
Expand All @@ -63,14 +77,15 @@ impl EigenDAProxyClient {
account_id: self.config.account_id.clone().unwrap_or_default(),
})
.await
.unwrap()
.map_err(|_| EigenDAError::PutError)?
.into_inner();

if self.result_to_status(reply.result) == disperser::BlobStatus::Failed {
return Err(());
return Err(EigenDAError::PutError);
}

let request_id_str = String::from_utf8(reply.request_id.clone()).unwrap();
let request_id_str =
String::from_utf8(reply.request_id.clone()).map_err(|_| EigenDAError::PutError)?;

let mut interval = interval(Duration::from_secs(Self::STATUS_QUERY_RETRY_INTERVAL));
let start_time = Instant::now();
Expand All @@ -83,7 +98,7 @@ impl EigenDAProxyClient {
request_id: reply.request_id.clone(),
})
.await
.unwrap()
.map_err(|_| EigenDAError::PutError)?
.into_inner();

let blob_status = blob_status_reply.status();
Expand All @@ -106,42 +121,44 @@ impl EigenDAProxyClient {
} else {
match blob_status_reply.info {
Some(info) => {
let blob_info = BlobInfo::try_from(info).map_err(|_| ())?;
let blob_info =
BlobInfo::try_from(info).map_err(|_| EigenDAError::PutError)?;
return Ok(rlp::encode(&blob_info).to_vec());
}
None => {
return Err(());
return Err(EigenDAError::PutError);
}
}
}
}
disperser::BlobStatus::Failed => {
return Err(());
return Err(EigenDAError::PutError);
}
disperser::BlobStatus::InsufficientSignatures => {
return Err(());
return Err(EigenDAError::PutError);
}
disperser::BlobStatus::Dispersing => {
interval.tick().await;
}
disperser::BlobStatus::Finalized => match blob_status_reply.info {
Some(info) => {
let blob_info = BlobInfo::try_from(info).map_err(|_| ())?;
let blob_info =
BlobInfo::try_from(info).map_err(|_| EigenDAError::PutError)?;
return Ok(rlp::encode(&blob_info).to_vec());
}
None => {
return Err(());
return Err(EigenDAError::PutError);
}
},
}
}

return Err(());
return Err(EigenDAError::PutError);
}

pub async fn get_blob(&self, commit: Vec<u8>) -> Result<Vec<u8>, ()> {
pub async fn get_blob(&self, commit: Vec<u8>) -> Result<Vec<u8>, EigenDAError> {
println!("Getting blob");
let blob_info: BlobInfo = decode(&commit).map_err(|_| ())?;
let blob_info: BlobInfo = decode(&commit).map_err(|_| EigenDAError::GetError)?;
let blob_index = blob_info.blob_verification_proof.blob_index;
let batch_header_hash = blob_info
.blob_verification_proof
Expand All @@ -156,11 +173,11 @@ impl EigenDAProxyClient {
blob_index,
})
.await
.unwrap()
.map_err(|_| EigenDAError::GetError)?
.into_inner();

if get_response.data.len() == 0 {
return Err(());
return Err(EigenDAError::GetError);
}

return Ok(get_response.data);
Expand Down
9 changes: 9 additions & 0 deletions core/node/eigenda_proxy/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,12 @@ pub enum MemStoreError {
IncorrectCommitment,
BlobNotFound,
}

#[derive(Debug)]
pub enum EigenDAError {
TlsError,
UriError,
ConnectionError,
PutError,
GetError,
}

0 comments on commit 0ae04c3

Please sign in to comment.