Skip to content

Commit

Permalink
doc: add examples of ack notify
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi committed Sep 30, 2024
1 parent f43ea21 commit 2632ab1
Show file tree
Hide file tree
Showing 4 changed files with 364 additions and 0 deletions.
90 changes: 90 additions & 0 deletions rumqttc/examples/ack_promise.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use tokio::task::{self, JoinSet};

use rumqttc::{AsyncClient, MqttOptions, QoS};
use std::error::Error;
use std::time::Duration;

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn Error>> {
pretty_env_logger::init();
// color_backtrace::install();

let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));

let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
task::spawn(async move {
loop {
let event = eventloop.poll().await;
match &event {
Ok(v) => {
println!("Event = {v:?}");
}
Err(e) => {
println!("Error = {e:?}");
}
}
}
});

// Subscribe and wait for broker acknowledgement
client
.subscribe("hello/world", QoS::AtMostOnce)
.await
.unwrap()
.await
.unwrap();
println!("Acknowledged Subscribe");

// Publish at all QoS levels and wait for broker acknowledgement
client
.publish("hello/world", QoS::AtMostOnce, false, vec![1; 1])
.await
.unwrap()
.await
.unwrap();
println!("Acknowledged Pub(1)");

client
.publish("hello/world", QoS::AtLeastOnce, false, vec![1; 2])
.await
.unwrap()
.await
.unwrap();
println!("Acknowledged Pub(2)");

client
.publish("hello/world", QoS::ExactlyOnce, false, vec![1; 3])
.await
.unwrap()
.await
.unwrap();
println!("Acknowledged Pub(3)");

// Publish and spawn wait for notification
let mut set = JoinSet::new();

let future = client
.publish("hello/world", QoS::AtMostOnce, false, vec![1; 1])
.await
.unwrap();
set.spawn(async { future.await.map(|_| 1) });

let future = client
.publish("hello/world", QoS::AtLeastOnce, false, vec![1; 2])
.await
.unwrap();
set.spawn(async { future.await.map(|_| 2) });

let future = client
.publish("hello/world", QoS::ExactlyOnce, false, vec![1; 3])
.await
.unwrap();
set.spawn(async { future.await.map(|_| 3) });

while let Some(res) = set.join_next().await {
println!("Acknowledged = {:?}", res?);
}

Ok(())
}
92 changes: 92 additions & 0 deletions rumqttc/examples/ack_promise_sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use flume::bounded;
use rumqttc::{Client, MqttOptions, QoS};
use std::error::Error;
use std::thread;
use std::time::Duration;

fn main() -> Result<(), Box<dyn Error>> {
pretty_env_logger::init();
// color_backtrace::install();

let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));

let (client, mut conn) = Client::new(mqttoptions, 10);
thread::spawn(move || {
for event in conn.iter() {
match &event {
Ok(v) => {
println!("Event = {v:?}");
}
Err(e) => {
println!("Error = {e:?}");
}
}
}
});

// Subscribe and wait for broker acknowledgement
client
.subscribe("hello/world", QoS::AtMostOnce)
.unwrap()
.blocking_recv()
.unwrap();
println!("Acknowledged Subscribe");

// Publish at all QoS levels and wait for broker acknowledgement
client
.publish("hello/world", QoS::AtMostOnce, false, vec![1; 1])
.unwrap()
.blocking_recv()
.unwrap();
println!("Acknowledged Pub(1)");

client
.publish("hello/world", QoS::AtLeastOnce, false, vec![1; 2])
.unwrap()
.blocking_recv()
.unwrap();
println!("Acknowledged Pub(2)");

client
.publish("hello/world", QoS::ExactlyOnce, false, vec![1; 3])
.unwrap()
.blocking_recv()
.unwrap();
println!("Acknowledged Pub(3)");

// Spawn threads for each publish, use channel to notify result
let (tx, rx) = bounded(1);

let future = client
.publish("hello/world", QoS::AtMostOnce, false, vec![1; 1])
.unwrap();
let tx_clone = tx.clone();
thread::spawn(move || {
let res = future.blocking_recv().map(|_| 1);
tx_clone.send(res).unwrap()
});

let future = client
.publish("hello/world", QoS::AtLeastOnce, false, vec![1; 2])
.unwrap();
let tx_clone = tx.clone();
thread::spawn(move || {
let res = future.blocking_recv().map(|_| 2);
tx_clone.send(res).unwrap()
});

let future = client
.publish("hello/world", QoS::ExactlyOnce, false, vec![1; 3])
.unwrap();
thread::spawn(move || {
let res = future.blocking_recv().map(|_| 3);
tx.send(res).unwrap()
});

while let Ok(res) = rx.recv() {
println!("Acknowledged = {:?}", res?);
}

