From 845b86984a3b58787f5228041a55493f3e291d4e Mon Sep 17 00:00:00 2001 From: Sven Carrillo Castillo Date: Thu, 26 Oct 2023 23:21:56 +0200 Subject: [PATCH] feature/message-bus --- .../api/messagebus/MessageBusController.java | 14 ++++--- .../api/test/EmitMessagesController.java | 15 +++++++- .../message/MessageBusLongPollRequestDto.java | 3 +- .../dto/message/MessageBusResponseDto.java | 2 +- .../com/recom/dto/message/MessageDto.java | 4 +- .../service/messagebus/MessageBusService.java | 38 ++++++++++++------- 6 files changed, 52 insertions(+), 24 deletions(-) diff --git a/src/main/java/com/recom/api/messagebus/MessageBusController.java b/src/main/java/com/recom/api/messagebus/MessageBusController.java index 672af95f..ce7fc7a7 100644 --- a/src/main/java/com/recom/api/messagebus/MessageBusController.java +++ b/src/main/java/com/recom/api/messagebus/MessageBusController.java @@ -58,7 +58,7 @@ public ResponseEntity getMessagesForm( @RequestParam(required = true) @NonNull final Map payload ) { - log.debug("Requested POST/api/v1/message-bus/form (FORM)"); + log.debug("Requested POST /api/v1/message-bus/form (FORM)"); return getMessagesJSON(payloadParser.parseValidated(payload, MessageBusLongPollRequestDto.class)); } @@ -77,16 +77,16 @@ public ResponseEntity getMessagesJSON( @RequestBody(required = true) @NonNull @Valid final MessageBusLongPollRequestDto messageBusLongPollRequestDto ) { - if (messageBusLongPollRequestDto.getTimestampEpochMilliseconds() != null && messageBusLongPollRequestDto.getTimestampEpochMilliseconds() != 0L) { - log.debug("Requested POST /api/v1/message-bus since {} (JSON)", messageBusLongPollRequestDto.getTimestampEpochMilliseconds()); + if (isNumeric(messageBusLongPollRequestDto)) { + log.debug("Requested POST /api/v1/message-bus since {} (JSON)", messageBusLongPollRequestDto.getSinceEpochMilliseconds()); } else { log.debug("Requested POST /api/v1/message-bus (JSON)"); } assertionService.assertMapExists(messageBusLongPollRequestDto.getMapName()); ResponseBodyEmitter emitter; - final Lazy messagesSinceLazy = Lazy.of(() -> messageBusService.listMessagesSince(messageBusLongPollRequestDto.getMapName(), messageBusLongPollRequestDto.getTimestampEpochMilliseconds())); - if (messageBusLongPollRequestDto.getTimestampEpochMilliseconds() != null && messageBusLongPollRequestDto.getTimestampEpochMilliseconds() != 0L && !messagesSinceLazy.get().getMessages().isEmpty()) { + final Lazy messagesSinceLazy = Lazy.of(() -> messageBusService.listMessagesSince(messageBusLongPollRequestDto.getMapName(), Long.valueOf(messageBusLongPollRequestDto.getSinceEpochMilliseconds()))); + if (isNumeric(messageBusLongPollRequestDto) && !messagesSinceLazy.get().getMessages().isEmpty()) { emitter = new ResponseBodyEmitter(RECOM_CURL_TIMEOUT.toMillis()); try { emitter.send(messagesSinceLazy.get(), MediaType.APPLICATION_JSON); @@ -109,4 +109,8 @@ public ResponseEntity getMessagesJSON( .body(emitter); } + private static boolean isNumeric(@NonNull final MessageBusLongPollRequestDto messageBusLongPollRequestDto) { + return messageBusLongPollRequestDto.getSinceEpochMilliseconds() != null && !messageBusLongPollRequestDto.getSinceEpochMilliseconds().isEmpty(); + } + } \ No newline at end of file diff --git a/src/main/java/com/recom/api/test/EmitMessagesController.java b/src/main/java/com/recom/api/test/EmitMessagesController.java index a3b137dd..8db6488b 100644 --- a/src/main/java/com/recom/api/test/EmitMessagesController.java +++ b/src/main/java/com/recom/api/test/EmitMessagesController.java @@ -2,6 +2,7 @@ import com.recom.api.commons.HttpCommons; import com.recom.configuration.AsyncConfiguration; +import com.recom.dto.map.Point2DDto; import com.recom.dto.message.MessageBusLongPollRequestDto; import com.recom.model.message.MessageContainer; import com.recom.model.message.MessageType; @@ -28,6 +29,7 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; +import java.math.BigDecimal; import java.util.List; @Slf4j @@ -67,10 +69,19 @@ public ResponseEntity emitMessagesJSON( .messages(List.of( OneMessage.builder() .messageType(MessageType.FETCH_MAP_RENDER_DATA) - .payload(null) + .payload(List.of( + Point2DDto.builder() + .x(BigDecimal.valueOf(5.0)) + .y(BigDecimal.valueOf(7.8)) + .build(), + Point2DDto.builder() + .x(BigDecimal.valueOf(5.0)) + .y(BigDecimal.valueOf(7.8)) + .build()) + ) .build(), OneMessage.builder() - .messageType(MessageType.FETCH_MAP_RENDER_DATA) + .messageType(MessageType.TEST) .payload(null) .build() )) diff --git a/src/main/java/com/recom/dto/message/MessageBusLongPollRequestDto.java b/src/main/java/com/recom/dto/message/MessageBusLongPollRequestDto.java index be866d33..2f44eb5a 100644 --- a/src/main/java/com/recom/dto/message/MessageBusLongPollRequestDto.java +++ b/src/main/java/com/recom/dto/message/MessageBusLongPollRequestDto.java @@ -4,7 +4,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import io.swagger.v3.oas.annotations.media.Schema; import jakarta.validation.constraints.NotBlank; -import jakarta.validation.constraints.NotNull; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -29,7 +28,7 @@ public class MessageBusLongPollRequestDto implements Serializable { @Nullable @Schema(description = "Unix timestamp in milliseconds", example = "1691941419964") @JsonProperty() - private Long timestampEpochMilliseconds; + private String sinceEpochMilliseconds; } diff --git a/src/main/java/com/recom/dto/message/MessageBusResponseDto.java b/src/main/java/com/recom/dto/message/MessageBusResponseDto.java index eb1bd67c..7cee9b25 100644 --- a/src/main/java/com/recom/dto/message/MessageBusResponseDto.java +++ b/src/main/java/com/recom/dto/message/MessageBusResponseDto.java @@ -25,7 +25,7 @@ public class MessageBusResponseDto implements Serializable { @Schema @JsonProperty() - private Long epochMillisecondsLastMessage; + private String epochMillisecondsLastMessage; @Schema @JsonProperty() diff --git a/src/main/java/com/recom/dto/message/MessageDto.java b/src/main/java/com/recom/dto/message/MessageDto.java index 4ec8b970..12c0e277 100644 --- a/src/main/java/com/recom/dto/message/MessageDto.java +++ b/src/main/java/com/recom/dto/message/MessageDto.java @@ -1,5 +1,6 @@ package com.recom.dto.message; +import com.fasterxml.jackson.annotation.JsonFormat; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.recom.model.message.MessageType; @@ -25,11 +26,12 @@ public class MessageDto { @Schema @JsonProperty() + @JsonFormat(shape = JsonFormat.Shape.STRING) private MessageType messageType; @Schema(description = "Unix timestamp in milliseconds", example = "1691941419964") @JsonProperty() - private Long timestampEpochMilliseconds; + private String timestampEpochMilliseconds; @Schema @JsonProperty() diff --git a/src/main/java/com/recom/service/messagebus/MessageBusService.java b/src/main/java/com/recom/service/messagebus/MessageBusService.java index adc4dbd0..3884a9fb 100644 --- a/src/main/java/com/recom/service/messagebus/MessageBusService.java +++ b/src/main/java/com/recom/service/messagebus/MessageBusService.java @@ -48,7 +48,13 @@ private void setLatestMessage( @NonNull final MessageBusResponseDto response, @NonNull final List persistedMessages ) { - response.setEpochMillisecondsLastMessage(persistedMessages.stream().mapToLong(message -> message.getTimestamp().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()).max().orElse(0L)); + response.setEpochMillisecondsLastMessage(persistedMessages.stream() + .mapToLong(message -> message.getTimestamp().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()) + .boxed() + .max(Long::compareTo) + .map(String::valueOf) + .orElse(null) + ); } private @NonNull List persistNotification(@NonNull final MessageBusResponseDto messageBusResponse) { @@ -60,7 +66,7 @@ private void setLatestMessage( .mapName(messageBusResponse.getMapName()) .messageType(message.getMessageType()) .payload(StaticObjectMapperProvider.provide().writeValueAsString(message.getPayload())) - .timestamp(LocalDateTime.ofInstant(Instant.ofEpochMilli(message.getTimestampEpochMilliseconds()), ZoneId.systemDefault())) + .timestamp(LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(message.getTimestampEpochMilliseconds())), ZoneId.systemDefault())) .build(); } catch (final JsonProcessingException jpe) { log.error("JsonProcessingException: ", jpe); @@ -82,21 +88,23 @@ private void setLatestMessage( @NonNull private MessageBusResponseDto prepareNotification(@NonNull final MessageContainer container) { final Instant now = Instant.now(); - long epochMilli = now.toEpochMilli(); + final long epochMilli = now.toEpochMilli(); final List messages = container.getMessages().stream() - .map(message -> { - return MessageDto.builder() - .uuid(UUID.randomUUID()) - .messageType(message.getMessageType()) - .payload(message.getPayload()) - .timestampEpochMilliseconds(epochMilli) - .build(); - }) + .map(message -> MessageDto.builder() + .uuid(UUID.randomUUID()) + .messageType(message.getMessageType()) + .payload(message.getPayload()) + .timestampEpochMilliseconds(String.valueOf(epochMilli)) + .build()) .toList(); return MessageBusResponseDto.builder() .mapName(container.getMapName()) - .epochMillisecondsLastMessage(messages.stream().mapToLong(MessageDto::getTimestampEpochMilliseconds).max().orElse(0L)) + .epochMillisecondsLastMessage(messages.stream() + .map(MessageDto::getTimestampEpochMilliseconds) + .max(String::compareTo) + .orElse(null) + ) .messages(messages) .build(); } @@ -110,7 +118,11 @@ public MessageBusResponseDto listMessagesSince( final List messages = messagePersistenceLayer.findAllMapSpecificMessagesSince(mapName, Optional.ofNullable(sinceTimestampEpochMilliseconds).orElse(0L)); return MessageBusResponseDto.builder() .mapName(mapName) - .epochMillisecondsLastMessage(messages.stream().mapToLong(MessageDto::getTimestampEpochMilliseconds).max().orElse(0L)) + .epochMillisecondsLastMessage(messages.stream() + .map(MessageDto::getTimestampEpochMilliseconds) + .max(String::compareTo) + .orElse(null) + ) .messages(messages) .build(); }