Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added the field localCorrelationData to MqttPublish for better flow-handling in reactive-APIs #546

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class Mqtt3PublishDecoder implements MqttMessageDecoder {
payload.position(0);
}

final MqttPublish publish = Mqtt3PublishView.delegate(topic, payload, qos, retain);
final MqttPublish publish = Mqtt3PublishView.delegate(topic, payload, qos, retain, null);

return Mqtt3PublishView.statefulDelegate(publish, packetIdentifier, dup);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public class Mqtt5PublishDecoder implements MqttMessageDecoder {

final MqttPublish publish =
new MqttPublish(topic, payload, qos, retain, messageExpiryInterval, payloadFormatIndicator, contentType,
responseTopic, correlationData, userProperties, null);
responseTopic, correlationData, userProperties, null, null);

final ImmutableIntList subscriptionIdentifiers =
(subscriptionIdentifiersBuilder == null) ? DEFAULT_NO_SUBSCRIPTION_IDENTIFIERS :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class MqttPublish extends MqttMessageWithUserProperties implements Mqtt5P
private final @Nullable MqttUtf8StringImpl contentType;
private final @Nullable MqttTopicImpl responseTopic;
private final @Nullable ByteBuffer correlationData;
private final @Nullable Object localContext;

private final @Nullable Confirmable confirmable;

