Skip to content

Commit

Permalink
Support the BFT protocol in the client. (linera-io#1305)
Browse files Browse the repository at this point in the history
* Make execute_block try harder.

* Introduce ClientOutcome.

* Add ClientOutcome checks to existing leader_timeout test.

* Extend the client tests.

* Rename OtherBlock to Conflict.

* Make ensure_admin_subscription parallel.

* Revert the increased sleeps in crowd-funding README.

* Retry on some proposal errors.

* Move ClientOutcome to data_types.

* Refactor return Oks.

* Add TODOs for error distinction and more accurate round timeout.
  • Loading branch information
afck authored Jan 2, 2024
1 parent cea12cf commit b8cfd4e
Show file tree
Hide file tree
Showing 8 changed files with 1,231 additions and 477 deletions.
4 changes: 3 additions & 1 deletion linera-core/benches/client_benchmarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ where
let cert = chain1
.transfer_to_account(None, amt, account, UserData(None))
.await
.unwrap()
.unwrap();

chain2.receive_certificate(cert).await.unwrap();
Expand All @@ -77,10 +78,11 @@ where
let cert = chain1
.claim(owner1, chain2.chain_id(), account, amt, UserData(None))
.await
.unwrap()
.unwrap();

chain2.receive_certificate(cert).await.unwrap();
let cert = chain2.process_inbox().await.unwrap().pop().unwrap();
let cert = chain2.process_inbox().await.unwrap().0.pop().unwrap();

chain1.receive_certificate(cert).await.unwrap();
chain1.process_inbox().await.unwrap();
Expand Down
630 changes: 445 additions & 185 deletions linera-core/src/client.rs

Large diffs are not rendered by default.

59 changes: 57 additions & 2 deletions linera-core/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
// Copyright (c) Zefchain Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::node::NodeError;
use crate::{client::ChainClientError, node::NodeError};
use linera_base::{
crypto::{BcsSignable, CryptoHash, KeyPair, Signature},
data_types::{Amount, BlockHeight, Timestamp},
data_types::{Amount, BlockHeight, Round, Timestamp},
identifiers::{ChainDescription, ChainId},
};
use linera_chain::{
Expand Down Expand Up @@ -264,3 +264,58 @@ impl ChainInfoResponse {
}

impl BcsSignable for ChainInfo {}

/// The outcome of trying to commit a list of operations to the chain.
#[derive(Debug)]
pub enum ClientOutcome<T> {
/// The operations were committed successfully.
Committed(T),
/// We are not the round leader and cannot do anything. Try again at the specified time or
/// or whenever the round or block height changes.
WaitForTimeout(RoundTimeout),
}

#[derive(Debug)]
pub struct RoundTimeout {
pub timestamp: Timestamp,
pub current_round: Round,
pub next_block_height: BlockHeight,
}

impl<T> ClientOutcome<T> {
#[cfg(any(test, feature = "test"))]
pub fn unwrap(self) -> T {
match self {
ClientOutcome::Committed(t) => t,
ClientOutcome::WaitForTimeout(_) => panic!(),
}
}

pub fn expect(self, msg: &'static str) -> T {
match self {
ClientOutcome::Committed(t) => t,
ClientOutcome::WaitForTimeout(_) => panic!("{}", msg),
}
}

pub fn map<F, S>(self, f: F) -> ClientOutcome<S>
where
F: FnOnce(T) -> S,
{
match self {
ClientOutcome::Committed(t) => ClientOutcome::Committed(f(t)),
ClientOutcome::WaitForTimeout(timeout) => ClientOutcome::WaitForTimeout(timeout),
}
}

#[allow(clippy::result_large_err)]
pub fn try_map<F, S>(self, f: F) -> Result<ClientOutcome<S>, ChainClientError>
where
F: FnOnce(T) -> Result<S, ChainClientError>,
{
match self {
ClientOutcome::Committed(t) => Ok(ClientOutcome::Committed(f(t)?)),
ClientOutcome::WaitForTimeout(timeout) => Ok(ClientOutcome::WaitForTimeout(timeout)),
}
}
}
105 changes: 93 additions & 12 deletions linera-core/src/unit_tests/client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod wasm;
use crate::{
client::{
client_test_utils::{FaultType, MakeMemoryStorage, StorageBuilder, TestBuilder},
ChainClient, ChainClientError, CommunicateAction, MessageAction,
ChainClient, ChainClientError, ClientOutcome, CommunicateAction, MessageAction,
},
local_node::LocalNodeError,
node::{
Expand Down Expand Up @@ -103,6 +103,7 @@ where
UserData(Some(*b"I paid 0.001 to pay you these 3!")),
)
.await
.unwrap()
.unwrap();
assert_eq!(sender.next_block_height, BlockHeight::from(1));
assert!(sender.pending_block.is_none());
Expand Down Expand Up @@ -177,6 +178,7 @@ where
UserData(None),
)
.await
.unwrap()
.unwrap();
assert_eq!(sender.local_balance().await.unwrap(), Amount::ONE);
receiver.receive_certificate(cert).await?;
Expand Down Expand Up @@ -205,10 +207,11 @@ where
UserData(None),
)
.await
.unwrap()
.unwrap();

receiver.receive_certificate(cert).await?;
let cert = receiver.process_inbox().await?.pop().unwrap();
let cert = receiver.process_inbox().await?.0.pop().unwrap();
{
let messages = &cert.value().block().unwrap().incoming_messages;
// Both `Claim` messages were included in the block.
Expand Down Expand Up @@ -268,7 +271,7 @@ where
.await?;
let new_key_pair = KeyPair::generate();
let new_owner = Owner::from(new_key_pair.public());
let certificate = sender.rotate_key_pair(new_key_pair).await.unwrap();
let certificate = sender.rotate_key_pair(new_key_pair).await.unwrap().unwrap();
assert_eq!(sender.next_block_height, BlockHeight::from(1));
assert!(sender.pending_block.is_none());
assert_eq!(sender.identity().await.unwrap(), new_owner);
Expand Down Expand Up @@ -341,6 +344,7 @@ where
let certificate = sender
.transfer_ownership(new_key_pair.public())
.await
.unwrap()
.unwrap();
assert_eq!(sender.next_block_height, BlockHeight::from(1));
assert!(sender.pending_block.is_none());
Expand Down Expand Up @@ -416,6 +420,7 @@ where
let certificate = sender
.share_ownership(new_key_pair.public(), 100)
.await
.unwrap()
.unwrap();
assert_eq!(sender.next_block_height, BlockHeight::from(1));
assert!(sender.pending_block.is_none());
Expand Down Expand Up @@ -592,6 +597,7 @@ where
let (message_id, certificate) = sender
.open_chain(ChainOwnership::single(new_key_pair.public()), Amount::ZERO)
.await
.unwrap()
.unwrap();
assert_eq!(sender.next_block_height, BlockHeight::from(1));
assert!(sender.pending_block.is_none());
Expand Down Expand Up @@ -671,6 +677,7 @@ where
let (open_chain_message_id, certificate) = parent
.open_chain(ChainOwnership::single(new_key_pair.public()), Amount::ZERO)
.await
.unwrap()
.unwrap();
let new_id2 = ChainId::child(open_chain_message_id);
assert_eq!(new_id, new_id2);
Expand Down Expand Up @@ -708,6 +715,7 @@ where
UserData::default(),
)
.await
.unwrap()
.unwrap();
client.receive_certificate(certificate2).await.unwrap();
assert_eq!(
Expand Down Expand Up @@ -783,6 +791,7 @@ where
let (open_chain_message_id, certificate) = sender
.open_chain(ChainOwnership::single(new_key_pair.public()), Amount::ZERO)
.await
.unwrap()
.unwrap();
let new_id2 = ChainId::child(open_chain_message_id);
assert_eq!(new_id, new_id2);
Expand Down Expand Up @@ -873,8 +882,11 @@ where
// Open the new chain. We are both regular and super owner.
let ownership = ChainOwnership::single(new_key_pair.public())
.with_regular_owner(new_key_pair.public(), 100);
let (message_id, creation_certificate) =
sender.open_chain(ownership, Amount::ZERO).await.unwrap();
let (message_id, creation_certificate) = sender
.open_chain(ownership, Amount::ZERO)
.await
.unwrap()
.unwrap();
let new_id = ChainId::child(message_id);
// Transfer after creating the chain.
let transfer_certificate = sender
Expand All @@ -885,6 +897,7 @@ where
UserData::default(),
)
.await
.unwrap()
.unwrap();
assert_eq!(sender.next_block_height, BlockHeight::from(2));
assert!(sender.pending_block.is_none());
Expand Down Expand Up @@ -955,7 +968,7 @@ where
let mut sender = builder
.add_initial_chain(ChainDescription::Root(1), Amount::from_tokens(4))
.await?;
let certificate = sender.close_chain().await.unwrap();
let certificate = sender.close_chain().await.unwrap().unwrap();
assert!(matches!(
&certificate.value(),
CertificateValue::ConfirmedBlock { executed_block: ExecutedBlock { block, .. }, .. } if matches!(
Expand Down Expand Up @@ -1113,6 +1126,7 @@ where
UserData::default(),
)
.await
.unwrap()
.unwrap();

assert_eq!(client1.next_block_height, BlockHeight::from(1));
Expand Down Expand Up @@ -1233,6 +1247,7 @@ where
UserData::default(),
)
.await
.unwrap()
.unwrap();
// Transfer was executed locally.
assert_eq!(
Expand Down Expand Up @@ -1367,6 +1382,7 @@ where
UserData::default(),
)
.await
.unwrap()
.unwrap();
// Blocks were executed locally.
assert_eq!(client1.local_balance().await.unwrap(), Amount::ONE);
Expand Down Expand Up @@ -1441,6 +1457,7 @@ where
UserData(None),
)
.await
.unwrap()
.unwrap();
admin
.transfer_to_account(
Expand All @@ -1450,6 +1467,7 @@ where
UserData(None),
)
.await
.unwrap()
.unwrap();

// User is still at the initial epoch, but we can receive transfers from future
Expand All @@ -1469,7 +1487,7 @@ where
assert_eq!(user.epoch().await.unwrap(), Epoch::ZERO);

// Now subscribe explicitly to migrations.
let cert = user.subscribe_to_new_committees().await.unwrap();
let cert = user.subscribe_to_new_committees().await.unwrap().unwrap();
admin.receive_certificate(cert).await.unwrap();
admin.process_inbox().await.unwrap();

Expand All @@ -1485,6 +1503,7 @@ where
UserData(None),
)
.await
.unwrap()
.unwrap();
assert!(matches!(
admin.receive_certificate(cert).await,
Expand All @@ -1510,6 +1529,7 @@ where
UserData(None),
)
.await
.unwrap()
.unwrap();
admin.receive_certificate(cert).await.unwrap();
// Transfer goes through and the previous one as well thanks to block chaining.
Expand Down Expand Up @@ -1620,13 +1640,74 @@ where
);
assert_eq!(certificate.round, Round::SingleLeader(0));

