Skip to content

Commit

Permalink
A0-3137: Split backup into submodules (#338)
Browse files Browse the repository at this point in the history
  • Loading branch information
woocash2 authored Aug 30, 2023
1 parent 590a432 commit f6384cf
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 125 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

### Overview

AlephBFT is an asynchronous and Byzantine fault tolerant consensus protocol aimed
AlephBFT is an asynchronous and Byzantine fault-tolerant consensus protocol aimed
at ordering arbitrary messages (transactions). It has been designed to operate
continuously under conditions where there is no bound on message-delivery delay
and under the assumption that there is a significant probability of malicious
Expand Down Expand Up @@ -60,7 +60,7 @@ More details are available [in the book][reference-link-implementation-details].
- Import AlephBFT in your crate
```toml
[dependencies]
aleph-bft = "^0.27"
aleph-bft = "^0.28"
```
- The main entry point is the `run_session` function, which returns a Future that runs the
consensus algorithm.
Expand Down
2 changes: 1 addition & 1 deletion consensus/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "aleph-bft"
version = "0.27.0"
version = "0.28.0"
edition = "2021"
authors = ["Cardinal Cryptography"]
categories = ["algorithms", "data-structures", "cryptography", "database"]
Expand Down
134 changes: 17 additions & 117 deletions consensus/src/runway/backup.rs → consensus/src/backup/loader.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,18 @@
use crate::{
units::{UncheckedSignedUnit, UnitCoord},
Data, Hasher, Keychain, MultiKeychain, NodeIndex, Receiver, Round, Sender, SessionId,
Terminator,
};
use std::{collections::HashSet, fmt, fmt::Debug, io::Read, marker::PhantomData};

use crate::alerts::AlertData;
use codec::{Decode, Encode, Error as CodecError};
use futures::{channel::oneshot, FutureExt, StreamExt};
use codec::{Decode, Error as CodecError};
use futures::channel::oneshot;
use itertools::{Either, Itertools};
use log::{debug, error, info, warn};
use std::{
collections::HashSet,
fmt,
fmt::Debug,
io::{Read, Write},
marker::PhantomData,
};
use log::{error, info, warn};

const LOG_TARGET: &str = "AlephBFT-backup";
use crate::{
alerts::AlertData,
backup::BackupItem,
units::{UncheckedSignedUnit, UnitCoord},
Data, Hasher, Keychain, MultiKeychain, NodeIndex, Round, SessionId,
};

#[derive(Clone, Debug, Decode, Encode, PartialEq)]
pub enum BackupItem<H: Hasher, D: Data, MK: MultiKeychain> {
Unit(UncheckedSignedUnit<H, D, MK::Signature>),
AlertData(AlertData<H, D, MK>),
}
const LOG_TARGET: &str = "AlephBFT-backup-loader";

/// Backup read error. Could be either caused by io error from `BackupReader`, or by decoding.
#[derive(Debug)]
Expand Down Expand Up @@ -270,110 +259,21 @@ impl<H: Hasher, D: Data, MK: MultiKeychain, R: Read> BackupLoader<H, D, MK, R> {
}
}

/// Component responsible for saving units and alert data into backup.
/// It waits for items to appear on its receivers, and writes them to backup.
/// It announces a successful write through an appropriate response sender.
pub struct BackupSaver<H: Hasher, D: Data, MK: MultiKeychain, W: Write> {
units_from_runway: Receiver<UncheckedSignedUnit<H, D, MK::Signature>>,
data_from_alerter: Receiver<AlertData<H, D, MK>>,
responses_for_runway: Sender<UncheckedSignedUnit<H, D, MK::Signature>>,
responses_for_alerter: Sender<AlertData<H, D, MK>>,
backup: W,
}

