Skip to content

Commit

Permalink
Extract some code from synchronize_received_certificates_from_validator.
Browse files Browse the repository at this point in the history
  • Loading branch information
afck committed Oct 30, 2024
1 parent d0df87e commit e213369
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 58 deletions.
119 changes: 61 additions & 58 deletions linera-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ use linera_base::{
};
use linera_chain::{
data_types::{
Block, BlockProposal, Certificate, CertificateValue, ExecutedBlock, HashedCertificateValue,
IncomingBundle, LiteCertificate, LiteVote, MessageAction, PostedMessage,
Block, BlockProposal, Certificate, CertificateValue, ChainAndHeight, ExecutedBlock,
HashedCertificateValue, IncomingBundle, LiteCertificate, LiteVote, MessageAction,
PostedMessage,
},
manager::ChainManagerInfo,
ChainError, ChainExecutionContext, ChainStateView,
Expand Down Expand Up @@ -1160,77 +1161,41 @@ where
.await
.map_err(|_| NodeError::InvalidChainInfoResponse)?;

// Retrieve newly received certificates from this validator.
// Retrieve the list of newly received certificates from this validator.
let query = ChainInfoQuery::new(chain_id).with_received_log_excluding_first_n(tracker);
let info = remote_node.handle_chain_info_query(query).await?;
let mut other_sender_chains = Vec::new();
let remote_log = info.requested_received_log;
let remote_node_chains_view = remote_log.iter().fold(
BTreeMap::<ChainId, BlockHeight>::new(),
|mut chain_to_info, entry| {
chain_to_info
.entry(entry.chain_id)
.and_modify(|h| *h = entry.height.max(*h))
.or_insert(entry.height);
chain_to_info
},
);
let remote_max_heights = Self::max_height_per_chain(&remote_log);

// Obtain the chain heights we already have from the local node.
let futures = remote_node_chains_view
.into_iter()
.map(|(sender_chain_id, remote_height)| async move {
let query = ChainInfoQuery::new(sender_chain_id);
let local_response = self.local_query(query).await?;
Ok::<_, NodeError>((
sender_chain_id,
remote_height,
local_response.info.next_block_height,
))
})
.collect::<Vec<_>>();
let heights = stream::iter(futures)
.buffer_unordered(chain_worker_limit)
.try_collect::<Vec<_>>()
// Obtain the next block height we need in the local node, for each chain.
let local_next_heights = self
.local_next_block_heights(remote_max_heights.keys(), chain_worker_limit)
.await?;

// We keep track of the height we've successfully downloaded and checked, per chain.
let mut downloaded_heights = BTreeMap::new();
// And we make a list of chains we already fully have locally. We need to make sure to
// put all their sent messages into the inbox.
let mut other_sender_chains = Vec::new();

let certificate_hashes = future::try_join_all(heights.into_iter().filter_map(
|(sender_chain_id, remote_height, local_next)| {
let certificate_hashes = future::try_join_all(remote_max_heights.into_iter().filter_map(
|(sender_chain_id, remote_height)| {
let local_next = *local_next_heights.get(&sender_chain_id)?;
if let Ok(height) = local_next.try_sub_one() {
downloaded_heights.insert(sender_chain_id, height);
}

// Find the first and last block height in the batch that we need.
if local_next > remote_height {
let Some(diff) = remote_height.0.checked_sub(local_next.0) else {
// Our highest, locally-known block is higher than any block height
// from the current batch. Skip this batch, but remember to wait for
// the messages to be delivered to the inboxes.
other_sender_chains.push(sender_chain_id);
return None;
}
};

// Find the hashes of the blocks we need.
Some(async move {
let batch_size = remote_height.saturating_sub(local_next).0.saturating_add(1);
let block_batch_range = BlockHeightRange::multi(local_next, batch_size);
let query = ChainInfoQuery::new(sender_chain_id)
.with_sent_certificate_hashes_in_range(block_batch_range.clone());
let remote_response = remote_node.handle_chain_info_query(query).await?;
let hashes = remote_response.requested_sent_certificate_hashes;

if hashes.len() as u64 != batch_size {
error!(
block_range = ?block_batch_range,
received_num = hashes.len(),
"Validator sent invalid number of certificate hashes."
);
return Err(NodeError::InvalidChainInfoResponse);
}
Ok(hashes)
})
let range = BlockHeightRange::multi(local_next, diff.saturating_add(1));
Some(remote_node.fetch_sent_certificate_hashes(sender_chain_id, range))
},
))
.await?
Expand All @@ -1250,10 +1215,7 @@ where
let sender_chain_id = certificate.value().chain_id();
let height = certificate.value().height();
let epoch = certificate.value().epoch();
match self
.check_certificate(max_epoch, &committees, &certificate)
.await?
{
match self.check_certificate(max_epoch, &committees, &certificate)? {
CheckCertificateResult::FutureEpoch => {
warn!(
"Postponing received certificate from {sender_chain_id:.8} at height \
Expand Down Expand Up @@ -1307,7 +1269,11 @@ where
})
}

async fn check_certificate(
#[tracing::instrument(
level = "trace", skip_all,
fields(certificate_hash = ?incoming_certificate.hash()),
)]
fn check_certificate(
&self,
highest_known_epoch: Epoch,
committees: &BTreeMap<Epoch, Committee>,
Expand Down Expand Up @@ -3229,6 +3195,43 @@ where
.await;
Ok(())
}

/// Given a list of chain IDs, returns a map that assigns to each of them the next block
/// height, i.e. the lowest block height that we have not processed in the local node yet.
///
/// It makes at most `chain_worker_limit` requests to the local node in parallel.
async fn local_next_block_heights(
&self,
chain_ids: impl IntoIterator<Item = &ChainId>,
chain_worker_limit: usize,
) -> Result<BTreeMap<ChainId, BlockHeight>, NodeError> {
let futures = chain_ids
.into_iter()
.map(|chain_id| async move {
let local_response = self.local_query(ChainInfoQuery::new(*chain_id)).await?;
Ok::<_, NodeError>((*chain_id, local_response.info.next_block_height))
})
.collect::<Vec<_>>();
stream::iter(futures)
.buffer_unordered(chain_worker_limit)
.try_collect()
.await
}

/// Given a set of chain ID-block height pairs, returns a map that assigns to each chain ID
/// the highest height seen.
fn max_height_per_chain(remote_log: &[ChainAndHeight]) -> BTreeMap<ChainId, BlockHeight> {
remote_log.iter().fold(
BTreeMap::<ChainId, BlockHeight>::new(),
|mut chain_to_info, entry| {
chain_to_info
.entry(entry.chain_id)
.and_modify(|h| *h = entry.height.max(*h))
.or_insert(entry.height);
chain_to_info
},
)
}
}

/// The outcome of trying to commit a list of incoming messages and operations to the chain.
Expand Down
28 changes: 28 additions & 0 deletions linera-core/src/remote_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{collections::HashMap, fmt};

use futures::future;
use linera_base::{
crypto::CryptoHash,
data_types::{Blob, BlockHeight},
ensure,
identifiers::{BlobId, ChainId},
Expand Down Expand Up @@ -210,4 +211,31 @@ impl<N: ValidatorNode> RemoteNode<N> {

Ok(found_blobs)
}

/// Returns the list of certificate hashes on the given chain in the given range of heights.
/// Returns an error if the number of hashes does not match the size of the range.
#[tracing::instrument(level = "trace")]
pub(crate) async fn fetch_sent_certificate_hashes(
&self,
chain_id: ChainId,
range: BlockHeightRange,
) -> Result<Vec<CryptoHash>, NodeError> {
let query =
ChainInfoQuery::new(chain_id).with_sent_certificate_hashes_in_range(range.clone());
let response = self.handle_chain_info_query(query).await?;
let hashes = response.requested_sent_certificate_hashes;

if range
.limit
.is_some_and(|limit| hashes.len() as u64 != limit)
{
warn!(
?range,
received_num = hashes.len(),
"Validator sent invalid number of certificate hashes."
);
return Err(NodeError::InvalidChainInfoResponse);
}
Ok(hashes)
}
}

0 comments on commit e213369

Please sign in to comment.