Skip to content

Commit

Permalink
Download blobs from the node that gave us the certificate. (#3034)
Browse files Browse the repository at this point in the history
  • Loading branch information
afck authored Dec 16, 2024
1 parent 5fcbc19 commit d80e059
Showing 1 changed file with 29 additions and 9 deletions.
38 changes: 29 additions & 9 deletions linera-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1185,7 +1185,8 @@ where
let block_chain_id = certificate.value().chain_id();
let block_height = certificate.value().height();

self.receive_certificate_internal(certificate, mode).await?;
self.receive_certificate_internal(certificate, mode, None)
.await?;

// Make sure a quorum of validators (according to our new local committee) are up-to-date
// for data availability.
Expand All @@ -1207,6 +1208,7 @@ where
&self,
certificate: Certificate,
mode: ReceiveCertificateMode,
nodes: Option<Vec<RemoteNode<P::Node>>>,
) -> Result<(), ChainClientError> {
let CertificateValue::ConfirmedBlock { executed_block, .. } = certificate.value() else {
return Err(ChainClientError::InternalError(
Expand All @@ -1227,9 +1229,13 @@ where
if let ReceiveCertificateMode::NeedsCheck = mode {
certificate.check(remote_committee)?;
}
// Recover history from the network. We assume that the committee that signed the
// certificate is still active.
let nodes = self.make_nodes(remote_committee)?;
// Recover history from the network.
let nodes = if let Some(nodes) = nodes {
nodes
} else {
// We assume that the committee that signed the certificate is still active.
self.make_nodes(remote_committee)?
};
self.client
.download_certificates(&nodes, block.chain_id, block.height)
.await?;
Expand Down Expand Up @@ -1443,7 +1449,10 @@ where
for certificate in certificates.into_values() {
let hash = certificate.hash();
let mode = ReceiveCertificateMode::AlreadyChecked;
if let Err(e) = client.receive_certificate_internal(certificate, mode).await {
if let Err(e) = client
.receive_certificate_internal(certificate, mode, None)
.await
{
warn!("Received invalid certificate {hash}: {e}");
}
}
Expand Down Expand Up @@ -1807,15 +1816,26 @@ where
for blob_id in blob_ids {
let mut certificate_stream = validators
.iter()
.map(|remote_node| remote_node.download_certificate_for_blob(blob_id))
.map(|remote_node| async move {
let cert = remote_node.download_certificate_for_blob(blob_id).await?;
Ok::<_, NodeError>((remote_node.clone(), cert))
})
.collect::<FuturesUnordered<_>>();
loop {
let Some(result) = certificate_stream.next().await else {
missing_blobs.push(blob_id);
break;
};
if let Ok(cert) = result {
if self.receive_certificate(cert).await.is_ok() {
if let Ok((remote_node, cert)) = result {
if self
.receive_certificate_internal(
cert,
ReceiveCertificateMode::NeedsCheck,
Some(vec![remote_node]),
)
.await
.is_ok()
{
break;
}
}
Expand Down Expand Up @@ -2578,7 +2598,7 @@ where
&self,
certificate: Certificate,
) -> Result<(), ChainClientError> {
self.receive_certificate_internal(certificate, ReceiveCertificateMode::NeedsCheck)
self.receive_certificate_internal(certificate, ReceiveCertificateMode::NeedsCheck, None)
.await
}

Expand Down

0 comments on commit d80e059

Please sign in to comment.