From 9c85d9f8f104c3271c2f490fc964edd8d3c8e466 Mon Sep 17 00:00:00 2001 From: Jochen Mader Date: Tue, 11 Oct 2022 09:23:08 +0200 Subject: [PATCH 1/2] Added the field localCorrelationData to MqttPublish for better flow-handling in reactive-APIs --- .../mqtt/message/publish/MqttPublish.java | 32 +++++++++++++++++-- .../message/publish/MqttPublishBuilder.java | 9 +++++- .../publish/mqtt3/Mqtt3PublishView.java | 17 +++++++++- .../mqtt3/message/publish/Mqtt3Publish.java | 6 ++++ .../mqtt5/message/publish/Mqtt5Publish.java | 6 ++++ 5 files changed, 65 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/hivemq/client/internal/mqtt/message/publish/MqttPublish.java b/src/main/java/com/hivemq/client/internal/mqtt/message/publish/MqttPublish.java index 385079152..5bda4b61a 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/message/publish/MqttPublish.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/message/publish/MqttPublish.java @@ -59,6 +59,7 @@ public class MqttPublish extends MqttMessageWithUserProperties implements Mqtt5P private final @Nullable MqttUtf8StringImpl contentType; private final @Nullable MqttTopicImpl responseTopic; private final @Nullable ByteBuffer correlationData; + private final @Nullable Object localCorrelationData; private final @Nullable Confirmable confirmable; @@ -73,7 +74,8 @@ public MqttPublish( final @Nullable MqttTopicImpl responseTopic, final @Nullable ByteBuffer correlationData, final @NotNull MqttUserPropertiesImpl userProperties, - final @Nullable Confirmable confirmable) { + final @Nullable Confirmable confirmable, + final @Nullable Object localCorrelationData) { super(userProperties); this.topic = topic; @@ -86,6 +88,24 @@ public MqttPublish( this.responseTopic = responseTopic; this.correlationData = correlationData; this.confirmable = confirmable; + this.localCorrelationData = localCorrelationData; + } + + public MqttPublish( + final @NotNull MqttTopicImpl topic, + final @Nullable ByteBuffer payload, + final @NotNull MqttQos qos, + final boolean retain, + final long messageExpiryInterval, + final @Nullable Mqtt5PayloadFormatIndicator payloadFormatIndicator, + final @Nullable MqttUtf8StringImpl contentType, + final @Nullable MqttTopicImpl responseTopic, + final @Nullable ByteBuffer correlationData, + final @NotNull MqttUserPropertiesImpl userProperties, + final @Nullable Confirmable confirmable) { + + this(topic, payload, qos, retain, messageExpiryInterval, payloadFormatIndicator, contentType, + responseTopic, correlationData, userProperties, confirmable, null); } @Override @@ -163,6 +183,10 @@ public long getRawMessageExpiryInterval() { return correlationData; } + public @Nullable Object getLocalCorrelationData() { + return localCorrelationData; + } + @Override public void acknowledge() { final Confirmable confirmable = this.confirmable; @@ -204,7 +228,7 @@ public void acknowledge() { public @NotNull MqttPublish withConfirmable(final @NotNull Confirmable confirmable) { return new MqttPublish(topic, payload, qos, retain, messageExpiryInterval, payloadFormatIndicator, contentType, - responseTopic, correlationData, getUserProperties(), confirmable); + responseTopic, correlationData, getUserProperties(), confirmable, localCorrelationData); } @Override @@ -216,6 +240,7 @@ public void acknowledge() { ((contentType == null) ? "" : ", contentType=" + contentType) + ((responseTopic == null) ? "" : ", responseTopic=" + responseTopic) + ((correlationData == null) ? "" : ", correlationData=" + correlationData.remaining() + "byte") + + ((localCorrelationData == null) ? "" : ", localCorrelationData=" + localCorrelationData + " " + localCorrelationData.getClass().getSimpleName()) + StringUtil.prepend(", ", super.toAttributeString()); } @@ -239,7 +264,7 @@ public boolean equals(final @Nullable Object o) { (messageExpiryInterval == that.messageExpiryInterval) && (payloadFormatIndicator == that.payloadFormatIndicator) && Objects.equals(contentType, that.contentType) && Objects.equals(responseTopic, that.responseTopic) && - Objects.equals(correlationData, that.correlationData); + Objects.equals(correlationData, that.correlationData) && Objects.equals(localCorrelationData, that.localCorrelationData); } protected boolean canEqual(final @Nullable Object o) { @@ -258,6 +283,7 @@ public int hashCode() { result = 31 * result + Objects.hashCode(contentType); result = 31 * result + Objects.hashCode(responseTopic); result = 31 * result + Objects.hashCode(correlationData); + result = 31 * result + Objects.hashCode(localCorrelationData); return result; } } diff --git a/src/main/java/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder.java b/src/main/java/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder.java index fe603468a..50bc16b5e 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder.java @@ -47,6 +47,7 @@ public abstract class MqttPublishBuilder> { @Nullable MqttUtf8StringImpl contentType; @Nullable MqttTopicImpl responseTopic; @Nullable ByteBuffer correlationData; + @Nullable Object localCorrelationData; @NotNull MqttUserPropertiesImpl userProperties = MqttUserPropertiesImpl.NO_USER_PROPERTIES; MqttPublishBuilder() {} @@ -62,6 +63,7 @@ public abstract class MqttPublishBuilder> { responseTopic = publish.getRawResponseTopic(); correlationData = publish.getRawCorrelationData(); userProperties = publish.getUserProperties(); + localCorrelationData = publish.getLocalCorrelationData(); } MqttPublishBuilder(final @NotNull MqttPublishBuilder publishBuilder) { @@ -152,6 +154,11 @@ public abstract class MqttPublishBuilder> { return self(); } + public @NotNull B localCorrelationData(final @Nullable Object localCorrelationData) { + this.localCorrelationData = localCorrelationData; + return self(); + } + public @NotNull B userProperties(final @Nullable Mqtt5UserProperties userProperties) { this.userProperties = MqttChecks.userProperties(userProperties); return self(); @@ -186,7 +193,7 @@ private static abstract class Base> extends MqttPublishBuilder public @NotNull MqttPublish build() { Checks.notNull(topic, "Topic"); return new MqttPublish(topic, payload, qos, retain, messageExpiryInterval, payloadFormatIndicator, - contentType, responseTopic, correlationData, userProperties, null); + contentType, responseTopic, correlationData, userProperties, null, localCorrelationData); } } diff --git a/src/main/java/com/hivemq/client/internal/mqtt/message/publish/mqtt3/Mqtt3PublishView.java b/src/main/java/com/hivemq/client/internal/mqtt/message/publish/mqtt3/Mqtt3PublishView.java index e5861a92a..05967661d 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/message/publish/mqtt3/Mqtt3PublishView.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/message/publish/mqtt3/Mqtt3PublishView.java @@ -50,8 +50,18 @@ public class Mqtt3PublishView implements Mqtt3Publish { final @NotNull MqttQos qos, final boolean retain) { + return delegate(topic, payload, qos, retain, null); + } + + public static @NotNull MqttPublish delegate( + final @NotNull MqttTopicImpl topic, + final @Nullable ByteBuffer payload, + final @NotNull MqttQos qos, + final boolean retain, + final Object localCorrelationData) { + return new MqttPublish(topic, payload, qos, retain, MqttPublish.NO_MESSAGE_EXPIRY, null, null, null, null, - MqttUserPropertiesImpl.NO_USER_PROPERTIES, null); + MqttUserPropertiesImpl.NO_USER_PROPERTIES, null, localCorrelationData); } public static @NotNull MqttStatefulPublish statefulDelegate( @@ -124,6 +134,11 @@ public boolean isRetain() { return delegate; } + @Override + public Object localCorrelationData() { + return delegate.getLocalCorrelationData(); + } + @Override public void acknowledge() { delegate.acknowledge(); diff --git a/src/main/java/com/hivemq/client/mqtt/mqtt3/message/publish/Mqtt3Publish.java b/src/main/java/com/hivemq/client/mqtt/mqtt3/message/publish/Mqtt3Publish.java index be7b4bb39..6dd248585 100644 --- a/src/main/java/com/hivemq/client/mqtt/mqtt3/message/publish/Mqtt3Publish.java +++ b/src/main/java/com/hivemq/client/mqtt/mqtt3/message/publish/Mqtt3Publish.java @@ -77,6 +77,12 @@ public interface Mqtt3Publish extends Mqtt3Message { */ boolean isRetain(); + /** + * @return the optional local correlation data of this Publish message. This data is never propagated and kept + * locally for correlation. + */ + Object localCorrelationData(); + /** * Acknowledges this Publish message. * diff --git a/src/main/java/com/hivemq/client/mqtt/mqtt5/message/publish/Mqtt5Publish.java b/src/main/java/com/hivemq/client/mqtt/mqtt5/message/publish/Mqtt5Publish.java index 9d24b1bc3..0abc47817 100644 --- a/src/main/java/com/hivemq/client/mqtt/mqtt5/message/publish/Mqtt5Publish.java +++ b/src/main/java/com/hivemq/client/mqtt/mqtt5/message/publish/Mqtt5Publish.java @@ -109,6 +109,12 @@ public interface Mqtt5Publish extends Mqtt5Message { */ @NotNull Mqtt5UserProperties getUserProperties(); + /** + * @return the optional local correlation data of this Publish message. This data is never propagated and kept + * locally for correlation. + */ + Object getLocalCorrelationData(); + /** * Acknowledges this Publish message. * From 95f4510dbc81cc823ffd54b1cd14389d49cca708 Mon Sep 17 00:00:00 2001 From: Jochen Mader Date: Wed, 15 Feb 2023 16:13:16 +0100 Subject: [PATCH 2/2] Applying review comments --- .../decoder/mqtt3/Mqtt3PublishDecoder.java | 2 +- .../decoder/mqtt5/Mqtt5PublishDecoder.java | 2 +- .../mqtt/message/publish/MqttPublish.java | 35 ++++---------- .../message/publish/MqttPublishBuilder.java | 2 +- .../mqtt/message/publish/MqttWillPublish.java | 2 +- .../publish/mqtt3/Mqtt3PublishView.java | 22 +++------ .../mqtt3/Mqtt3PublishViewBuilder.java | 9 +++- .../mqtt3/message/publish/Mqtt3Publish.java | 5 +- .../mqtt5/message/publish/Mqtt5Publish.java | 2 +- .../mqtt5/Mqtt5PublishEncoderTest.java | 48 +++++++++---------- 10 files changed, 56 insertions(+), 73 deletions(-) diff --git a/src/main/java/com/hivemq/client/internal/mqtt/codec/decoder/mqtt3/Mqtt3PublishDecoder.java b/src/main/java/com/hivemq/client/internal/mqtt/codec/decoder/mqtt3/Mqtt3PublishDecoder.java index 612a14be8..a190c8203 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/codec/decoder/mqtt3/Mqtt3PublishDecoder.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/codec/decoder/mqtt3/Mqtt3PublishDecoder.java @@ -74,7 +74,7 @@ public class Mqtt3PublishDecoder implements MqttMessageDecoder { payload.position(0); } - final MqttPublish publish = Mqtt3PublishView.delegate(topic, payload, qos, retain); + final MqttPublish publish = Mqtt3PublishView.delegate(topic, payload, qos, retain, null); return Mqtt3PublishView.statefulDelegate(publish, packetIdentifier, dup); } diff --git a/src/main/java/com/hivemq/client/internal/mqtt/codec/decoder/mqtt5/Mqtt5PublishDecoder.java b/src/main/java/com/hivemq/client/internal/mqtt/codec/decoder/mqtt5/Mqtt5PublishDecoder.java index 8d31a2785..3e1e0cef8 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/codec/decoder/mqtt5/Mqtt5PublishDecoder.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/codec/decoder/mqtt5/Mqtt5PublishDecoder.java @@ -208,7 +208,7 @@ public class Mqtt5PublishDecoder implements MqttMessageDecoder { final MqttPublish publish = new MqttPublish(topic, payload, qos, retain, messageExpiryInterval, payloadFormatIndicator, contentType, - responseTopic, correlationData, userProperties, null); + responseTopic, correlationData, userProperties, null, null); final ImmutableIntList subscriptionIdentifiers = (subscriptionIdentifiersBuilder == null) ? DEFAULT_NO_SUBSCRIPTION_IDENTIFIERS : diff --git a/src/main/java/com/hivemq/client/internal/mqtt/message/publish/MqttPublish.java b/src/main/java/com/hivemq/client/internal/mqtt/message/publish/MqttPublish.java index 5bda4b61a..915ccf308 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/message/publish/MqttPublish.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/message/publish/MqttPublish.java @@ -59,7 +59,7 @@ public class MqttPublish extends MqttMessageWithUserProperties implements Mqtt5P private final @Nullable MqttUtf8StringImpl contentType; private final @Nullable MqttTopicImpl responseTopic; private final @Nullable ByteBuffer correlationData; - private final @Nullable Object localCorrelationData; + private final @Nullable Object localContext; private final @Nullable Confirmable confirmable; @@ -75,7 +75,7 @@ public MqttPublish( final @Nullable ByteBuffer correlationData, final @NotNull MqttUserPropertiesImpl userProperties, final @Nullable Confirmable confirmable, - final @Nullable Object localCorrelationData) { + final @Nullable Object localContext) { super(userProperties); this.topic = topic; @@ -88,24 +88,7 @@ public MqttPublish( this.responseTopic = responseTopic; this.correlationData = correlationData; this.confirmable = confirmable; - this.localCorrelationData = localCorrelationData; - } - - public MqttPublish( - final @NotNull MqttTopicImpl topic, - final @Nullable ByteBuffer payload, - final @NotNull MqttQos qos, - final boolean retain, - final long messageExpiryInterval, - final @Nullable Mqtt5PayloadFormatIndicator payloadFormatIndicator, - final @Nullable MqttUtf8StringImpl contentType, - final @Nullable MqttTopicImpl responseTopic, - final @Nullable ByteBuffer correlationData, - final @NotNull MqttUserPropertiesImpl userProperties, - final @Nullable Confirmable confirmable) { - - this(topic, payload, qos, retain, messageExpiryInterval, payloadFormatIndicator, contentType, - responseTopic, correlationData, userProperties, confirmable, null); + this.localContext = localContext; } @Override @@ -183,8 +166,8 @@ public long getRawMessageExpiryInterval() { return correlationData; } - public @Nullable Object getLocalCorrelationData() { - return localCorrelationData; + public @Nullable Object getLocalContext() { + return localContext; } @Override @@ -228,7 +211,7 @@ public void acknowledge() { public @NotNull MqttPublish withConfirmable(final @NotNull Confirmable confirmable) { return new MqttPublish(topic, payload, qos, retain, messageExpiryInterval, payloadFormatIndicator, contentType, - responseTopic, correlationData, getUserProperties(), confirmable, localCorrelationData); + responseTopic, correlationData, getUserProperties(), confirmable, localContext); } @Override @@ -240,7 +223,7 @@ public void acknowledge() { ((contentType == null) ? "" : ", contentType=" + contentType) + ((responseTopic == null) ? "" : ", responseTopic=" + responseTopic) + ((correlationData == null) ? "" : ", correlationData=" + correlationData.remaining() + "byte") + - ((localCorrelationData == null) ? "" : ", localCorrelationData=" + localCorrelationData + " " + localCorrelationData.getClass().getSimpleName()) + + ((localContext == null) ? "" : ", localCorrelationData=" + localContext + " " + localContext.getClass().getSimpleName()) + StringUtil.prepend(", ", super.toAttributeString()); } @@ -264,7 +247,7 @@ public boolean equals(final @Nullable Object o) { (messageExpiryInterval == that.messageExpiryInterval) && (payloadFormatIndicator == that.payloadFormatIndicator) && Objects.equals(contentType, that.contentType) && Objects.equals(responseTopic, that.responseTopic) && - Objects.equals(correlationData, that.correlationData) && Objects.equals(localCorrelationData, that.localCorrelationData); + Objects.equals(correlationData, that.correlationData) && Objects.equals(localContext, that.localContext); } protected boolean canEqual(final @Nullable Object o) { @@ -283,7 +266,7 @@ public int hashCode() { result = 31 * result + Objects.hashCode(contentType); result = 31 * result + Objects.hashCode(responseTopic); result = 31 * result + Objects.hashCode(correlationData); - result = 31 * result + Objects.hashCode(localCorrelationData); + result = 31 * result + Objects.hashCode(localContext); return result; } } diff --git a/src/main/java/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder.java b/src/main/java/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder.java index 50bc16b5e..70b668d5b 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/message/publish/MqttPublishBuilder.java @@ -63,7 +63,7 @@ public abstract class MqttPublishBuilder> { responseTopic = publish.getRawResponseTopic(); correlationData = publish.getRawCorrelationData(); userProperties = publish.getUserProperties(); - localCorrelationData = publish.getLocalCorrelationData(); + localCorrelationData = publish.getLocalContext(); } MqttPublishBuilder(final @NotNull MqttPublishBuilder publishBuilder) { diff --git a/src/main/java/com/hivemq/client/internal/mqtt/message/publish/MqttWillPublish.java b/src/main/java/com/hivemq/client/internal/mqtt/message/publish/MqttWillPublish.java index b767658dd..48b092445 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/message/publish/MqttWillPublish.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/message/publish/MqttWillPublish.java @@ -50,7 +50,7 @@ public MqttWillPublish( final long delayInterval) { super(topic, payload, qos, isRetain, messageExpiryInterval, payloadFormatIndicator, contentType, responseTopic, - correlationData, userProperties, null); + correlationData, userProperties, null, null); this.delayInterval = delayInterval; } diff --git a/src/main/java/com/hivemq/client/internal/mqtt/message/publish/mqtt3/Mqtt3PublishView.java b/src/main/java/com/hivemq/client/internal/mqtt/message/publish/mqtt3/Mqtt3PublishView.java index 05967661d..9e8e0861b 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/message/publish/mqtt3/Mqtt3PublishView.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/message/publish/mqtt3/Mqtt3PublishView.java @@ -44,24 +44,15 @@ public class Mqtt3PublishView implements Mqtt3Publish { public static final @NotNull java.util.function.Function JAVA_MAPPER = Mqtt3PublishView::of; - public static @NotNull MqttPublish delegate( - final @NotNull MqttTopicImpl topic, - final @Nullable ByteBuffer payload, - final @NotNull MqttQos qos, - final boolean retain) { - - return delegate(topic, payload, qos, retain, null); - } - public static @NotNull MqttPublish delegate( final @NotNull MqttTopicImpl topic, final @Nullable ByteBuffer payload, final @NotNull MqttQos qos, final boolean retain, - final Object localCorrelationData) { + final @Nullable Object localContext) { return new MqttPublish(topic, payload, qos, retain, MqttPublish.NO_MESSAGE_EXPIRY, null, null, null, null, - MqttUserPropertiesImpl.NO_USER_PROPERTIES, null, localCorrelationData); + MqttUserPropertiesImpl.NO_USER_PROPERTIES, null, localContext); } public static @NotNull MqttStatefulPublish statefulDelegate( @@ -75,9 +66,10 @@ public class Mqtt3PublishView implements Mqtt3Publish { final @NotNull MqttTopicImpl topic, final @Nullable ByteBuffer payload, final @NotNull MqttQos qos, - final boolean retain) { + final boolean retain, + final @Nullable Object localContext) { - return new Mqtt3PublishView(delegate(topic, payload, qos, retain)); + return new Mqtt3PublishView(delegate(topic, payload, qos, retain, localContext)); } static @NotNull Mqtt3PublishView willOf( @@ -135,8 +127,8 @@ public boolean isRetain() { } @Override - public Object localCorrelationData() { - return delegate.getLocalCorrelationData(); + public Object localContext() { + return delegate.getLocalContext(); } @Override diff --git a/src/main/java/com/hivemq/client/internal/mqtt/message/publish/mqtt3/Mqtt3PublishViewBuilder.java b/src/main/java/com/hivemq/client/internal/mqtt/message/publish/mqtt3/Mqtt3PublishViewBuilder.java index ef68aa515..df9c13bb5 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/message/publish/mqtt3/Mqtt3PublishViewBuilder.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/message/publish/mqtt3/Mqtt3PublishViewBuilder.java @@ -41,6 +41,7 @@ public abstract class Mqtt3PublishViewBuilder> extends Mqtt3PublishViewBuilder { Base() {} @@ -100,7 +107,7 @@ private static abstract class Base> extends Mqtt3PublishViewBu public @NotNull Mqtt3PublishView build() { Checks.notNull(topic, "Topic"); - return Mqtt3PublishView.of(topic, payload, qos, retain); + return Mqtt3PublishView.of(topic, payload, qos, retain, localContext); } } diff --git a/src/main/java/com/hivemq/client/mqtt/mqtt3/message/publish/Mqtt3Publish.java b/src/main/java/com/hivemq/client/mqtt/mqtt3/message/publish/Mqtt3Publish.java index 6dd248585..42983d419 100644 --- a/src/main/java/com/hivemq/client/mqtt/mqtt3/message/publish/Mqtt3Publish.java +++ b/src/main/java/com/hivemq/client/mqtt/mqtt3/message/publish/Mqtt3Publish.java @@ -23,6 +23,7 @@ import com.hivemq.client.mqtt.mqtt3.message.Mqtt3Message; import com.hivemq.client.mqtt.mqtt3.message.Mqtt3MessageType; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import java.nio.ByteBuffer; import java.util.Optional; @@ -78,10 +79,10 @@ public interface Mqtt3Publish extends Mqtt3Message { boolean isRetain(); /** - * @return the optional local correlation data of this Publish message. This data is never propagated and kept + * @return the optional context of this Publish message. This data is never propagated and kept * locally for correlation. */ - Object localCorrelationData(); + @Nullable Object localContext(); /** * Acknowledges this Publish message. diff --git a/src/main/java/com/hivemq/client/mqtt/mqtt5/message/publish/Mqtt5Publish.java b/src/main/java/com/hivemq/client/mqtt/mqtt5/message/publish/Mqtt5Publish.java index 0abc47817..fe369829c 100644 --- a/src/main/java/com/hivemq/client/mqtt/mqtt5/message/publish/Mqtt5Publish.java +++ b/src/main/java/com/hivemq/client/mqtt/mqtt5/message/publish/Mqtt5Publish.java @@ -113,7 +113,7 @@ public interface Mqtt5Publish extends Mqtt5Message { * @return the optional local correlation data of this Publish message. This data is never propagated and kept * locally for correlation. */ - Object getLocalCorrelationData(); + Object getLocalContext(); /** * Acknowledges this Publish message. diff --git a/src/test/java/com/hivemq/client/internal/mqtt/codec/encoder/mqtt5/Mqtt5PublishEncoderTest.java b/src/test/java/com/hivemq/client/internal/mqtt/codec/encoder/mqtt5/Mqtt5PublishEncoderTest.java index 64158e4ac..1e9faa80e 100644 --- a/src/test/java/com/hivemq/client/internal/mqtt/codec/encoder/mqtt5/Mqtt5PublishEncoderTest.java +++ b/src/test/java/com/hivemq/client/internal/mqtt/codec/encoder/mqtt5/Mqtt5PublishEncoderTest.java @@ -94,7 +94,7 @@ void encode_allProperties() { new MqttPublish(MqttTopicImpl.of("topic"), ByteBuffer.wrap(new byte[]{1, 2, 3, 4, 5}), MqttQos.AT_MOST_ONCE, false, 10, Mqtt5PayloadFormatIndicator.UNSPECIFIED, MqttUtf8StringImpl.of("myContentType"), MqttTopicImpl.of("responseTopic"), - ByteBuffer.wrap(new byte[]{1, 2, 3, 4, 5}), userProperties, null); + ByteBuffer.wrap(new byte[]{1, 2, 3, 4, 5}), userProperties, null, null); encode(expected, publish, -1, false, DEFAULT_NO_TOPIC_ALIAS, true, DEFAULT_NO_SUBSCRIPTION_IDENTIFIERS); } @@ -121,7 +121,7 @@ void encode_simple() { final MqttPublish publish = new MqttPublish(MqttTopicImpl.of("topic"), ByteBuffer.wrap(new byte[]{1, 2, 3, 4, 5}), MqttQos.AT_MOST_ONCE, false, MqttPublish.NO_MESSAGE_EXPIRY, - Mqtt5PayloadFormatIndicator.UNSPECIFIED, null, null, null, NO_USER_PROPERTIES, null); + Mqtt5PayloadFormatIndicator.UNSPECIFIED, null, null, null, NO_USER_PROPERTIES, null, null); encode(expected, publish, -1, false, DEFAULT_NO_TOPIC_ALIAS, true, DEFAULT_NO_SUBSCRIPTION_IDENTIFIERS); } @@ -145,7 +145,7 @@ void encode_retainTrue() { final MqttPublish publish = new MqttPublish(MqttTopicImpl.of("topic"), null, MqttQos.AT_MOST_ONCE, true, MqttPublish.NO_MESSAGE_EXPIRY, Mqtt5PayloadFormatIndicator.UNSPECIFIED, null, null, null, - NO_USER_PROPERTIES, null); + NO_USER_PROPERTIES, null, null); encode(expected, publish, -1, false, DEFAULT_NO_TOPIC_ALIAS, false, DEFAULT_NO_SUBSCRIPTION_IDENTIFIERS); } @@ -170,7 +170,7 @@ void encode_retainFalse() { final MqttPublish publish = new MqttPublish(MqttTopicImpl.of("topic"), null, MqttQos.AT_LEAST_ONCE, false, MqttPublish.NO_MESSAGE_EXPIRY, Mqtt5PayloadFormatIndicator.UNSPECIFIED, null, null, null, - NO_USER_PROPERTIES, null); + NO_USER_PROPERTIES, null, null); encode(expected, publish, 15, true, DEFAULT_NO_TOPIC_ALIAS, true, DEFAULT_NO_SUBSCRIPTION_IDENTIFIERS); } @@ -195,7 +195,7 @@ void encode_isDupFalse() { final MqttPublish publish = new MqttPublish(MqttTopicImpl.of("topic"), null, MqttQos.AT_LEAST_ONCE, false, MqttPublish.NO_MESSAGE_EXPIRY, Mqtt5PayloadFormatIndicator.UNSPECIFIED, null, null, null, - NO_USER_PROPERTIES, null); + NO_USER_PROPERTIES, null, null); encode(expected, publish, 15, false, DEFAULT_NO_TOPIC_ALIAS, true, DEFAULT_NO_SUBSCRIPTION_IDENTIFIERS); } @@ -220,7 +220,7 @@ void encode_isDupTrue() { final MqttPublish publish = new MqttPublish(MqttTopicImpl.of("topic"), null, MqttQos.AT_LEAST_ONCE, false, MqttPublish.NO_MESSAGE_EXPIRY, Mqtt5PayloadFormatIndicator.UNSPECIFIED, null, null, null, - NO_USER_PROPERTIES, null); + NO_USER_PROPERTIES, null, null); encode(expected, publish, 17, true, DEFAULT_NO_TOPIC_ALIAS, true, DEFAULT_NO_SUBSCRIPTION_IDENTIFIERS); } @@ -245,7 +245,7 @@ void encode_formatIndicatorUtf8() { final MqttPublish publish = new MqttPublish(MqttTopicImpl.of("topic"), null, MqttQos.AT_LEAST_ONCE, false, MqttPublish.NO_MESSAGE_EXPIRY, Mqtt5PayloadFormatIndicator.UTF_8, null, null, null, NO_USER_PROPERTIES, - null); + null, null); encode(expected, publish, 15, false, DEFAULT_NO_TOPIC_ALIAS, true, DEFAULT_NO_SUBSCRIPTION_IDENTIFIERS); } @@ -271,7 +271,7 @@ void encode_expiryInterval() { }; final MqttPublish publish = new MqttPublish(MqttTopicImpl.of("topic"), null, MqttQos.AT_LEAST_ONCE, false, 1000, - Mqtt5PayloadFormatIndicator.UTF_8, null, null, null, NO_USER_PROPERTIES, null); + Mqtt5PayloadFormatIndicator.UTF_8, null, null, null, NO_USER_PROPERTIES, null, null); encode(expected, publish, 15, false, DEFAULT_NO_TOPIC_ALIAS, true, DEFAULT_NO_SUBSCRIPTION_IDENTIFIERS); } @@ -298,7 +298,7 @@ void encode_contentType() { final MqttPublish publish = new MqttPublish(MqttTopicImpl.of("topic"), null, MqttQos.AT_LEAST_ONCE, false, MqttPublish.NO_MESSAGE_EXPIRY, Mqtt5PayloadFormatIndicator.UTF_8, - MqttUtf8StringImpl.of("myContentType"), null, null, NO_USER_PROPERTIES, null); + MqttUtf8StringImpl.of("myContentType"), null, null, NO_USER_PROPERTIES, null, null); encode(expected, publish, 15, false, DEFAULT_NO_TOPIC_ALIAS, true, DEFAULT_NO_SUBSCRIPTION_IDENTIFIERS); } @@ -325,7 +325,7 @@ void encode_responseTopic() { final MqttPublish publish = new MqttPublish(MqttTopicImpl.of("topic"), null, MqttQos.AT_LEAST_ONCE, false, MqttPublish.NO_MESSAGE_EXPIRY, Mqtt5PayloadFormatIndicator.UTF_8, null, - MqttTopicImpl.of("responseTopic"), null, NO_USER_PROPERTIES, null); + MqttTopicImpl.of("responseTopic"), null, NO_USER_PROPERTIES, null, null); encode(expected, publish, 15, false, DEFAULT_NO_TOPIC_ALIAS, true, DEFAULT_NO_SUBSCRIPTION_IDENTIFIERS); } @@ -353,7 +353,7 @@ void encode_correlationData() { final ByteBuffer correlationData = ByteBuffer.wrap(new byte[]{1, 2, 3, 4, 5}); final MqttPublish publish = new MqttPublish(MqttTopicImpl.of("topic"), null, MqttQos.AT_LEAST_ONCE, false, MqttPublish.NO_MESSAGE_EXPIRY, Mqtt5PayloadFormatIndicator.UTF_8, null, null, correlationData, - NO_USER_PROPERTIES, null); + NO_USER_PROPERTIES, null, null); encode(expected, publish, 15, false, DEFAULT_NO_TOPIC_ALIAS, true, DEFAULT_NO_SUBSCRIPTION_IDENTIFIERS); } @@ -377,7 +377,7 @@ void encode_newTopicAlias() { }; final MqttPublish publish = new MqttPublish(MqttTopicImpl.of("topic"), null, MqttQos.AT_LEAST_ONCE, false, - MqttPublish.NO_MESSAGE_EXPIRY, null, null, null, null, NO_USER_PROPERTIES, null); + MqttPublish.NO_MESSAGE_EXPIRY, null, null, null, null, NO_USER_PROPERTIES, null, null); encode(expected, publish, 15, false, 8, true, DEFAULT_NO_SUBSCRIPTION_IDENTIFIERS); } @@ -399,7 +399,7 @@ void encode_withoutTopicAlias() { }; final MqttPublish publish = new MqttPublish(MqttTopicImpl.of("topic"), null, MqttQos.AT_LEAST_ONCE, false, - MqttPublish.NO_MESSAGE_EXPIRY, null, null, null, null, NO_USER_PROPERTIES, null); + MqttPublish.NO_MESSAGE_EXPIRY, null, null, null, null, NO_USER_PROPERTIES, null, null); encode(expected, publish, 15, false, DEFAULT_NO_SUBSCRIPTION_IDENTIFIERS); } @@ -421,7 +421,7 @@ void encode_withoutTopicAliasUsingDefault() { }; final MqttPublish publish = new MqttPublish(MqttTopicImpl.of("topic"), null, MqttQos.AT_LEAST_ONCE, false, - MqttPublish.NO_MESSAGE_EXPIRY, null, null, null, null, NO_USER_PROPERTIES, null); + MqttPublish.NO_MESSAGE_EXPIRY, null, null, null, null, NO_USER_PROPERTIES, null, null); encode(expected, publish, 2, true, DEFAULT_NO_SUBSCRIPTION_IDENTIFIERS); } @@ -445,7 +445,7 @@ void encode_existingTopicAlias() { }; final MqttPublish publish = new MqttPublish(MqttTopicImpl.of("topic"), null, MqttQos.AT_LEAST_ONCE, false, - MqttPublish.NO_MESSAGE_EXPIRY, null, null, null, null, NO_USER_PROPERTIES, null); + MqttPublish.NO_MESSAGE_EXPIRY, null, null, null, null, NO_USER_PROPERTIES, null, null); encode(expected, publish, 15, false, 8, false, DEFAULT_NO_SUBSCRIPTION_IDENTIFIERS); } @@ -475,7 +475,7 @@ void encode_userProperties() { final MqttPublish publish = new MqttPublish(MqttTopicImpl.of("topic"), null, MqttQos.AT_LEAST_ONCE, false, MqttPublish.NO_MESSAGE_EXPIRY, Mqtt5PayloadFormatIndicator.UTF_8, null, null, null, userProperties, - null); + null, null); encode(expected, publish, 15, false, DEFAULT_NO_TOPIC_ALIAS, true, DEFAULT_NO_SUBSCRIPTION_IDENTIFIERS); } @@ -499,7 +499,7 @@ void encode_singleSubscriptionIdentifier() { }; final MqttPublish publish = new MqttPublish(MqttTopicImpl.of("topic"), null, MqttQos.AT_LEAST_ONCE, false, - MqttPublish.NO_MESSAGE_EXPIRY, null, null, null, null, NO_USER_PROPERTIES, null); + MqttPublish.NO_MESSAGE_EXPIRY, null, null, null, null, NO_USER_PROPERTIES, null, null); encode(expected, publish, 15, false, DEFAULT_NO_TOPIC_ALIAS, true, ImmutableIntList.of(3)); } @@ -525,7 +525,7 @@ void encode_multipleSubscriptionIdentifiers() { }; final MqttPublish publish = new MqttPublish(MqttTopicImpl.of("topic"), null, MqttQos.AT_LEAST_ONCE, false, - MqttPublish.NO_MESSAGE_EXPIRY, null, null, null, null, NO_USER_PROPERTIES, null); + MqttPublish.NO_MESSAGE_EXPIRY, null, null, null, null, NO_USER_PROPERTIES, null, null); encode(expected, publish, 15, false, DEFAULT_NO_TOPIC_ALIAS, true, ImmutableIntList.of(3, 4)); } @@ -581,17 +581,17 @@ void encode_qos() { }; final MqttPublish publishQos0 = new MqttPublish(MqttTopicImpl.of("topic"), null, MqttQos.AT_MOST_ONCE, false, MqttPublish.NO_MESSAGE_EXPIRY, Mqtt5PayloadFormatIndicator.UNSPECIFIED, null, null, null, - NO_USER_PROPERTIES, null); + NO_USER_PROPERTIES, null, null); encode(expectedQos0, publishQos0, 7, false, DEFAULT_NO_TOPIC_ALIAS, true, DEFAULT_NO_SUBSCRIPTION_IDENTIFIERS); final MqttPublish publishQos1 = new MqttPublish(MqttTopicImpl.of("topic"), null, MqttQos.AT_LEAST_ONCE, false, MqttPublish.NO_MESSAGE_EXPIRY, Mqtt5PayloadFormatIndicator.UNSPECIFIED, null, null, null, - NO_USER_PROPERTIES, null); + NO_USER_PROPERTIES, null, null); encode(expectedQos1, publishQos1, 7, false, DEFAULT_NO_TOPIC_ALIAS, true, DEFAULT_NO_SUBSCRIPTION_IDENTIFIERS); final MqttPublish publishQos2 = new MqttPublish(MqttTopicImpl.of("topic"), null, MqttQos.EXACTLY_ONCE, false, MqttPublish.NO_MESSAGE_EXPIRY, Mqtt5PayloadFormatIndicator.UNSPECIFIED, null, null, null, - NO_USER_PROPERTIES, null); + NO_USER_PROPERTIES, null, null); encode(expectedQos2, publishQos2, 7, false, DEFAULT_NO_TOPIC_ALIAS, true, DEFAULT_NO_SUBSCRIPTION_IDENTIFIERS); } @@ -601,7 +601,7 @@ void encode_maximumOutgoingPacketSizeExceeded_throwsEncoderException() { final ByteBuffer correlationData = ByteBuffer.wrap(new byte[100]); final MqttPublish publish = new MqttPublish(MqttTopicImpl.of("topic"), null, MqttQos.AT_MOST_ONCE, false, MqttPublish.NO_MESSAGE_EXPIRY, Mqtt5PayloadFormatIndicator.UNSPECIFIED, null, null, correlationData, - NO_USER_PROPERTIES, null); + NO_USER_PROPERTIES, null, null); final MqttStatefulPublish publishInternal = @@ -640,7 +640,7 @@ void encode_maximumPacketSizeExceeded_omitUserProperties() { final MqttPublish publish = new MqttPublish(MqttTopicImpl.of("topic"), ByteBuffer.wrap(new byte[]{1, 2, 3, 4, 5}), MqttQos.AT_MOST_ONCE, false, MqttPublish.NO_MESSAGE_EXPIRY, - Mqtt5PayloadFormatIndicator.UNSPECIFIED, null, null, null, userProperties, null); + Mqtt5PayloadFormatIndicator.UNSPECIFIED, null, null, null, userProperties, null, null); encode(expected, publish, -1, false, DEFAULT_NO_TOPIC_ALIAS, true, DEFAULT_NO_SUBSCRIPTION_IDENTIFIERS); } @@ -686,7 +686,7 @@ void encode_propertyLengthExceeded_omitUserProperties() { final MqttPublish publish = new MqttPublish(MqttTopicImpl.of("topic"), ByteBuffer.wrap(new byte[]{1, 2, 3, 4, 5}), MqttQos.AT_MOST_ONCE, false, MqttPublish.NO_MESSAGE_EXPIRY, - Mqtt5PayloadFormatIndicator.UNSPECIFIED, null, null, correlationData, userProperties, null); + Mqtt5PayloadFormatIndicator.UNSPECIFIED, null, null, correlationData, userProperties, null, null); encode(expected.array(), publish, -1, false, DEFAULT_NO_TOPIC_ALIAS, true, DEFAULT_NO_SUBSCRIPTION_IDENTIFIERS); expected.release();