diff --git a/Cargo.lock b/Cargo.lock index 4ca7838..d0a36b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -132,6 +132,12 @@ version = "4.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" +[[package]] +name = "async_cell" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "834eee9ce518130a3b4d5af09ecc43e9d6b57ee76613f227a1ddd6b77c7a62bc" + [[package]] name = "atomic-waker" version = "1.1.2" @@ -620,6 +626,7 @@ checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" name = "wgps" version = "0.1.0" dependencies = [ + "async_cell", "either", "futures", "smol", diff --git a/wgps/Cargo.toml b/wgps/Cargo.toml index 6e89fa4..9883874 100644 --- a/wgps/Cargo.toml +++ b/wgps/Cargo.toml @@ -9,6 +9,7 @@ willow-encoding = { path = "../encoding", version = "0.1.0" } ufotofu = { version = "0.4.2", features = ["std"] } futures = { version = "0.3.31" } either = "1.10.0" +async_cell = "0.2.2" [dev-dependencies] smol = "2.0.0" diff --git a/wgps/src/commitment_scheme/execute_prelude.rs b/wgps/src/commitment_scheme/execute_prelude.rs index b5531a7..0785a01 100644 --- a/wgps/src/commitment_scheme/execute_prelude.rs +++ b/wgps/src/commitment_scheme/execute_prelude.rs @@ -57,14 +57,14 @@ pub(crate) async fn execute_prelude< P: BulkProducer, >( max_payload_power: u8, - our_nonce: [u8; CHALLENGE_LENGTH], + our_nonce: &[u8; CHALLENGE_LENGTH], consumer: &mut C, producer: &mut P, ) -> Result, ExecutePreludeError> { let commitment = CH::hash(our_nonce); let receive_fut = Box::pin(receive_prelude::(producer)); - let send_fut = Box::pin(send_prelude(max_payload_power, commitment, consumer)); + let send_fut = Box::pin(send_prelude(max_payload_power, &commitment, consumer)); let (received_prelude, ()) = match try_select(receive_fut, send_fut).await { Ok(Either::Left((received, send_fut))) => (received, send_fut.await?), @@ -73,7 +73,7 @@ pub(crate) async fn execute_prelude< Err(Either::Right((error, _))) => return Err(error.into()), }; - let msg = CommitmentReveal { nonce: our_nonce }; + let msg = CommitmentReveal { nonce: &our_nonce }; msg.encode(consumer).await?; Ok(received_prelude) diff --git a/wgps/src/commitment_scheme/send_prelude.rs b/wgps/src/commitment_scheme/send_prelude.rs index 9ba0f9f..e0ea425 100644 --- a/wgps/src/commitment_scheme/send_prelude.rs +++ b/wgps/src/commitment_scheme/send_prelude.rs @@ -3,13 +3,13 @@ use ufotofu::{local_nb::BulkConsumer, nb::ConsumeFullSliceError}; /// Sends the prelude of maximum payload power and what is to be the recipient's `received_commitment` via a transport. pub(crate) async fn send_prelude>( max_payload_power: u8, - commitment: [u8; CHALLENGE_HASH_LENGTH], + commitment: &[u8; CHALLENGE_HASH_LENGTH], transport: &mut C, ) -> Result<(), C::Error> { transport.consume(max_payload_power).await?; if let Err(ConsumeFullSliceError { reason, .. }) = - transport.bulk_consume_full_slice(&commitment).await + transport.bulk_consume_full_slice(commitment).await { return Err(reason); } diff --git a/wgps/src/lib.rs b/wgps/src/lib.rs index 7a7a98f..125947f 100644 --- a/wgps/src/lib.rs +++ b/wgps/src/lib.rs @@ -1,6 +1,9 @@ use std::future::Future; -use execute_prelude::ExecutePreludeError; +use async_cell::unsync::AsyncCell; +use futures::try_join; + +use receive_prelude::{ReceivePreludeError, ReceivedPrelude}; use ufotofu::local_nb::{BulkConsumer, BulkProducer, Producer}; use willow_data_model::{ grouping::{AreaOfInterest, Range3d}, @@ -15,24 +18,47 @@ mod messages; pub use messages::*; mod commitment_scheme; -use commitment_scheme::execute_prelude::execute_prelude; pub use commitment_scheme::*; +use commitment_scheme::{receive_prelude::receive_prelude, send_prelude::send_prelude}; /// An error which can occur during a WGPS synchronisation session. pub enum WgpsError { - Prelude(ExecutePreludeError), + /// The received max payload power was invalid, i.e. greater than 64. + PreludeMaxPayloadInvalid, + /// The transport stopped producing bytes before it could be deemed ready. + PreludeFinishedTooSoon, + /// The underlying transport emitted an error. + Transport(E), } -impl From> for WgpsError { - fn from(value: ExecutePreludeError) -> Self { - Self::Prelude(value) +impl From for WgpsError { + fn from(value: E) -> Self { + Self::Transport(value) + } +} + +impl From> for WgpsError { + fn from(value: ReceivePreludeError) -> Self { + match value { + ReceivePreludeError::Transport(err) => err.into(), + ReceivePreludeError::MaxPayloadInvalid => WgpsError::PreludeMaxPayloadInvalid, + ReceivePreludeError::FinishedTooSoon => WgpsError::PreludeFinishedTooSoon, + } } } impl core::fmt::Display for WgpsError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - WgpsError::Prelude(execute_prelude_error) => write!(f, "{}", execute_prelude_error), + WgpsError::Transport(transport_error) => write!(f, "{}", transport_error), + WgpsError::PreludeMaxPayloadInvalid => write!( + f, + "The peer sent an invalid max payload power in their prelude." + ), + WgpsError::PreludeFinishedTooSoon => write!( + f, + "The peer terminated the connection before sending their full prelude." + ), } } } @@ -51,19 +77,48 @@ pub async fn sync_with_peer< P: BulkProducer, >( options: &SyncOptions, - mut consumer: C, + consumer: C, mut producer: P, ) -> Result<(), WgpsError> { - execute_prelude::( - options.max_payload_power, - options.challenge_nonce, - &mut consumer, - &mut producer, - ) - .await?; - - // TODO: The rest of the WGPS. No probs! - + // Compute the commitment to our [nonce](https://willowprotocol.org/specs/sync/index.html#nonce); we send this + // commitment to the peer at the start (the *prelude*) of the sync session. + let our_commitment = CH::hash(&options.challenge_nonce); + + let mut consumer = consumer; // TODO turn into a SharedConsumer in an Arc here. See util::shared_encoder on the logical_channels branch (and ask Aljoscha about it). + + // This is set to `()` once our own prelude has been sent. + let sent_own_prelude = AsyncCell::<()>::new(); + // This is set to the prelude received from the peer once it has arrived. + let received_prelude = AsyncCell::>::new(); + + // Every unit of work that the WGPS needs to perform is defined as a future in the following, via an async block. + // If one of these futures needs another unit of work to have been completed, this should be enforced by + // calling `some_cell.get().await` for one of the cells defined above. Generally, these futures should *not* call + // `some_cell.take().await`, since that might mess with other futures depending on the same step to have completed. + // + // Each of the futures must evaluate to a `Result<(), WgpsError>`. + // Since type annotations for async blocks are not possible in today's rust, we instead provide + // a type annotation on the return value; that's why the last two lines of each of the following + // async blocks are a weirdly overcomplicated way of returning `Ok(())`. + + let do_send_prelude = async { + send_prelude(options.max_payload_power, &our_commitment, &mut consumer).await?; + sent_own_prelude.set(()); + + let ret: Result<(), WgpsError> = Ok(()); + ret + }; + + let do_receive_prelude = async { + received_prelude.set(receive_prelude(&mut producer).await?); + + let ret: Result<(), WgpsError> = Ok(()); + ret + }; + + // Add each of the futures here. The macro polls them all to completion, until the first one hits + // an error, in which case this function immediately returns with that first error. + let ((), ()) = try_join!(do_send_prelude, do_receive_prelude,)?; Ok(()) } diff --git a/wgps/src/messages/commitment_reveal.rs b/wgps/src/messages/commitment_reveal.rs index 33ec069..c95953f 100644 --- a/wgps/src/messages/commitment_reveal.rs +++ b/wgps/src/messages/commitment_reveal.rs @@ -1,11 +1,11 @@ use ufotofu::local_nb::BulkConsumer; use willow_encoding::Encodable; -pub struct CommitmentReveal { - pub nonce: [u8; CHALLENGE_LENGTH], +pub struct CommitmentReveal<'nonce, const CHALLENGE_LENGTH: usize> { + pub nonce: &'nonce [u8; CHALLENGE_LENGTH], } -impl Encodable for CommitmentReveal { +impl<'nonce, const CHALLENGE_LENGTH: usize> Encodable for CommitmentReveal<'nonce, CHALLENGE_LENGTH> { async fn encode(&self, consumer: &mut Consumer) -> Result<(), Consumer::Error> where Consumer: BulkConsumer, diff --git a/wgps/src/parameters.rs b/wgps/src/parameters.rs index cdea2db..d9d8d39 100644 --- a/wgps/src/parameters.rs +++ b/wgps/src/parameters.rs @@ -1,3 +1,3 @@ pub trait ChallengeHash { - fn hash(nonce: [u8; CHALLENGE_LENGTH]) -> [u8; CHALLENGE_HASH_LENGTH]; + fn hash(nonce: &[u8; CHALLENGE_LENGTH]) -> [u8; CHALLENGE_HASH_LENGTH]; }