Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remember a fast block proposal: it remains locked even if re-proposed. #3140

Merged
merged 5 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 158 additions & 100 deletions linera-chain/src/manager.rs

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions linera-core/src/chain_worker/state/attempted_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ where
check_block_epoch(epoch, block)?;
certificate.check(committee)?;
let mut actions = NetworkActions::default();
let already_validated_block = self
let already_committed_block = self
.state
.chain
.tip_state
Expand All @@ -196,7 +196,7 @@ where
.check_validated_block(&certificate)
.map(|outcome| outcome == manager::Outcome::Skip)
};
if already_validated_block || should_skip_validated_block()? {
if already_committed_block || should_skip_validated_block()? {
// If we just processed the same pending block, return the chain info unchanged.
return Ok((
ChainInfoResponse::new(&self.state.chain, self.state.config.key_pair()),
Expand Down
96 changes: 65 additions & 31 deletions linera-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use linera_chain::{
Block, BlockProposal, ChainAndHeight, ExecutedBlock, IncomingBundle, LiteVote,
MessageAction,
},
manager::LockedBlock,
types::{
CertificateKind, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate,
GenericCertificate, LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock,
Expand Down Expand Up @@ -1764,13 +1765,38 @@ where
break;
}
}
if let Some(cert) = info.manager.requested_locked {
let hash = cert.hash();
if let Err(err) = self.try_process_locked_block_from(remote_node, cert).await {
warn!(
"Skipping certificate {hash} from validator {}: {err}",
remote_node.name
);
if let Some(locked) = info.manager.requested_locked {
match *locked {
LockedBlock::Fast(proposal) => {
let owner = proposal.owner;
while let Err(original_err) = self
.client
.local_node
.handle_block_proposal(proposal.clone())
.await
{
if let LocalNodeError::BlobsNotFound(blob_ids) = &original_err {
self.update_local_node_with_blobs_from(blob_ids.clone(), remote_node)
.await?;
continue; // We found the missing blobs: retry.
}

warn!(
"Skipping proposal from {} and validator {}: {}",
owner, remote_node.name, original_err
);
break;
}
}
LockedBlock::Regular(cert) => {
let hash = cert.hash();
if let Err(err) = self.try_process_locked_block_from(remote_node, cert).await {
warn!(
"Skipping certificate {hash} from validator {}: {err}",
remote_node.name
);
}
}
}
}
Ok(())
Expand All @@ -1779,10 +1805,10 @@ where
async fn try_process_locked_block_from(
&self,
remote_node: &RemoteNode<P::Node>,
certificate: Box<GenericCertificate<ValidatedBlock>>,
certificate: GenericCertificate<ValidatedBlock>,
) -> Result<(), ChainClientError> {
let chain_id = certificate.inner().chain_id();
match self.process_certificate(*certificate.clone(), vec![]).await {
match self.process_certificate(certificate.clone(), vec![]).await {
Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
let mut blobs = Vec::new();
for blob_id in blob_ids {
Expand All @@ -1792,7 +1818,7 @@ where
.await?;
blobs.push(Blob::new(blob_content));
}
self.process_certificate(*certificate, blobs).await?;
self.process_certificate(certificate, blobs).await?;
Ok(())
}
Err(err) => Err(err.into()),
Expand Down Expand Up @@ -2398,28 +2424,26 @@ where
let info = self.request_leader_timeout_if_needed().await?;

// If there is a validated block in the current round, finalize it.
if info.manager.has_locked_block_in_current_round() {
if info.manager.has_locked_block_in_current_round() && !info.manager.current_round.is_fast()
{
return self.finalize_locked_block(info).await;
}

// Otherwise we have to re-propose the highest validated block, if there is one.
let executed_block = if let Some(certificate) = &info.manager.requested_locked {
certificate.executed_block().clone()
} else {
let block = if let Some(proposal) = info
.manager
.requested_proposed
.as_ref()
.filter(|proposal| proposal.content.round.is_fast())
{
// The fast block counts as "locked", too. If there is one, re-propose that.
proposal.content.block.clone()
} else if let Some(block) = self.state().pending_block() {
block.clone() // Otherwise we are free to propose our own pending block.
} else {
return Ok(ClientOutcome::Committed(None)); // Nothing to do.
};
let pending: Option<Block> = self.state().pending_block().clone();
let executed_block = if let Some(locked) = &info.manager.requested_locked {
match &**locked {
LockedBlock::Regular(certificate) => certificate.executed_block().clone(),
LockedBlock::Fast(proposal) => {
let block = proposal.content.block.clone();
self.stage_block_execution(block).await?.0
}
}
} else if let Some(block) = pending {
// Otherwise we are free to propose our own pending block.
self.stage_block_execution(block).await?.0
} else {
return Ok(ClientOutcome::Committed(None)); // Nothing to do.
};

let identity = self.identity().await?;
Expand All @@ -2434,8 +2458,15 @@ where
let already_handled_locally = info.manager.already_handled_proposal(round, block);
let key_pair = self.key_pair().await?;
// Create the final block proposal.
let proposal = if let Some(cert) = info.manager.requested_locked {
Box::new(BlockProposal::new_retry(round, *cert, &key_pair, blobs))
let proposal = if let Some(locked) = info.manager.requested_locked {
Box::new(match *locked {
LockedBlock::Regular(cert) => {
BlockProposal::new_retry(round, cert, &key_pair, blobs)
}
LockedBlock::Fast(proposal) => {
BlockProposal::new_initial(round, proposal.content.block, &key_pair, blobs)
}
})
} else {
let block = executed_block.block.clone();
Box::new(BlockProposal::new_initial(round, block, &key_pair, blobs))
Expand Down Expand Up @@ -2488,12 +2519,15 @@ where
&self,
info: Box<ChainInfo>,
) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
let certificate = info
let locked = info
.manager
.requested_locked
.expect("Should have a locked block");
let LockedBlock::Regular(certificate) = *locked else {
panic!("Should have a locked validated block");
};
let committee = self.local_committee().await?;
match self.finalize_block(&committee, *certificate.clone()).await {
match self.finalize_block(&committee, certificate.clone()).await {
Ok(certificate) => Ok(ClientOutcome::Committed(Some(certificate))),
Err(ChainClientError::CommunicationError(error)) => {
// Communication errors in this case often mean that someone else already
Expand Down
3 changes: 1 addition & 2 deletions linera-core/src/remote_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,7 @@ impl<N: ValidatorNode> RemoteNode<N> {
let locked = manager.requested_locked.as_ref();
ensure!(
proposed.map_or(true, |proposal| proposal.content.block.chain_id == chain_id)
&& locked.map_or(true, |cert| cert.executed_block().block.chain_id
== chain_id)
&& locked.map_or(true, |locked| locked.chain_id() == chain_id)
&& response.check(&self.name).is_ok(),
NodeError::InvalidChainInfoResponse
);
Expand Down
52 changes: 33 additions & 19 deletions linera-core/src/unit_tests/client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use linera_base::{
};
use linera_chain::{
data_types::{IncomingBundle, Medium, MessageBundle, Origin, PostedMessage},
manager::LockedBlock,
types::Timeout,
ChainError, ChainExecutionContext,
};
Expand Down Expand Up @@ -1374,10 +1375,13 @@ where
// We make validator 3 (who does not have the block proposal) process the validated block.
let info2_a = client2_a.chain_info_with_manager_values().await?;
let locked = *info2_a.manager.requested_locked.unwrap();
let LockedBlock::Regular(validated) = locked else {
panic!("Unexpected locked fast block.");
};
let blobs = vec![blob1];
let response = builder
.node(3)
.handle_validated_certificate(locked.clone(), blobs)
.handle_validated_certificate(validated.clone(), blobs)
.await?;
assert_eq!(
response.info.manager.pending.unwrap().round,
Expand All @@ -1387,7 +1391,10 @@ where
// Client 2B should be able to synchronize the locked block and the blobs from validator 3.
client2_b.synchronize_from_validators().await.unwrap();
let info2_b = client2_b.chain_info_with_manager_values().await?;
assert_eq!(locked, *info2_b.manager.requested_locked.unwrap());
assert_eq!(
LockedBlock::Regular(validated),
*info2_b.manager.requested_locked.unwrap()
);
let bt_certificate = client2_b
.burn(None, Amount::from_tokens(1))
.await
Expand Down Expand Up @@ -1665,7 +1672,10 @@ where
// Validator 2 may or may not have processed the validated block before the update was
// canceled due to the errors from the faulty validators. Submit it again to make sure
// it's there, so that client 2 can download and re-propose it later.
let validated_block_certificate = *manager.requested_locked.unwrap();
let locked = *manager.requested_locked.unwrap();
let LockedBlock::Regular(validated_block_certificate) = locked else {
panic!("Unexpected locked fast block.");
};
let resubmission_result = builder
.node(2)
.handle_validated_certificate(validated_block_certificate, vec![blob1])
Expand All @@ -1690,13 +1700,12 @@ where
);

if i == 2 {
let locked = *validator_manager.requested_locked.unwrap();
let LockedBlock::Regular(validated) = locked else {
panic!("Unexpected locked fast block.");
};
assert_eq!(
validator_manager
.requested_locked
.unwrap()
.executed_block()
.block
.operations,
validated.executed_block().block.operations,
blob_0_1_operations,
);
} else {
Expand Down Expand Up @@ -1733,7 +1742,10 @@ where
// Validator 3 may or may not have processed the validated block before the update was
// canceled due to the errors from the faulty validators. Submit it again to make sure
// it's there, so that client 2 can download and re-propose it later.
let validated_block_certificate = *manager.requested_locked.unwrap();
let locked = *manager.requested_locked.unwrap();
let LockedBlock::Regular(validated_block_certificate) = locked else {
panic!("Unexpected locked fast block.");
};
let resubmission_result = builder
.node(3)
.handle_validated_certificate(validated_block_certificate, vec![blob3])
Expand All @@ -1755,13 +1767,12 @@ where
.operations,
blob_2_3_operations,
);
let locked = *validator_manager.requested_locked.unwrap();
let LockedBlock::Regular(validated) = locked else {
panic!("Unexpected locked fast block.");
};
assert_eq!(
validator_manager
.requested_locked
.unwrap()
.executed_block()
.block
.operations,
validated.executed_block().block.operations,
blob_2_3_operations,
);

Expand Down Expand Up @@ -1996,7 +2007,7 @@ where
.unwrap()
.manager;
assert_eq!(
manager.requested_locked.unwrap().round,
manager.requested_locked.unwrap().round(),
Round::MultiLeader(1)
);
assert!(client0.pending_block().is_some());
Expand Down Expand Up @@ -2112,7 +2123,10 @@ where
// Validator 0 may or may not have processed the validated block before the update was
// canceled due to the errors from the faulty validators. Submit it again to make sure
// it's there, so that client 1 can download and re-propose it later.
let validated_block_certificate = *manager.requested_locked.unwrap();
let locked = *manager.requested_locked.unwrap();
let LockedBlock::Regular(validated_block_certificate) = locked else {
panic!("Unexpected locked fast block.");
};
builder
.node(0)
.handle_validated_certificate(validated_block_certificate, Vec::new())
Expand Down Expand Up @@ -2149,7 +2163,7 @@ where
.unwrap()
.manager;
assert_eq!(
manager.requested_locked.unwrap().round,
manager.requested_locked.unwrap().round(),
Round::MultiLeader(0)
);
assert_eq!(manager.current_round, Round::MultiLeader(1));
Expand Down
Loading
Loading