Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restructure sync_with_peer to allow for dependencies between steps #59

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions wgps/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ willow-encoding = { path = "../encoding", version = "0.1.0" }
ufotofu = { version = "0.4.2", features = ["std"] }
futures = { version = "0.3.31" }
either = "1.10.0"
async_cell = "0.2.2"

[dev-dependencies]
smol = "2.0.0"
Expand Down
6 changes: 3 additions & 3 deletions wgps/src/commitment_scheme/execute_prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ pub(crate) async fn execute_prelude<
P: BulkProducer<Item = u8, Error = E>,
>(
max_payload_power: u8,
our_nonce: [u8; CHALLENGE_LENGTH],
our_nonce: &[u8; CHALLENGE_LENGTH],
consumer: &mut C,
producer: &mut P,
) -> Result<ReceivedPrelude<CHALLENGE_HASH_LENGTH>, ExecutePreludeError<E>> {
let commitment = CH::hash(our_nonce);

let receive_fut = Box::pin(receive_prelude::<CHALLENGE_HASH_LENGTH, _>(producer));
let send_fut = Box::pin(send_prelude(max_payload_power, commitment, consumer));
let send_fut = Box::pin(send_prelude(max_payload_power, &commitment, consumer));

let (received_prelude, ()) = match try_select(receive_fut, send_fut).await {
Ok(Either::Left((received, send_fut))) => (received, send_fut.await?),
Expand All @@ -73,7 +73,7 @@ pub(crate) async fn execute_prelude<
Err(Either::Right((error, _))) => return Err(error.into()),
};

let msg = CommitmentReveal { nonce: our_nonce };
let msg = CommitmentReveal { nonce: &our_nonce };
msg.encode(consumer).await?;

Ok(received_prelude)
Expand Down
4 changes: 2 additions & 2 deletions wgps/src/commitment_scheme/send_prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ use ufotofu::{local_nb::BulkConsumer, nb::ConsumeFullSliceError};
/// Sends the prelude of maximum payload power and what is to be the recipient's `received_commitment` via a transport.
pub(crate) async fn send_prelude<const CHALLENGE_HASH_LENGTH: usize, C: BulkConsumer<Item = u8>>(
max_payload_power: u8,
commitment: [u8; CHALLENGE_HASH_LENGTH],
commitment: &[u8; CHALLENGE_HASH_LENGTH],
transport: &mut C,
) -> Result<(), C::Error> {
transport.consume(max_payload_power).await?;

if let Err(ConsumeFullSliceError { reason, .. }) =
transport.bulk_consume_full_slice(&commitment).await
transport.bulk_consume_full_slice(commitment).await
{
return Err(reason);
}
Expand Down
91 changes: 73 additions & 18 deletions wgps/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::future::Future;

use execute_prelude::ExecutePreludeError;
use async_cell::unsync::AsyncCell;
use futures::try_join;

use receive_prelude::{ReceivePreludeError, ReceivedPrelude};
use ufotofu::local_nb::{BulkConsumer, BulkProducer, Producer};
use willow_data_model::{
grouping::{AreaOfInterest, Range3d},
Expand All @@ -15,24 +18,47 @@ mod messages;
pub use messages::*;

mod commitment_scheme;
use commitment_scheme::execute_prelude::execute_prelude;
pub use commitment_scheme::*;
use commitment_scheme::{receive_prelude::receive_prelude, send_prelude::send_prelude};

/// An error which can occur during a WGPS synchronisation session.
pub enum WgpsError<E> {
Prelude(ExecutePreludeError<E>),
/// The received max payload power was invalid, i.e. greater than 64.
PreludeMaxPayloadInvalid,
/// The transport stopped producing bytes before it could be deemed ready.
PreludeFinishedTooSoon,
/// The underlying transport emitted an error.
Transport(E),
}

impl<E> From<ExecutePreludeError<E>> for WgpsError<E> {
fn from(value: ExecutePreludeError<E>) -> Self {
Self::Prelude(value)
impl<E> From<E> for WgpsError<E> {
fn from(value: E) -> Self {
Self::Transport(value)
}
}

impl<E> From<ReceivePreludeError<E>> for WgpsError<E> {
fn from(value: ReceivePreludeError<E>) -> Self {
match value {
ReceivePreludeError::Transport(err) => err.into(),
ReceivePreludeError::MaxPayloadInvalid => WgpsError::PreludeMaxPayloadInvalid,
ReceivePreludeError::FinishedTooSoon => WgpsError::PreludeFinishedTooSoon,
}
}
}

impl<E: core::fmt::Display> core::fmt::Display for WgpsError<E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
WgpsError::Prelude(execute_prelude_error) => write!(f, "{}", execute_prelude_error),
WgpsError::Transport(transport_error) => write!(f, "{}", transport_error),
WgpsError::PreludeMaxPayloadInvalid => write!(
f,
"The peer sent an invalid max payload power in their prelude."
),
WgpsError::PreludeFinishedTooSoon => write!(
f,
"The peer terminated the connection before sending their full prelude."
),
}
}
}
Expand All @@ -51,19 +77,48 @@ pub async fn sync_with_peer<
P: BulkProducer<Item = u8, Error = E>,
>(
options: &SyncOptions<CHALLENGE_LENGTH>,
mut consumer: C,
consumer: C,
mut producer: P,
) -> Result<(), WgpsError<E>> {
execute_prelude::<CHALLENGE_LENGTH, CHALLENGE_HASH_LENGTH, CH, _, _, _>(
options.max_payload_power,
options.challenge_nonce,
&mut consumer,
&mut producer,
)
.await?;

// TODO: The rest of the WGPS. No probs!

// Compute the commitment to our [nonce](https://willowprotocol.org/specs/sync/index.html#nonce); we send this
// commitment to the peer at the start (the *prelude*) of the sync session.
let our_commitment = CH::hash(&options.challenge_nonce);

let mut consumer = consumer; // TODO turn into a SharedConsumer in an Arc here. See util::shared_encoder on the logical_channels branch (and ask Aljoscha about it).

// This is set to `()` once our own prelude has been sent.
let sent_own_prelude = AsyncCell::<()>::new();
// This is set to the prelude received from the peer once it has arrived.
let received_prelude = AsyncCell::<ReceivedPrelude<CHALLENGE_HASH_LENGTH>>::new();

// Every unit of work that the WGPS needs to perform is defined as a future in the following, via an async block.
// If one of these futures needs another unit of work to have been completed, this should be enforced by
// calling `some_cell.get().await` for one of the cells defined above. Generally, these futures should *not* call
// `some_cell.take().await`, since that might mess with other futures depending on the same step to have completed.
//
// Each of the futures must evaluate to a `Result<(), WgpsError<E>>`.
// Since type annotations for async blocks are not possible in today's rust, we instead provide
// a type annotation on the return value; that's why the last two lines of each of the following
// async blocks are a weirdly overcomplicated way of returning `Ok(())`.

let do_send_prelude = async {
send_prelude(options.max_payload_power, &our_commitment, &mut consumer).await?;
sent_own_prelude.set(());

let ret: Result<(), WgpsError<E>> = Ok(());
ret
};

let do_receive_prelude = async {
received_prelude.set(receive_prelude(&mut producer).await?);

let ret: Result<(), WgpsError<E>> = Ok(());
ret
};

// Add each of the futures here. The macro polls them all to completion, until the first one hits
// an error, in which case this function immediately returns with that first error.
let ((), ()) = try_join!(do_send_prelude, do_receive_prelude,)?;
Ok(())
}

Expand Down
6 changes: 3 additions & 3 deletions wgps/src/messages/commitment_reveal.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use ufotofu::local_nb::BulkConsumer;
use willow_encoding::Encodable;

pub struct CommitmentReveal<const CHALLENGE_LENGTH: usize> {
pub nonce: [u8; CHALLENGE_LENGTH],
pub struct CommitmentReveal<'nonce, const CHALLENGE_LENGTH: usize> {
pub nonce: &'nonce [u8; CHALLENGE_LENGTH],
}

impl<const CHALLENGE_LENGTH: usize> Encodable for CommitmentReveal<CHALLENGE_LENGTH> {
impl<'nonce, const CHALLENGE_LENGTH: usize> Encodable for CommitmentReveal<'nonce, CHALLENGE_LENGTH> {
async fn encode<Consumer>(&self, consumer: &mut Consumer) -> Result<(), Consumer::Error>
where
Consumer: BulkConsumer<Item = u8>,
Expand Down
2 changes: 1 addition & 1 deletion wgps/src/parameters.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
pub trait ChallengeHash<const CHALLENGE_LENGTH: usize, const CHALLENGE_HASH_LENGTH: usize> {
fn hash(nonce: [u8; CHALLENGE_LENGTH]) -> [u8; CHALLENGE_HASH_LENGTH];
fn hash(nonce: &[u8; CHALLENGE_LENGTH]) -> [u8; CHALLENGE_HASH_LENGTH];
}
Loading