impl<H: Hasher, D: Data, MK: MultiKeychain, W: Write> BackupSaver<H, D, MK, W> {
pub fn new(
units_from_runway: Receiver<UncheckedSignedUnit<H, D, MK::Signature>>,
data_from_alerter: Receiver<AlertData<H, D, MK>>,
responses_for_runway: Sender<UncheckedSignedUnit<H, D, MK::Signature>>,
responses_for_alerter: Sender<AlertData<H, D, MK>>,
backup: W,
) -> BackupSaver<H, D, MK, W> {
BackupSaver {
units_from_runway,
data_from_alerter,
responses_for_runway,
responses_for_alerter,
backup,
}
}

pub fn save_item(&mut self, item: BackupItem<H, D, MK>) -> Result<(), std::io::Error> {
self.backup.write_all(&item.encode())?;
self.backup.flush()?;
Ok(())
}

pub async fn run(&mut self, mut terminator: Terminator) {
let mut terminator_exit = false;
loop {
futures::select! {
unit = self.units_from_runway.next() => {
let unit = match unit {
Some(unit) => unit,
None => {
error!(target: LOG_TARGET, "receiver of units to save closed early");
break;
},
};
let item = BackupItem::Unit(unit.clone());
if let Err(e) = self.save_item(item) {
error!(target: LOG_TARGET, "couldn't save item to backup: {:?}", e);
break;
}
if self.responses_for_runway.unbounded_send(unit).is_err() {
error!(target: LOG_TARGET, "couldn't respond with saved unit to runway");
break;
}
},
data = self.data_from_alerter.next() => {
let data = match data {
Some(data) => data,
None => {
error!(target: LOG_TARGET, "receiver of alert data to save closed early");
break;
},
};
let item = BackupItem::AlertData(data.clone());
if let Err(e) = self.save_item(item) {
error!(target: LOG_TARGET, "couldn't save item to backup: {:?}", e);
break;
}
if self.responses_for_alerter.unbounded_send(data).is_err() {
error!(target: LOG_TARGET, "couldn't respond with saved alert data to runway");
break;
}
}
_ = terminator.get_exit().fuse() => {
debug!(target: LOG_TARGET, "backup saver received exit signal.");
terminator_exit = true;
}
}

if terminator_exit {
debug!(target: LOG_TARGET, "backup saver decided to exit.");
terminator.terminate_sync().await;
break;
}
}
}
}

