Skip to content

Commit

Permalink
Exp backoff (#332)
Browse files Browse the repository at this point in the history
* move mixnet listening into separate task

* add exponential retry for insufficient peers in libp2p

* fix logging
  • Loading branch information
zeegomo authored Sep 5, 2023
1 parent 798a847 commit 2a0a6c9
Showing 1 changed file with 106 additions and 44 deletions.
150 changes: 106 additions & 44 deletions nomos-services/network/src/backends/libp2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use overwatch_rs::{overwatch::handle::OverwatchHandle, services::state::NoState}
use rand::{rngs::OsRng, thread_rng, Rng};
use serde::{Deserialize, Serialize};
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio_stream::StreamExt;

macro_rules! log_error {
($e:expr) => {
Expand Down Expand Up @@ -50,14 +51,29 @@ pub enum EventKind {
}

const BUFFER_SIZE: usize = 64;
const BACKOFF: u64 = 5;
const MAX_RETRY: usize = 3;

#[derive(Debug)]
#[non_exhaustive]
pub enum Command {
Connect(PeerId, Multiaddr),
Broadcast { topic: Topic, message: Box<[u8]> },
Broadcast {
topic: Topic,
message: Box<[u8]>,
},
Subscribe(Topic),
Unsubscribe(Topic),
Info { reply: oneshot::Sender<Libp2pInfo> },
Info {
reply: oneshot::Sender<Libp2pInfo>,
},
#[doc(hidden)]
// broadcast a message directly through gossipsub without mixnet
DirectBroadcastAndRetry {
topic: Topic,
message: Box<[u8]>,
retry: usize,
},
}

pub type Topic = String;
Expand Down Expand Up @@ -93,22 +109,49 @@ impl NetworkBackend for Libp2p {
type NetworkEvent = Event;

fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self {
let mut mixnet_client = MixnetClient::new(config.mixnet_client, OsRng);
let mixnet_client = MixnetClient::new(config.mixnet_client.clone(), OsRng);
let (commands_tx, mut commands_rx) = tokio::sync::mpsc::channel(BUFFER_SIZE);
let (events_tx, _) = tokio::sync::broadcast::channel(BUFFER_SIZE);
let libp2p = Self {
events_tx: events_tx.clone(),
commands_tx,
};
overwatch_handle.runtime().spawn(async move {
use tokio_stream::StreamExt;

let cmd_tx = commands_tx.clone();
overwatch_handle.runtime().spawn(async move {
let Ok(mut stream) = mixnet_client.run().await else {
tracing::error!("Could not quickstart mixnet stream");
return;
};

while let Some(result) = stream.next().await {
match result {
Ok(msg) => {
tracing::debug!("receiving message from mixnet client");
let Ok(MixnetMessage { topic, message }) = MixnetMessage::from_bytes(&msg)
else {
tracing::error!(
"failed to deserialize json received from mixnet client"
);
continue;
};

cmd_tx
.send(Command::DirectBroadcastAndRetry {
topic,
message,
retry: 0,
})
.await
.unwrap_or_else(|_| tracing::error!("could not schedule broadcast"));
}
Err(e) => {
todo!("Handle mixclient error: {e}");
}
}
}
});
let cmd_tx = commands_tx.clone();
let notify = events_tx.clone();
overwatch_handle.runtime().spawn(async move {
let mut swarm = Swarm::build(&config.inner).unwrap();
let mut mixnet_client = MixnetClient::new(config.mixnet_client, OsRng);
loop {
tokio::select! {
Some(event) = swarm.next() => {
Expand All @@ -119,7 +162,7 @@ impl NetworkBackend for Libp2p {
message,
})) => {
tracing::debug!("Got message with id: {id} from peer: {peer_id}");
log_error!(events_tx.send(Event::Message(message)));
log_error!(notify.send(Event::Message(message)));
}
SwarmEvent::ConnectionEstablished {
peer_id,
Expand Down Expand Up @@ -179,45 +222,18 @@ impl NetworkBackend for Libp2p {
};
log_error!(reply.send(info));
}
};
}
Some(result) = stream.next() => {
match result {
Ok(msg) => {
tracing::debug!("receiving message from mixnet client");
let Ok(MixnetMessage { topic, message }) = MixnetMessage::from_bytes(&msg) else {
tracing::error!("failed to deserialize msg received from mixnet client");
continue;
};

match swarm.broadcast(&topic, message.to_vec()) {
Ok(id) => {
tracing::debug!("broadcasted message with id: {id} tp topic: {topic}");
}
Err(e) => {
tracing::error!("failed to broadcast message to topic: {topic} {e:?}");
}
}

// self-notification because libp2p doesn't do it
if swarm.is_subscribed(&topic) {
log_error!(events_tx.send(Event::Message(Message {
source: None,
data: message.into(),
sequence_number: None,
topic: Swarm::topic_hash(&topic),
})));
}
},
Err(e) => {
todo!("Handle mixclient error: {e}");
Command::DirectBroadcastAndRetry { topic, message, retry } => {
broadcast_and_retry(topic, message, retry, cmd_tx.clone(), &mut swarm, notify.clone()).await;
}
}
};
}
}
}
});
libp2p
Self {
events_tx,
commands_tx,
}
}

async fn process(&self, msg: Self::Message) {
Expand Down Expand Up @@ -246,6 +262,52 @@ fn random_delay(range: &Range<Duration>) -> Duration {
thread_rng().gen_range(range.start, range.end)
}

async fn broadcast_and_retry(
topic: Topic,
message: Box<[u8]>,
retry: usize,
commands_tx: mpsc::Sender<Command>,
swarm: &mut Swarm,
events_tx: broadcast::Sender<Event>,
) {
tracing::debug!("broadcasting message to topic: {topic}");

let wait = BACKOFF.pow(retry as u32);

match swarm.broadcast(&topic, message.to_vec()) {
Ok(id) => {
tracing::debug!("broadcasted message with id: {id} tp topic: {topic}");
// self-notification because libp2p doesn't do it
if swarm.is_subscribed(&topic) {
log_error!(events_tx.send(Event::Message(Message {
source: None,
data: message.into(),
sequence_number: None,
topic: Swarm::topic_hash(&topic),
})));
}
}
Err(gossipsub::PublishError::InsufficientPeers) if retry < MAX_RETRY => {
tracing::error!("failed to broadcast message to topic due to insufficient peers, trying again in {wait:?}");

tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(wait)).await;
commands_tx
.send(Command::DirectBroadcastAndRetry {
topic,
message,
retry: retry + 1,
})
.await
.unwrap_or_else(|_| tracing::error!("could not schedule retry"));
});
}
Err(e) => {
tracing::error!("failed to broadcast message to topic: {topic} {e:?}");
}
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;
Expand Down

0 comments on commit 2a0a6c9

Please sign in to comment.