let expected_round = Round::SingleLeader(1);
builder
.check_that_validators_are_in_round(
chain_id,
BlockHeight::from(1),
Round::SingleLeader(1),
3,
.check_that_validators_are_in_round(chain_id, BlockHeight::from(1), expected_round, 3)
.await;

let round = loop {
let manager = client.chain_info().await.unwrap().manager;
if manager.leader == Some(Owner::from(pub_key1)) {
break manager.current_round;
}
clock.set(manager.round_timeout);
assert!(client.request_leader_timeout().await.is_ok());
};
let round_number = match round {
Round::SingleLeader(round_number) => round_number,
round => panic!("Unexpected round {:?}", round),
};

// The other owner is leader now. Trying to submit a block should return `WaitForTimeout`.
let result = client
.transfer(
None,
Amount::from(1),
Recipient::root(2),
UserData::default(),
)
.await
.unwrap();
let timeout = match result {
ClientOutcome::Committed(_) => panic!("Committed a block where we aren't the leader."),
ClientOutcome::WaitForTimeout(timeout) => timeout,
};
assert!(client.request_leader_timeout().await.is_err());
clock.set(timeout.timestamp);
client.request_leader_timeout().await.unwrap();
let expected_round = Round::SingleLeader(round_number + 1);
builder
.check_that_validators_are_in_round(chain_id, BlockHeight::from(1), expected_round, 3)
.await;

loop {
let manager = client.chain_info().await.unwrap().manager;
if manager.leader == Some(Owner::from(pub_key0)) {
break;
}
clock.set(manager.round_timeout);
assert!(client.request_leader_timeout().await.is_ok());
}

// Now we are the leader, and the transfer should succeed.
let _certificate = client
.transfer(
None,
Amount::from_tokens(1),
Recipient::root(2),
UserData::default(),
)
.await
.unwrap()
.unwrap();
assert_eq!(
client.local_balance().await.unwrap(),
Amount::from_tokens(2)
);

let expected_round = Round::SingleLeader(0);
builder
.check_that_validators_are_in_round(chain_id, BlockHeight::from(2), expected_round, 3)
.await;

Ok(())
Expand Down
Loading

0 comments on commit b8cfd4e

Please sign in to comment.