#[cfg(test)]
mod tests {
use codec::Encode;
use futures::channel::oneshot;

use aleph_bft_mock::{Data, Hasher64, Keychain, Loader, Signature};

use crate::{
runway::backup::{BackupItem, BackupLoader},
backup::{loader::LoadedData, BackupItem, BackupLoader},
units::{
create_units, creator_set, preunit_to_unchecked_signed_unit, preunit_to_unit,
UncheckedSignedUnit as GenericUncheckedSignedUnit,
},
NodeCount, NodeIndex, Round, SessionId,
};
use aleph_bft_mock::{Data, Hasher64, Keychain, Loader, Signature};
use codec::Encode;
use futures::channel::oneshot;

use crate::runway::backup::LoadedData;

type UncheckedSignedUnit = GenericUncheckedSignedUnit<Hasher64, Data, Signature>;
type TestBackupItem = BackupItem<Hasher64, Data, Keychain>;
Expand Down
16 changes: 16 additions & 0 deletions consensus/src/backup/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use codec::{Decode, Encode};
use std::fmt::Debug;

pub use loader::{BackupLoader, LoadedData};
pub use saver::BackupSaver;

use crate::{alerts::AlertData, units::UncheckedSignedUnit, Data, Hasher, MultiKeychain};

mod loader;
mod saver;

#[derive(Clone, Debug, Decode, Encode, PartialEq)]
pub enum BackupItem<H: Hasher, D: Data, MK: MultiKeychain> {
Unit(UncheckedSignedUnit<H, D, MK::Signature>),
AlertData(AlertData<H, D, MK>),
}
102 changes: 102 additions & 0 deletions consensus/src/backup/saver.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
use crate::{
units::UncheckedSignedUnit, Data, Hasher, MultiKeychain, Receiver, Sender, Terminator,
};

use crate::alerts::AlertData;
use codec::Encode;
use futures::{FutureExt, StreamExt};

use crate::backup::BackupItem;
use log::{debug, error};
use std::io::Write;

const LOG_TARGET: &str = "AlephBFT-backup-saver";

/// Component responsible for saving units and alert data into backup.
/// It waits for items to appear on its receivers, and writes them to backup.
/// It announces a successful write through an appropriate response sender.
pub struct BackupSaver<H: Hasher, D: Data, MK: MultiKeychain, W: Write> {
units_from_runway: Receiver<UncheckedSignedUnit<H, D, MK::Signature>>,
data_from_alerter: Receiver<AlertData<H, D, MK>>,
responses_for_runway: Sender<UncheckedSignedUnit<H, D, MK::Signature>>,
responses_for_alerter: Sender<AlertData<H, D, MK>>,
backup: W,
}

impl<H: Hasher, D: Data, MK: MultiKeychain, W: Write> BackupSaver<H, D, MK, W> {
pub fn new(
units_from_runway: Receiver<UncheckedSignedUnit<H, D, MK::Signature>>,
data_from_alerter: Receiver<AlertData<H, D, MK>>,
responses_for_runway: Sender<UncheckedSignedUnit<H, D, MK::Signature>>,
responses_for_alerter: Sender<AlertData<H, D, MK>>,
backup: W,
) -> BackupSaver<H, D, MK, W> {
BackupSaver {
units_from_runway,
data_from_alerter,
responses_for_runway,
responses_for_alerter,
backup,
}
}

pub fn save_item(&mut self, item: BackupItem<H, D, MK>) -> Result<(), std::io::Error> {
self.backup.write_all(&item.encode())?;
self.backup.flush()?;
Ok(())
}

pub async fn run(&mut self, mut terminator: Terminator) {
let mut terminator_exit = false;
loop {
futures::select! {
unit = self.units_from_runway.next() => {
let unit = match unit {
Some(unit) => unit,
None => {
error!(target: LOG_TARGET, "receiver of units to save closed early");
break;
},
};
let item = BackupItem::Unit(unit.clone());
if let Err(e) = self.save_item(item) {
error!(target: LOG_TARGET, "couldn't save item to backup: {:?}", e);
break;
}
if self.responses_for_runway.unbounded_send(unit).is_err() {
error!(target: LOG_TARGET, "couldn't respond with saved unit to runway");
break;
}
},
data = self.data_from_alerter.next() => {
let data = match data {
Some(data) => data,
None => {
error!(target: LOG_TARGET, "receiver of alert data to save closed early");
break;
},
};
let item = BackupItem::AlertData(data.clone());
if let Err(e) = self.save_item(item) {
error!(target: LOG_TARGET, "couldn't save item to backup: {:?}", e);
break;
}
if self.responses_for_alerter.unbounded_send(data).is_err() {
error!(target: LOG_TARGET, "couldn't respond with saved alert data to runway");
break;
}
}
_ = terminator.get_exit().fuse() => {
debug!(target: LOG_TARGET, "backup saver received exit signal.");
terminator_exit = true;
}
}

if terminator_exit {
debug!(target: LOG_TARGET, "backup saver decided to exit.");
terminator.terminate_sync().await;
break;
}
}
}
}
1 change: 1 addition & 0 deletions consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ mod terminal;
mod terminator;
mod units;

mod backup;
mod task_queue;
#[cfg(test)]
mod testing;
Expand Down
4 changes: 1 addition & 3 deletions consensus/src/runway/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,10 @@ use std::{
time::Duration,
};

mod backup;
mod collection;
mod packer;

use crate::runway::backup::{BackupLoader, BackupSaver, LoadedData};
pub use backup::BackupItem;
use crate::backup::{BackupLoader, BackupSaver, LoadedData};
#[cfg(feature = "initial_unit_collection")]
use collection::{Collection, IO as CollectionIO};
pub use collection::{NewestUnitResponse, Salt};
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/testing/crash_recovery.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
runway::BackupItem,
backup::BackupItem,
testing::{init_log, spawn_honest_member, HonestMember, Network, ReconnectSender},
units::UnitCoord,
NodeCount, NodeIndex, SpawnHandle, TaskHandle,
Expand Down

0 comments on commit f6384cf

Please sign in to comment.