-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add mixnet retry mechanism #386
Conversation
* Add mixnet service and overwatch app * remove #[tokio::main] --------- Co-authored-by: Youngjoon Lee <[email protected]>
* add a connection pool
* move mixnet listening into separate task * add exponential retry for insufficient peers in libp2p * fix logging
* Fix MutexGuard across await Holding a MutexGuard across an await point is not a good idea. Removing that solves the issues we had with the mixnet test * Make mixnode handle bodies coming from the same source concurrently (#372) --------- Co-authored-by: Youngjoon Lee <[email protected]>
We now wait after the call to 'subscribe' to give the network the time to register peers in the mesh before starting to publish messages
mixnet/client/src/sender.rs
Outdated
let arc_socket = mu.clone(); | ||
let mut socket = mu.lock().await; | ||
let body = Body::new_sphinx(packet); | ||
body.write(&mut *socket).await?; | ||
|
||
tracing::debug!("Sent a Sphinx packet successuflly to the node: {addr:?}"); | ||
let body = Body::SphinxPacket(packet); | ||
|
||
if let Err(e) = body.write(&mut *socket).await { | ||
tokio::spawn(async move { | ||
mixnet_protocol::retry_backoff(addr, max_retries, retry_delay, body, arc_socket) | ||
.await; | ||
}); | ||
return Err(e); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can simplify this code like a9be870, because
- we don't need to spawn a task here.
- this function shouldn't return Err if retry succeeded.
- this function should return Err if all retries were failed.
I think this rule should be applied to the other similar function in mixnet/node/src/lib.rs
. Also, the behaviour of these functions are not consistent. The function in mixnet/node/src/lib.rs
returns Ok even if the first try was failed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my understanding, if we remove the spawn, we will block and retry the single message until it is sent successfully or reaches the max_retries. Please correct me if I am wrong.
// update the connection | ||
if let Ok(tcp) = TcpStream::connect(peer_addr).await { | ||
*socket = tcp; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I keep having a feeling that we need to adopt the message passing model instead of the locking model. We could do it at next PRs (maybe after freezing the first version of testnet).
In the current model, this retry_backoff
function can be executed by multiple async tasks that share the same TcpStream. Then, many TcpStream::connect(..)
can be called concurrently. That might be not a big issue, but it would sometimes cause too many fds
errors or congestions. Even if not, only one of connections reconnected will survive in the pool eventually.
In the message passing model, we can have a single worker per TcpStream, which exposes a MPSC to users and manages the socket writes and the lifecycle of TcpStream. As I remember, Nym also uses this model (but I'm not sure). Then, it would be much easier to implement the retry/reconnect mechanism.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. Having a single-worker thread per TcpStream to handle messages and communicate with channels is cleaner and more elegant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with @youngjoon-lee and it's actually what is done pretty much everywhere else in the node
mixnet/client/src/sender.rs
Outdated
tracing::error!("Failed to send packet to {addr} with error: {e}. Retrying..."); | ||
return mixnet_protocol::retry_backoff(addr, max_retries, retry_delay, body, arc_socket) | ||
.await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we probably have to drop socket
first before calling retry_backoff
? as you did at the other part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, thank you!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems it works. It would be great if we have a test to see if the nomos network works even if several mixnodes are down.
} | ||
} | ||
} | ||
|
||
impl MixnetNodeConfig { | ||
const fn default_connection_pool_size() -> usize { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could they be consts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
serde(default = "...")
does not allow to use consts, have to use a fn.
| ErrorKind::Other => {} | ||
_ => { | ||
// update the connection | ||
if let Ok(tcp) = TcpStream::connect(peer_addr).await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the rational behind updating the connection upon receiving these errors? For example, Unsupported
means the action can never succeed, why retrying doing the same thing as before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a little bit confused here. It seems that the code will not update the conncection on those errors, do you mean we should early return when those errors happen?
ErrorKind::Unsupported
| ErrorKind::NotFound
| ErrorKind::PermissionDenied
| ErrorKind::Other => {}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fact that ErrorKind::Unsupported
is specifically mentioned suggests we have some specific appropriate handling for it and due to the nature of the error it would mean to stop retying.
It's acceptable to retry again if we don't want to differentiate errors, but if we differentiate then it only makes sense to do the right thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now we return early for ErrorKind::Other
, why is that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I misunderstood. so we only need to early return for Unsupported
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My point was that if we explicitly mention an ErrorKind
then we need to appropriately handle it. Depending on the specific case, it could be that Other
and Unsupported
might be treated differently (like this one).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense. 👍
mixnet/protocol/src/lib.rs
Outdated
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> { | ||
for idx in 0..max_retries { | ||
// backoff | ||
let wait = retry_delay * (idx as u32); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we might want an exponential backoff
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "fix PR comment" is not a very helpful commit message
This PR tries to solve #322 (comment). I introduced an ack mechanism to check whether we should retry to send the packet or not.