Skip to content

Commit

Permalink
A0-4160: Make packing part of unit creation (#425)
Browse files Browse the repository at this point in the history
* Make packing part of unit creation

* Better variable name from review and doc update

* Small review change
  • Loading branch information
timorleph authored Mar 26, 2024
1 parent 43ab291 commit 4795e10
Show file tree
Hide file tree
Showing 12 changed files with 292 additions and 478 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion consensus/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "aleph-bft"
version = "0.36.0"
version = "0.36.1"
edition = "2021"
authors = ["Cardinal Cryptography"]
categories = ["algorithms", "data-structures", "cryptography", "database"]
Expand Down
34 changes: 25 additions & 9 deletions consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,39 @@ use log::{debug, error};

use crate::{
config::Config,
creation,
creation::{self, SignedUnitWithParents},
extension::Service as Extender,
handle_task_termination,
reconstruction::Service as ReconstructionService,
runway::{NotificationIn, NotificationOut},
Hasher, Receiver, Round, Sender, SpawnHandle, Terminator,
Data, DataProvider, Hasher, MultiKeychain, Receiver, Round, Sender, SpawnHandle, Terminator,
};

pub(crate) async fn run<H: Hasher + 'static>(
pub struct IO<H: Hasher, D: Data, MK: MultiKeychain, DP: DataProvider<D>> {
pub incoming_notifications: Receiver<NotificationIn<H>>,
pub outgoing_notifications: Sender<NotificationOut<H>>,
pub units_for_runway: Sender<SignedUnitWithParents<H, D, MK>>,
pub data_provider: DP,
pub ordered_batch_tx: Sender<Vec<H::Hash>>,
pub starting_round: oneshot::Receiver<Option<Round>>,
}

pub async fn run<H: Hasher, D: Data, MK: MultiKeychain, DP: DataProvider<D>>(
conf: Config,
incoming_notifications: Receiver<NotificationIn<H>>,
outgoing_notifications: Sender<NotificationOut<H>>,
ordered_batch_tx: Sender<Vec<H::Hash>>,
io: IO<H, D, MK, DP>,
keychain: MK,
spawn_handle: impl SpawnHandle,
starting_round: oneshot::Receiver<Option<Round>>,
mut terminator: Terminator,
) {
debug!(target: "AlephBFT", "{:?} Starting all services...", conf.node_ix());
let IO {
incoming_notifications,
outgoing_notifications,
units_for_runway,
data_provider,
ordered_batch_tx,
starting_round,
} = io;

let index = conf.node_ix();

Expand All @@ -41,13 +56,14 @@ pub(crate) async fn run<H: Hasher + 'static>(

let creator_terminator = terminator.add_offspring_connection("creator");
let io = creation::IO {
outgoing_units: outgoing_notifications.clone(),
outgoing_units: units_for_runway,
incoming_parents: parents_from_dag,
data_provider,
};
let creator_handle = spawn_handle
.spawn_essential(
"consensus/creation",
creation::run(conf.into(), io, starting_round, creator_terminator),
creation::run(conf, io, keychain, starting_round, creator_terminator),
)
.shared();
let creator_handle_for_panic = creator_handle.clone();
Expand Down
120 changes: 53 additions & 67 deletions consensus/src/creation/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use crate::{
config::{Config as GeneralConfig, DelaySchedule},
runway::NotificationOut,
units::{PreUnit, Unit},
Hasher, NodeCount, NodeIndex, Receiver, Round, Sender, Terminator,
config::Config,
units::{PreUnit, SignedUnit, Unit},
Data, DataProvider, Hasher, MultiKeychain, Receiver, Round, Sender, Terminator,
};
use futures::{
channel::{
Expand All @@ -13,41 +12,14 @@ use futures::{
};
use futures_timer::Delay;
use log::{debug, error, trace, warn};
use std::fmt::{Debug, Formatter};

mod creator;
mod packer;

pub use creator::Creator;
use packer::Packer;

/// The configuration needed for the process creating new units.
#[derive(Clone)]
pub struct Config {
node_id: NodeIndex,
n_members: NodeCount,
create_lag: DelaySchedule,
max_round: Round,
}

impl Debug for Config {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Config")
.field("node id", &self.node_id)
.field("member count", &self.n_members)
.field("max round", &self.max_round)
.finish()
}
}

impl From<GeneralConfig> for Config {
fn from(conf: GeneralConfig) -> Self {
Config {
node_id: conf.node_ix(),
n_members: conf.n_members(),
create_lag: conf.delay_config().unit_creation_delay.clone(),
max_round: conf.max_round(),
}
}
}
const LOG_TARGET: &str = "AlephBFT-creator";

enum CreatorError {
OutChannelClosed(SendError),
Expand All @@ -60,9 +32,12 @@ impl<T> From<TrySendError<T>> for CreatorError {
}
}

pub struct IO<H: Hasher> {
pub(crate) incoming_parents: Receiver<Unit<H>>,
pub(crate) outgoing_units: Sender<NotificationOut<H>>,
pub type SignedUnitWithParents<H, D, MK> = (SignedUnit<H, D, MK>, Vec<<H as Hasher>::Hash>);

pub struct IO<H: Hasher, D: Data, MK: MultiKeychain, DP: DataProvider<D>> {
pub incoming_parents: Receiver<Unit<H>>,
pub outgoing_units: Sender<SignedUnitWithParents<H, D, MK>>,
pub data_provider: DP,
}

async fn create_unit<H: Hasher>(
Expand All @@ -74,7 +49,7 @@ async fn create_unit<H: Hasher>(
match creator.create_unit(round) {
Ok(unit) => return Ok(unit),
Err(err) => {
trace!(target: "AlephBFT-creator", "Creator unable to create a new unit at round {}: {}.", round, err)
trace!(target: LOG_TARGET, "Creator unable to create a new unit at round {}: {}.", round, err)
}
}
process_unit(creator, incoming_parents).await?;
Expand Down Expand Up @@ -114,7 +89,7 @@ async fn keep_processing_units_until<H: Hasher>(
result?
},
_ = until.fuse() => {
debug!(target: "AlephBFT-creator", "Delay passed.");
debug!(target: LOG_TARGET, "Delay passed.");
},
}
Ok(())
Expand All @@ -133,86 +108,97 @@ async fn keep_processing_units_until<H: Hasher>(
///
/// We refer to the documentation https://cardinal-cryptography.github.io/AlephBFT/internals.html
/// Section 5.1 for a discussion of this component.
pub async fn run<H: Hasher>(
pub async fn run<H: Hasher, D: Data, MK: MultiKeychain, DP: DataProvider<D>>(
conf: Config,
mut io: IO<H>,
mut io: IO<H, D, MK, DP>,
keychain: MK,
mut starting_round: oneshot::Receiver<Option<Round>>,
mut terminator: Terminator,
) {
futures::select! {
_ = read_starting_round_and_run_creator(conf, &mut io, &mut starting_round).fuse() =>
debug!(target: "AlephBFT-creator", "Creator is about to finish."),
_ = read_starting_round_and_run_creator(conf, &mut io, keychain, &mut starting_round).fuse() =>
debug!(target: LOG_TARGET, "Creator is about to finish."),
_ = terminator.get_exit().fuse() =>
debug!(target: "AlephBFT-creator", "Received an exit signal."),
debug!(target: LOG_TARGET, "Received an exit signal."),
}

terminator.terminate_sync().await;
}

async fn read_starting_round_and_run_creator<H: Hasher>(
async fn read_starting_round_and_run_creator<
H: Hasher,
D: Data,
MK: MultiKeychain,
DP: DataProvider<D>,
>(
conf: Config,
io: &mut IO<H>,
io: &mut IO<H, D, MK, DP>,
keychain: MK,
starting_round: &mut oneshot::Receiver<Option<Round>>,
) {
let maybe_round = starting_round.await;
let starting_round = match maybe_round {
Ok(Some(round)) => round,
Ok(None) => {
warn!(target: "AlephBFT-creator", "None starting round provided. Exiting.");
warn!(target: LOG_TARGET, "None starting round provided. Exiting.");
return;
}
Err(e) => {
error!(target: "AlephBFT-creator", "Starting round not provided: {}", e);
error!(target: LOG_TARGET, "Starting round not provided: {}", e);
return;
}
};

if let Err(err) = run_creator(conf, io, starting_round).await {
if let Err(err) = run_creator(conf, io, keychain, starting_round).await {
match err {
CreatorError::OutChannelClosed(e) => {
warn!(target: "AlephBFT-creator", "Notification send error: {}. Exiting.", e)
warn!(target: LOG_TARGET, "Notification send error: {}. Exiting.", e)
}
CreatorError::ParentsChannelClosed => {
debug!(target: "AlephBFT-creator", "Incoming parent channel closed, exiting.")
debug!(target: LOG_TARGET, "Incoming parent channel closed, exiting.")
}
}
}
}

async fn run_creator<H: Hasher>(
async fn run_creator<H: Hasher, D: Data, MK: MultiKeychain, DP: DataProvider<D>>(
conf: Config,
io: &mut IO<H>,
io: &mut IO<H, D, MK, DP>,
keychain: MK,
starting_round: Round,
) -> anyhow::Result<(), CreatorError> {
let Config {
node_id,
n_members,
create_lag,
max_round,
} = conf;
let node_id = conf.node_ix();
let n_members = conf.n_members();
let create_delay = conf.delay_config().unit_creation_delay.clone();
let max_round = conf.max_round();
let session_id = conf.session_id();
let mut creator = Creator::new(node_id, n_members);
let packer = Packer::new(keychain, session_id);
let incoming_parents = &mut io.incoming_parents;
let outgoing_units = &io.outgoing_units;
let data_provider = &mut io.data_provider;

debug!(target: "AlephBFT-creator", "Creator starting from round {}", starting_round);
debug!(target: LOG_TARGET, "Creator starting from round {}", starting_round);
for round in starting_round..max_round {
// Skip waiting if someone created a unit of a higher round.
// In such a case at least 2/3 nodes created units from this round so we aren't skipping a
// delay we should observe.
let skip_delay = creator.current_round() > round;
if !skip_delay {
let lag = Delay::new(create_lag(round.into()));
let delay = Delay::new(create_delay(round.into()));

keep_processing_units_until(&mut creator, incoming_parents, lag).await?;
keep_processing_units_until(&mut creator, incoming_parents, delay).await?;
}

let (unit, parent_hashes) = create_unit(round, &mut creator, incoming_parents).await?;

trace!(target: "AlephBFT-creator", "Created a new unit {:?} at round {:?}.", unit, round);
let (preunit, parent_hashes) = create_unit(round, &mut creator, incoming_parents).await?;
trace!(target: LOG_TARGET, "Created a new preunit {:?} at round {:?}.", preunit, round);
let data = data_provider.get_data().await;
trace!(target: LOG_TARGET, "Received data: {:?}.", data);
let unit = packer.pack(preunit, data);

outgoing_units.unbounded_send(NotificationOut::CreatedPreUnit(unit, parent_hashes))?;
outgoing_units.unbounded_send((unit, parent_hashes))?;
}

warn!(target: "AlephBFT-creator", "Maximum round reached. Not creating another unit.");
warn!(target: LOG_TARGET, "Maximum round reached. Not creating another unit.");
Ok(())
}
31 changes: 31 additions & 0 deletions consensus/src/creation/packer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use crate::{
units::{FullUnit, PreUnit, SignedUnit},
Data, Hasher, MultiKeychain, SessionId, Signed,
};

/// The component responsible for packing Data into PreUnits,
/// and signing the outcome, thus creating SignedUnits that are sent back to Runway.
pub struct Packer<MK: MultiKeychain> {
keychain: MK,
session_id: SessionId,
}

impl<MK: MultiKeychain> Packer<MK> {
pub fn new(keychain: MK, session_id: SessionId) -> Self {
Packer {
keychain,
session_id,
}
}

pub fn pack<H: Hasher, D: Data>(
&self,
preunit: PreUnit<H>,
data: Option<D>,
) -> SignedUnit<H, D, MK> {
Signed::sign(
FullUnit::new(preunit, data, self.session_id),
&self.keychain,
)
}
}
2 changes: 1 addition & 1 deletion consensus/src/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ pub async fn run_session<
runway::run(
config_copy,
runway_io,
&keychain,
keychain.clone(),
spawn_copy,
network_io,
runway_terminator,
Expand Down
Loading

0 comments on commit 4795e10

Please sign in to comment.