Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix message loss when forwarding messages from MQTTv5 to MQTTv3 in async mode #619

Conversation

inad9300
Copy link

@inad9300 inad9300 commented Feb 22, 2024

Description

So far, a test case showing the message loss has been added. The actual fix needs to be developed (I won't venture myself into those waters).

Please, note that I haven't nor plan to sign the CLA. This pull request is just a convenient way for me to share my findings. As far as I am concerned, you are free to use of the code I provide in this pull request in whichever way you see fit. Please, copy it, modify it and complete it as you wish.

Related Issue

#618

Type of Change

  • 📚 Examples / docs / tutorials / dependencies update
  • 🔧 Bug fix (non-breaking change which fixes an issue)
  • 🥂 Improvement (non-breaking change which improves an existing feature)
  • 🚀 New feature (non-breaking change which adds functionality)
  • 💥 Breaking change (fix or feature that would cause existing functionality to change)
  • 🔐 Security fix

Checklist

  • I've written tests (if applicable) for all new methods and classes that I created.
  • I've added documentation as necessary so users can easily use and understand this feature/fix.

Add test case showing message loss when forwarding messages from MQTTv5 to MQTTv3.
Copy link

cla-bot bot commented Feb 22, 2024

Thank you for your pull request and welcome to our community. We require contributors to sign our Contributor License Agreement, and we don't seem to have the users @inad9300 on file. In order for us to review and merge your code, please sign our Contributor License Agreement to get yourself added. You'll find the CLA and more information here: https://github.com/hivemq/hivemq-community/blob/master/CONTRIBUTING.adoc#contributor-license-agreement

@inad9300 inad9300 changed the title Fix message loss when forwarding messages from MQTTv5 to MQTTv3 Fix message loss when forwarding messages from MQTTv5 to MQTTv3 in async mode Feb 22, 2024
@pglombardo
Copy link
Contributor

Hi @inad9300 - thanks for the helpful bug report and tests! I'll take a closer look and post back next week.

.payload(payload)
.qos(MqttQos.EXACTLY_ONCE)
.send()
.get(5, TimeUnit.SECONDS);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code blocks the thread that calls the callback. Blocking inside the callback is not allowed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The acknowledgement can actually not be delivered when it wants to use the same thread that you are currently blocking. This creates a sort of deadlock here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you use a QoS > 0, you should use manual acknowledgement and acknowledge the message received by the MQTT 5 client when successfully delivered with the MQTT 3 client. Blocking in the callback does not give you guaranteed message delivery (and is not allowed here anyhow).

Copy link
Author

@inad9300 inad9300 Feb 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looking into this, @SgtSilvio.

Is there documentation for this restriction? I don't get why the acknowledgment would not be able to be delivered in this case, nor why would I need to use manual acknowledgement. Isn't the blocking on send precisely to wait on the acknowledgement? The fact that calling the blocking client inside the callback works makes it extra confusing to understand what you are saying:

// Passing test:
@Test
void mqtt5ToMqtt3_messageForwarding_blocking_in_async() throws InterruptedException, ExecutionException, TimeoutException {

    final Mqtt3BlockingClient mqtt3Client = MqttClient
            .builder()
            .useMqttVersion3()
            .serverPort(hivemq.getMqttPort())
            .addConnectedListener(__ -> System.out.println("MQTTv3 client connected."))
            .addDisconnectedListener(context -> System.out.println("MQTTv3 client disconnected. (" + context.getCause().getMessage() + ")"))
            .buildBlocking();

    final Mqtt5AsyncClient mqtt5Client = MqttClient
            .builder()
            .useMqttVersion5()
            .serverPort(hivemq.getMqttPort())
            .addConnectedListener(__ -> System.out.println("MQTTv5 client connected."))
            .addDisconnectedListener(context -> System.out.println("MQTTv5 client disconnected. (" + context.getCause().getMessage() + ")"))
            .buildAsync();

    final Mqtt3ConnAck mqtt3ConnAck = mqtt3Client.connect();
    assertEquals(Mqtt3ConnAckReturnCode.SUCCESS, mqtt3ConnAck.getReturnCode());

    final Mqtt5ConnAck mqtt5ConnAck = mqtt5Client.connect().get(5, TimeUnit.SECONDS);
    assertEquals(Mqtt5ConnAckReasonCode.SUCCESS, mqtt5ConnAck.getReasonCode());

    final int MESSAGE_COUNT = 10;
    final CountDownLatch countDownLatch = new CountDownLatch(MESSAGE_COUNT);

    final Mqtt5SubAck mqtt5SubAck = mqtt5Client
            .subscribeWith()
            .topicFilter("test_1")
            .qos(MqttQos.EXACTLY_ONCE)
            .callback(message -> {
                final byte[] payload = message.getPayloadAsBytes();
                final int i = payload[0];
                System.out.println("MQTTv5 client received message #" + i + " from topic \"test_1\".");
                System.out.println("MQTTv3 client sending message #" + i + " to topic \"test_2\"...");
                try {
                    mqtt3Client
                            .publishWith()
                            .topic("test_2")
                            .payload(payload)
                            .qos(MqttQos.EXACTLY_ONCE)
                            .send();

                    countDownLatch.countDown();
                } catch (final Exception ex) {
                    System.out.println("MQTTv3 client failed to send message #" + i + ".");
                    ex.printStackTrace();
                }
            })
            .send()
            .get(5, TimeUnit.SECONDS);

    assertTrue(mqtt5SubAck.getReasonCodes().contains(Mqtt5SubAckReasonCode.GRANTED_QOS_2));

    for (int i = 0; i < MESSAGE_COUNT; i++) {
        final byte[] payload = new byte[42];
        payload[0] = (byte) i;

        System.out.println("MQTTv5 client sending message #" + i + " to topic \"test_1\"...");
        mqtt5Client
                .toBlocking()
                .publishWith()
                .topic("test_1")
                .payload(payload)
                .qos(MqttQos.EXACTLY_ONCE)
                .send();

        TimeUnit.SECONDS.sleep(1);
    }

    assertTrue(countDownLatch.await(5 + 1, TimeUnit.SECONDS));
}

To be honest, I have always assumed that the callback consumers were executed in separate threads from a thread pool. You saying this now at least helps me understand why throwing an exception in a callback messes up with all subsequent calls (#575) – there really is only one thread doing the work underneath, isn't there?

Copy link
Member

@SgtSilvio SgtSilvio Feb 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there really is only one thread doing the work underneath

no, there are multiple threads, otherwise it would happen right on the first message.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will explain it in more detail, but today is a full day, so will take some time

)
)
.timeout(5, TimeUnit.SECONDS)
.subscribe();
Copy link
Member

@SgtSilvio SgtSilvio Feb 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code does not block the thread, it just starts publishing but does not wait for the acknowledgement.
If you would change this to a blockingSubscribe, you would see the failure here as well.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your claim is not true, though: the test still passes with blockingSubscribe().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, let me check if I also made a different change, because I tried this out before commenting.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SgtSilvio did you have a chance to look further into this issue?

@SgtSilvio
Copy link
Member

Closing this as the issue you see is caused by a threading problem in the test code and is not an issue of the library.
Feel free to comment and ask for clarifications.

@SgtSilvio SgtSilvio closed this Feb 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants