Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mixnet PoC base branch #316

Merged
merged 37 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
d57efc3
Add `mixnode` and `mixnet-client` crate (#302)
youngjoon-lee Aug 21, 2023
0a38fd3
Add `mixnode` binary (#317)
youngjoon-lee Aug 22, 2023
b692043
Integrate mixnet with libp2p network backend (#318)
youngjoon-lee Aug 28, 2023
f578217
Fix #312: proper delays (#321)
al8n Aug 29, 2023
5a5aa1f
add missing duration param
al8n Aug 29, 2023
4621c0a
Merge branch 'master' into mixnet
al8n Aug 29, 2023
9798ff0
Merge branch 'master' into mixnet
youngjoon-lee Aug 29, 2023
030baa7
tiny fix: compilation error caused by `rand` 0.8 -> 0.7
youngjoon-lee Aug 29, 2023
e725af0
Merge branch 'master' into mixnet
youngjoon-lee Aug 29, 2023
292f393
use `get_available_port()` for mixnet integration tests (#333)
youngjoon-lee Aug 29, 2023
7df010e
Merge branch 'master' into mixnet
danielSanchezQ Aug 29, 2023
723ebfe
add missing comments
youngjoon-lee Aug 30, 2023
33a4823
Overwatch mixnet node (#339)
danielSanchezQ Aug 30, 2023
e8f2c38
fix tests for the overwatch mixnode (#342)
youngjoon-lee Aug 30, 2023
82421ce
fix panic when corner case happen in RandomDelayIter (#335)
al8n Aug 30, 2023
361032e
Use `log` service for `mixnode` bin (#341)
youngjoon-lee Aug 30, 2023
190a955
Merge branch 'master' into mixnet
youngjoon-lee Aug 30, 2023
d00ca2a
Use `wire` for MixnetMessage in libp2p (#347)
youngjoon-lee Aug 31, 2023
25c8ae0
Prevent tmixnet tests from running forever (#363)
youngjoon-lee Sep 1, 2023
b11d2cb
Use random delay when sending msgs to mixnet (#362)
youngjoon-lee Sep 1, 2023
3468d7c
Merge branch 'master' into mixnet
youngjoon-lee Sep 1, 2023
73360d7
fix a minor compilation error caused by the latest master
youngjoon-lee Sep 1, 2023
b122867
Fix run output fd (#343)
al8n Sep 4, 2023
798a847
Merge branch 'master' into mixnet
youngjoon-lee Sep 5, 2023
2a0a6c9
Exp backoff (#332)
zeegomo Sep 5, 2023
a21a8a9
Fix MutexGuard across await (#373)
zeegomo Sep 6, 2023
e50ef09
Move wait at network startup (#338)
zeegomo Sep 6, 2023
e1ab1a7
Remove unused functions from mixnet connpool (#374)
youngjoon-lee Sep 7, 2023
0950782
Mixnet benchmark (#375)
youngjoon-lee Sep 7, 2023
2db4862
Merge branch 'master' into mixnet
youngjoon-lee Sep 8, 2023
2ad89c1
Merge branch 'master' into mixnet
youngjoon-lee Sep 12, 2023
6527c50
merge fixes
youngjoon-lee Sep 12, 2023
cc2f004
add `connection_pool_size` field to `config.yaml`
youngjoon-lee Sep 12, 2023
f7b33e4
Simplify mixnet topology (#393)
youngjoon-lee Sep 12, 2023
d020eed
Simplify bytes and duration range ser/de (#394)
al8n Sep 12, 2023
4d0814c
Merge branch 'master' into mixnet
youngjoon-lee Sep 13, 2023
1169e0e
Merge branch 'master' into mixnet
youngjoon-lee Sep 14, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@ members = [
"nomos-da/kzg",
"nomos-da/full-replication",
"nodes/nomos-node",
"nodes/mixnode",
"simulations",
"consensus-engine",
"tests",
"mixnet/node",
"mixnet/client",
"mixnet/protocol",
"mixnet/topology",
]
resolver = "2"
18 changes: 18 additions & 0 deletions mixnet/client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "mixnet-client"
version = "0.1.0"
edition = "2021"

[dependencies]
serde = { version = "1.0", features = ["derive"] }
tracing = "0.1.37"
tokio = { version = "1.29.1", features = ["net"] }
sphinx-packet = "0.1.0"
nym-sphinx = { package = "nym-sphinx", git = "https://github.com/nymtech/nym", tag = "v1.1.22" }
# Using an older version, since `nym-sphinx` depends on `rand` v0.7.3.
rand = "0.7.3"
mixnet-protocol = { path = "../protocol" }
mixnet-topology = { path = "../topology" }
mixnet-util = { path = "../util" }
futures = "0.3.28"
thiserror = "1"
31 changes: 31 additions & 0 deletions mixnet/client/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use std::net::SocketAddr;

use futures::{stream, StreamExt};
use mixnet_topology::MixnetTopology;
use serde::{Deserialize, Serialize};

use crate::{receiver::Receiver, MessageStream, MixnetClientError};

#[derive(Serialize, Deserialize, Clone, Debug)]

Check warning on line 9 in mixnet/client/src/config.rs

View check run for this annotation

Codecov / codecov/patch

mixnet/client/src/config.rs#L9

Added line #L9 was not covered by tests
pub struct MixnetClientConfig {
pub mode: MixnetClientMode,
pub topology: MixnetTopology,
pub connection_pool_size: usize,
}

#[derive(Serialize, Deserialize, Clone, Debug)]

Check warning on line 16 in mixnet/client/src/config.rs

View check run for this annotation

Codecov / codecov/patch

mixnet/client/src/config.rs#L16

Added line #L16 was not covered by tests
pub enum MixnetClientMode {
Sender,
SenderReceiver(SocketAddr),
}

impl MixnetClientMode {
pub(crate) async fn run(&self) -> Result<MessageStream, MixnetClientError> {
match self {
Self::Sender => Ok(stream::empty().boxed()),

Check warning on line 25 in mixnet/client/src/config.rs

View check run for this annotation

Codecov / codecov/patch

mixnet/client/src/config.rs#L25

Added line #L25 was not covered by tests
Self::SenderReceiver(node_address) => {
Ok(Receiver::new(*node_address).run().await?.boxed())
}
}
}
}
56 changes: 56 additions & 0 deletions mixnet/client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
pub mod config;
mod receiver;
mod sender;

use std::error::Error;
use std::time::Duration;

pub use config::MixnetClientConfig;
pub use config::MixnetClientMode;
use futures::stream::BoxStream;
use mixnet_util::ConnectionPool;
use rand::Rng;
use sender::Sender;
use thiserror::Error;

// A client for sending packets to Mixnet and receiving packets from Mixnet.
pub struct MixnetClient<R: Rng> {
mode: MixnetClientMode,
sender: Sender<R>,
}

pub type MessageStream = BoxStream<'static, Result<Vec<u8>, MixnetClientError>>;

impl<R: Rng> MixnetClient<R> {
pub fn new(config: MixnetClientConfig, rng: R) -> Self {
let cache = ConnectionPool::new(config.connection_pool_size);
Self {
mode: config.mode,
sender: Sender::new(config.topology, cache, rng),
}
}

pub async fn run(&self) -> Result<MessageStream, MixnetClientError> {
self.mode.run().await
}

pub fn send(
&mut self,
msg: Vec<u8>,
total_delay: Duration,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
self.sender.send(msg, total_delay)
}
}

#[derive(Error, Debug)]

Check warning on line 46 in mixnet/client/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

mixnet/client/src/lib.rs#L46

Added line #L46 was not covered by tests
pub enum MixnetClientError {
#[error("mixnet node connect error")]
MixnetNodeConnectError,
#[error("mixnode stream has been closed")]
MixnetNodeStreamClosed,
#[error("unexpected stream body received")]
UnexpectedStreamBody,
#[error("invalid payload")]
InvalidPayload,
}
139 changes: 139 additions & 0 deletions mixnet/client/src/receiver.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
use std::{error::Error, net::SocketAddr};

use futures::{stream, Stream, StreamExt};
use mixnet_protocol::Body;
use nym_sphinx::{
chunking::{fragment::Fragment, reconstruction::MessageReconstructor},
message::{NymMessage, PaddedMessage},
Payload,
};
use tokio::net::TcpStream;

use crate::MixnetClientError;

// Receiver accepts TCP connections to receive incoming payloads from the Mixnet.
pub struct Receiver {
node_address: SocketAddr,
}

impl Receiver {
pub fn new(node_address: SocketAddr) -> Self {
Self { node_address }
}

pub async fn run(
&self,
) -> Result<
impl Stream<Item = Result<Vec<u8>, MixnetClientError>> + Send + 'static,
MixnetClientError,
> {
let Ok(socket) = TcpStream::connect(self.node_address).await else {
return Err(MixnetClientError::MixnetNodeConnectError);

Check warning on line 31 in mixnet/client/src/receiver.rs

View check run for this annotation

Codecov / codecov/patch

mixnet/client/src/receiver.rs#L31

Added line #L31 was not covered by tests
};

Ok(Self::message_stream(Box::pin(Self::fragment_stream(
socket,
))))
}

fn fragment_stream(
socket: TcpStream,
) -> impl Stream<Item = Result<Fragment, MixnetClientError>> + Send + 'static {
stream::unfold(socket, |mut socket| async move {
let Ok(body) = Body::read(&mut socket).await else {
// TODO: Maybe this is a hard error and the stream is corrupted? In that case stop the stream
return Some((Err(MixnetClientError::MixnetNodeStreamClosed), socket));

Check warning on line 45 in mixnet/client/src/receiver.rs

View check run for this annotation

Codecov / codecov/patch

mixnet/client/src/receiver.rs#L45

Added line #L45 was not covered by tests
};

match body {
Body::SphinxPacket(_) => {
Some((Err(MixnetClientError::UnexpectedStreamBody), socket))

Check warning on line 50 in mixnet/client/src/receiver.rs

View check run for this annotation

Codecov / codecov/patch

mixnet/client/src/receiver.rs#L50

Added line #L50 was not covered by tests
}
Body::FinalPayload(payload) => Some((Self::fragment_from_payload(payload), socket)),
}
})
}

fn message_stream(
fragment_stream: impl Stream<Item = Result<Fragment, MixnetClientError>>
+ Send
+ Unpin
+ 'static,
) -> impl Stream<Item = Result<Vec<u8>, MixnetClientError>> + Send + 'static {
// MessageReconstructor buffers all received fragments
// and eventually returns reconstructed messages.
let message_reconstructor: MessageReconstructor = Default::default();

stream::unfold(
(fragment_stream, message_reconstructor),
|(mut fragment_stream, mut message_reconstructor)| async move {
let result =
Self::reconstruct_message(&mut fragment_stream, &mut message_reconstructor)
.await;
Some((result, (fragment_stream, message_reconstructor)))
},
)
}

fn fragment_from_payload(payload: Payload) -> Result<Fragment, MixnetClientError> {
let Ok(payload_plaintext) = payload.recover_plaintext() else {
return Err(MixnetClientError::InvalidPayload);

Check warning on line 80 in mixnet/client/src/receiver.rs

View check run for this annotation

Codecov / codecov/patch

mixnet/client/src/receiver.rs#L80

Added line #L80 was not covered by tests
};
let Ok(fragment) = Fragment::try_from_bytes(&payload_plaintext) else {
return Err(MixnetClientError::InvalidPayload);

Check warning on line 83 in mixnet/client/src/receiver.rs

View check run for this annotation

Codecov / codecov/patch

mixnet/client/src/receiver.rs#L83

Added line #L83 was not covered by tests
};
Ok(fragment)
}

async fn reconstruct_message(
fragment_stream: &mut (impl Stream<Item = Result<Fragment, MixnetClientError>>
+ Send
+ Unpin
+ 'static),
message_reconstructor: &mut MessageReconstructor,
) -> Result<Vec<u8>, MixnetClientError> {
// Read fragments until at least one message is fully reconstructed.
while let Some(next) = fragment_stream.next().await {
match next {
Ok(fragment) => {
if let Some(message) =
Self::try_reconstruct_message(fragment, message_reconstructor)
{
return Ok(message);
}
}
Err(e) => {
return Err(e);

Check warning on line 106 in mixnet/client/src/receiver.rs

View check run for this annotation

Codecov / codecov/patch

mixnet/client/src/receiver.rs#L105-L106

Added lines #L105 - L106 were not covered by tests
}
}
}

// fragment_stream closed before messages are fully reconstructed
Err(MixnetClientError::MixnetNodeStreamClosed)

Check warning on line 112 in mixnet/client/src/receiver.rs

View check run for this annotation

Codecov / codecov/patch

mixnet/client/src/receiver.rs#L112

Added line #L112 was not covered by tests
}

fn try_reconstruct_message(
fragment: Fragment,
message_reconstructor: &mut MessageReconstructor,
) -> Option<Vec<u8>> {
let reconstruction_result = message_reconstructor.insert_new_fragment(fragment);
match reconstruction_result {
Some((padded_message, _)) => {
let message = Self::remove_padding(padded_message).unwrap();
Some(message)
}
None => None,
}
}

fn remove_padding(msg: Vec<u8>) -> Result<Vec<u8>, Box<dyn Error>> {
let padded_message = PaddedMessage::new_reconstructed(msg);
// we need this because PaddedMessage.remove_padding requires it for other NymMessage types.
let dummy_num_mix_hops = 0;

match padded_message.remove_padding(dummy_num_mix_hops)? {
NymMessage::Plain(msg) => Ok(msg),
_ => todo!("return error"),

Check warning on line 136 in mixnet/client/src/receiver.rs

View check run for this annotation

Codecov / codecov/patch

mixnet/client/src/receiver.rs#L136

Added line #L136 was not covered by tests
}
}
}
Loading
Loading