Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

publish / subscribe / unsubscribe methods return a notice future that waits publish (QoS0), PubAck (QoS1) and PubRec (QoS2) #851

Open
wants to merge 14 commits into
base: acked
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion rumqttc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* `size()` method on `Packet` calculates size once serialized.
* `read()` and `write()` methods on `Packet`.
* `ConnectionAborted` variant on `StateError` type to denote abrupt end to a connection
* `set_session_expiry_interval` and `session_expiry_interval` methods on `MqttOptions`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the extremely late response, if you are still interested in contributing this change, please do open a separate PR. Currently this PR will be hard to accept given it has deviated from the issue focus.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For session expiry interval support, it's already done in #854.


### Changed

Expand All @@ -27,6 +28,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

* Validate filters while creating subscription requests.
* Make v4::Connect::write return correct value
* Resume session only if broker sends `CONNACK` with `session_present == 1`.

### Security

Expand Down Expand Up @@ -56,7 +58,7 @@ To update your code simply remove `Key::ECC()` or `Key::RSA()` from the initiali
`rusttls-pemfile` to `2.0.0`, `async-tungstenite` to `0.24.0`, `ws_stream_tungstenite` to `0.12.0`
and `http` to `1.0.0`. This is a breaking change as types from some of these crates are part of
the public API.
- `publish` / `subscribe` / `unsubscribe` methods on `AsyncClient` and `Client` now return a `PkidPromise` which resolves into the identifier value chosen by the `EventLoop` when handling the packet.
- `publish` / `subscribe` / `unsubscribe` methods on `AsyncClient` and `Client` now return a `NoticeFuture` which is noticed after the packet is released (sent in QoS0, ACKed in QoS1, COMPed in QoS2).

### Deprecated

Expand Down
1 change: 1 addition & 0 deletions rumqttc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ bytes = "1.5"
log = "0.4"
flume = { version = "0.11", default-features = false, features = ["async"] }
thiserror = "1"
linked-hash-map = "0.5"

# Optional
# rustls
Expand Down
23 changes: 19 additions & 4 deletions rumqttc/examples/ack_notif.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use tokio::task::{self, JoinSet};
use tokio::{task::{self, JoinSet}, time};

use rumqttc::{AsyncClient, MqttOptions, QoS};
use std::error::Error;
Expand Down Expand Up @@ -35,24 +35,38 @@ async fn main() -> Result<(), Box<dyn Error>> {
.wait_async()
.await
.unwrap();
client
.subscribe("hello/world", QoS::AtLeastOnce)
.await
.unwrap()
.wait_async()
.await
.unwrap();
client
.subscribe("hello/world", QoS::ExactlyOnce)
.await
.unwrap()
.wait_async()
.await
.unwrap();

// Publish and spawn wait for notification
let mut set = JoinSet::new();

let future = client
.publish("hello/world", QoS::AtMostOnce, false, vec![1; 1024])
.publish("hello/world", QoS::AtMostOnce, false, vec![1; 1])
.await
.unwrap();
set.spawn(future.wait_async());

let future = client
.publish("hello/world", QoS::AtLeastOnce, false, vec![1; 1024])
.publish("hello/world", QoS::AtLeastOnce, false, vec![1; 2])
.await
.unwrap();
set.spawn(future.wait_async());

let future = client
.publish("hello/world", QoS::ExactlyOnce, false, vec![1; 1024])
.publish("hello/world", QoS::ExactlyOnce, false, vec![1; 3])
.await
.unwrap();
set.spawn(future.wait_async());
Expand All @@ -61,5 +75,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
println!("Acknoledged = {:?}", res?);
}

time::sleep(Duration::from_secs(6)).await;
Ok(())
}
80 changes: 80 additions & 0 deletions rumqttc/examples/ack_notif_v5.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use tokio::{task::{self, JoinSet}, time};

use rumqttc::v5::{AsyncClient, MqttOptions, mqttbytes::QoS};
use std::error::Error;
use std::time::Duration;

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn Error>> {
pretty_env_logger::init();
// color_backtrace::install();

let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));

let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
task::spawn(async move {
loop {
let event = eventloop.poll().await;
match &event {
Ok(v) => {
println!("Event = {v:?}");
}
Err(e) => {
println!("Error = {e:?}");
}
}
}
});

// Subscribe and wait for broker acknowledgement
client
.subscribe("hello/world", QoS::AtMostOnce)
.await
.unwrap()
.wait_async()
.await
.unwrap();
client
.subscribe("hello/world", QoS::AtLeastOnce)
.await
.unwrap()
.wait_async()
.await
.unwrap();
client
.subscribe("hello/world", QoS::ExactlyOnce)
.await
.unwrap()
.wait_async()
.await
.unwrap();

// Publish and spawn wait for notification
let mut set = JoinSet::new();

let future = client
.publish("hello/world", QoS::AtMostOnce, false, vec![1; 1])
.await
.unwrap();
set.spawn(future.wait_async());

let future = client
.publish("hello/world", QoS::AtLeastOnce, false, vec![1; 2])
.await
.unwrap();
set.spawn(future.wait_async());

let future = client
.publish("hello/world", QoS::ExactlyOnce, false, vec![1; 3])
.await
.unwrap();
set.spawn(future.wait_async());

while let Some(res) = set.join_next().await {
println!("Acknoledged = {:?}", res?);
}

time::sleep(Duration::from_secs(6)).await;
Ok(())
}
1 change: 1 addition & 0 deletions rumqttc/examples/async_manual_acks_v5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ fn create_conn() -> (AsyncClient, EventLoop) {
let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1884);
mqttoptions
.set_keep_alive(Duration::from_secs(5))
.set_session_expiry_interval(u32::MAX.into())
.set_manual_acks(true)
.set_clean_start(false);

Expand Down
70 changes: 0 additions & 70 deletions rumqttc/examples/pkid_promise.rs

This file was deleted.

70 changes: 0 additions & 70 deletions rumqttc/examples/pkid_promise_v5.rs

This file was deleted.

Loading