Skip to content

Commit

Permalink
Allow async implementation for BackupReader and BackupWriter (#399)
Browse files Browse the repository at this point in the history
* Allow async implementation for BackupReader and BackupWriter

* fix: unused imports

* bump versions
  • Loading branch information
maan2003 authored Mar 4, 2024
1 parent 2cec6cd commit 36638fd
Show file tree
Hide file tree
Showing 15 changed files with 114 additions and 80 deletions.
21 changes: 18 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ More details are available [in the book][reference-link-implementation-details].
- Import AlephBFT in your crate
```toml
[dependencies]
aleph-bft = "^0.33"
aleph-bft = "^0.34"
```
- The main entry point is the `run_session` function, which returns a Future that runs the
consensus algorithm.
Expand Down
2 changes: 1 addition & 1 deletion consensus/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "aleph-bft"
version = "0.33.2"
version = "0.34.0"
edition = "2021"
authors = ["Cardinal Cryptography"]
categories = ["algorithms", "data-structures", "cryptography", "database"]
Expand Down
23 changes: 14 additions & 9 deletions consensus/src/backup/loader.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use std::{collections::HashSet, fmt, fmt::Debug, io::Read, marker::PhantomData};
use std::{
collections::HashSet,
fmt::{self, Debug},
marker::PhantomData,
pin::Pin,
};

use codec::{Decode, Error as CodecError};
use futures::channel::oneshot;
use futures::{channel::oneshot, AsyncRead, AsyncReadExt};
use log::{error, info, warn};

use crate::{
Expand Down Expand Up @@ -63,26 +68,26 @@ impl From<CodecError> for LoaderError {
}
}

pub struct BackupLoader<H: Hasher, D: Data, S: Signature, R: Read> {
backup: R,
pub struct BackupLoader<H: Hasher, D: Data, S: Signature, R: AsyncRead> {
backup: Pin<Box<R>>,
index: NodeIndex,
session_id: SessionId,
_phantom: PhantomData<(H, D, S)>,
}

impl<H: Hasher, D: Data, S: Signature, R: Read> BackupLoader<H, D, S, R> {
impl<H: Hasher, D: Data, S: Signature, R: AsyncRead> BackupLoader<H, D, S, R> {
pub fn new(backup: R, index: NodeIndex, session_id: SessionId) -> BackupLoader<H, D, S, R> {
BackupLoader {
backup,
backup: Box::pin(backup),
index,
session_id,
_phantom: PhantomData,
}
}

fn load(&mut self) -> Result<Vec<UncheckedSignedUnit<H, D, S>>, LoaderError> {
async fn load(&mut self) -> Result<Vec<UncheckedSignedUnit<H, D, S>>, LoaderError> {
let mut buf = Vec::new();
self.backup.read_to_end(&mut buf)?;
self.backup.read_to_end(&mut buf).await?;
let input = &mut &buf[..];
let mut result = Vec::new();
while !input.is_empty() {
Expand Down Expand Up @@ -163,7 +168,7 @@ impl<H: Hasher, D: Data, S: Signature, R: Read> BackupLoader<H, D, S, R> {
starting_round: oneshot::Sender<Option<Round>>,
next_round_collection: oneshot::Receiver<Round>,
) {
let units = match self.load() {
let units = match self.load().await {
Ok(items) => items,
Err(e) => {
error!(target: LOG_TARGET, "unable to load backup data: {}", e);
Expand Down
24 changes: 13 additions & 11 deletions consensus/src/backup/saver.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
use std::io::Write;
use std::pin::Pin;

use crate::{units::UncheckedSignedUnit, Data, Hasher, Receiver, Sender, Signature, Terminator};
use codec::Encode;
use futures::{FutureExt, StreamExt};
use futures::{AsyncWrite, AsyncWriteExt, FutureExt, StreamExt};
use log::{debug, error};

const LOG_TARGET: &str = "AlephBFT-backup-saver";

/// Component responsible for saving units into backup.
/// It waits for items to appear on its receivers, and writes them to backup.
/// It announces a successful write through an appropriate response sender.
pub struct BackupSaver<H: Hasher, D: Data, S: Signature, W: Write> {
pub struct BackupSaver<H: Hasher, D: Data, S: Signature, W: AsyncWrite> {
units_from_runway: Receiver<UncheckedSignedUnit<H, D, S>>,
responses_for_runway: Sender<UncheckedSignedUnit<H, D, S>>,
backup: W,
backup: Pin<Box<W>>,
}

impl<H: Hasher, D: Data, S: Signature, W: Write> BackupSaver<H, D, S, W> {
impl<H: Hasher, D: Data, S: Signature, W: AsyncWrite> BackupSaver<H, D, S, W> {
pub fn new(
units_from_runway: Receiver<UncheckedSignedUnit<H, D, S>>,
responses_for_runway: Sender<UncheckedSignedUnit<H, D, S>>,
Expand All @@ -25,14 +25,16 @@ impl<H: Hasher, D: Data, S: Signature, W: Write> BackupSaver<H, D, S, W> {
BackupSaver {
units_from_runway,
responses_for_runway,
backup,
backup: Box::pin(backup),
}
}

pub fn save_item(&mut self, item: &UncheckedSignedUnit<H, D, S>) -> Result<(), std::io::Error> {
self.backup.write_all(&item.encode())?;
self.backup.flush()?;
Ok(())
pub async fn save_item(
&mut self,
item: &UncheckedSignedUnit<H, D, S>,
) -> Result<(), std::io::Error> {
self.backup.write_all(&item.encode()).await?;
self.backup.flush().await
}

pub async fn run(&mut self, mut terminator: Terminator) {
Expand All @@ -47,7 +49,7 @@ impl<H: Hasher, D: Data, S: Signature, W: Write> BackupSaver<H, D, S, W> {
break;
},
};
if let Err(e) = self.save_item(&item) {
if let Err(e) = self.save_item(&item).await {
error!(target: LOG_TARGET, "couldn't save item to backup: {:?}", e);
break;
}
Expand Down
17 changes: 11 additions & 6 deletions consensus/src/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
};
use aleph_bft_types::NodeMap;
use codec::{Decode, Encode};
use futures::{channel::mpsc, pin_mut, FutureExt, StreamExt};
use futures::{channel::mpsc, pin_mut, AsyncRead, AsyncWrite, FutureExt, StreamExt};
use futures_timer::Delay;
use itertools::Itertools;
use log::{debug, error, info, trace, warn};
Expand All @@ -23,7 +23,6 @@ use std::{
collections::HashSet,
convert::TryInto,
fmt::{self, Debug},
io::{Read, Write},
marker::PhantomData,
time::Duration,
};
Expand Down Expand Up @@ -108,15 +107,21 @@ enum TaskDetails<H: Hasher, D: Data, S: Signature> {
}

#[derive(Clone)]
pub struct LocalIO<D: Data, DP: DataProvider<D>, FH: FinalizationHandler<D>, US: Write, UL: Read> {
pub struct LocalIO<
D: Data,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
US: AsyncWrite,
UL: AsyncRead,
> {
data_provider: DP,
finalization_handler: FH,
unit_saver: US,
unit_loader: UL,
_phantom: PhantomData<D>,
}

impl<D: Data, DP: DataProvider<D>, FH: FinalizationHandler<D>, US: Write, UL: Read>
impl<D: Data, DP: DataProvider<D>, FH: FinalizationHandler<D>, US: AsyncWrite, UL: AsyncRead>
LocalIO<D, DP, FH, US, UL>
{
pub fn new(
Expand Down Expand Up @@ -573,8 +578,8 @@ pub async fn run_session<
D: Data,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
US: Write + Send + Sync + 'static,
UL: Read + Send + Sync + 'static,
US: AsyncWrite + Send + Sync + 'static,
UL: AsyncRead + Send + Sync + 'static,
N: Network<NetworkData<H, D, MK::Signature, MK::PartialMultisignature>> + 'static,
SH: SpawnHandle,
MK: MultiKeychain,
Expand Down
24 changes: 9 additions & 15 deletions consensus/src/runway/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,15 @@ use crate::{
Terminator, UncheckedSigned,
};
use aleph_bft_types::Recipient;
use futures::AsyncWrite;
use futures::{
channel::{mpsc, oneshot},
pin_mut, Future, FutureExt, StreamExt,
pin_mut, AsyncRead, Future, FutureExt, StreamExt,
};
use futures_timer::Delay;
use itertools::Itertools;
use log::{debug, error, info, trace, warn};
use std::{
collections::HashSet,
convert::TryFrom,
fmt,
io::{Read, Write},
marker::PhantomData,
time::Duration,
};
use std::{collections::HashSet, convert::TryFrom, fmt, marker::PhantomData, time::Duration};

mod collection;
mod packer;
Expand Down Expand Up @@ -871,8 +865,8 @@ pub struct RunwayIO<
H: Hasher,
D: Data,
MK: MultiKeychain,
W: Write + Send + Sync + 'static,
R: Read + Send + Sync + 'static,
W: AsyncWrite + Send + Sync + 'static,
R: AsyncRead + Send + Sync + 'static,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
> {
Expand All @@ -887,8 +881,8 @@ impl<
H: Hasher,
D: Data,
MK: MultiKeychain,
W: Write + Send + Sync + 'static,
R: Read + Send + Sync + 'static,
W: AsyncWrite + Send + Sync + 'static,
R: AsyncRead + Send + Sync + 'static,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
> RunwayIO<H, D, MK, W, R, DP, FH>
Expand Down Expand Up @@ -919,8 +913,8 @@ pub(crate) async fn run<H, D, US, UL, MK, DP, FH, SH>(
) where
H: Hasher,
D: Data,
US: Write + Send + Sync + 'static,
UL: Read + Send + Sync + 'static,
US: AsyncWrite + Send + Sync + 'static,
UL: AsyncRead + Send + Sync + 'static,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
MK: MultiKeychain,
Expand Down
3 changes: 2 additions & 1 deletion examples/ordering/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ futures = "0.3"
log = "0.4"
parking_lot = "0.12"
time = { version = "0.3", features = ["formatting", "macros", "local-offset"] }
tokio = { version = "1", features = ["macros", "rt", "rt-multi-thread", "io-util", "net", "time"] }
tokio = { version = "1", features = ["macros", "rt", "rt-multi-thread", "io-util", "net", "time", "fs"] }
tokio-util = { version = "0.7.10", features = ["compat"] }
24 changes: 16 additions & 8 deletions examples/ordering/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use std::io::Write;
mod dataio;
mod network;

use aleph_bft::{run_session, NodeIndex, Terminator};
use aleph_bft_mock::{Keychain, Spawner};
use clap::Parser;
use dataio::{Data, DataProvider, FinalizationHandler};
use futures::{channel::oneshot, StreamExt};
use futures::{channel::oneshot, io, StreamExt};
use log::{debug, error, info};
use network::Network;
use std::{collections::HashMap, fs, fs::File, io, io::Write, path::Path, time::Duration};
use std::{collections::HashMap, path::Path, time::Duration};
use time::{macros::format_description, OffsetDateTime};
use tokio::fs::{self, File};
use tokio_util::compat::{Compat, TokioAsyncWriteCompatExt};

/// Example node producing linear order.
#[derive(Parser, Debug)]
Expand Down Expand Up @@ -40,20 +43,23 @@ struct Args {
crash: bool,
}

fn create_backup(node_id: NodeIndex) -> Result<(File, io::Cursor<Vec<u8>>), io::Error> {
async fn create_backup(
node_id: NodeIndex,
) -> Result<(Compat<File>, io::Cursor<Vec<u8>>), io::Error> {
let stash_path = Path::new("./aleph-bft-examples-ordering-backup");
fs::create_dir_all(stash_path)?;
fs::create_dir_all(stash_path).await?;
let file_path = stash_path.join(format!("{}.units", node_id.0));
let loader = if file_path.exists() {
io::Cursor::new(fs::read(&file_path)?)
io::Cursor::new(fs::read(&file_path).await?)
} else {
io::Cursor::new(Vec::new())
};
let saver = fs::OpenOptions::new()
.create(true)
.append(true)
.open(file_path)?;
Ok((saver, loader))
.open(file_path)
.await?;
Ok((saver.compat_write(), loader))
}

fn finalized_counts(cf: &HashMap<NodeIndex, u32>) -> Vec<u32> {
Expand Down Expand Up @@ -104,7 +110,9 @@ async fn main() {
let n_members = ports.len().into();
let data_provider = DataProvider::new(id, n_starting, n_data - n_starting, stalled);
let (finalization_handler, mut finalized_rx) = FinalizationHandler::new();
let (backup_saver, backup_loader) = create_backup(id).expect("Error setting up unit saving");
let (backup_saver, backup_loader) = create_backup(id)
.await
.expect("Error setting up unit saving");
let local_io = aleph_bft::LocalIO::new(
data_provider,
finalization_handler,
Expand Down
2 changes: 1 addition & 1 deletion mock/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "aleph-bft-mock"
version = "0.11.1"
version = "0.12.0"
edition = "2021"
authors = ["Cardinal Cryptography"]
documentation = "https://docs.rs/?"
Expand Down
Loading

0 comments on commit 36638fd

Please sign in to comment.