Skip to content

Commit

Permalink
A0-4155: Unit saving pipeline (#432)
Browse files Browse the repository at this point in the history
* Unit saving pipeline

* Do not save parents after all

In particular this means that this doesn't change the API, so only patch
version bump needed.
  • Loading branch information
timorleph authored Apr 12, 2024
1 parent 82151d8 commit 3a2bafc
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 70 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion consensus/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "aleph-bft"
version = "0.36.3"
version = "0.36.4"
edition = "2021"
authors = ["Cardinal Cryptography"]
categories = ["algorithms", "data-structures", "cryptography", "database"]
Expand Down
62 changes: 34 additions & 28 deletions consensus/src/backup/saver.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use std::pin::Pin;

use crate::{units::UncheckedSignedUnit, Data, Hasher, Receiver, Sender, Signature, Terminator};
use crate::{
dag::DagUnit,
units::{UncheckedSignedUnit, WrappedUnit},
Data, Hasher, MultiKeychain, Receiver, Sender, Terminator,
};
use codec::Encode;
use futures::{AsyncWrite, AsyncWriteExt, FutureExt, StreamExt};
use log::{debug, error};
Expand All @@ -10,30 +14,28 @@ const LOG_TARGET: &str = "AlephBFT-backup-saver";
/// Component responsible for saving units into backup.
/// It waits for items to appear on its receivers, and writes them to backup.
/// It announces a successful write through an appropriate response sender.
pub struct BackupSaver<H: Hasher, D: Data, S: Signature, W: AsyncWrite> {
units_from_runway: Receiver<UncheckedSignedUnit<H, D, S>>,
responses_for_runway: Sender<UncheckedSignedUnit<H, D, S>>,
pub struct BackupSaver<H: Hasher, D: Data, MK: MultiKeychain, W: AsyncWrite> {
units_from_runway: Receiver<DagUnit<H, D, MK>>,
responses_for_runway: Sender<DagUnit<H, D, MK>>,
backup: Pin<Box<W>>,
}

impl<H: Hasher, D: Data, S: Signature, W: AsyncWrite> BackupSaver<H, D, S, W> {
impl<H: Hasher, D: Data, MK: MultiKeychain, W: AsyncWrite> BackupSaver<H, D, MK, W> {
pub fn new(
units_from_runway: Receiver<UncheckedSignedUnit<H, D, S>>,
responses_for_runway: Sender<UncheckedSignedUnit<H, D, S>>,
units_from_runway: Receiver<DagUnit<H, D, MK>>,
responses_for_runway: Sender<DagUnit<H, D, MK>>,
backup: W,
) -> BackupSaver<H, D, S, W> {
) -> BackupSaver<H, D, MK, W> {
BackupSaver {
units_from_runway,
responses_for_runway,
backup: Box::pin(backup),
}
}

pub async fn save_item(
&mut self,
item: &UncheckedSignedUnit<H, D, S>,
) -> Result<(), std::io::Error> {
self.backup.write_all(&item.encode()).await?;
pub async fn save_unit(&mut self, unit: &DagUnit<H, D, MK>) -> Result<(), std::io::Error> {
let unit: UncheckedSignedUnit<_, _, _> = unit.clone().unpack().into();
self.backup.write_all(&unit.encode()).await?;
self.backup.flush().await
}

Expand All @@ -49,7 +51,7 @@ impl<H: Hasher, D: Data, S: Signature, W: AsyncWrite> BackupSaver<H, D, S, W> {
break;
},
};
if let Err(e) = self.save_item(&item).await {
if let Err(e) = self.save_unit(&item).await {
error!(target: LOG_TARGET, "couldn't save item to backup: {:?}", e);
break;
}
Expand Down Expand Up @@ -80,16 +82,17 @@ mod tests {
StreamExt,
};

use aleph_bft_mock::{Data, Hasher64, Keychain, Saver, Signature};
use aleph_bft_mock::{Data, Hasher64, Keychain, Saver};

use crate::{
backup::BackupSaver,
units::{creator_set, preunit_to_unchecked_signed_unit, UncheckedSignedUnit},
NodeCount, NodeIndex, Terminator,
dag::ReconstructedUnit,
units::{creator_set, preunit_to_signed_unit, TestingSignedUnit},
NodeCount, Terminator,
};

type TestBackupSaver = BackupSaver<Hasher64, Data, Signature, Saver>;
type TestUnit = UncheckedSignedUnit<Hasher64, Data, Signature>;
type TestUnit = ReconstructedUnit<TestingSignedUnit>;
type TestBackupSaver = BackupSaver<Hasher64, Data, Keychain, Saver>;
struct PrepareSaverResponse<F: futures::Future> {
task: F,
units_for_saver: mpsc::UnboundedSender<TestUnit>,
Expand Down Expand Up @@ -122,6 +125,7 @@ mod tests {

#[tokio::test]
async fn test_proper_relative_responses_ordering() {
let node_count = NodeCount(5);
let PrepareSaverResponse {
task,
units_for_saver,
Expand All @@ -133,17 +137,19 @@ mod tests {
task.await;
});

let creators = creator_set(NodeCount(5));
let keychains: Vec<_> = (0..5)
.map(|id| Keychain::new(NodeCount(5), NodeIndex(id)))
let creators = creator_set(node_count);
let keychains: Vec<_> = node_count
.into_iterator()
.map(|id| Keychain::new(node_count, id))
.collect();
let units: Vec<TestUnit> = (0..5)
.map(|k| {
preunit_to_unchecked_signed_unit(
creators[k].create_unit(0).unwrap(),
let units: Vec<TestUnit> = node_count
.into_iterator()
.map(|id| {
ReconstructedUnit::initial(preunit_to_signed_unit(
creators[id.0].create_unit(0).unwrap(),
0,
&keychains[k],
)
&keychains[id.0],
))
})
.collect();

Expand Down
24 changes: 10 additions & 14 deletions consensus/src/dag/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ use validation::{Error as ValidationError, Validator};

const LOG_TARGET: &str = "AlephBFT-dag";

pub type DagUnit<H, D, MK> = ReconstructedUnit<SignedUnit<H, D, MK>>;

/// The result of sending some information to the Dag.
pub struct DagResult<H: Hasher, D: Data, MK: MultiKeychain> {
/// Units added to the dag.
pub units: Vec<ReconstructedUnit<SignedUnit<H, D, MK>>>,
pub units: Vec<DagUnit<H, D, MK>>,
/// Requests for more information.
pub requests: Vec<Request<H>>,
/// Alerts raised due to encountered forks.
Expand Down Expand Up @@ -114,25 +116,16 @@ impl<H: Hasher, D: Data, MK: MultiKeychain> Dag<H, D, MK> {
}
}

fn handle_result(&mut self, result: &DagResult<H, D, MK>) {
// just clean the validator cache of units that we are returning
for unit in &result.units {
self.validator.finished_processing(&unit.hash());
}
}

/// Add a unit to the Dag.
pub fn add_unit<U: WrappedUnit<H, Wrapped = SignedUnit<H, D, MK>>>(
&mut self,
unit: UncheckedSignedUnit<H, D, MK::Signature>,
store: &UnitStore<U>,
) -> DagResult<H, D, MK> {
let result = match self.validator.validate(unit, store) {
match self.validator.validate(unit, store) {
Ok(unit) => self.reconstruction.add_unit(unit).into(),
Err(e) => Self::handle_validation_error(e),
};
self.handle_result(&result);
result
}
}

/// Add parents of a unit to the Dag.
Expand Down Expand Up @@ -180,7 +173,6 @@ impl<H: Hasher, D: Data, MK: MultiKeychain> Dag<H, D, MK> {
.add_parents(unit_hash, parent_hashes)
.into(),
);
self.handle_result(&result);
result
}

Expand Down Expand Up @@ -208,10 +200,14 @@ impl<H: Hasher, D: Data, MK: MultiKeychain> Dag<H, D, MK> {
}
}
}
self.handle_result(&result);
result
}

/// Notify the dag that a unit has finished processing and can be cleared from the cache.
pub fn finished_processing(&mut self, hash: &H::Hash) {
self.validator.finished_processing(hash);
}

pub fn status(&self) -> DagStatus {
self.validator.status()
}
Expand Down
41 changes: 20 additions & 21 deletions consensus/src/runway/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
alerts::{Alert, ForkingNotification, NetworkMessage},
creation,
dag::{Dag, DagResult, DagStatus, ReconstructedUnit, Request as ReconstructionRequest},
dag::{Dag, DagResult, DagStatus, DagUnit, Request as ReconstructionRequest},
extension::{ExtenderUnit, Service as Extender},
handle_task_termination,
member::UnitMessage,
Expand Down Expand Up @@ -109,7 +109,7 @@ where
{
missing_coords: HashSet<UnitCoord>,
missing_parents: HashSet<H::Hash>,
store: UnitStore<ReconstructedUnit<SignedUnit<H, D, MK>>>,
store: UnitStore<DagUnit<H, D, MK>>,
keychain: MK,
dag: Dag<H, D, MK>,
alerts_for_alerter: Sender<Alert<H, D, MK::Signature>>,
Expand All @@ -118,12 +118,12 @@ where
unit_messages_for_network: Sender<RunwayNotificationOut<H, D, MK::Signature>>,
responses_for_collection: Sender<CollectionResponse<H, D, MK>>,
resolved_requests: Sender<Request<H>>,
parents_for_creator: Sender<ReconstructedUnit<SignedUnit<H, D, MK>>>,
parents_for_creator: Sender<DagUnit<H, D, MK>>,
ordered_batch_rx: Receiver<Vec<H::Hash>>,
finalization_handler: FH,
backup_units_for_saver: Sender<UncheckedSignedUnit<H, D, MK::Signature>>,
backup_units_for_saver: Sender<DagUnit<H, D, MK>>,
units_for_extender: Sender<ExtenderUnit<H>>,
backup_units_from_saver: Receiver<UncheckedSignedUnit<H, D, MK::Signature>>,
backup_units_from_saver: Receiver<DagUnit<H, D, MK>>,
new_units_from_creation: Receiver<SignedUnit<H, D, MK>>,
exiting: bool,
}
Expand Down Expand Up @@ -210,15 +210,15 @@ impl<'a, H: Hasher> Display for RunwayStatus<'a, H> {

struct RunwayConfig<H: Hasher, D: Data, FH: FinalizationHandler<D>, MK: MultiKeychain> {
finalization_handler: FH,
backup_units_for_saver: Sender<UncheckedSignedUnit<H, D, MK::Signature>>,
backup_units_for_saver: Sender<DagUnit<H, D, MK>>,
units_for_extender: Sender<ExtenderUnit<H>>,
backup_units_from_saver: Receiver<UncheckedSignedUnit<H, D, MK::Signature>>,
backup_units_from_saver: Receiver<DagUnit<H, D, MK>>,
alerts_for_alerter: Sender<Alert<H, D, MK::Signature>>,
notifications_from_alerter: Receiver<ForkingNotification<H, D, MK::Signature>>,
unit_messages_from_network: Receiver<RunwayNotificationIn<H, D, MK::Signature>>,
unit_messages_for_network: Sender<RunwayNotificationOut<H, D, MK::Signature>>,
responses_for_collection: Sender<CollectionResponse<H, D, MK>>,
parents_for_creator: Sender<ReconstructedUnit<SignedUnit<H, D, MK>>>,
parents_for_creator: Sender<DagUnit<H, D, MK>>,
ordered_batch_rx: Receiver<Vec<H::Hash>>,
resolved_requests: Sender<Request<H>>,
new_units_from_creation: Receiver<SignedUnit<H, D, MK>>,
Expand Down Expand Up @@ -455,10 +455,18 @@ where
}
}

fn on_unit_reconstructed(&mut self, unit: ReconstructedUnit<SignedUnit<H, D, MK>>) {
fn on_unit_reconstructed(&mut self, unit: DagUnit<H, D, MK>) {
let unit_hash = unit.hash();
trace!(target: "AlephBFT-runway", "Unit {:?} {} reconstructed.", unit_hash, unit.coord());
if self.backup_units_for_saver.unbounded_send(unit).is_err() {
error!(target: "AlephBFT-runway", "{:?} A unit couldn't be sent to backup: {:?}.", self.index(), unit_hash);
}
}

fn on_unit_backup_saved(&mut self, unit: DagUnit<H, D, MK>) {
let unit_hash = unit.hash();
self.store.insert(unit.clone());
self.dag.finished_processing(&unit_hash);
self.resolve_missing_parents(&unit_hash);
self.resolve_missing_coord(&unit.coord());
if self
Expand All @@ -477,21 +485,12 @@ where
warn!(target: "AlephBFT-runway", "Creator channel should be open.");
self.exiting = true;
}
if self
.backup_units_for_saver
.unbounded_send(unit.unpack().into())
.is_err()
{
error!(target: "AlephBFT-runway", "{:?} A unit couldn't be sent to backup: {:?}.", self.index(), unit_hash);
}
}

fn on_unit_backup_saved(&mut self, unit: UncheckedSignedUnit<H, D, MK::Signature>) {
self.send_message_for_network(RunwayNotificationOut::NewAnyUnit(unit.clone()));
let unit = unit.unpack();
self.send_message_for_network(RunwayNotificationOut::NewAnyUnit(unit.clone().into()));

if unit.as_signable().creator() == self.index() {
trace!(target: "AlephBFT-runway", "{:?} Sending a unit {:?}.", self.index(), unit.as_signable().hash());
self.send_message_for_network(RunwayNotificationOut::NewSelfUnit(unit));
self.send_message_for_network(RunwayNotificationOut::NewSelfUnit(unit.into()));
}
}

Expand Down
2 changes: 1 addition & 1 deletion consensus/src/testing/crash_recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ fn verify_backup(buf: &mut &[u8]) -> HashSet<UnitCoord> {
let mut already_saved = HashSet::new();

while !buf.is_empty() {
let unit = UncheckedSignedUnit::<Hasher64, Data, Signature>::decode(buf).unwrap();
let unit = <UncheckedSignedUnit<Hasher64, Data, Signature>>::decode(buf).unwrap();
let full_unit = unit.as_signable();
let coord = full_unit.coord();
let parent_ids = &full_unit.as_pre_unit().control_hash().parents_mask;
Expand Down
5 changes: 3 additions & 2 deletions consensus/src/units/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ pub(crate) use store::*;
#[cfg(test)]
pub use testing::{
create_preunits, creator_set, full_unit_to_unchecked_signed_unit, preunit_to_full_unit,
preunit_to_unchecked_signed_unit, random_full_parent_units_up_to, random_unit_with_parents,
FullUnit as TestingFullUnit, SignedUnit as TestingSignedUnit, WrappedSignedUnit,
preunit_to_signed_unit, preunit_to_unchecked_signed_unit, random_full_parent_units_up_to,
random_unit_with_parents, FullUnit as TestingFullUnit, SignedUnit as TestingSignedUnit,
WrappedSignedUnit,
};
pub use validator::{ValidationError, Validator};

Expand Down
16 changes: 14 additions & 2 deletions consensus/src/units/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,31 @@ impl Creator {
}
}

pub fn full_unit_to_signed_unit(full_unit: FullUnit, keychain: &Keychain) -> SignedUnit {
Signed::sign(full_unit, keychain)
}

pub fn preunit_to_signed_unit(
pu: PreUnit,
session_id: SessionId,
keychain: &Keychain,
) -> SignedUnit {
full_unit_to_signed_unit(preunit_to_full_unit(pu, session_id), keychain)
}

pub fn full_unit_to_unchecked_signed_unit(
full_unit: FullUnit,
keychain: &Keychain,
) -> UncheckedSignedUnit {
Signed::sign(full_unit, keychain).into()
full_unit_to_signed_unit(full_unit, keychain).into()
}

pub fn preunit_to_unchecked_signed_unit(
pu: PreUnit,
session_id: SessionId,
keychain: &Keychain,
) -> UncheckedSignedUnit {
full_unit_to_unchecked_signed_unit(preunit_to_full_unit(pu, session_id), keychain)
preunit_to_signed_unit(pu, session_id, keychain).into()
}

fn initial_preunit(n_members: NodeCount, node_id: NodeIndex) -> PreUnit {
Expand Down

0 comments on commit 3a2bafc

Please sign in to comment.