Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
/*
* Copyright 2018-present HiveMQ and the HiveMQ Community
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hivemq.client.restrictions;

import com.hivemq.client.mqtt.MqttClient;
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient;
import com.hivemq.client.mqtt.mqtt3.Mqtt3RxClient;
import com.hivemq.client.mqtt.mqtt3.message.connect.connack.Mqtt3ConnAck;
import com.hivemq.client.mqtt.mqtt3.message.connect.connack.Mqtt3ConnAckReturnCode;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5RxClient;
import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck;
import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAckReasonCode;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAckReasonCode;
import com.hivemq.testcontainer.junit5.HiveMQTestContainerExtension;
import io.reactivex.Flowable;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.testcontainers.utility.MountableFile;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class Mqtt5ToMqtt3MessageForwardingIT {

@RegisterExtension
public final @NotNull HiveMQTestContainerExtension hivemq =
new HiveMQTestContainerExtension().withHiveMQConfig(MountableFile.forClasspathResource("/config.xml"));

@Test
void mqtt5ToMqtt3_messageForwarding_rx() throws InterruptedException {

final Mqtt3RxClient mqtt3Client = MqttClient
.builder()
.useMqttVersion3()
.serverPort(hivemq.getMqttPort())
.addConnectedListener(__ -> System.out.println("MQTTv3 client connected."))
.addDisconnectedListener(context -> System.out.println("MQTTv3 client disconnected. (" + context.getCause().getMessage() + ")"))
.buildRx();

final Mqtt5RxClient mqtt5Client = MqttClient
.builder()
.useMqttVersion5()
.serverPort(hivemq.getMqttPort())
.addConnectedListener(__ -> System.out.println("MQTTv5 client connected."))
.addDisconnectedListener(context -> System.out.println("MQTTv5 client disconnected. (" + context.getCause().getMessage() + ")"))
.buildRx();

final Mqtt3ConnAck mqtt3ConnAck = mqtt3Client.connect().timeout(5, TimeUnit.SECONDS).blockingGet();
assertEquals(Mqtt3ConnAckReturnCode.SUCCESS, mqtt3ConnAck.getReturnCode());

final Mqtt5ConnAck mqtt5ConnAck = mqtt5Client.connect().timeout(5, TimeUnit.SECONDS).blockingGet();
assertEquals(Mqtt5ConnAckReasonCode.SUCCESS, mqtt5ConnAck.getReasonCode());

final int MESSAGE_COUNT = 10;
final CountDownLatch countDownLatch = new CountDownLatch(MESSAGE_COUNT);

mqtt5Client
.publishes(MqttGlobalPublishFilter.ALL)
.subscribe(message -> {
final byte[] payload = message.getPayloadAsBytes();
final int i = payload[0];
System.out.println("MQTTv5 client received message #" + i + " from topic \"test_1\".");
System.out.println("MQTTv3 client sending message #" + i + " to topic \"test_2\"...");
try {
mqtt3Client
.publish(
Flowable.just(
Mqtt3Publish
.builder()
.topic("test_2")
.payload(payload)
.qos(MqttQos.EXACTLY_ONCE)
.build()
)
)
.timeout(5, TimeUnit.SECONDS)
.subscribe();
Copy link
Member

@SgtSilvio SgtSilvio Feb 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code does not block the thread, it just starts publishing but does not wait for the acknowledgement.
If you would change this to a blockingSubscribe, you would see the failure here as well.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your claim is not true, though: the test still passes with blockingSubscribe().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, let me check if I also made a different change, because I tried this out before commenting.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SgtSilvio did you have a chance to look further into this issue?


countDownLatch.countDown();
} catch (final Exception ex) {
System.out.println("MQTTv3 client failed to send message #" + i + ".");
ex.printStackTrace();
}
});

final Mqtt5SubAck mqtt5SubAck = mqtt5Client
.subscribeWith()
.topicFilter("test_1")
.qos(MqttQos.EXACTLY_ONCE)
.applySubscribe()
.timeout(5, TimeUnit.SECONDS)
.blockingGet();

assertTrue(mqtt5SubAck.getReasonCodes().contains(Mqtt5SubAckReasonCode.GRANTED_QOS_2));

for (int i = 0; i < MESSAGE_COUNT; i++) {
final byte[] payload = new byte[42];
payload[0] = (byte) i;

System.out.println("MQTTv5 client sending message #" + i + " to topic \"test_1\"...");
mqtt5Client
.publish(
Flowable.just(
Mqtt5Publish
.builder()
.topic("test_1")
.payload(payload)
.qos(MqttQos.EXACTLY_ONCE)
.build()
)
)
.timeout(5, TimeUnit.SECONDS)
.subscribe();

TimeUnit.SECONDS.sleep(1);
}

assertTrue(countDownLatch.await(5 + 1, TimeUnit.SECONDS));
}

@Test
void mqtt5ToMqtt3_messageForwarding_async() throws InterruptedException, ExecutionException, TimeoutException {

final Mqtt3AsyncClient mqtt3Client = MqttClient
.builder()
.useMqttVersion3()
.serverPort(hivemq.getMqttPort())
.addConnectedListener(__ -> System.out.println("MQTTv3 client connected."))
.addDisconnectedListener(context -> System.out.println("MQTTv3 client disconnected. (" + context.getCause().getMessage() + ")"))
.buildAsync();

final Mqtt5AsyncClient mqtt5Client = MqttClient
.builder()
.useMqttVersion5()
.serverPort(hivemq.getMqttPort())
.addConnectedListener(__ -> System.out.println("MQTTv5 client connected."))
.addDisconnectedListener(context -> System.out.println("MQTTv5 client disconnected. (" + context.getCause().getMessage() + ")"))
.buildAsync();

final Mqtt3ConnAck mqtt3ConnAck = mqtt3Client.connect().get(5, TimeUnit.SECONDS);
assertEquals(Mqtt3ConnAckReturnCode.SUCCESS, mqtt3ConnAck.getReturnCode());

final Mqtt5ConnAck mqtt5ConnAck = mqtt5Client.connect().get(5, TimeUnit.SECONDS);
assertEquals(Mqtt5ConnAckReasonCode.SUCCESS, mqtt5ConnAck.getReasonCode());

final int MESSAGE_COUNT = 10;
final CountDownLatch countDownLatch = new CountDownLatch(MESSAGE_COUNT);

final Mqtt5SubAck mqtt5SubAck = mqtt5Client
.subscribeWith()
.topicFilter("test_1")
.qos(MqttQos.EXACTLY_ONCE)
.callback(message -> {
final byte[] payload = message.getPayloadAsBytes();
final int i = payload[0];
System.out.println("MQTTv5 client received message #" + i + " from topic \"test_1\".");
System.out.println("MQTTv3 client sending message #" + i + " to topic \"test_2\"...");
try {
mqtt3Client
.publishWith()
.topic("test_2")
.payload(payload)
.qos(MqttQos.EXACTLY_ONCE)
.send()
.get(5, TimeUnit.SECONDS);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code blocks the thread that calls the callback. Blocking inside the callback is not allowed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The acknowledgement can actually not be delivered when it wants to use the same thread that you are currently blocking. This creates a sort of deadlock here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you use a QoS > 0, you should use manual acknowledgement and acknowledge the message received by the MQTT 5 client when successfully delivered with the MQTT 3 client. Blocking in the callback does not give you guaranteed message delivery (and is not allowed here anyhow).

Copy link
Author

@inad9300 inad9300 Feb 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looking into this, @SgtSilvio.

Is there documentation for this restriction? I don't get why the acknowledgment would not be able to be delivered in this case, nor why would I need to use manual acknowledgement. Isn't the blocking on send precisely to wait on the acknowledgement? The fact that calling the blocking client inside the callback works makes it extra confusing to understand what you are saying:

// Passing test:
@Test
void mqtt5ToMqtt3_messageForwarding_blocking_in_async() throws InterruptedException, ExecutionException, TimeoutException {

    final Mqtt3BlockingClient mqtt3Client = MqttClient
            .builder()
            .useMqttVersion3()
            .serverPort(hivemq.getMqttPort())
            .addConnectedListener(__ -> System.out.println("MQTTv3 client connected."))
            .addDisconnectedListener(context -> System.out.println("MQTTv3 client disconnected. (" + context.getCause().getMessage() + ")"))
            .buildBlocking();

    final Mqtt5AsyncClient mqtt5Client = MqttClient
            .builder()
            .useMqttVersion5()
            .serverPort(hivemq.getMqttPort())
            .addConnectedListener(__ -> System.out.println("MQTTv5 client connected."))
            .addDisconnectedListener(context -> System.out.println("MQTTv5 client disconnected. (" + context.getCause().getMessage() + ")"))
            .buildAsync();

    final Mqtt3ConnAck mqtt3ConnAck = mqtt3Client.connect();
    assertEquals(Mqtt3ConnAckReturnCode.SUCCESS, mqtt3ConnAck.getReturnCode());

    final Mqtt5ConnAck mqtt5ConnAck = mqtt5Client.connect().get(5, TimeUnit.SECONDS);
    assertEquals(Mqtt5ConnAckReasonCode.SUCCESS, mqtt5ConnAck.getReasonCode());

    final int MESSAGE_COUNT = 10;
    final CountDownLatch countDownLatch = new CountDownLatch(MESSAGE_COUNT);

    final Mqtt5SubAck mqtt5SubAck = mqtt5Client
            .subscribeWith()
            .topicFilter("test_1")
            .qos(MqttQos.EXACTLY_ONCE)
            .callback(message -> {
                final byte[] payload = message.getPayloadAsBytes();
                final int i = payload[0];
                System.out.println("MQTTv5 client received message #" + i + " from topic \"test_1\".");
                System.out.println("MQTTv3 client sending message #" + i + " to topic \"test_2\"...");
                try {
                    mqtt3Client
                            .publishWith()
                            .topic("test_2")
                            .payload(payload)
                            .qos(MqttQos.EXACTLY_ONCE)
                            .send();

                    countDownLatch.countDown();
                } catch (final Exception ex) {
                    System.out.println("MQTTv3 client failed to send message #" + i + ".");
                    ex.printStackTrace();
                }
            })
            .send()
            .get(5, TimeUnit.SECONDS);

    assertTrue(mqtt5SubAck.getReasonCodes().contains(Mqtt5SubAckReasonCode.GRANTED_QOS_2));

    for (int i = 0; i < MESSAGE_COUNT; i++) {
        final byte[] payload = new byte[42];
        payload[0] = (byte) i;

        System.out.println("MQTTv5 client sending message #" + i + " to topic \"test_1\"...");
        mqtt5Client
                .toBlocking()
                .publishWith()
                .topic("test_1")
                .payload(payload)
                .qos(MqttQos.EXACTLY_ONCE)
                .send();

        TimeUnit.SECONDS.sleep(1);
    }

    assertTrue(countDownLatch.await(5 + 1, TimeUnit.SECONDS));
}

To be honest, I have always assumed that the callback consumers were executed in separate threads from a thread pool. You saying this now at least helps me understand why throwing an exception in a callback messes up with all subsequent calls (#575) – there really is only one thread doing the work underneath, isn't there?

Copy link
Member

@SgtSilvio SgtSilvio Feb 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there really is only one thread doing the work underneath

no, there are multiple threads, otherwise it would happen right on the first message.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will explain it in more detail, but today is a full day, so will take some time


countDownLatch.countDown();
} catch (final Exception ex) {
System.out.println("MQTTv3 client failed to send message #" + i + ".");
ex.printStackTrace();
}
})
.send()
.get(5, TimeUnit.SECONDS);

assertTrue(mqtt5SubAck.getReasonCodes().contains(Mqtt5SubAckReasonCode.GRANTED_QOS_2));

for (int i = 0; i < MESSAGE_COUNT; i++) {
final byte[] payload = new byte[42];
payload[0] = (byte) i;

System.out.println("MQTTv5 client sending message #" + i + " to topic \"test_1\"...");
mqtt5Client
.publishWith()
.topic("test_1")
.payload(payload)
.qos(MqttQos.EXACTLY_ONCE)
.send()
.get(5, TimeUnit.SECONDS);

TimeUnit.SECONDS.sleep(1);
}

assertTrue(countDownLatch.await(5 + 1, TimeUnit.SECONDS));
}
}