Skip to content

Commit

Permalink
feature/message-bus
Browse files Browse the repository at this point in the history
  • Loading branch information
svencc committed Oct 26, 2023
1 parent 112cfe6 commit 845b869
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 24 deletions.
14 changes: 9 additions & 5 deletions src/main/java/com/recom/api/messagebus/MessageBusController.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public ResponseEntity<ResponseBodyEmitter> getMessagesForm(
@RequestParam(required = true)
@NonNull final Map<String, String> payload
) {
log.debug("Requested POST/api/v1/message-bus/form (FORM)");
log.debug("Requested POST /api/v1/message-bus/form (FORM)");

return getMessagesJSON(payloadParser.parseValidated(payload, MessageBusLongPollRequestDto.class));
}
Expand All @@ -77,16 +77,16 @@ public ResponseEntity<ResponseBodyEmitter> getMessagesJSON(
@RequestBody(required = true)
@NonNull @Valid final MessageBusLongPollRequestDto messageBusLongPollRequestDto
) {
if (messageBusLongPollRequestDto.getTimestampEpochMilliseconds() != null && messageBusLongPollRequestDto.getTimestampEpochMilliseconds() != 0L) {
log.debug("Requested POST /api/v1/message-bus since {} (JSON)", messageBusLongPollRequestDto.getTimestampEpochMilliseconds());
if (isNumeric(messageBusLongPollRequestDto)) {
log.debug("Requested POST /api/v1/message-bus since {} (JSON)", messageBusLongPollRequestDto.getSinceEpochMilliseconds());
} else {
log.debug("Requested POST /api/v1/message-bus (JSON)");
}
assertionService.assertMapExists(messageBusLongPollRequestDto.getMapName());

ResponseBodyEmitter emitter;
final Lazy<MessageBusResponseDto> messagesSinceLazy = Lazy.of(() -> messageBusService.listMessagesSince(messageBusLongPollRequestDto.getMapName(), messageBusLongPollRequestDto.getTimestampEpochMilliseconds()));
if (messageBusLongPollRequestDto.getTimestampEpochMilliseconds() != null && messageBusLongPollRequestDto.getTimestampEpochMilliseconds() != 0L && !messagesSinceLazy.get().getMessages().isEmpty()) {
final Lazy<MessageBusResponseDto> messagesSinceLazy = Lazy.of(() -> messageBusService.listMessagesSince(messageBusLongPollRequestDto.getMapName(), Long.valueOf(messageBusLongPollRequestDto.getSinceEpochMilliseconds())));
if (isNumeric(messageBusLongPollRequestDto) && !messagesSinceLazy.get().getMessages().isEmpty()) {
emitter = new ResponseBodyEmitter(RECOM_CURL_TIMEOUT.toMillis());
try {
emitter.send(messagesSinceLazy.get(), MediaType.APPLICATION_JSON);
Expand All @@ -109,4 +109,8 @@ public ResponseEntity<ResponseBodyEmitter> getMessagesJSON(
.body(emitter);
}

private static boolean isNumeric(@NonNull final MessageBusLongPollRequestDto messageBusLongPollRequestDto) {
return messageBusLongPollRequestDto.getSinceEpochMilliseconds() != null && !messageBusLongPollRequestDto.getSinceEpochMilliseconds().isEmpty();
}

}
15 changes: 13 additions & 2 deletions src/main/java/com/recom/api/test/EmitMessagesController.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.recom.api.commons.HttpCommons;
import com.recom.configuration.AsyncConfiguration;
import com.recom.dto.map.Point2DDto;
import com.recom.dto.message.MessageBusLongPollRequestDto;
import com.recom.model.message.MessageContainer;
import com.recom.model.message.MessageType;
Expand All @@ -28,6 +29,7 @@
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.math.BigDecimal;
import java.util.List;

@Slf4j
Expand Down Expand Up @@ -67,10 +69,19 @@ public ResponseEntity<Void> emitMessagesJSON(
.messages(List.of(
OneMessage.builder()
.messageType(MessageType.FETCH_MAP_RENDER_DATA)
.payload(null)
.payload(List.of(
Point2DDto.builder()
.x(BigDecimal.valueOf(5.0))
.y(BigDecimal.valueOf(7.8))
.build(),
Point2DDto.builder()
.x(BigDecimal.valueOf(5.0))
.y(BigDecimal.valueOf(7.8))
.build())
)
.build(),
OneMessage.builder()
.messageType(MessageType.FETCH_MAP_RENDER_DATA)
.messageType(MessageType.TEST)
.payload(null)
.build()
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
Expand All @@ -29,7 +28,7 @@ public class MessageBusLongPollRequestDto implements Serializable {
@Nullable
@Schema(description = "Unix timestamp in milliseconds", example = "1691941419964")
@JsonProperty()
private Long timestampEpochMilliseconds;
private String sinceEpochMilliseconds;

}

Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class MessageBusResponseDto implements Serializable {

@Schema
@JsonProperty()
private Long epochMillisecondsLastMessage;
private String epochMillisecondsLastMessage;

@Schema
@JsonProperty()
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/com/recom/dto/message/MessageDto.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.recom.dto.message;

import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.recom.model.message.MessageType;
Expand All @@ -25,11 +26,12 @@ public class MessageDto {

@Schema
@JsonProperty()
@JsonFormat(shape = JsonFormat.Shape.STRING)
private MessageType messageType;

@Schema(description = "Unix timestamp in milliseconds", example = "1691941419964")
@JsonProperty()
private Long timestampEpochMilliseconds;
private String timestampEpochMilliseconds;

@Schema
@JsonProperty()
Expand Down
38 changes: 25 additions & 13 deletions src/main/java/com/recom/service/messagebus/MessageBusService.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,13 @@ private void setLatestMessage(
@NonNull final MessageBusResponseDto response,
@NonNull final List<Message> persistedMessages
) {
response.setEpochMillisecondsLastMessage(persistedMessages.stream().mapToLong(message -> message.getTimestamp().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()).max().orElse(0L));
response.setEpochMillisecondsLastMessage(persistedMessages.stream()
.mapToLong(message -> message.getTimestamp().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli())
.boxed()
.max(Long::compareTo)
.map(String::valueOf)
.orElse(null)
);
}

private @NonNull List<Message> persistNotification(@NonNull final MessageBusResponseDto messageBusResponse) {
Expand All @@ -60,7 +66,7 @@ private void setLatestMessage(
.mapName(messageBusResponse.getMapName())
.messageType(message.getMessageType())
.payload(StaticObjectMapperProvider.provide().writeValueAsString(message.getPayload()))
.timestamp(LocalDateTime.ofInstant(Instant.ofEpochMilli(message.getTimestampEpochMilliseconds()), ZoneId.systemDefault()))
.timestamp(LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(message.getTimestampEpochMilliseconds())), ZoneId.systemDefault()))
.build();
} catch (final JsonProcessingException jpe) {
log.error("JsonProcessingException: ", jpe);
Expand All @@ -82,21 +88,23 @@ private void setLatestMessage(
@NonNull
private MessageBusResponseDto prepareNotification(@NonNull final MessageContainer container) {
final Instant now = Instant.now();
long epochMilli = now.toEpochMilli();
final long epochMilli = now.toEpochMilli();
final List<MessageDto> messages = container.getMessages().stream()
.map(message -> {
return MessageDto.builder()
.uuid(UUID.randomUUID())
.messageType(message.getMessageType())
.payload(message.getPayload())
.timestampEpochMilliseconds(epochMilli)
.build();
})
.map(message -> MessageDto.builder()
.uuid(UUID.randomUUID())
.messageType(message.getMessageType())
.payload(message.getPayload())
.timestampEpochMilliseconds(String.valueOf(epochMilli))
.build())
.toList();

return MessageBusResponseDto.builder()
.mapName(container.getMapName())
.epochMillisecondsLastMessage(messages.stream().mapToLong(MessageDto::getTimestampEpochMilliseconds).max().orElse(0L))
.epochMillisecondsLastMessage(messages.stream()
.map(MessageDto::getTimestampEpochMilliseconds)
.max(String::compareTo)
.orElse(null)
)
.messages(messages)
.build();
}
Expand All @@ -110,7 +118,11 @@ public MessageBusResponseDto listMessagesSince(
final List<MessageDto> messages = messagePersistenceLayer.findAllMapSpecificMessagesSince(mapName, Optional.ofNullable(sinceTimestampEpochMilliseconds).orElse(0L));
return MessageBusResponseDto.builder()
.mapName(mapName)
.epochMillisecondsLastMessage(messages.stream().mapToLong(MessageDto::getTimestampEpochMilliseconds).max().orElse(0L))
.epochMillisecondsLastMessage(messages.stream()
.map(MessageDto::getTimestampEpochMilliseconds)
.max(String::compareTo)
.orElse(null)
)
.messages(messages)
.build();
}
Expand Down

0 comments on commit 845b869

Please sign in to comment.