Skip to content

Commit

Permalink
Merge pull request #206 from zeromq/finalize-socket-recv-dealer-impl
Browse files Browse the repository at this point in the history
handle errors from the fair queue and the case when the queue is empty
  • Loading branch information
rgbkrk authored Dec 30, 2024
2 parents 270bdde + 2e4b540 commit 3ec38fe
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 26 deletions.
17 changes: 14 additions & 3 deletions src/dealer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::transport::AcceptStopHandle;
use crate::util::PeerIdentity;
use crate::{
CaptureSocket, Endpoint, MultiPeerBackend, Socket, SocketBackend, SocketEvent, SocketOptions,
SocketRecv, SocketSend, SocketType, ZmqMessage, ZmqResult,
SocketRecv, SocketSend, SocketType, ZmqError, ZmqMessage, ZmqResult,
};

use async_trait::async_trait;
Expand Down Expand Up @@ -66,8 +66,19 @@ impl SocketRecv for DealerSocket {
Some((_peer_id, Ok(Message::Message(message)))) => {
return Ok(message);
}
Some((_peer_id, _)) => todo!(),
None => todo!(),
Some((_peer_id, Ok(_))) => {
// Ignore non-message frames
continue;
}
Some((_peer_id, Err(e))) => {
// Handle potential errors from the fair queue
return Err(e.into());
}
None => {
// The fair queue is empty, which shouldn't happen in normal operation
// We could either wait for more messages or return an error
return Err(ZmqError::NoMessage);
}
};
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,8 @@ pub async fn proxy<Frontend: SocketSend + SocketRecv, Backend: SocketSend + Sock
}
backend.send(message).await?;
}
Err(_) => {
todo!()
Err(e) => {
return Err(e);
}
}
},
Expand All @@ -370,8 +370,8 @@ pub async fn proxy<Frontend: SocketSend + SocketRecv, Backend: SocketSend + Sock
}
frontend.send(message).await?;
}
Err(_) => {
todo!()
Err(e) => {
return Err(e);
}
}
}
Expand Down
16 changes: 12 additions & 4 deletions src/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::transport::AcceptStopHandle;
use crate::util::PeerIdentity;
use crate::{
Endpoint, MultiPeerBackend, Socket, SocketEvent, SocketOptions, SocketRecv, SocketType,
ZmqMessage, ZmqResult,
ZmqError, ZmqMessage, ZmqResult,
};

use async_trait::async_trait;
Expand Down Expand Up @@ -60,11 +60,19 @@ impl SocketRecv for PullSocket {
Some((_peer_id, Ok(Message::Message(message)))) => {
return Ok(message);
}
Some((_peer_id, Ok(msg))) => todo!("Unimplemented message: {:?}", msg),
Some((peer_id, Err(_))) => {
Some((_peer_id, Ok(_msg))) => {
// Ignore non-message frames (Command, Greeting) as PULL sockets are designed to only receive actual messages, not internal protocol frames.
continue;
}
Some((peer_id, Err(e))) => {
self.backend.peer_disconnected(&peer_id);
// Handle potential errors from the fair queue
return Err(e.into());
}
None => {
// The fair queue is empty, which shouldn't happen in normal operation
return Err(ZmqError::NoMessage);
}
None => todo!(),
};
}
}
Expand Down
14 changes: 11 additions & 3 deletions src/rep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ impl SocketRecv for RepSocket {
match self.fair_queue.next().await {
Some((peer_id, Ok(message))) => match message {
Message::Message(mut m) => {
assert!(m.len() > 1);
if m.len() < 2 {
return Err(ZmqError::Other("Invalid message format"));
}
let mut at = 1;
for (index, frame) in m.iter().enumerate() {
if frame.is_empty() {
Expand All @@ -163,9 +165,15 @@ impl SocketRecv for RepSocket {
self.current_request = Some(peer_id);
return Ok(data);
}
_ => todo!(),
Message::Greeting(_) | Message::Command(_) => {
// Ignore non-message frames. REP sockets should only process actual messages.
continue;
}
},
Some((_peer_id, _)) => todo!(),
Some((peer_id, Err(e))) => {
self.backend.peer_disconnected(&peer_id);
return Err(e.into());
}
None => return Err(ZmqError::NoMessage),
};
}
Expand Down
20 changes: 15 additions & 5 deletions src/req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,24 @@ impl SocketRecv for ReqSocket {
match self.current_request.take() {
Some(peer_id) => {
if let Some(mut peer) = self.backend.peers.get_mut(&peer_id) {
let message = peer.recv_queue.next().await;
match message {
match peer.recv_queue.next().await {
Some(Ok(Message::Message(mut m))) => {
assert!(m.len() > 1);
assert!(m.pop_front().unwrap().is_empty()); // Ensure that we have delimeter as first part
if m.len() < 2 {
return Err(ZmqError::Other(
"Invalid message format: too few frames",
));
}
if !m.pop_front().unwrap().is_empty() {
return Err(ZmqError::Other(
"Invalid message format: missing delimiter",
));
}
Ok(m)
}
Some(Ok(_)) => todo!(),
Some(Ok(_)) => {
// Non-message frames should be ignored by the caller
Err(ZmqError::Other("Received non-message frame"))
}
Some(Err(error)) => Err(error.into()),
None => Err(ZmqError::NoMessage),
}
Expand Down
17 changes: 14 additions & 3 deletions src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,22 @@ impl SocketRecv for RouterSocket {
message.push_front(peer_id.into());
return Ok(message);
}
Some((_peer_id, Ok(msg))) => todo!("Unimplemented message: {:?}", msg),
Some((peer_id, Err(_))) => {
Some((_peer_id, Ok(_msg))) => {
// todo: Log or handle other message types if needed
// We could take an approach of using `tracing` and have that be an optional feature
// tracing::warn!("Received unimplemented message type: {:?}", msg);
continue;
}
Some((peer_id, Err(_e))) => {
self.backend.peer_disconnected(&peer_id);
// We could take an approach of using `tracing` and have that be an optional feature
// tracing::error!("Error receiving message from peer {}: {:?}", peer_id, e);
continue;
}
None => {
// The fair queue is empty, which shouldn't happen in normal operation
return Err(ZmqError::NoMessage);
}
None => todo!(),
};
}
}
Expand Down
17 changes: 13 additions & 4 deletions src/sub.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::backend::Peer;
use crate::codec::{FramedIo, Message, ZmqFramedRead};
use crate::endpoint::Endpoint;
use crate::error::ZmqResult;
use crate::error::{ZmqError, ZmqResult};
use crate::fair_queue::FairQueue;
use crate::fair_queue::QueueInner;
use crate::message::ZmqMessage;
Expand Down Expand Up @@ -191,11 +191,20 @@ impl SocketRecv for SubSocket {
Some((_peer_id, Ok(Message::Message(message)))) => {
return Ok(message);
}
Some((_peer_id, Ok(msg))) => todo!("Unimplemented message: {:?}", msg),
Some((peer_id, Err(_))) => {
Some((_peer_id, Ok(_msg))) => {
// Ignore non-message frames. SUB sockets are designed to only receive actual messages,
// not internal protocol frames like commands or greetings.
continue;
}
Some((peer_id, Err(e))) => {
self.backend.peer_disconnected(&peer_id);
// Handle potential errors from the fair queue
return Err(e.into());
}
None => {
// The fair queue is empty, which shouldn't happen in normal operation
return Err(ZmqError::NoMessage);
}
None => todo!(),
}
}
}
Expand Down

0 comments on commit 3ec38fe

Please sign in to comment.