Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

publish / subscribe / unsubscribe methods return a notice future that waits publish (QoS0), PubAck (QoS1) and PubRec (QoS2) #851

Open
wants to merge 14 commits into
base: acked
Choose a base branch
from

Conversation

xiaocq2001
Copy link
Contributor

@xiaocq2001 xiaocq2001 commented Apr 26, 2024

Type of change

Added reason code for PubAck/Rec/Comp.
Fix notification flow for NoticeFuture and Eventloop.
See discuss #805

Checklist:

  • Formatted with cargo fmt
  • Make an entry to CHANGELOG.md if it's relevant to the users of the library. If it's not relevant mention why.

@xiaocq2001 xiaocq2001 changed the title Acked POC improvements publish / subscribe / unsubscribe methods return a notice future that waits publish (QoS0), PubAck (QoS1) and PubComp (QoS2) May 6, 2024
@xiaocq2001 xiaocq2001 marked this pull request as ready for review May 14, 2024 01:39
@xiaocq2001 xiaocq2001 changed the title publish / subscribe / unsubscribe methods return a notice future that waits publish (QoS0), PubAck (QoS1) and PubComp (QoS2) publish / subscribe / unsubscribe methods return a notice future that waits publish (QoS0), PubAck (QoS1) and PubRec (QoS2) Jun 3, 2024
Copy link
Contributor

@FSMaxB FSMaxB left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question

Comment on lines 325 to 328
// Notify user about the publish, pubrel and pubcomp will be handled in background
if let Some(tx) = tx {
tx.success();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is "handled in the background" supposed to mean in this context.

From what I understand, this makes the NoticeFuture complete on PubRec instead of PubComp for ExactlyOnce QoS, which is the wrong behavior from what I understand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO the notice is to confirm that the message actually received by broker.
So in QoS0 it's noticed on message sent (since there is no ACK), in QoS1 it's noticed on ACK reception, in QoS2 it's noticed on REC which indicates broker accept the message. The PubRel and PubComp do not affect the broker reception of message, but some way to let broker release QoS2 message.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QoS2 it's noticed on REC which indicates broker accept the message. The PubRel and PubComp do not affect the broker reception of message, but some way to let broker release QoS2 message.

The original code by @de-sh notifies on PubComp not on PubRec for QoS2 which I believe to be the correct behavior. Because when using this feature, I want to be notified once I'm certain the message will fulfil it's QoS. For QoS2, the message is only sent after sending PubRel to the broker, which the broker then acknowledges by responding with PubComp. If the client dies after receiving PubRec but before the broker received PubRel, the message might get lost and the NoticeFuture shouldn't resolve for a message that might not fulfil it's QoS, which for 1 and 2 means it not getting lost.

At least that's my take based on my understanding of how MQTT QoS works and my take on the NoticeFuture feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is we can get notice on message ownership transfer, which is mentioned in the section 4.4, "When its original sender receives the PUBREC packet, ownership of the Application Message is transferred to the receiver". But I am totally fine to be noticed by PUBCOMP, where message ownership definitely transferred.

Copy link
Contributor Author

@xiaocq2001 xiaocq2001 Jun 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW I see your works on migrating this ack wait feature to main in #883, hoping the PR and discuss here can help.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is we can get notice on message ownership transfer, which is mentioned in the section 4.4

Yeah, but that is transferring it to the broker, right? My understanding was what is described in Method A in this non-normative example in the spec: https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Figure_4.3_%E2%80%93 In that case the message is only delivered onwards after the receiver has received PUBREL. (although for Method B it sends it with the PUBLISH already, so I'm kind of confused)

BTW I see your works on migrating this ack wait feature to main in #883, hoping the PR and discuss here can help.

Yeah, I was looking at this PR for reference, but basically wanted to preserve the original commits for easier rebasing/updating with main and making it easier to diff to main. Also there were a few more commits in the acked branch since you made this PR, those are also included there.

commit 518773d
Author: CQ Xiao <[email protected]>
Date:   Tue May 21 14:47:32 2024 +0800

    rumqttc: resume session only if CONNACK with session present 1 (bytebeamio#864)

    * Check if session present to restore pending publishes.

    * Modify changelog.

    * remove changes that don't seem to be related

    * refactor: improve readability

    * feat: apply changes to v4

    * Remove session_expiry_interval related code.

    * test: set clean session

    * test: broker saved session

    * test: fix resume reconnect

    ---------

    Co-authored-by: Devdutt Shenoi <[email protected]>
    Co-authored-by: Devdutt Shenoi <[email protected]>

commit 67d9ca7
Author: CQ Xiao <[email protected]>
Date:   Thu May 16 23:05:10 2024 +0800

    feat(rumqttc): set `session_expiry_interval` in v5 (bytebeamio#854)

    Co-authored-by: Devdutt Shenoi <[email protected]>

commit 98997d1
Author: CQ Xiao <[email protected]>
Date:   Wed Jul 3 18:13:54 2024 +0800

    HashMap -> VecDeque, cleanup
@xiaocq2001
Copy link
Contributor Author

xiaocq2001 commented Jul 4, 2024

Some updates for reference:

  • Sync with latest acked
  • HashMap -> VecDeque
    • To save memory
    • VecDeque is fast to push_back and pop_first
  • Publish NoticeTx
  • Notify on QoS2 PUBCOMP
  • Improve example to delay a while to see all logs

@KillingJacky
Copy link

Hi @xiaocq2001 , thanks for your work, this feature is exactly what I need for my current project. I can't wait for the upstream, and your PR seems quite promising, so how mature is your PR? If I can spend some effort to do the tests, can I use your PR right now?

@xiaocq2001
Copy link
Contributor Author

Hi @xiaocq2001 , thanks for your work, this feature is exactly what I need for my current project. I can't wait for the upstream, and your PR seems quite promising, so how mature is your PR? If I can spend some effort to do the tests, can I use your PR right now?

Feel free to test and use that.

Copy link
Contributor

@de-sh de-sh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide your suggestions against #916

@@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* `size()` method on `Packet` calculates size once serialized.
* `read()` and `write()` methods on `Packet`.
* `ConnectionAborted` variant on `StateError` type to denote abrupt end to a connection
* `set_session_expiry_interval` and `session_expiry_interval` methods on `MqttOptions`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the extremely late response, if you are still interested in contributing this change, please do open a separate PR. Currently this PR will be hard to accept given it has deviated from the issue focus.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For session expiry interval support, it's already done in #854.

@KillingJacky
Copy link

Hi @xiaocq2001 , thanks for your work, this feature is exactly what I need for my current project. I can't wait for the upstream, and your PR seems quite promising, so how mature is your PR? If I can spend some effort to do the tests, can I use your PR right now?

Feel free to test and use that.

I'v done integrating with your acked_test branch and tested it with several test cases, so far so good, thank you very much for your work.

@ryanwinter
Copy link

Thanks @KillingJacky!

.ok_or(StateError::Unsolicited(suback.pkid))?;
let (_, tx) = self.outgoing_sub.remove(pos).unwrap();
Copy link

@joshuachp joshuachp Jan 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let (_, tx) = self.outgoing_sub.remove(pos).unwrap();
let (_, tx) = self.outgoing_sub.swap_remove_back(pos).unwrap();

Since the ordering is not important, could we maybe just swap_remove the element to not shift the others?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants