Skip to content

Commit

Permalink
fix up test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
GunnarMorrigan committed Nov 27, 2024
1 parent 1ce35da commit 1c914c8
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 65 deletions.
19 changes: 5 additions & 14 deletions mqrstt/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,7 @@ use crate::{
error::ClientError,
packets::{
mqtt_trait::PacketValidation,
DisconnectReasonCode,
Packet,
QoS,
// disconnect::{Disconnect, DisconnectProperties},
// publish::{Publish, PublishProperties},
// subscribe::{Subscribe, SubscribeProperties, Subscription},
// unsubscribe::{Unsubscribe, UnsubscribeProperties, UnsubscribeTopics},
{Disconnect, DisconnectProperties},
{Publish, PublishProperties},
{Subscribe, SubscribeProperties, SubscribeTopics},
DisconnectReasonCode, Packet, QoS, {Disconnect, DisconnectProperties}, {Publish, PublishProperties}, {Subscribe, SubscribeProperties, SubscribeTopics},
{Unsubscribe, UnsubscribeProperties, UnsubscribeTopics},
},
};
Expand All @@ -26,7 +17,7 @@ use crate::{
///
/// This object can be obtained by calling the builder functions on [`crate::NetworkBuilder`]
///
/// This client should be used in combindation with a handler [`crate::AsyncEventHandler`] or [`crate::AsyncEventHandlerMut`] to handle incoming messages.
/// This client should be used in combination with a handler [`crate::AsyncEventHandler`] to receive and send messages.
pub struct MqttClient {
/// Provides this client with an available packet id or waits on it.
available_packet_ids_r: Receiver<u16>,
Expand Down Expand Up @@ -212,7 +203,7 @@ impl MqttClient {
/// Creates a Publish packet with additional publish properties.
/// The packet is then asynchronously transferred to the Network stack for transmission.
///
/// Can be called with any payload that can be converted into [`Bytes`]
/// Can be called with any payload that can be converted into [`Vec<u8>`]
///
/// # Examples
/// ```
Expand Down Expand Up @@ -563,7 +554,7 @@ impl MqttClient {

/// Creates a Publish packet which is then transferred to the Network stack for transmission.
///
/// Can be called with any payload that can be converted into [`Bytes`]
/// Can be called with any payload that can be converted into [`Vec<u8>`]
///
/// This function blocks until the packet is queued for transmission
/// # Examples
Expand Down Expand Up @@ -618,7 +609,7 @@ impl MqttClient {
/// Creates a Publish packet with additional publish properties.
/// The packet is then transferred to the Network stack for transmission.
///
/// Can be called with any payload that can be converted into [`Bytes`]
/// Can be called with any payload that can be converted into [`Vec<u8>`]
///
/// This function blocks until the packet is queued for transmission
///
Expand Down
11 changes: 4 additions & 7 deletions mqrstt/src/event_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,23 +50,20 @@ pub mod example_handlers {

pub struct PingResp {
pub client: MqttClient,
pub ping_resp_received: AtomicU16,
pub ping_resp_received: u32,
}

impl PingResp {
pub fn new(client: MqttClient) -> Self {
Self {
client,
ping_resp_received: AtomicU16::new(0),
}
Self { client, ping_resp_received: 0 }
}
}

impl AsyncEventHandler for PingResp {
async fn handle(&mut self, event: packets::Packet) -> () {
use Packet::*;
if event == PingResp {
self.ping_resp_received.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
self.ping_resp_received += 1;
}
println!("Received packet: {}", event);
}
Expand All @@ -76,7 +73,7 @@ pub mod example_handlers {
fn handle(&mut self, event: Packet) {
use Packet::*;
if event == PingResp {
self.ping_resp_received.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
self.ping_resp_received += 1;
}
println!("Received packet: {}", event);
}
Expand Down
74 changes: 36 additions & 38 deletions mqrstt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
//!
//! Notes:
//! ----------------------------
//! - While the handler is processing a message the stream blocks. To prevent this, spawn a task in the handler or use [tokio::ConcurrentHandler].
//! - Handlers only get incoming packets
//! - Create a new connection when an error or disconnect is encountered
//!
Expand Down Expand Up @@ -83,7 +82,7 @@
//! async fn main() {
//! let (mut network, client) = NetworkBuilder
//! ::new_from_client_id("TokioTcpPingPongExample")
//! .tokio_sequential_network();
//! .tokio_network();
//!
//! // Construct a no op handler
//! let mut nop = NOP{};
Expand Down Expand Up @@ -206,7 +205,7 @@ where
/// let options = ConnectOptions::new("ExampleClient");
/// let (mut network, client) = mqrstt::NetworkBuilder::<(), tokio::net::TcpStream>
/// ::new_from_options(options)
/// .tokio_sequential_network();
/// .tokio_network();
/// ```
pub fn tokio_network(self) -> (tokio::Network<H, S>, MqttClient)
where
Expand Down Expand Up @@ -266,12 +265,12 @@ mod smol_lib_test {

use rand::Rng;

use crate::{example_handlers::PingPong, packets::QoS, ConnectOptions, NetworkBuilder};
use crate::{example_handlers::PingPong, packets::QoS, random_chars, ConnectOptions, NetworkBuilder};

#[test]
fn test_smol_tcp() {
smol::block_on(async {
let mut client_id: String = rand::thread_rng().sample_iter(&rand::distributions::Alphanumeric).take(7).map(char::from).collect();
let mut client_id: String = random_chars();
client_id += "_SmolTcpPingPong";
let options = ConnectOptions::new(client_id);

Expand Down Expand Up @@ -339,7 +338,7 @@ mod smol_lib_test {
);
assert!(n.is_ok());
let pingresp = n.unwrap();
assert_eq!(2, pingresp.ping_resp_received.load(std::sync::atomic::Ordering::Acquire));
assert_eq!(2, pingresp.ping_resp_received);
});
}

Expand Down Expand Up @@ -385,45 +384,45 @@ mod smol_lib_test {
#[cfg(feature = "tokio")]
#[cfg(test)]
mod tokio_lib_test {
use crate::example_handlers::PingPong;

use crate::packets::QoS;
use crate::example_handlers::PingResp;
use crate::random_chars;
use crate::ConnectOptions;

use std::{sync::Arc, time::Duration};

// #[tokio::test]
// async fn test_tokio_ping_req() {
// let mut client_id: String = rand::thread_rng().sample_iter(&rand::distributions::Alphanumeric).take(7).map(char::from).collect();
// client_id += "_TokioTcppingrespTest";
// let mut options = ConnectOptions::new(client_id);
// let keep_alive_interval = 5;
// options.set_keep_alive_interval(Duration::from_secs(keep_alive_interval));
use std::time::Duration;

// let wait_duration = options.get_keep_alive_interval() * 2 + options.get_keep_alive_interval() / 2;
#[tokio::test]
async fn test_tokio_ping_req() {
let mut client_id: String = random_chars();
client_id += "_TokioTcppingrespTest";
let mut options = ConnectOptions::new(client_id);
let keep_alive_interval = 5;
options.set_keep_alive_interval(Duration::from_secs(keep_alive_interval));

// let (mut network, client) = new_tokio(options);
let wait_duration = options.get_keep_alive_interval() * 2 + options.get_keep_alive_interval() / 2;

// let stream = tokio::net::TcpStream::connect(("broker.emqx.io", 1883)).await.unwrap();
let (mut network, client) = crate::NetworkBuilder::new_from_options(options).tokio_network();

// let pingresp = Arc::new(crate::test_handlers::PingResp::new(client.clone()));
let stream = tokio::net::TcpStream::connect(("broker.emqx.io", 1883)).await.unwrap();

// network.connect(stream, &mut pingresp).await.unwrap();
let mut pingresp = PingResp::new(client.clone());

// let (read, write) = network.split(pingresp.clone()).unwrap();
network.connect(stream, &mut pingresp).await.unwrap();

// let read_handle = tokio::task::spawn(read.run());
// let write_handle = tokio::task::spawn(write.run());
let network_handle = tokio::task::spawn(async move {
network.run(&mut pingresp).await;
pingresp
});

// tokio::time::sleep(wait_duration).await;
// client.disconnect().await.unwrap();
tokio::time::sleep(wait_duration).await;
client.disconnect().await.unwrap();

// tokio::time::sleep(Duration::from_secs(1)).await;
tokio::time::sleep(Duration::from_secs(1)).await;

// let (read_result, write_result) = tokio::join!(read_handle, write_handle);
// let (read_result, write_result) = (read_result.unwrap(), write_result.unwrap());
// assert!(write_result.is_ok());
// assert_eq!(2, pingresp.ping_resp_received.load(std::sync::atomic::Ordering::Acquire));
// }
let result = network_handle.await;
assert!(result.is_ok());
let result = result.unwrap();
assert_eq!(2, result.ping_resp_received);
}

#[cfg(all(feature = "tokio", target_family = "windows"))]
#[tokio::test]
Expand All @@ -435,11 +434,11 @@ mod tokio_lib_test {
let address = ("127.0.0.1", 2000);

let client_id: String = crate::random_chars() + "_TokioTcppingrespTest";
let options = ConnectOptions::new(client_id);
let options = crate::ConnectOptions::new(client_id);

let (n, _) = tokio::join!(
async move {
let (mut network, client) = NetworkBuilder::new_from_options(options).tokio_sequential_network();
let (mut network, client) = NetworkBuilder::new_from_options(options).tokio_network();

let stream = tokio::net::TcpStream::connect(address).await.unwrap();

Expand All @@ -456,8 +455,7 @@ mod tokio_lib_test {
);

if let ConnectionError::Io(err) = n.unwrap_err() {
assert_eq!(ErrorKind::ConnectionReset, err.kind());
assert_eq!("Connection reset by peer".to_string(), err.to_string());
assert_eq!(ErrorKind::UnexpectedEof, err.kind());
} else {
panic!();
}
Expand Down
21 changes: 15 additions & 6 deletions mqrstt/src/packets/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,14 +451,23 @@ mod tests {

use crate::packets::Packet;

use crate::tests::test_packets::{create_empty_publish_packet, disconnect_case, ping_req_case, ping_resp_case, publish_case, pubrel_case, pubrel_smallest_case};
use crate::tests::test_packets::*;

#[rstest::rstest]
// #[case(disconnect_case())]
// #[case(ping_req_case())]
// #[case(ping_resp_case())]
// #[case(publish_case())]
// #[case(pubrel_case())]
#[case(ping_req_case().1)]
#[case(ping_resp_case().1)]
#[case(connack_case().1)]
#[case(create_subscribe_packet(1))]
#[case(create_subscribe_packet(65335))]
#[case(create_puback_packet(1))]
#[case(create_puback_packet(65335))]
#[case(create_disconnect_packet())]
#[case(create_connack_packet(true))]
#[case(create_connack_packet(false))]
#[case(publish_packet_1())]
#[case(publish_packet_2())]
#[case(publish_packet_3())]
#[case(publish_packet_4())]
#[case(create_empty_publish_packet())]
fn test_write_read_write_read_cases(#[case] packet: Packet) {
let mut buffer = BytesMut::new();
Expand Down

0 comments on commit 1c914c8

Please sign in to comment.