-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Mixnet message handle #435
Conversation
c35ec81
to
f6476e8
Compare
mixnet/node/src/lib.rs
Outdated
if msg.retry_count < self.config.max_retries { | ||
msg.retry_count += 1; | ||
return Ok(Some(msg)); | ||
} | ||
|
||
tracing::error!( | ||
"failed to forward msg to {}: reach the maximum retries", | ||
msg.target, | ||
); | ||
Err(MixnetNodeError::Protocol(ProtocolError::ReachMaxRetries( | ||
self.config.max_retries, | ||
))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have this check twice at the beginning of the function
mixnet/node/src/lib.rs
Outdated
if let std::collections::hash_map::Entry::Vacant(e) = self.connections.entry(msg.target) { | ||
match TcpStream::connect(msg.target).await { | ||
Ok(tcp) => { | ||
e.insert(tcp); | ||
} | ||
Err(e) => { | ||
tracing::error!("failed to connect to {}: {e}", msg.target); | ||
return Ok(Some(msg)); | ||
} | ||
} | ||
} | ||
|
||
let tcp = self.connections.get_mut(&msg.target).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let tcp = self.connections.entry(msg.target).or_insert_with(|| {TcpStream::connect(...)}).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems cannot be done by this way, TcpStream::connect
is an async fn.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my bad, you're right!
mixnet/node/src/lib.rs
Outdated
/// Send a message to the remote node, | ||
/// return Ok(Some((Duration, Message))) if the message is not sent and the error is retryable | ||
/// return Ok(None) if the message is sent successfully | ||
/// return Err(e) if the message is not sent and the error is not retryable | ||
async fn send(&mut self, mut msg: TargetedMessage) -> Result<Option<TargetedMessage>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is hard to read even with a doc comment. I would suggest we create a new enum with more descriptive variantas
mixnet/node/src/lib.rs
Outdated
Ok(Some(msg)) => { | ||
let _ = self.message_tx.send(msg); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an example of why the signature of the function it's hard to read, also, a comment explaining this re-queues the message would be helpful, as it's not explicit where the other half of the channel is
mixnet/node/src/lib.rs
Outdated
} | ||
} | ||
} | ||
Err(e) => self.handle_connection_failure(e, target, body, retry_count), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: handle_connection_failure
and handle_retry
are quite similar, can we reduce duplication?
One idea could be to have something like this but you can get creative!
match self.get_connection(target).await.map(|..| body.write(tcp)) { // async map exists somewhere
Err(e) if matches!(err.kind(), ...) | Ok(Err(e)) if e.kind() != ErrorKind::Unsupported => {
// do the retry with a single shared function
}
Ok(Ok(_)) => happy case
// maybe other cases
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I don't think we need to make Message
as enum. I guess we can leave it as it was.
mixnet/node/src/lib.rs
Outdated
async fn handle_msg(&mut self, msg: Message) { | ||
self.send(msg.target, msg.body, msg.retry_count).await | ||
} | ||
|
||
async fn send(&mut self, target: SocketAddr, body: Body, retry_count: usize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we have the Message
struct, what do you think about having only the send
function which accepts a Message
? Also, I think we can simplify the signature of the handle_retry
function below in the same way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have changed the signature, but do not know why GitHub does not show the changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this works as intended, but I think some things can be polished.
mixnet/node/src/lib.rs
Outdated
pub struct Message { | ||
target: SocketAddr, | ||
body: Body, | ||
retry_count: usize, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the mixnet-related crates, I would like to suggest to have clearly separated terminologies for 'message' and 'packet'. As we've seen so far, we call the object, that the mixnet end-user wants to send, "message". The message is splitted into "packet"s by the mixclient, and these packets are sent to mixnodes. The mixnode forwards packets to other mixnodes, not messages (Only one additional case is that the mixnode also forwards "final payloads" to the destination mixnode).
That is, we don't need to use the terminology "message" in the mixnode layer. In this PR, these two or three terminologies are mixed. This will cause confusions in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the thread is PacketHandler but the struct is "message", which is misleading. I change all "message" to "packet".
mixnet/node/src/lib.rs
Outdated
.await | ||
.map_err(Into::into); | ||
.map_err(From::from) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Ok(body.write(...).await?)
should also work
mixnet/node/src/lib.rs
Outdated
} | ||
}, | ||
_ = tokio::signal::ctrl_c() => { | ||
tracing::info!("Shutting down packet forwarder thread..."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it's not a thread, it's a task
mixnet/node/src/lib.rs
Outdated
self.send(pkt).await; | ||
} else { | ||
// Channel closed, we should shutdown the packet forwarder thread | ||
tracing::info!("Channel closed, shutting down packet forwarder thread..."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same
mixnet/node/src/lib.rs
Outdated
if let Some(pkt) = pkt { | ||
self.send(pkt).await; | ||
} else { | ||
// Channel closed, we should shutdown the packet forwarder thread |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: not sure it's possible since PacketForwarder
internally stores the sending half
Refactor the mixnet fan-in. There are no locks anymore, only one message handle thread. All messages will be sent to the message thread handler, and the message thread will handle send and retry logic.