Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added the field localCorrelationData to MqttPublish for better flow-handling in reactive-APIs #546

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class MqttPublish extends MqttMessageWithUserProperties implements Mqtt5P
private final @Nullable MqttUtf8StringImpl contentType;
private final @Nullable MqttTopicImpl responseTopic;
private final @Nullable ByteBuffer correlationData;
private final @Nullable Object localCorrelationData;

private final @Nullable Confirmable confirmable;

Expand All @@ -73,7 +74,8 @@ public MqttPublish(
final @Nullable MqttTopicImpl responseTopic,
final @Nullable ByteBuffer correlationData,
final @NotNull MqttUserPropertiesImpl userProperties,
final @Nullable Confirmable confirmable) {
final @Nullable Confirmable confirmable,
final @Nullable Object localCorrelationData) {

super(userProperties);
this.topic = topic;
Expand All @@ -86,6 +88,24 @@ public MqttPublish(
this.responseTopic = responseTopic;
this.correlationData = correlationData;
this.confirmable = confirmable;
this.localCorrelationData = localCorrelationData;
}

public MqttPublish(
final @NotNull MqttTopicImpl topic,
final @Nullable ByteBuffer payload,
final @NotNull MqttQos qos,
final boolean retain,
final long messageExpiryInterval,
final @Nullable Mqtt5PayloadFormatIndicator payloadFormatIndicator,
final @Nullable MqttUtf8StringImpl contentType,
final @Nullable MqttTopicImpl responseTopic,
final @Nullable ByteBuffer correlationData,
final @NotNull MqttUserPropertiesImpl userProperties,
final @Nullable Confirmable confirmable) {

this(topic, payload, qos, retain, messageExpiryInterval, payloadFormatIndicator, contentType,
responseTopic, correlationData, userProperties, confirmable, null);
codepitbull marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand Down Expand Up @@ -163,6 +183,10 @@ public long getRawMessageExpiryInterval() {
return correlationData;
}

public @Nullable Object getLocalCorrelationData() {
return localCorrelationData;
}

@Override
public void acknowledge() {
final Confirmable confirmable = this.confirmable;
Expand Down Expand Up @@ -204,7 +228,7 @@ public void acknowledge() {

public @NotNull MqttPublish withConfirmable(final @NotNull Confirmable confirmable) {
return new MqttPublish(topic, payload, qos, retain, messageExpiryInterval, payloadFormatIndicator, contentType,
responseTopic, correlationData, getUserProperties(), confirmable);
responseTopic, correlationData, getUserProperties(), confirmable, localCorrelationData);
}

@Override
Expand All @@ -216,6 +240,7 @@ public void acknowledge() {
((contentType == null) ? "" : ", contentType=" + contentType) +
((responseTopic == null) ? "" : ", responseTopic=" + responseTopic) +
((correlationData == null) ? "" : ", correlationData=" + correlationData.remaining() + "byte") +
((localCorrelationData == null) ? "" : ", localCorrelationData=" + localCorrelationData + " " + localCorrelationData.getClass().getSimpleName()) +
StringUtil.prepend(", ", super.toAttributeString());
}

Expand All @@ -239,7 +264,7 @@ public boolean equals(final @Nullable Object o) {
(messageExpiryInterval == that.messageExpiryInterval) &&
(payloadFormatIndicator == that.payloadFormatIndicator) &&
Objects.equals(contentType, that.contentType) && Objects.equals(responseTopic, that.responseTopic) &&
Objects.equals(correlationData, that.correlationData);
Objects.equals(correlationData, that.correlationData) && Objects.equals(localCorrelationData, that.localCorrelationData);
}

protected boolean canEqual(final @Nullable Object o) {
Expand All @@ -258,6 +283,7 @@ public int hashCode() {
result = 31 * result + Objects.hashCode(contentType);
result = 31 * result + Objects.hashCode(responseTopic);
result = 31 * result + Objects.hashCode(correlationData);
result = 31 * result + Objects.hashCode(localCorrelationData);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public abstract class MqttPublishBuilder<B extends MqttPublishBuilder<B>> {
@Nullable MqttUtf8StringImpl contentType;
@Nullable MqttTopicImpl responseTopic;
@Nullable ByteBuffer correlationData;
@Nullable Object localCorrelationData;
@NotNull MqttUserPropertiesImpl userProperties = MqttUserPropertiesImpl.NO_USER_PROPERTIES;

MqttPublishBuilder() {}
Expand All @@ -62,6 +63,7 @@ public abstract class MqttPublishBuilder<B extends MqttPublishBuilder<B>> {
responseTopic = publish.getRawResponseTopic();
correlationData = publish.getRawCorrelationData();
userProperties = publish.getUserProperties();
localCorrelationData = publish.getLocalCorrelationData();
}

MqttPublishBuilder(final @NotNull MqttPublishBuilder<?> publishBuilder) {
Expand Down Expand Up @@ -152,6 +154,11 @@ public abstract class MqttPublishBuilder<B extends MqttPublishBuilder<B>> {
return self();
}

public @NotNull B localCorrelationData(final @Nullable Object localCorrelationData) {
this.localCorrelationData = localCorrelationData;
return self();
}

public @NotNull B userProperties(final @Nullable Mqtt5UserProperties userProperties) {
this.userProperties = MqttChecks.userProperties(userProperties);
return self();
Expand Down Expand Up @@ -186,7 +193,7 @@ private static abstract class Base<B extends Base<B>> extends MqttPublishBuilder
public @NotNull MqttPublish build() {
Checks.notNull(topic, "Topic");
return new MqttPublish(topic, payload, qos, retain, messageExpiryInterval, payloadFormatIndicator,
contentType, responseTopic, correlationData, userProperties, null);
contentType, responseTopic, correlationData, userProperties, null, localCorrelationData);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,18 @@ public class Mqtt3PublishView implements Mqtt3Publish {
final @NotNull MqttQos qos,
final boolean retain) {

return delegate(topic, payload, qos, retain, null);
}

public static @NotNull MqttPublish delegate(
codepitbull marked this conversation as resolved.
Show resolved Hide resolved
final @NotNull MqttTopicImpl topic,
final @Nullable ByteBuffer payload,
final @NotNull MqttQos qos,
final boolean retain,
final Object localCorrelationData) {

return new MqttPublish(topic, payload, qos, retain, MqttPublish.NO_MESSAGE_EXPIRY, null, null, null, null,
MqttUserPropertiesImpl.NO_USER_PROPERTIES, null);
MqttUserPropertiesImpl.NO_USER_PROPERTIES, null, localCorrelationData);
}

public static @NotNull MqttStatefulPublish statefulDelegate(
Expand Down Expand Up @@ -124,6 +134,11 @@ public boolean isRetain() {
return delegate;
}

@Override
public Object localCorrelationData() {
return delegate.getLocalCorrelationData();
}

@Override
public void acknowledge() {
delegate.acknowledge();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ public interface Mqtt3Publish extends Mqtt3Message {
*/
boolean isRetain();

/**
* @return the optional local correlation data of this Publish message. This data is never propagated and kept
* locally for correlation.
*/
Object localCorrelationData();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API needs to be discussed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean the usage of object (yep, could replace it with a generic but that would change general signature of the class) or something else?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not like a generic here. But maybe there would be a case for creating a more sophisticated structure like for user properties. Having just plain Object there is not very expandable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YannickWeber any objections to merging this and then punting the improvement (that may or may not be needed) for a future date? Object isn't "slick" but it serves the purpose for now without complexity. Plus this isn't a customer requested feature.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I have objections. This is API, API can only change in major versions, therefore we need to be very cautious what we are adding and need to double check for expandability and clarity to make it future-proof.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to double check for expandability and clarity to make it future-proof

One addition, I agree on this point completely.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with moving cautiously, just to provide some context:
The newly added API is strictly for local correlation when inside a reactive pipeline.
I used Object because it was the least invasive and most flexible in that case.
With some fighting we might turn this into a generic.
We could also generate some id for these local interactions which I think would be a little much since this must never be transferred over the wire.

Copy link
Contributor

@YannickWeber YannickWeber Feb 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course we can add API in minor versions, but we can only remove methods in major versions. Therefore, I want us to be very careful about API design, as we can't easily correct potential API issues.

In the client world, things aren't as strict since package updates are manual.

I would argue that API stability and caution is of high importance independent of where it is used.

My suggestion would be to not return an Object but rather add an Interface LocalCorrelationData. So that we can expand the return value easier in the future. But that also does not mitigate the problem that you always have to unsafely cast when using the correlation data object :(

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An unsafe cast can be avoided with an instanceof, which got a lot nicer with recent Java-versions.

From a type-perspective it would be best to introduce a generic, like Ractor-Kafka is doing it (https://github.com/reactor/reactor-kafka/blob/main/src/main/java/reactor/kafka/sender/SenderRecord.java#L30).

Everything without a generic will require an unsafe cast at some point.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A final decision would be ideal. Let me know if there are any other thoughts.

codepitbull marked this conversation as resolved.
Show resolved Hide resolved

/**
* Acknowledges this Publish message.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ public interface Mqtt5Publish extends Mqtt5Message {
*/
@NotNull Mqtt5UserProperties getUserProperties();

/**
* @return the optional local correlation data of this Publish message. This data is never propagated and kept
* locally for correlation.
*/
Object getLocalCorrelationData();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API needs to be discussed.

codepitbull marked this conversation as resolved.
Show resolved Hide resolved

/**
* Acknowledges this Publish message.
*
Expand Down