Expand All @@ -73,7 +74,8 @@ public MqttPublish(
final @Nullable MqttTopicImpl responseTopic,
final @Nullable ByteBuffer correlationData,
final @NotNull MqttUserPropertiesImpl userProperties,
final @Nullable Confirmable confirmable) {
final @Nullable Confirmable confirmable,
final @Nullable Object localContext) {

super(userProperties);
this.topic = topic;
Expand All @@ -86,6 +88,7 @@ public MqttPublish(
this.responseTopic = responseTopic;
this.correlationData = correlationData;
this.confirmable = confirmable;
this.localContext = localContext;
}

@Override
Expand Down Expand Up @@ -163,6 +166,10 @@ public long getRawMessageExpiryInterval() {
return correlationData;
}

public @Nullable Object getLocalContext() {
return localContext;
}

@Override
public void acknowledge() {
final Confirmable confirmable = this.confirmable;
Expand Down Expand Up @@ -204,7 +211,7 @@ public void acknowledge() {

public @NotNull MqttPublish withConfirmable(final @NotNull Confirmable confirmable) {
return new MqttPublish(topic, payload, qos, retain, messageExpiryInterval, payloadFormatIndicator, contentType,
responseTopic, correlationData, getUserProperties(), confirmable);
responseTopic, correlationData, getUserProperties(), confirmable, localContext);
}

@Override
Expand All @@ -216,6 +223,7 @@ public void acknowledge() {
((contentType == null) ? "" : ", contentType=" + contentType) +
((responseTopic == null) ? "" : ", responseTopic=" + responseTopic) +
((correlationData == null) ? "" : ", correlationData=" + correlationData.remaining() + "byte") +
((localContext == null) ? "" : ", localCorrelationData=" + localContext + " " + localContext.getClass().getSimpleName()) +
StringUtil.prepend(", ", super.toAttributeString());
}

Expand All @@ -239,7 +247,7 @@ public boolean equals(final @Nullable Object o) {
(messageExpiryInterval == that.messageExpiryInterval) &&
(payloadFormatIndicator == that.payloadFormatIndicator) &&
Objects.equals(contentType, that.contentType) && Objects.equals(responseTopic, that.responseTopic) &&
Objects.equals(correlationData, that.correlationData);
Objects.equals(correlationData, that.correlationData) && Objects.equals(localContext, that.localContext);
}

protected boolean canEqual(final @Nullable Object o) {
Expand All @@ -258,6 +266,7 @@ public int hashCode() {
result = 31 * result + Objects.hashCode(contentType);
result = 31 * result + Objects.hashCode(responseTopic);
result = 31 * result + Objects.hashCode(correlationData);
result = 31 * result + Objects.hashCode(localContext);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public abstract class MqttPublishBuilder<B extends MqttPublishBuilder<B>> {
@Nullable MqttUtf8StringImpl contentType;
@Nullable MqttTopicImpl responseTopic;
@Nullable ByteBuffer correlationData;
@Nullable Object localCorrelationData;
@NotNull MqttUserPropertiesImpl userProperties = MqttUserPropertiesImpl.NO_USER_PROPERTIES;

MqttPublishBuilder() {}
Expand All @@ -62,6 +63,7 @@ public abstract class MqttPublishBuilder<B extends MqttPublishBuilder<B>> {
responseTopic = publish.getRawResponseTopic();
correlationData = publish.getRawCorrelationData();
userProperties = publish.getUserProperties();
localCorrelationData = publish.getLocalContext();
}

MqttPublishBuilder(final @NotNull MqttPublishBuilder<?> publishBuilder) {
Expand Down Expand Up @@ -152,6 +154,11 @@ public abstract class MqttPublishBuilder<B extends MqttPublishBuilder<B>> {
return self();
}

public @NotNull B localCorrelationData(final @Nullable Object localCorrelationData) {
this.localCorrelationData = localCorrelationData;
return self();
}

public @NotNull B userProperties(final @Nullable Mqtt5UserProperties userProperties) {
this.userProperties = MqttChecks.userProperties(userProperties);
return self();
Expand Down Expand Up @@ -186,7 +193,7 @@ private static abstract class Base<B extends Base<B>> extends MqttPublishBuilder
public @NotNull MqttPublish build() {
Checks.notNull(topic, "Topic");
return new MqttPublish(topic, payload, qos, retain, messageExpiryInterval, payloadFormatIndicator,
contentType, responseTopic, correlationData, userProperties, null);
contentType, responseTopic, correlationData, userProperties, null, localCorrelationData);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public MqttWillPublish(
final long delayInterval) {

super(topic, payload, qos, isRetain, messageExpiryInterval, payloadFormatIndicator, contentType, responseTopic,
correlationData, userProperties, null);
correlationData, userProperties, null, null);
this.delayInterval = delayInterval;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ public class Mqtt3PublishView implements Mqtt3Publish {
final @NotNull MqttTopicImpl topic,
final @Nullable ByteBuffer payload,
final @NotNull MqttQos qos,
final boolean retain) {
final boolean retain,
final @Nullable Object localContext) {

return new MqttPublish(topic, payload, qos, retain, MqttPublish.NO_MESSAGE_EXPIRY, null, null, null, null,
MqttUserPropertiesImpl.NO_USER_PROPERTIES, null);
MqttUserPropertiesImpl.NO_USER_PROPERTIES, null, localContext);
}

public static @NotNull MqttStatefulPublish statefulDelegate(
Expand All @@ -65,9 +66,10 @@ public class Mqtt3PublishView implements Mqtt3Publish {
final @NotNull MqttTopicImpl topic,
final @Nullable ByteBuffer payload,
final @NotNull MqttQos qos,
final boolean retain) {
final boolean retain,
final @Nullable Object localContext) {

return new Mqtt3PublishView(delegate(topic, payload, qos, retain));
return new Mqtt3PublishView(delegate(topic, payload, qos, retain, localContext));
}

static @NotNull Mqtt3PublishView willOf(
Expand Down Expand Up @@ -124,6 +126,11 @@ public boolean isRetain() {
return delegate;
}

@Override
public Object localContext() {
return delegate.getLocalContext();
}

@Override
public void acknowledge() {
delegate.acknowledge();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public abstract class Mqtt3PublishViewBuilder<B extends Mqtt3PublishViewBuilder<

@Nullable MqttTopicImpl topic;
@Nullable ByteBuffer payload;
@Nullable Object localContext;
@NotNull MqttQos qos = Mqtt3PublishView.DEFAULT_QOS;
boolean retain;

Expand All @@ -51,6 +52,7 @@ public abstract class Mqtt3PublishViewBuilder<B extends Mqtt3PublishViewBuilder<
topic = delegate.getTopic();
payload = delegate.getRawPayload();
qos = delegate.getQos();
localContext = delegate.getLocalContext();
retain = delegate.isRetain();
}

Expand Down Expand Up @@ -80,6 +82,11 @@ public abstract class Mqtt3PublishViewBuilder<B extends Mqtt3PublishViewBuilder<
return self();
}

public @NotNull B localContext(@Nullable final Object localContext) {
this.localContext = localContext;
return self();
}

private static abstract class Base<B extends Base<B>> extends Mqtt3PublishViewBuilder<B> {

Base() {}
Expand All @@ -100,7 +107,7 @@ private static abstract class Base<B extends Base<B>> extends Mqtt3PublishViewBu

public @NotNull Mqtt3PublishView build() {
Checks.notNull(topic, "Topic");
return Mqtt3PublishView.of(topic, payload, qos, retain);
return Mqtt3PublishView.of(topic, payload, qos, retain, localContext);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.hivemq.client.mqtt.mqtt3.message.Mqtt3Message;
import com.hivemq.client.mqtt.mqtt3.message.Mqtt3MessageType;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.nio.ByteBuffer;
import java.util.Optional;
Expand Down Expand Up @@ -77,6 +78,12 @@ public interface Mqtt3Publish extends Mqtt3Message {
*/
boolean isRetain();

/**
* @return the optional context of this Publish message. This data is never propagated and kept
* locally for correlation.
*/
@Nullable Object localContext();

/**
* Acknowledges this Publish message.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ public interface Mqtt5Publish extends Mqtt5Message {
*/
@NotNull Mqtt5UserProperties getUserProperties();

/**
* @return the optional local correlation data of this Publish message. This data is never propagated and kept
* locally for correlation.
*/
Object getLocalContext();

/**
* Acknowledges this Publish message.
*
Expand Down
Loading