diff --git a/tests/message.rs b/tests/message.rs index 45d4f97..d46e1a9 100644 --- a/tests/message.rs +++ b/tests/message.rs @@ -1,45 +1,48 @@ -use bytes::Bytes; -use std::collections::vec_deque::VecDeque; -use std::convert::TryFrom; -use zeromq::ZmqMessage; +#[cfg(test)] +mod test { + use bytes::Bytes; + use std::collections::vec_deque::VecDeque; + use std::convert::TryFrom; + use zeromq::ZmqMessage; -#[test] -fn test_split_off() { - let mut frames = VecDeque::with_capacity(5); - frames.push_back(Bytes::from("id1")); - frames.push_back(Bytes::from("id2")); - frames.push_back(Bytes::from("")); - frames.push_back(Bytes::from("data1")); - frames.push_back(Bytes::from("data2")); - let mut m = ZmqMessage::try_from(frames).unwrap(); - let data = m.split_off(3); - assert_eq!(m.len(), 3); - assert_eq!(m.get(0), Some(&Bytes::from("id1"))); - assert_eq!(m.get(1), Some(&Bytes::from("id2"))); - assert_eq!(m.get(2), Some(&Bytes::from(""))); - assert_eq!(data.len(), 2); - assert_eq!(data.get(0), Some(&Bytes::from("data1"))); - assert_eq!(data.get(1), Some(&Bytes::from("data2"))); -} + #[test] + fn test_split_off() { + let mut frames = VecDeque::with_capacity(5); + frames.push_back(Bytes::from("id1")); + frames.push_back(Bytes::from("id2")); + frames.push_back(Bytes::from("")); + frames.push_back(Bytes::from("data1")); + frames.push_back(Bytes::from("data2")); + let mut m = ZmqMessage::try_from(frames).unwrap(); + let data = m.split_off(3); + assert_eq!(m.len(), 3); + assert_eq!(m.get(0), Some(&Bytes::from("id1"))); + assert_eq!(m.get(1), Some(&Bytes::from("id2"))); + assert_eq!(m.get(2), Some(&Bytes::from(""))); + assert_eq!(data.len(), 2); + assert_eq!(data.get(0), Some(&Bytes::from("data1"))); + assert_eq!(data.get(1), Some(&Bytes::from("data2"))); + } -#[test] -fn test_prepend() { - let mut frames = VecDeque::with_capacity(2); - frames.push_back(Bytes::from("data1")); - frames.push_back(Bytes::from("data2")); - let mut m = ZmqMessage::try_from(frames).unwrap(); + #[test] + fn test_prepend() { + let mut frames = VecDeque::with_capacity(2); + frames.push_back(Bytes::from("data1")); + frames.push_back(Bytes::from("data2")); + let mut m = ZmqMessage::try_from(frames).unwrap(); - let mut envelope_frames = VecDeque::with_capacity(3); - envelope_frames.push_back(Bytes::from("id1")); - envelope_frames.push_back(Bytes::from("id2")); - envelope_frames.push_back(Bytes::from("")); - let envelope = ZmqMessage::try_from(envelope_frames).unwrap(); + let mut envelope_frames = VecDeque::with_capacity(3); + envelope_frames.push_back(Bytes::from("id1")); + envelope_frames.push_back(Bytes::from("id2")); + envelope_frames.push_back(Bytes::from("")); + let envelope = ZmqMessage::try_from(envelope_frames).unwrap(); - m.prepend(&envelope); - assert_eq!(m.len(), 5); - assert_eq!(m.get(0), Some(&Bytes::from("id1"))); - assert_eq!(m.get(1), Some(&Bytes::from("id2"))); - assert_eq!(m.get(2), Some(&Bytes::from(""))); - assert_eq!(m.get(3), Some(&Bytes::from("data1"))); - assert_eq!(m.get(4), Some(&Bytes::from("data2"))); + m.prepend(&envelope); + assert_eq!(m.len(), 5); + assert_eq!(m.get(0), Some(&Bytes::from("id1"))); + assert_eq!(m.get(1), Some(&Bytes::from("id2"))); + assert_eq!(m.get(2), Some(&Bytes::from(""))); + assert_eq!(m.get(3), Some(&Bytes::from("data1"))); + assert_eq!(m.get(4), Some(&Bytes::from("data2"))); + } } diff --git a/tests/pub_sub.rs b/tests/pub_sub.rs index a8090e5..0fa32e2 100644 --- a/tests/pub_sub.rs +++ b/tests/pub_sub.rs @@ -1,104 +1,107 @@ -use zeromq::prelude::*; -use zeromq::Endpoint; -use zeromq::ZmqMessage; -use zeromq::__async_rt as async_rt; +#[cfg(test)] +mod test { + use zeromq::prelude::*; + use zeromq::Endpoint; + use zeromq::ZmqMessage; + use zeromq::__async_rt as async_rt; -use futures_channel::{mpsc, oneshot}; -use futures_util::{SinkExt, StreamExt}; -use std::time::Duration; + use futures_channel::{mpsc, oneshot}; + use futures_util::{SinkExt, StreamExt}; + use std::time::Duration; -#[async_rt::test] -async fn test_pub_sub_sockets() { - pretty_env_logger::try_init().ok(); + #[async_rt::test] + async fn test_pub_sub_sockets() { + pretty_env_logger::try_init().ok(); - async fn helper(bind_addr: &'static str) { - // We will join on these at the end to determine if any tasks we spawned - // panicked - let mut task_handles = Vec::new(); - let payload = chrono::Utc::now().to_rfc2822(); + async fn helper(bind_addr: &'static str) { + // We will join on these at the end to determine if any tasks we spawned + // panicked + let mut task_handles = Vec::new(); + let payload = chrono::Utc::now().to_rfc2822(); - let cloned_payload = payload.clone(); - let (server_stop_sender, mut server_stop) = oneshot::channel::<()>(); - let (has_bound_sender, has_bound) = oneshot::channel::(); - task_handles.push(async_rt::task::spawn(async move { - let mut pub_socket = zeromq::PubSocket::new(); - let bound_to = pub_socket - .bind(bind_addr) - .await - .unwrap_or_else(|e| panic!("Failed to bind to {}: {}", bind_addr, e)); - has_bound_sender - .send(bound_to) - .expect("channel was dropped"); + let cloned_payload = payload.clone(); + let (server_stop_sender, mut server_stop) = oneshot::channel::<()>(); + let (has_bound_sender, has_bound) = oneshot::channel::(); + task_handles.push(async_rt::task::spawn(async move { + let mut pub_socket = zeromq::PubSocket::new(); + let bound_to = pub_socket + .bind(bind_addr) + .await + .unwrap_or_else(|e| panic!("Failed to bind to {}: {}", bind_addr, e)); + has_bound_sender + .send(bound_to) + .expect("channel was dropped"); - loop { - if let Ok(Some(_)) = server_stop.try_recv() { - break; + loop { + if let Ok(Some(_)) = server_stop.try_recv() { + break; + } + + let s: String = cloned_payload.clone(); + let m = ZmqMessage::from(s); + pub_socket.send(m).await.expect("Failed to send"); + async_rt::task::sleep(Duration::from_millis(1)).await; } - let s: String = cloned_payload.clone(); - let m = ZmqMessage::from(s); - pub_socket.send(m).await.expect("Failed to send"); - async_rt::task::sleep(Duration::from_millis(1)).await; + let errs = pub_socket.close().await; + if !errs.is_empty() { + panic!("Could not unbind socket: {:?}", errs); + } + })); + // Block until the pub has finished binding + // TODO: ZMQ sockets should not care about this sort of ordering. + // See https://github.com/zeromq/zmq.rs/issues/73 + let bound_addr = has_bound.await.expect("channel was cancelled"); + if let Endpoint::Tcp(_host, port) = bound_addr.clone() { + assert_ne!(port, 0); } - let errs = pub_socket.close().await; - if !errs.is_empty() { - panic!("Could not unbind socket: {:?}", errs); - } - })); - // Block until the pub has finished binding - // TODO: ZMQ sockets should not care about this sort of ordering. - // See https://github.com/zeromq/zmq.rs/issues/73 - let bound_addr = has_bound.await.expect("channel was cancelled"); - if let Endpoint::Tcp(_host, port) = bound_addr.clone() { - assert_ne!(port, 0); - } + let (sub_results_sender, sub_results) = mpsc::channel(100); + for _ in 0..10 { + let mut cloned_sub_sender = sub_results_sender.clone(); + let cloned_payload = payload.clone(); + let cloned_bound_addr = bound_addr.to_string(); + task_handles.push(async_rt::task::spawn(async move { + let mut sub_socket = zeromq::SubSocket::new(); + sub_socket + .connect(&cloned_bound_addr) + .await + .unwrap_or_else(|_| panic!("Failed to connect to {}", bind_addr)); - let (sub_results_sender, sub_results) = mpsc::channel(100); - for _ in 0..10 { - let mut cloned_sub_sender = sub_results_sender.clone(); - let cloned_payload = payload.clone(); - let cloned_bound_addr = bound_addr.to_string(); - task_handles.push(async_rt::task::spawn(async move { - let mut sub_socket = zeromq::SubSocket::new(); - sub_socket - .connect(&cloned_bound_addr) - .await - .unwrap_or_else(|_| panic!("Failed to connect to {}", bind_addr)); + sub_socket.subscribe("").await.expect("Failed to subscribe"); - sub_socket.subscribe("").await.expect("Failed to subscribe"); + async_rt::task::sleep(std::time::Duration::from_millis(500)).await; - async_rt::task::sleep(std::time::Duration::from_millis(500)).await; + for _ in 0..10 { + let recv_message = sub_socket.recv().await.unwrap(); + let recv_payload = + String::from_utf8(recv_message.get(0).unwrap().to_vec()).unwrap(); + assert_eq!(cloned_payload, recv_payload); + cloned_sub_sender.send(()).await.unwrap(); + } + })); + } + drop(sub_results_sender); + let res_vec: Vec<()> = sub_results.collect().await; + assert_eq!(100, res_vec.len()); - for _ in 0..10 { - let recv_message = sub_socket.recv().await.unwrap(); - let recv_payload = - String::from_utf8(recv_message.get(0).unwrap().to_vec()).unwrap(); - assert_eq!(cloned_payload, recv_payload); - cloned_sub_sender.send(()).await.unwrap(); - } - })); + server_stop_sender.send(()).unwrap(); + for t in task_handles { + t.await.expect("Task failed unexpectedly!"); + } } - drop(sub_results_sender); - let res_vec: Vec<()> = sub_results.collect().await; - assert_eq!(100, res_vec.len()); - server_stop_sender.send(()).unwrap(); - for t in task_handles { - t.await.expect("Task failed unexpectedly!"); - } + let addrs = vec![ + "tcp://localhost:0", + "tcp://127.0.0.1:0", + "tcp://[::1]:0", + "tcp://127.0.0.1:0", + "tcp://localhost:0", + "tcp://127.0.0.1:0", + "tcp://[::1]:0", + "ipc://asdf.sock", + "ipc://anothersocket-asdf", + ]; + futures_util::future::join_all(addrs.into_iter().map(helper)).await; } - - let addrs = vec![ - "tcp://localhost:0", - "tcp://127.0.0.1:0", - "tcp://[::1]:0", - "tcp://127.0.0.1:0", - "tcp://localhost:0", - "tcp://127.0.0.1:0", - "tcp://[::1]:0", - "ipc://asdf.sock", - "ipc://anothersocket-asdf", - ]; - futures_util::future::join_all(addrs.into_iter().map(helper)).await; } diff --git a/tests/pub_sub_compliant.rs b/tests/pub_sub_compliant.rs index 96ea0af..21a5916 100644 --- a/tests/pub_sub_compliant.rs +++ b/tests/pub_sub_compliant.rs @@ -65,66 +65,71 @@ async fn run_our_subs(our_subs: Vec, num_to_recv: u32) { println!("Finished sub task"); } -#[async_rt::test] -async fn test_their_pub_our_sub() { - const N_SUBS: u8 = 16; - - async fn do_test(their_endpoint: &str) { - let (their_pub, bind_endpoint, their_monitor) = setup_their_pub(their_endpoint); - println!("Their pub was bound to {}", bind_endpoint); - - let our_subs = setup_our_subs(&bind_endpoint, N_SUBS).await; - for _ in 0..N_SUBS { - assert_eq!( - zmq2::SocketEvent::ACCEPTED, - get_monitor_event(&their_monitor).0 - ); - assert_eq!( - zmq2::SocketEvent::HANDSHAKE_SUCCEEDED, - get_monitor_event(&their_monitor).0 - ); - } - // This is necessary to avoid slow joiner problem - async_rt::task::sleep(Duration::from_millis(100)).await; - println!("Setup done"); - - const NUM_MSGS: u32 = 64; - - let their_join_handle = run_their_pub(their_pub, NUM_MSGS); - run_our_subs(our_subs, NUM_MSGS).await; - let their_pub = their_join_handle - .join() - .expect("Their pub terminated with an error!"); +#[cfg(test)] +mod test { + use super::*; + + #[async_rt::test] + async fn test_their_pub_our_sub() { + const N_SUBS: u8 = 16; + + async fn do_test(their_endpoint: &str) { + let (their_pub, bind_endpoint, their_monitor) = setup_their_pub(their_endpoint); + println!("Their pub was bound to {}", bind_endpoint); + + let our_subs = setup_our_subs(&bind_endpoint, N_SUBS).await; + for _ in 0..N_SUBS { + assert_eq!( + zmq2::SocketEvent::ACCEPTED, + get_monitor_event(&their_monitor).0 + ); + assert_eq!( + zmq2::SocketEvent::HANDSHAKE_SUCCEEDED, + get_monitor_event(&their_monitor).0 + ); + } + // This is necessary to avoid slow joiner problem + async_rt::task::sleep(Duration::from_millis(100)).await; + println!("Setup done"); + + const NUM_MSGS: u32 = 64; + + let their_join_handle = run_their_pub(their_pub, NUM_MSGS); + run_our_subs(our_subs, NUM_MSGS).await; + let their_pub = their_join_handle + .join() + .expect("Their pub terminated with an error!"); + + for _ in 0..N_SUBS { + assert_eq!( + get_monitor_event(&their_monitor).0, + zmq2::SocketEvent::DISCONNECTED + ); + } - for _ in 0..N_SUBS { + drop(their_pub); assert_eq!( get_monitor_event(&their_monitor).0, - zmq2::SocketEvent::DISCONNECTED + zmq2::SocketEvent::CLOSED ); } - drop(their_pub); - assert_eq!( - get_monitor_event(&their_monitor).0, - zmq2::SocketEvent::CLOSED - ); - } - - let endpoints = vec![ - "tcp://127.0.0.1:0", - "tcp://[::1]:0", - "ipc://asdf.sock", - "ipc://anothersocket-asdf", - ]; - for e in endpoints { - println!("Testing with endpoint {}", e); - do_test(e).await; - - // Unfortunately not all libzmq versions actually delete the ipc file. See - // https://github.com/zeromq/libzmq/issues/3387 - // So we will delete it ourselves. - if let Some(path) = e.strip_prefix("ipc://") { - std::fs::remove_file(path).expect("Failed to remove ipc file") + let endpoints = vec![ + "tcp://127.0.0.1:0", + "tcp://[::1]:0", + "ipc://asdf.sock", + "ipc://anothersocket-asdf", + ]; + for e in endpoints { + println!("Testing with endpoint {}", e); + do_test(e).await; + + // Unfortunately not all libzmq versions actually delete the ipc file. See + // https://github.com/zeromq/libzmq/issues/3387 + // So we will delete it ourselves. + if let Some(path) = e.strip_prefix("ipc://") { + std::fs::remove_file(path).expect("Failed to remove ipc file") + } } } } diff --git a/tests/req_rep.rs b/tests/req_rep.rs index 20b24bc..50f5240 100644 --- a/tests/req_rep.rs +++ b/tests/req_rep.rs @@ -7,61 +7,66 @@ use futures_util::StreamExt; use std::error::Error; use std::time::Duration; -#[async_rt::test] -async fn test_req_rep_sockets() -> Result<(), Box> { - pretty_env_logger::try_init().ok(); - - let mut rep_socket = zeromq::RepSocket::new(); - let monitor = rep_socket.monitor(); - let endpoint = rep_socket.bind("tcp://localhost:0").await?; - println!("Started rep server on {}", endpoint); - - let num_messages = 10; - - async_rt::task::spawn(async move { - let mut req_socket = zeromq::ReqSocket::new(); - req_socket - .connect(endpoint.to_string().as_str()) - .await - .unwrap(); - helpers::run_req_client(req_socket, num_messages) - .await - .unwrap(); - }); - - helpers::run_rep_server(rep_socket, num_messages).await?; - - let events: Vec<_> = monitor.collect().await; - assert_eq!(2, events.len(), "{:?}", &events); - - Ok(()) -} +#[cfg(test)] +mod test { -#[async_rt::test] -async fn test_many_req_rep_sockets() -> Result<(), Box> { - pretty_env_logger::try_init().ok(); + use super::*; + #[async_rt::test] + async fn test_req_rep_sockets() -> Result<(), Box> { + pretty_env_logger::try_init().ok(); - let mut rep_socket = zeromq::RepSocket::new(); - let endpoint = rep_socket.bind("tcp://localhost:0").await?; - println!("Started rep server on {}", endpoint); + let mut rep_socket = zeromq::RepSocket::new(); + let monitor = rep_socket.monitor(); + let endpoint = rep_socket.bind("tcp://localhost:0").await?; + println!("Started rep server on {}", endpoint); - let num_req_sockets = 100; - let num_messages = 100; + let num_messages = 10; - for i in 0..num_req_sockets { - let cloned_endpoint = endpoint.to_string(); async_rt::task::spawn(async move { - // yield for a moment to ensure that server has some time to open socket - async_rt::task::sleep(Duration::from_millis(100)).await; let mut req_socket = zeromq::ReqSocket::new(); - req_socket.connect(&cloned_endpoint).await.unwrap(); - helpers::run_req_client_with_id(req_socket, i, num_messages) + req_socket + .connect(endpoint.to_string().as_str()) + .await + .unwrap(); + helpers::run_req_client(req_socket, num_messages) .await .unwrap(); }); + + helpers::run_rep_server(rep_socket, num_messages).await?; + + let events: Vec<_> = monitor.collect().await; + assert_eq!(2, events.len(), "{:?}", &events); + + Ok(()) } - helpers::run_rep_server(rep_socket, num_req_sockets * num_messages).await?; + #[async_rt::test] + async fn test_many_req_rep_sockets() -> Result<(), Box> { + pretty_env_logger::try_init().ok(); - Ok(()) + let mut rep_socket = zeromq::RepSocket::new(); + let endpoint = rep_socket.bind("tcp://localhost:0").await?; + println!("Started rep server on {}", endpoint); + + let num_req_sockets = 100; + let num_messages = 100; + + for i in 0..num_req_sockets { + let cloned_endpoint = endpoint.to_string(); + async_rt::task::spawn(async move { + // yield for a moment to ensure that server has some time to open socket + async_rt::task::sleep(Duration::from_millis(100)).await; + let mut req_socket = zeromq::ReqSocket::new(); + req_socket.connect(&cloned_endpoint).await.unwrap(); + helpers::run_req_client_with_id(req_socket, i, num_messages) + .await + .unwrap(); + }); + } + + helpers::run_rep_server(rep_socket, num_req_sockets * num_messages).await?; + + Ok(()) + } } diff --git a/tests/req_rep_compliant.rs b/tests/req_rep_compliant.rs index 799c29f..5bdc934 100644 --- a/tests/req_rep_compliant.rs +++ b/tests/req_rep_compliant.rs @@ -1,85 +1,94 @@ mod compliance; use compliance::{get_monitor_event, setup_monitor}; -use std::convert::TryInto; -use zeromq::__async_rt as async_rt; -use zeromq::prelude::*; -use zeromq::ZmqMessage; +#[cfg(test)] +mod test { -/// Returns (socket, bound_endpoint, monitor) -fn setup_their_rep(bind_endpoint: &str) -> (zmq2::Socket, String, zmq2::Socket) { - let ctx = zmq2::Context::new(); - let their_rep = ctx.socket(zmq2::REP).expect("Couldn't make rep socket"); - their_rep.bind(bind_endpoint).expect("Failed to bind"); + use super::*; - let resolved_bind = their_rep.get_last_endpoint().unwrap().unwrap(); + use std::convert::TryInto; + use zeromq::__async_rt as async_rt; + use zeromq::prelude::*; + use zeromq::ZmqMessage; - let their_monitor = setup_monitor(&ctx, &their_rep, "inproc://their-monitor"); + /// Returns (socket, bound_endpoint, monitor) + fn setup_their_rep(bind_endpoint: &str) -> (zmq2::Socket, String, zmq2::Socket) { + let ctx = zmq2::Context::new(); + let their_rep = ctx.socket(zmq2::REP).expect("Couldn't make rep socket"); + their_rep.bind(bind_endpoint).expect("Failed to bind"); - (their_rep, resolved_bind, their_monitor) -} + let resolved_bind = their_rep.get_last_endpoint().unwrap().unwrap(); -async fn setup_our_req(bind_endpoint: &str) -> zeromq::ReqSocket { - let mut our_req = zeromq::ReqSocket::new(); - our_req - .connect(bind_endpoint) - .await - .expect("Failed to connect"); - our_req -} + let their_monitor = setup_monitor(&ctx, &their_rep, "inproc://their-monitor"); -fn run_their_rep(their_rep: zmq2::Socket, num_req: u32) -> std::thread::JoinHandle { - assert_eq!(their_rep.get_socket_type().unwrap(), zmq2::REP); - std::thread::spawn(move || { - for i in 0..num_req { - let request = their_rep.recv_msg(0).expect("Failed to recv"); - assert_eq!(request.as_str().unwrap(), format!("Request: {}", i)); + (their_rep, resolved_bind, their_monitor) + } + + async fn setup_our_req(bind_endpoint: &str) -> zeromq::ReqSocket { + let mut our_req = zeromq::ReqSocket::new(); + our_req + .connect(bind_endpoint) + .await + .expect("Failed to connect"); + our_req + } + + fn run_their_rep( + their_rep: zmq2::Socket, + num_req: u32, + ) -> std::thread::JoinHandle { + assert_eq!(their_rep.get_socket_type().unwrap(), zmq2::REP); + std::thread::spawn(move || { + for i in 0..num_req { + let request = their_rep.recv_msg(0).expect("Failed to recv"); + assert_eq!(request.as_str().unwrap(), format!("Request: {}", i)); + their_rep + .send(&format!("Reply: {}", i), 0) + .expect("Failed to send"); + } + println!("Finished pub task"); their_rep - .send(&format!("Reply: {}", i), 0) - .expect("Failed to send"); - } - println!("Finished pub task"); - their_rep - }) -} + }) + } -async fn run_our_req(our_req: &mut zeromq::ReqSocket, num_req: u32) { - for i in 0..num_req { - let ms: String = format!("Request: {}", i); - let message = ZmqMessage::from(ms); - our_req.send(message).await.expect("Failed to send"); - let reply = our_req.recv().await.expect("Failed to recv"); + async fn run_our_req(our_req: &mut zeromq::ReqSocket, num_req: u32) { + for i in 0..num_req { + let ms: String = format!("Request: {}", i); + let message = ZmqMessage::from(ms); + our_req.send(message).await.expect("Failed to send"); + let reply = our_req.recv().await.expect("Failed to recv"); - let reply_payload: String = reply.try_into().unwrap(); - println!("Received reply: {}", &reply_payload); - assert_eq!(reply_payload, format!("Reply: {}", i)); + let reply_payload: String = reply.try_into().unwrap(); + println!("Received reply: {}", &reply_payload); + assert_eq!(reply_payload, format!("Reply: {}", i)); + } } -} -#[async_rt::test] -async fn test_their_rep_our_req() { - let (their_rep, bind_endpoint, their_monitor) = setup_their_rep("tcp://127.0.0.1:0"); - println!("Their rep was bound to {}", bind_endpoint); + #[async_rt::test] + async fn test_their_rep_our_req() { + let (their_rep, bind_endpoint, their_monitor) = setup_their_rep("tcp://127.0.0.1:0"); + println!("Their rep was bound to {}", bind_endpoint); - let mut our_req = setup_our_req(&bind_endpoint).await; - assert_eq!( - zmq2::SocketEvent::ACCEPTED, - get_monitor_event(&their_monitor).0 - ); - assert_eq!( - zmq2::SocketEvent::HANDSHAKE_SUCCEEDED, - get_monitor_event(&their_monitor).0 - ); - println!("Setup done"); + let mut our_req = setup_our_req(&bind_endpoint).await; + assert_eq!( + zmq2::SocketEvent::ACCEPTED, + get_monitor_event(&their_monitor).0 + ); + assert_eq!( + zmq2::SocketEvent::HANDSHAKE_SUCCEEDED, + get_monitor_event(&their_monitor).0 + ); + println!("Setup done"); - const NUM_MSGS: u32 = 64; + const NUM_MSGS: u32 = 64; - let their_join_handle = run_their_rep(their_rep, NUM_MSGS); - run_our_req(&mut our_req, NUM_MSGS).await; - let _their_rep = their_join_handle - .join() - .expect("Their pub terminated with an error!"); - assert_eq!(our_req.close().await.len(), 0); - // TODO: check that socket disconnected via monitor when we implement that - // functionality + let their_join_handle = run_their_rep(their_rep, NUM_MSGS); + run_our_req(&mut our_req, NUM_MSGS).await; + let _their_rep = their_join_handle + .join() + .expect("Their pub terminated with an error!"); + assert_eq!(our_req.close().await.len(), 0); + // TODO: check that socket disconnected via monitor when we implement that + // functionality + } } diff --git a/tests/req_router_dealer_rep.rs b/tests/req_router_dealer_rep.rs index 3030785..dffcbd5 100644 --- a/tests/req_router_dealer_rep.rs +++ b/tests/req_router_dealer_rep.rs @@ -1,107 +1,112 @@ mod helpers; -use zeromq::__async_rt as async_rt; -use zeromq::prelude::*; - -use futures_util::StreamExt; -use std::error::Error; -use std::time::Duration; - -#[async_rt::test] -async fn test_req_router_dealer_rep_sockets() -> Result<(), Box> { - pretty_env_logger::try_init().ok(); - - let mut router_socket = zeromq::RouterSocket::new(); - let router_monitor = router_socket.monitor(); - let router_endpoint = router_socket.bind("tcp://localhost:0").await?; - - let mut dealer_socket = zeromq::DealerSocket::new(); - let dealer_monitor = dealer_socket.monitor(); - let dealer_endpoint = dealer_socket.bind("tcp://localhost:0").await?; - - let mut rep_socket = zeromq::RepSocket::new(); - let rep_monitor = rep_socket.monitor(); - rep_socket - .connect(dealer_endpoint.to_string().as_str()) - .await?; - - let num_messages = 10; - - async_rt::task::spawn(async move { - helpers::run_rep_server(rep_socket, num_messages) - .await - .unwrap(); - }); - - let req_task = async_rt::task::spawn(async move { - let mut req_socket = zeromq::ReqSocket::new(); - req_socket - .connect(router_endpoint.to_string().as_str()) - .await - .unwrap(); - helpers::run_req_client(req_socket, num_messages) - .await - .unwrap(); - }); - - helpers::run_proxy(router_socket, dealer_socket, 100).await; - - req_task.await.unwrap(); - - let router_events: Vec<_> = router_monitor.collect().await; - let dealer_events: Vec<_> = dealer_monitor.collect().await; - let rep_events: Vec<_> = rep_monitor.collect().await; - assert_eq!(2, router_events.len(), "{:?}", &router_events); - assert_eq!(2, dealer_events.len(), "{:?}", &dealer_events); - assert_eq!(1, rep_events.len(), "{:?}", &rep_events); - - Ok(()) -} +#[cfg(test)] +mod test { + + use super::helpers; + use zeromq::__async_rt as async_rt; + use zeromq::prelude::*; -#[async_rt::test] -async fn test_many_req_router_dealer_rep_sockets() -> Result<(), Box> { - pretty_env_logger::try_init().ok(); + use futures_util::StreamExt; + use std::error::Error; + use std::time::Duration; - let mut router_socket = zeromq::RouterSocket::new(); - let router_endpoint = router_socket.bind("tcp://localhost:0").await?; + #[async_rt::test] + async fn test_req_router_dealer_rep_sockets() -> Result<(), Box> { + pretty_env_logger::try_init().ok(); - let mut dealer_socket = zeromq::DealerSocket::new(); - let dealer_endpoint = dealer_socket.bind("tcp://localhost:0").await?; + let mut router_socket = zeromq::RouterSocket::new(); + let router_monitor = router_socket.monitor(); + let router_endpoint = router_socket.bind("tcp://localhost:0").await?; - let num_req_sockets = 100; - let num_messages = 100; + let mut dealer_socket = zeromq::DealerSocket::new(); + let dealer_monitor = dealer_socket.monitor(); + let dealer_endpoint = dealer_socket.bind("tcp://localhost:0").await?; - async_rt::task::spawn(async move { let mut rep_socket = zeromq::RepSocket::new(); + let rep_monitor = rep_socket.monitor(); rep_socket .connect(dealer_endpoint.to_string().as_str()) - .await - .unwrap(); - helpers::run_rep_server(rep_socket, num_req_sockets * num_messages) - .await - .unwrap(); - }); - - let mut req_tasks = Vec::with_capacity(num_req_sockets as usize); - for i in 0..num_req_sockets { - let router_endpoint_clone = router_endpoint.to_string(); + .await?; + + let num_messages = 10; + + async_rt::task::spawn(async move { + helpers::run_rep_server(rep_socket, num_messages) + .await + .unwrap(); + }); + let req_task = async_rt::task::spawn(async move { - // yield for a moment to ensure that server has some time to open socket - async_rt::task::sleep(Duration::from_millis(100)).await; let mut req_socket = zeromq::ReqSocket::new(); - req_socket.connect(&router_endpoint_clone).await.unwrap(); - helpers::run_req_client_with_id(req_socket, i, num_messages) + req_socket + .connect(router_endpoint.to_string().as_str()) + .await + .unwrap(); + helpers::run_req_client(req_socket, num_messages) .await .unwrap(); }); - req_tasks.push(req_task); - } - helpers::run_proxy(router_socket, dealer_socket, 5000).await; + helpers::run_proxy(router_socket, dealer_socket, 100).await; - for req_task in req_tasks { req_task.await.unwrap(); + + let router_events: Vec<_> = router_monitor.collect().await; + let dealer_events: Vec<_> = dealer_monitor.collect().await; + let rep_events: Vec<_> = rep_monitor.collect().await; + assert_eq!(2, router_events.len(), "{:?}", &router_events); + assert_eq!(2, dealer_events.len(), "{:?}", &dealer_events); + assert_eq!(1, rep_events.len(), "{:?}", &rep_events); + + Ok(()) } - Ok(()) + #[async_rt::test] + async fn test_many_req_router_dealer_rep_sockets() -> Result<(), Box> { + pretty_env_logger::try_init().ok(); + + let mut router_socket = zeromq::RouterSocket::new(); + let router_endpoint = router_socket.bind("tcp://localhost:0").await?; + + let mut dealer_socket = zeromq::DealerSocket::new(); + let dealer_endpoint = dealer_socket.bind("tcp://localhost:0").await?; + + let num_req_sockets = 100; + let num_messages = 100; + + async_rt::task::spawn(async move { + let mut rep_socket = zeromq::RepSocket::new(); + rep_socket + .connect(dealer_endpoint.to_string().as_str()) + .await + .unwrap(); + helpers::run_rep_server(rep_socket, num_req_sockets * num_messages) + .await + .unwrap(); + }); + + let mut req_tasks = Vec::with_capacity(num_req_sockets as usize); + for i in 0..num_req_sockets { + let router_endpoint_clone = router_endpoint.to_string(); + let req_task = async_rt::task::spawn(async move { + // yield for a moment to ensure that server has some time to open socket + async_rt::task::sleep(Duration::from_millis(100)).await; + let mut req_socket = zeromq::ReqSocket::new(); + req_socket.connect(&router_endpoint_clone).await.unwrap(); + helpers::run_req_client_with_id(req_socket, i, num_messages) + .await + .unwrap(); + }); + req_tasks.push(req_task); + } + + helpers::run_proxy(router_socket, dealer_socket, 5000).await; + + for req_task in req_tasks { + req_task.await.unwrap(); + } + + Ok(()) + } }