Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: oneshot channel returns Pkid #806

Closed
wants to merge 17 commits into from
Prev Previous commit
Next Next commit
doc: v5 example
Devdutt Shenoi committed Feb 23, 2024
commit 4999ae3c83a1abe9173d4cb50f53ccc631942ca0
6 changes: 3 additions & 3 deletions rumqttc/examples/pkid_promise.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use futures_util::stream::StreamExt;
use tokio::{
select,
task::{self, JoinSet},
select
};
use tokio_util::time::DelayQueue;
use futures_util::stream::StreamExt;

use rumqttc::{AsyncClient, MqttOptions, QoS};
use std::error::Error;
@@ -51,7 +51,7 @@ async fn requests(client: AsyncClient) {
}

loop {
select!{
select! {
Some(i) = queue.next() => {
joins.spawn(
client
69 changes: 69 additions & 0 deletions rumqttc/examples/pkid_promise_v5.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use futures_util::stream::StreamExt;
use tokio::{
select,
task::{self, JoinSet},
};
use tokio_util::time::DelayQueue;

use rumqttc::v5::{mqttbytes::QoS, AsyncClient, MqttOptions};
use std::error::Error;
use std::time::Duration;

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn Error>> {
pretty_env_logger::init();
// color_backtrace::install();

let mut mqttoptions = MqttOptions::new("test-1", "broker.emqx.io", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));

let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
task::spawn(async move {
requests(client).await;
});

loop {
let event = eventloop.poll().await;
match &event {
Ok(v) => {
println!("Event = {v:?}");
}
Err(e) => {
println!("Error = {e:?}");
return Ok(());
}
}
}
}

async fn requests(client: AsyncClient) {
let mut joins = JoinSet::new();
joins.spawn(
client
.subscribe("hello/world", QoS::AtMostOnce)
.await
.unwrap(),
);

let mut queue = DelayQueue::new();
for i in 1..=10 {
queue.insert(i as usize, Duration::from_secs(i));
}

loop {
select! {
Some(i) = queue.next() => {
joins.spawn(
client
.publish("hello/world", QoS::ExactlyOnce, false, vec![1; i.into_inner()])
.await
.unwrap(),
);
}
Some(Ok(Ok(pkid))) = joins.join_next() => {
println!("Pkid: {:?}", pkid);
}
else => break,
}
}
}