Ok(())
}
90 changes: 90 additions & 0 deletions rumqttc/examples/ack_promise_v5.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use tokio::task::{self, JoinSet};

use rumqttc::v5::{mqttbytes::QoS, AsyncClient, MqttOptions};
use std::error::Error;
use std::time::Duration;

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn Error>> {
pretty_env_logger::init();
// color_backtrace::install();

let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));

let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
task::spawn(async move {
loop {
let event = eventloop.poll().await;
match &event {
Ok(v) => {
println!("Event = {v:?}");
}
Err(e) => {
println!("Error = {e:?}");
}
}
}
});

// Subscribe and wait for broker acknowledgement
client
.subscribe("hello/world", QoS::AtMostOnce)
.await
.unwrap()
.await
.unwrap();
println!("Acknowledged Subscribe");

// Publish at all QoS levels and wait for broker acknowledgement
client
.publish("hello/world", QoS::AtMostOnce, false, vec![1; 1])
.await
.unwrap()
.await
.unwrap();
println!("Acknowledged Pub(1)");

client
.publish("hello/world", QoS::AtLeastOnce, false, vec![1; 2])
.await
.unwrap()
.await
.unwrap();
println!("Acknowledged Pub(2)");

client
.publish("hello/world", QoS::ExactlyOnce, false, vec![1; 3])
.await
.unwrap()
.await
.unwrap();
println!("Acknowledged Pub(3)");

// Publish and spawn wait for notification
let mut set = JoinSet::new();

let future = client
.publish("hello/world", QoS::AtMostOnce, false, vec![1; 1])
.await
.unwrap();
set.spawn(async { future.await.map(|_| 1) });

let future = client
.publish("hello/world", QoS::AtLeastOnce, false, vec![1; 2])
.await
.unwrap();
set.spawn(async { future.await.map(|_| 2) });

let future = client
.publish("hello/world", QoS::ExactlyOnce, false, vec![1; 3])
.await
.unwrap();
set.spawn(async { future.await.map(|_| 3) });

while let Some(res) = set.join_next().await {
println!("Acknowledged = {:?}", res?);
}

Ok(())
}
92 changes: 92 additions & 0 deletions rumqttc/examples/ack_promise_v5_sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use flume::bounded;
use rumqttc::v5::{mqttbytes::QoS, Client, MqttOptions};
use std::error::Error;
use std::thread;
use std::time::Duration;

fn main() -> Result<(), Box<dyn Error>> {
pretty_env_logger::init();
// color_backtrace::install();

let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));

let (client, mut conn) = Client::new(mqttoptions, 10);
thread::spawn(move || {
for event in conn.iter() {
match &event {
Ok(v) => {
println!("Event = {v:?}");
}
Err(e) => {
println!("Error = {e:?}");
}
}
}
});

// Subscribe and wait for broker acknowledgement
client
.subscribe("hello/world", QoS::AtMostOnce)
.unwrap()
.blocking_recv()
.unwrap();
println!("Acknowledged Subscribe");

// Publish at all QoS levels and wait for broker acknowledgement
client
.publish("hello/world", QoS::AtMostOnce, false, vec![1; 1])
.unwrap()
.blocking_recv()
.unwrap();
println!("Acknowledged Pub(1)");

client
.publish("hello/world", QoS::AtLeastOnce, false, vec![1; 2])
.unwrap()
.blocking_recv()
.unwrap();
println!("Acknowledged Pub(2)");

client
.publish("hello/world", QoS::ExactlyOnce, false, vec![1; 3])
.unwrap()
.blocking_recv()
.unwrap();
println!("Acknowledged Pub(3)");

// Spawn threads for each publish, use channel to notify result
let (tx, rx) = bounded(1);

let future = client
.publish("hello/world", QoS::AtMostOnce, false, vec![1; 1])
.unwrap();
let tx_clone = tx.clone();
thread::spawn(move || {
let res = future.blocking_recv().map(|_| 1);
tx_clone.send(res).unwrap()
});

let future = client
.publish("hello/world", QoS::AtLeastOnce, false, vec![1; 2])
.unwrap();
let tx_clone = tx.clone();
thread::spawn(move || {
let res = future.blocking_recv().map(|_| 2);
tx_clone.send(res).unwrap()
});

let future = client
.publish("hello/world", QoS::ExactlyOnce, false, vec![1; 3])
.unwrap();
thread::spawn(move || {
let res = future.blocking_recv().map(|_| 3);
tx.send(res).unwrap()
});

while let Ok(res) = rx.recv() {
println!("Acknowledged = {:?}", res?);
}

Ok(())
}

0 comments on commit 2632ab1

Please sign in to comment.