Skip to content

Commit

Permalink
fix PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
al8n committed Sep 12, 2023
1 parent cd0a436 commit 56806dd
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 333 deletions.
7 changes: 2 additions & 5 deletions mixnet/client/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use std::{collections::HashMap, net::SocketAddr, sync::Arc};
use std::net::SocketAddr;

use futures::{stream, StreamExt};
use mixnet_topology::MixnetTopology;
use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;

use crate::{receiver::Receiver, MessageStream, MixnetClientError};

Expand All @@ -24,12 +23,10 @@ pub enum MixnetClientMode {

impl MixnetClientMode {
pub(crate) async fn run(&self) -> Result<MessageStream, MixnetClientError> {
let ack_cache = Arc::new(Mutex::new(HashMap::new()));
match self {
// TODO: do not forget add ack_cache to the sender
Self::Sender => Ok(stream::empty().boxed()),
Self::SenderReceiver(node_address) => {
Ok(Receiver::new(*node_address, ack_cache).run().await?.boxed())
Ok(Receiver::new(*node_address).run().await?.boxed())
}
}
}
Expand Down
35 changes: 6 additions & 29 deletions mixnet/client/src/receiver.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,24 @@
use std::{
collections::{HashMap, HashSet},
error::Error,
net::SocketAddr,
sync::Arc,
};
use std::{error::Error, net::SocketAddr};

use futures::{stream, Stream, StreamExt};
use mixnet_protocol::{Body, PacketId};
use mixnet_protocol::Body;
use nym_sphinx::{
chunking::{fragment::Fragment, reconstruction::MessageReconstructor},
message::{NymMessage, PaddedMessage},
Payload,
};
use tokio::{net::TcpStream, sync::Mutex};
use tokio::net::TcpStream;

use crate::MixnetClientError;

// Receiver accepts TCP connections to receive incoming payloads from the Mixnet.
pub struct Receiver {
node_address: SocketAddr,
ack_cache: Arc<Mutex<HashMap<SocketAddr, HashSet<PacketId>>>>,
}

impl Receiver {
pub fn new(
node_address: SocketAddr,
ack_cache: Arc<Mutex<HashMap<SocketAddr, HashSet<PacketId>>>>,
) -> Self {
Self {
node_address,
ack_cache,
}
pub fn new(node_address: SocketAddr) -> Self {
Self { node_address }
}

pub async fn run(
Expand All @@ -42,19 +30,16 @@ impl Receiver {
let Ok(socket) = TcpStream::connect(self.node_address).await else {
return Err(MixnetClientError::MixnetNodeConnectError);
};
let ack_cache = self.ack_cache.clone();

Ok(Self::message_stream(Box::pin(Self::fragment_stream(
socket, ack_cache,
socket,
))))
}

fn fragment_stream(
socket: TcpStream,
ack_cache: Arc<Mutex<HashMap<SocketAddr, HashSet<PacketId>>>>,
) -> impl Stream<Item = Result<Fragment, MixnetClientError>> + Send + 'static {
stream::unfold(socket, move |mut socket| {
let cache = ack_cache.clone();
async move {
let Ok(body) = Body::read(&mut socket).await else {
// TODO: Maybe this is a hard error and the stream is corrupted? In that case stop the stream
Expand All @@ -68,14 +53,6 @@ impl Receiver {
Body::FinalPayload(payload) => {
Some((Self::fragment_from_payload(payload), socket))
}
// Client should not receive AckResponse
Body::AckResponse(resp) => {
let mut mu = cache.lock().await;
if let Some(ids) = mu.get_mut(&resp.sender) {
ids.remove(&resp.id);
}
None
}
_ => unreachable!(),
}
}
Expand Down
100 changes: 25 additions & 75 deletions mixnet/client/src/sender.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
use std::{
collections::{HashMap, HashSet},
error::Error,
io::ErrorKind,
net::SocketAddr,
sync::Arc,
time::Duration,
};
use std::{error::Error, io::ErrorKind, net::SocketAddr, sync::Arc, time::Duration};

use mixnet_protocol::{Body, PacketId};
use mixnet_protocol::Body;
use mixnet_topology::MixnetTopology;
use mixnet_util::ConnectionPool;
use nym_sphinx::{
Expand All @@ -24,7 +17,6 @@ pub struct Sender<R: Rng> {
//TODO: handle topology update
topology: MixnetTopology,
pool: ConnectionPool,
ack_cache: Arc<Mutex<HashMap<SocketAddr, HashSet<PacketId>>>>,
max_retries: usize,
retry_delay: Duration,
rng: R,
Expand All @@ -42,7 +34,6 @@ impl<R: Rng> Sender<R> {
topology,
rng,
pool,
ack_cache: Arc::new(Mutex::new(HashMap::new())),
max_retries,
retry_delay,
}
Expand All @@ -66,13 +57,11 @@ impl<R: Rng> Sender<R> {
.into_iter()
.for_each(|(packet, first_node)| {
let pool = self.pool.clone();
let ack_cache = self.ack_cache.clone();
let max_retries = self.max_retries;
let retry_delay = self.retry_delay;
tokio::spawn(async move {
if let Err(e) = Self::send_packet(
&pool,
ack_cache,
max_retries,
retry_delay,
Box::new(packet),
Expand Down Expand Up @@ -131,7 +120,6 @@ impl<R: Rng> Sender<R> {

async fn send_packet(
pool: &ConnectionPool,
ack_cache: Arc<Mutex<HashMap<SocketAddr, HashSet<PacketId>>>>,
max_retries: usize,
retry_delay: Duration,
packet: Box<SphinxPacket>,
Expand All @@ -145,23 +133,12 @@ impl<R: Rng> Sender<R> {
let arc_socket = mu.clone();
let mut socket = mu.lock().await;
let bytes = packet.to_bytes();
let packet_id = PacketId::from_bytes(&bytes);
match Body::write_sphinx_packet_bytes(&mut *socket, &bytes).await {
Ok(_) => {
tracing::info!("Sent a Sphinx packet successuflly to the node: {addr}");
Self::insert_packet_id(&ack_cache, addr, packet_id).await;

tokio::spawn(async move {
Self::retry_backoff(
ack_cache,
max_retries,
retry_delay,
packet_id,
bytes,
addr,
arc_socket,
)
.await;
Self::retry_backoff(max_retries, retry_delay, bytes, addr, arc_socket).await;
});
Ok(())
}
Expand All @@ -178,13 +155,10 @@ impl<R: Rng> Sender<R> {
Body::write_sphinx_packet_bytes(&mut tcp, &bytes).await?;
*socket = tcp;
tracing::info!("Sent a Sphinx packet successuflly to the node: {addr}");
Self::insert_packet_id(&ack_cache, addr, packet_id).await;
tokio::spawn(async move {
Self::retry_backoff(
ack_cache,
max_retries,
retry_delay,
packet_id,
bytes,
addr,
arc_socket,
Expand All @@ -202,66 +176,42 @@ impl<R: Rng> Sender<R> {
}
}

async fn insert_packet_id(
ack_cache: &Arc<Mutex<HashMap<SocketAddr, HashSet<PacketId>>>>,
addr: SocketAddr,
packet_id: PacketId,
) {
ack_cache
.lock()
.await
.entry(addr)
.or_insert_with(HashSet::new)
.insert(packet_id);
}

async fn retry_backoff(
ack_cache: Arc<Mutex<HashMap<SocketAddr, HashSet<PacketId>>>>,
max_retries: usize,
retry_delay: Duration,
packet_id: PacketId,
sphinx_bytes: Vec<u8>,
peer_addr: SocketAddr,
socket: Arc<Mutex<TcpStream>>,
) {
for _ in 0..max_retries {
tokio::time::sleep(retry_delay).await;
let mu = ack_cache.lock().await;
if let Some(ids) = mu.get(&peer_addr) {
if ids.contains(&packet_id) {
tracing::debug!("retrying to send a Sphinx packet to the peer {peer_addr}");
let mut socket = socket.lock().await;
match Body::write_sphinx_packet_bytes(&mut *socket, &sphinx_bytes).await {
Ok(_) => {
tracing::info!(
"Sent a Sphinx packet successuflly to the node: {peer_addr}"
);
return;
}
Err(e) => {
if let Some(err) = e.downcast_ref::<std::io::Error>() {
match err.kind() {
ErrorKind::BrokenPipe
| ErrorKind::NotConnected
| ErrorKind::ConnectionAborted => {
tracing::warn!("broken pipe error while sending a Sphinx packet to the node: {peer_addr}, try to update the connection and retry");
// update the connection
match TcpStream::connect(peer_addr).await {
Ok(tcp) => {
*socket = tcp;
}
Err(e) => {
tracing::error!("failed to update the connection to the node: {peer_addr}, error: {e}");
}
}
tracing::debug!("retrying to send a Sphinx packet to the peer {peer_addr}");
let mut socket = socket.lock().await;
match Body::write_sphinx_packet_bytes(&mut *socket, &sphinx_bytes).await {
Ok(_) => {
tracing::info!("Sent a Sphinx packet successuflly to the node: {peer_addr}");
return;
}
Err(e) => {
if let Some(err) = e.downcast_ref::<std::io::Error>() {
match err.kind() {
ErrorKind::BrokenPipe
| ErrorKind::NotConnected
| ErrorKind::ConnectionAborted => {
tracing::warn!("broken pipe error while sending a Sphinx packet to the node: {peer_addr}, try to update the connection and retry");
// update the connection
match TcpStream::connect(peer_addr).await {
Ok(tcp) => {
*socket = tcp;
}
Err(e) => {
tracing::error!("failed to update the connection to the node: {peer_addr}, error: {e}");
}
_ => {}
}
}
_ => {}
}
}
} else {
return;
}
}
}
Expand Down
Loading

0 comments on commit 56806dd

Please sign in to comment.