Skip to content

Commit

Permalink
provide the possibility to define additional DittoHeaders to be inclu…
Browse files Browse the repository at this point in the history
…ded when sending/replying to a message

* fixed critical bug in PendingMessageImpl which caused a non-visible ClassCastException for message responses using the typed message send() API

Signed-off-by: Thomas Jaeckle <[email protected]>
  • Loading branch information
thjaeckle committed Apr 23, 2020
1 parent d4bb8a6 commit 4b25736
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,37 +107,41 @@ private static void typeCheckAndConsume(final MessageResponseConsumer<?> respons
final BiConsumer uncheckedResponseConsumer = responseConsumer.getResponseConsumer();
final Class<?> responseType = responseConsumer.getResponseType();

// throw ClassCastException if response has incorrect type
final Message<?> responseMessage;
if (response instanceof MessageCommand) {
responseMessage = ((MessageCommand) response).getMessage();
} else if (response instanceof MessageCommandResponse) {
responseMessage = ((MessageCommandResponse) response).getMessage();
} else if (response instanceof ErrorResponse) {
uncheckedResponseConsumer.accept(null, ((ErrorResponse<?>) response).getDittoRuntimeException());
return;
} else {
uncheckedResponseConsumer.accept(null, classCastException(responseType, response));
return;
}
try {
// throw ClassCastException if response has incorrect type
final Message<?> responseMessage;
if (response instanceof MessageCommand) {
responseMessage = ((MessageCommand) response).getMessage();
} else if (response instanceof MessageCommandResponse) {
responseMessage = ((MessageCommandResponse) response).getMessage();
} else if (response instanceof ErrorResponse) {
uncheckedResponseConsumer.accept(null, ((ErrorResponse<?>) response).getDittoRuntimeException());
return;
} else {
uncheckedResponseConsumer.accept(null, classCastException(responseType, response));
return;
}

if (responseConsumer.getResponseType().isAssignableFrom(ByteBuffer.class)) {
uncheckedResponseConsumer.accept(asByteBufferMessage(responseMessage), null);
} else {
final Optional<?> payloadOptional = responseMessage.getPayload();
if (payloadOptional.isPresent()) {
final Object payload = payloadOptional.get();
if (responseConsumer.getResponseType().isInstance(payload)) {
uncheckedResponseConsumer.accept(payload, null);
if (responseConsumer.getResponseType().isAssignableFrom(ByteBuffer.class)) {
uncheckedResponseConsumer.accept(asByteBufferMessage(responseMessage), null);
} else {
final Optional<?> payloadOptional = responseMessage.getPayload();
if (payloadOptional.isPresent()) {
final Object payload = payloadOptional.get();
if (responseConsumer.getResponseType().isInstance(payload)) {
uncheckedResponseConsumer.accept(setMessagePayload(responseMessage, payload), null);
} else {
// response has unexpected type
uncheckedResponseConsumer.accept(setMessagePayload(responseMessage, null),
classCastException(responseType, payload));
}
} else {
// response has unexpected type
uncheckedResponseConsumer.accept(setMessagePayload(responseMessage, null),
classCastException(responseType, payload));
// response has no payload; regard it as any message type
uncheckedResponseConsumer.accept(responseMessage, null);
}
} else {
// response has no payload; regard it as any message type
uncheckedResponseConsumer.accept(responseMessage, null);
}
} catch (final RuntimeException e) {
uncheckedResponseConsumer.accept(null, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.function.Consumer;

import org.eclipse.ditto.model.base.common.HttpStatusCode;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.messages.Message;
import org.eclipse.ditto.model.things.ThingId;

Expand Down Expand Up @@ -152,6 +153,16 @@ interface SetPayloadOrSend<T> extends MessageSendable<T> {
*/
SetPayloadOrSend<T> statusCode(HttpStatusCode statusCode);

/**
* Sets additional headers to send in the message.
*
* @param additionalHeaders the headers.
* @return fluent api builder that provides the functionality to set <em>optionally</em> fields of the message
* or send the message.
* @since 1.1.0
*/
SetPayloadOrSend<T> headers(DittoHeaders additionalHeaders);

/**
* Sets the payload of the message. NOTE: The maximum payload size is restricted to 10MB.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.eclipse.ditto.client.live.messages.MessageSender;
import org.eclipse.ditto.model.base.common.HttpStatusCode;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.messages.Message;
import org.eclipse.ditto.model.messages.MessageBuilder;
import org.eclipse.ditto.model.messages.MessageDirection;
Expand Down Expand Up @@ -56,6 +57,7 @@ public final class ImmutableMessageSender<T> implements MessageSender<T> {
private String messageCorrelationId;
private String messageContentType;
private HttpStatusCode messageStatusCode;
private DittoHeaders messageAdditionalHeaders;
private Consumer<Message<T>> sendConsumer;

private ImmutableMessageSender(final boolean isResponse) {
Expand All @@ -69,6 +71,7 @@ private ImmutableMessageSender(final boolean isResponse) {
messageTimestamp = null;
messageCorrelationId = null;
messageStatusCode = null;
messageAdditionalHeaders = null;
}

/**
Expand Down Expand Up @@ -128,12 +131,19 @@ private void buildAndSendMessage(final T payload) {

private <R> void buildAndSendMessage(final T payload, final MessageResponseConsumer<R> responseConsumer) {
final MessageHeadersBuilder messageHeadersBuilder =
MessageHeaders.newBuilder(messageDirection, messageThingId, messageSubject)
.contentType(messageContentType)
.featureId(messageFeatureId)
.timeout(messageTimeout)
.timestamp(messageTimestamp)
.correlationId(messageCorrelationId);
MessageHeaders.newBuilder(messageDirection, messageThingId, messageSubject);

if (null != messageAdditionalHeaders) {
// put additionalHeaders first, so that custom "contentType", "timeout", etc. still overwrites the values:
messageHeadersBuilder.putHeaders(messageAdditionalHeaders);
}

messageHeadersBuilder
.contentType(messageContentType)
.featureId(messageFeatureId)
.timeout(messageTimeout)
.timestamp(messageTimestamp)
.correlationId(messageCorrelationId);

if (responseConsumer == null) {
messageHeadersBuilder.responseRequired(false);
Expand Down Expand Up @@ -222,6 +232,12 @@ public SetPayloadOrSend<T> statusCode(final HttpStatusCode statusCode) {
return this;
}

@Override
public SetPayloadOrSend<T> headers(final DittoHeaders additionalHeaders) {
messageAdditionalHeaders = additionalHeaders;
return this;
}

@Override
public SetContentType<T> payload(final T payload) {
return new SetContentTypeImpl(payload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ public void onBinaryMessage(final WebSocket websocket, final byte[] binary) {
@Override
public void onTextMessage(final WebSocket websocket, final String text) {
callbackExecutor.execute(() -> {
LOGGER.trace("Client <{}>: Received WebSocket string message <{}>", sessionId, text);
LOGGER.debug("Client <{}>: Received WebSocket string message <{}>", sessionId, text);
handleIncomingMessage(text);
});
}
Expand Down

0 comments on commit 4b25736

Please sign in to comment.