-
Notifications
You must be signed in to change notification settings - Fork 53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
change last will to publish to puslar, remove LWT events as they're no #943
change last will to publish to puslar, remove LWT events as they're no #943
Conversation
@tsturzl:Thanks for your contribution. For this PR, do we need to update docs? |
@tsturzl:Thanks for providing doc info! |
Is there a good way to setup my IDE for the styling expected for this project? |
@Technoboy- I'm not sure why the failing test is failing in this case. It seems unrelated to the changes made. |
Unless anyone has any feedback or requests, this should be ready to merge. Without this LWT is largely broken right now. |
@Technoboy- @mattisonchao Please review. No feedback has been provided. This is a major bug, as it completely breaks the ability for proper MQTT connection tracking. |
willMessageHandler.fireWillMessage(clientId, willMessage); | ||
try { | ||
// wait to will message to fire before continuing cleanup | ||
willMessageHandler.fireWillMessage(connection, willMessage).get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should better give a timeout to avoid thread hanging forever in some by fault tolerance purspective. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a 500ms timeout here. I didn't see many examples to pull from, but 500ms seems generous enough to make sure the tasks completes, but should hopefully prevent blocking for too terribly long. I can change this to something else if you have a better duration in mind.
} else { | ||
sendWillMessage(willMessage); | ||
final Executor delayed = delayedExecutor(willMessage.getDelayInterval(), TimeUnit.SECONDS); | ||
return CompletableFuture.supplyAsync(() -> sendWillMessageToPulsarTopic(connection, willMessage).join(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do not block the single executor here. It will be the bottleneck in the high traffic load. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I switched this to use runAsync
and no longer await the CompletableFuture
returned by sendWillMessageToPulsarTopic
. From my understanding this should just free up the ScheduledExecutorService
more immediately. I don't believe there is any major caveat to this, however if we really care about the result of that CompletableFuture
(which would only really be an exception), I'd say it'd make sense to maybe move this executor to use a cached threadpool with a low keep alive time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some minor comments. :)
@tsturzl:Thanks for your contribution. For this PR, do we need to update docs? |
1 similar comment
@tsturzl:Thanks for your contribution. For this PR, do we need to update docs? |
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## master #943 +/- ##
============================================
- Coverage 77.28% 75.98% -1.30%
+ Complexity 988 978 -10
============================================
Files 111 111
Lines 4336 4414 +78
Branches 336 345 +9
============================================
+ Hits 3351 3354 +3
- Misses 802 868 +66
- Partials 183 192 +9 ☔ View full report in Codecov by Sentry. |
@mattisonchao Hoping to address these today. I appreciate the feed back! If not today I should be able to address this early next week. |
Should be ready for re-review |
Fixes #937
Motivation
LWT did not function as expected. LWTs mechanism would utilize the system topic to propagate fireWillMessage events, and then each MoP instance would look at each of it's subscriptions on the topic the LWT should be published on, it would then send the LWT to all these susbcriptions directly. This meant LWT did NOT reach Pulsar topics, nor did QoSPublishHandlers come into play at all for LWT which means that retained LWTs did not work.
Modifications
Instead of using the system topic I changed WillMessageHandler to publish the messages to their destination pulsar topic. This means that both Pulsar and MQTT clients can receive these messages, as MoP already subscribes on the pulsar topic on behalf of the MQTT MoP connection. WillMessageHandler now acquires the QoSPublishHandlers from MQTTService to properly publish messages with the appropriate QoS and retain functionality.
Since sendLWT events are no longer used I removed that from the interface and implementing classes, no uses or implementations should continue to exist. LWTs should NOT need to reach the system topic any longer, as the destination topic is already read by every MoP instance that's concerned with that topic. This is obviously excluding the case where the LWT is retained and that LWT message is send into the system topic as a retained message.
I tried to remain styled as closely to the rest of the codebase as possible. In many cases code is directly copied from other areas. The sendWillMessageToPulsarTopic was named to be very explicit about the behavior, and this code is largely taken from the
doPublish
method with some modifcations. In this case I'm relying on theConnection
object, and thereforefireWillMessage
was adapted to return a CompleteableFuture, so we can block till the will message is sent so we don't cleanup theConnection
prematurely. I'm not sure if it would make sense to add a timeout on that to prevent deadlocking, but opted for the simpler solution. Additionally to support returning a CompleteableFuture, I changed the delay logic to wrap the SchedulerExecutorService in a way that could be used by a CompleteableFuture.Verifying this change
This change is already covered by existing tests, such as (please describe tests).
Documentation
Check the box below.
Need to update docs?
doc-required
(If you need help on updating docs, create a doc issue)
no-need-doc
This change only adapts existing components to behave as expected.
doc
(If this PR contains doc changes)