diff --git a/src/main/java/com/recom/api/messagebus/MessageBusController.java b/src/main/java/com/recom/api/messagebus/MessageBusController.java index 86de75f3..a738157b 100644 --- a/src/main/java/com/recom/api/messagebus/MessageBusController.java +++ b/src/main/java/com/recom/api/messagebus/MessageBusController.java @@ -2,7 +2,9 @@ import com.recom.api.commons.HttpCommons; import com.recom.configuration.AsyncConfiguration; -import com.recom.dto.message.MessageBusRequestDto; +import com.recom.dto.message.MessageBusLongPollRequestDto; +import com.recom.dto.message.MessageBusResponseDto; +import com.recom.dto.message.MessageBusSinceRequestDto; import com.recom.persistence.message.MessagePersistenceLayer; import com.recom.service.AssertionService; import com.recom.service.ReforgerPayloadParserService; @@ -17,7 +19,6 @@ import jakarta.validation.Valid; import lombok.NonNull; import lombok.RequiredArgsConstructor; -import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.http.*; import org.springframework.validation.annotation.Validated; @@ -49,8 +50,8 @@ public class MessageBusController { private final AsyncConfiguration asyncConfiguration; @Operation( - summary = "Get a list of messages", - description = "Gets all map specific, latest message of a type.", + summary = "Long-poll latest messages", + description = "Polls for all map specific, latest message.", security = @SecurityRequirement(name = HttpCommons.BEARER_AUTHENTICATION_REQUIREMENT) ) @ApiResponses(value = { @@ -64,12 +65,12 @@ public ResponseEntity getMessagesForm( ) { log.debug("Requested POST /api/v1/map/renderer/form (FORM)"); - return getMessagesJSON(payloadParser.parseValidated(payload, MessageBusRequestDto.class)); + return getMessagesJSON(payloadParser.parseValidated(payload, MessageBusLongPollRequestDto.class)); } @Operation( - summary = "Get a list of messages", - description = "Gets all map specific, latest message of a type.", + summary = "Long-poll latest messages", + description = "Polls for all map specific, latest message.", security = @SecurityRequirement(name = HttpCommons.BEARER_AUTHENTICATION_REQUIREMENT) ) @ApiResponses(value = { @@ -79,10 +80,10 @@ public ResponseEntity getMessagesForm( @PostMapping(path = "", produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE) public ResponseEntity getMessagesJSON( @RequestBody(required = true) - @NonNull @Valid final MessageBusRequestDto mapRendererRequestDto + @NonNull @Valid final MessageBusLongPollRequestDto messageBusLongPollRequestDto ) { log.debug("Requested POST /api/v1/map/message-bus (JSON)"); - assertionService.assertMapExists(mapRendererRequestDto.getMapName()); + assertionService.assertMapExists(messageBusLongPollRequestDto.getMapName()); final MessageLongPollObserver messageLongPollObserver = MessageLongPollObserver.builder() .timeout(RECOM_CURL_TIMEOUT.toMillis()) @@ -91,7 +92,7 @@ public ResponseEntity getMessagesJSON( .build(); messageLongPollObserver.observe(messageBusService.getSubject()); - messageLongPollObserver.scheduleTestResponse(mapRendererRequestDto.getMapName(), Duration.ofSeconds(5), messageBusService.getSubject(), asyncConfiguration); + messageLongPollObserver.scheduleTestResponse(messageBusLongPollRequestDto.getMapName(), Duration.ofSeconds(5), messageBusService.getSubject(), asyncConfiguration); final HttpHeaders httpHeaders = new HttpHeaders(); httpHeaders.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE); @@ -102,4 +103,26 @@ public ResponseEntity getMessagesJSON( .body(messageLongPollObserver.provideResponseEmitter()); } + @Operation( + summary = "Get a list of messages since given timestamp", + description = "Gets all map specific message since provided epoch-millis.", + security = @SecurityRequirement(name = HttpCommons.BEARER_AUTHENTICATION_REQUIREMENT) + ) + @ApiResponses(value = { + @ApiResponse(responseCode = HttpCommons.OK_CODE, description = HttpCommons.OK), + @ApiResponse(responseCode = HttpCommons.UNAUTHORIZED_CODE, description = HttpCommons.UNAUTHORIZED, content = @Content()) + }) + @PostMapping(path = "/after", produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE) + public ResponseEntity getMessagesSinceJSON( + @RequestBody(required = true) + @NonNull @Valid final MessageBusSinceRequestDto messageBusSinceRequestDto + ) { + log.debug("Requested POST /api/v1/map/message-bus/after (JSON)"); + assertionService.assertMapExists(messageBusSinceRequestDto.getMapName()); + + return ResponseEntity.status(HttpStatus.OK) + .cacheControl(CacheControl.noCache()) + .body(messageBusService.listMessagesSince(messageBusSinceRequestDto.getMapName(), messageBusSinceRequestDto.getTimestampEpochMilliseconds())); + } + } diff --git a/src/main/java/com/recom/dto/message/MessageBusRequestDto.java b/src/main/java/com/recom/dto/message/MessageBusLongPollRequestDto.java similarity index 89% rename from src/main/java/com/recom/dto/message/MessageBusRequestDto.java rename to src/main/java/com/recom/dto/message/MessageBusLongPollRequestDto.java index 1e1755f9..90b4ed75 100644 --- a/src/main/java/com/recom/dto/message/MessageBusRequestDto.java +++ b/src/main/java/com/recom/dto/message/MessageBusLongPollRequestDto.java @@ -17,7 +17,7 @@ @NoArgsConstructor @AllArgsConstructor @JsonInclude(JsonInclude.Include.NON_NULL) -public class MessageBusRequestDto implements Serializable { +public class MessageBusLongPollRequestDto implements Serializable { @NotBlank @Schema diff --git a/src/main/java/com/recom/dto/message/MessageBusSinceRequestDto.java b/src/main/java/com/recom/dto/message/MessageBusSinceRequestDto.java new file mode 100644 index 00000000..30564efe --- /dev/null +++ b/src/main/java/com/recom/dto/message/MessageBusSinceRequestDto.java @@ -0,0 +1,34 @@ +package com.recom.dto.message; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotNull; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +@Data +@Schema +@Builder +@NoArgsConstructor +@AllArgsConstructor +@JsonInclude(JsonInclude.Include.NON_NULL) +public class MessageBusSinceRequestDto implements Serializable { + + @NotBlank + @Schema + @JsonProperty() + private String mapName; + + @NotNull + @Schema(description = "Unix timestamp in milliseconds", example = "1691941419964") + @JsonProperty() + private Long timestampEpochMilliseconds; + +} + diff --git a/src/main/java/com/recom/dto/message/MessageDto.java b/src/main/java/com/recom/dto/message/MessageDto.java index d331e370..4ec8b970 100644 --- a/src/main/java/com/recom/dto/message/MessageDto.java +++ b/src/main/java/com/recom/dto/message/MessageDto.java @@ -31,12 +31,8 @@ public class MessageDto { @JsonProperty() private Long timestampEpochMilliseconds; - @Schema(description = "Unix timestamp in milliseconds", example = "1691941419964") - @JsonProperty() - private Long timestampConfirmationEpochMilliseconds; - @Schema @JsonProperty() - private String payload; + private Object payload; } \ No newline at end of file diff --git a/src/main/java/com/recom/entity/Message.java b/src/main/java/com/recom/entity/Message.java index 121d1a35..b5064b76 100644 --- a/src/main/java/com/recom/entity/Message.java +++ b/src/main/java/com/recom/entity/Message.java @@ -20,8 +20,7 @@ @AllArgsConstructor @Table(indexes = { @Index(name = "IDX_mapName", columnList = "mapName", unique = false), - @Index(name = "IDX_mapName_messageType", columnList = "mapName, messageType", unique = false), - @Index(name = "IDX_mapName_messageType_timestamp", columnList = "mapName, messageType, timestamp", unique = true), + @Index(name = "IDX_mapName_messageType_timestamp", columnList = "mapName, messageType, timestamp", unique = false), @Index(name = "IDX_mapName_timestamp", columnList = "mapName, timestamp", unique = false), }) @Cacheable @@ -29,7 +28,7 @@ public class Message implements Persistable, Serializable { @Id - @GeneratedValue(strategy = GenerationType.AUTO) +// @GeneratedValue(strategy = GenerationType.AUTO) @Column(insertable = true, updatable = false, nullable = false) private UUID uuid; diff --git a/src/main/java/com/recom/model/message/MessageContainer.java b/src/main/java/com/recom/model/message/MessageContainer.java new file mode 100644 index 00000000..b5383294 --- /dev/null +++ b/src/main/java/com/recom/model/message/MessageContainer.java @@ -0,0 +1,19 @@ +package com.recom.model.message; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class MessageContainer { + + private String mapName; + private List messages; + +} diff --git a/src/main/java/com/recom/model/message/OneMessage.java b/src/main/java/com/recom/model/message/OneMessage.java new file mode 100644 index 00000000..7054662b --- /dev/null +++ b/src/main/java/com/recom/model/message/OneMessage.java @@ -0,0 +1,17 @@ +package com.recom.model.message; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class OneMessage { + + private MessageType messageType; + private Object payload; + +} diff --git a/src/main/java/com/recom/persistence/dbcached/DBCachedPersistenceLayer.java b/src/main/java/com/recom/persistence/dbcached/DBCachedPersistenceLayer.java index 0c6e9756..f004e1fc 100644 --- a/src/main/java/com/recom/persistence/dbcached/DBCachedPersistenceLayer.java +++ b/src/main/java/com/recom/persistence/dbcached/DBCachedPersistenceLayer.java @@ -60,7 +60,7 @@ private DBCachedItem updateExistingCacheItem( try (ByteArrayOutputStream byteArrayOutputStream = serializeObject(value)) { existingCacheItem.setCachedValue(byteArrayOutputStream.toByteArray()); return existingCacheItem; - } catch (Exception e) { + } catch (final Exception e) { log.error("Error serializing cache value with key {}!", existingCacheItem.getCacheKey(), e); return null; } @@ -79,7 +79,7 @@ private DBCachedItem createNewCacheItem( .cacheKey(cacheKey) .cachedValue(byteArrayOutputStream.toByteArray()) .build(); - } catch (Exception e) { + } catch (final Exception e) { log.error("Error serializing cache value with key {}!", cacheKey, e); return null; } @@ -116,7 +116,7 @@ private Optional deserializeCacheValue( ) throws DBCachedDeserializationException { try (ObjectInputStream inputStream = new ObjectInputStream(new ByteArrayInputStream(serializedValue))) { return Optional.ofNullable((V) inputStream.readObject()); - } catch (IOException | ClassNotFoundException e) { + } catch (final IOException | ClassNotFoundException e) { log.error("Error deserializing cache value with key {}", cacheKey); throw new DBCachedDeserializationException(String.format("Unable to deserialize cacheKey %s", cacheKey), e); } diff --git a/src/main/java/com/recom/persistence/message/MessagePersistenceLayer.java b/src/main/java/com/recom/persistence/message/MessagePersistenceLayer.java index 72b69ff1..551bbe54 100644 --- a/src/main/java/com/recom/persistence/message/MessagePersistenceLayer.java +++ b/src/main/java/com/recom/persistence/message/MessagePersistenceLayer.java @@ -1,15 +1,25 @@ package com.recom.persistence.message; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.recom.dto.message.MessageDto; import com.recom.entity.Message; import com.recom.mapper.MessageMapper; import lombok.NonNull; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.core.convert.ConversionService; import org.springframework.stereotype.Service; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.Comparator; import java.util.List; +import java.util.Map; +import java.util.Objects; +@Slf4j @Service @RequiredArgsConstructor public class MessagePersistenceLayer { @@ -18,12 +28,29 @@ public class MessagePersistenceLayer { private final MessageRepository messageRepository; @NonNull private final ConversionService conversionService; + @NonNull + private final ObjectMapper objectMapper; @NonNull -// @Cacheable(cacheNames = "CommandPersistenceLayer.findAllMapSpecificCommands") // @TODO there is some work to do, to cache the commands ... - public List findAllMapSpecificMessages(@NonNull final String mapName) { - return messageRepository.findAllByMapName(mapName).stream() + public List findAllMapSpecificMessagesSince( + @NonNull final String mapName, + @NonNull final Long sinceTimestampEpochMilliseconds + ) { + final LocalDateTime since = LocalDateTime.ofInstant(Instant.ofEpochMilli(sinceTimestampEpochMilliseconds), ZoneId.systemDefault()); + return messageRepository.findAllByMapNameAndTimestampAfter(mapName, since).stream() .map(MessageMapper.INSTANCE::toDto) + .map(messageDto -> { + try { + final Map map = objectMapper.readValue((String) messageDto.getPayload(), Map.class); + messageDto.setPayload(map); + return messageDto; + } catch (JsonProcessingException e) { + log.error("Error while parsing payload of message: {}", messageDto, e); + } + return null; + }) + .filter(Objects::nonNull) + .sorted(Comparator.comparing(MessageDto::getTimestampEpochMilliseconds)) .toList(); } diff --git a/src/main/java/com/recom/persistence/message/MessageRepository.java b/src/main/java/com/recom/persistence/message/MessageRepository.java index 808a3b2e..7c5eaf7a 100644 --- a/src/main/java/com/recom/persistence/message/MessageRepository.java +++ b/src/main/java/com/recom/persistence/message/MessageRepository.java @@ -5,12 +5,16 @@ import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Repository; +import java.time.LocalDateTime; import java.util.List; @Repository interface MessageRepository extends JpaRepository { @NonNull - List findAllByMapName(@NonNull final String mapName); + List findAllByMapNameAndTimestampAfter( + @NonNull final String mapName, + @NonNull final LocalDateTime timestamp + ); } \ No newline at end of file diff --git a/src/main/java/com/recom/security/RECOMJWTAuthenticationFilter.java b/src/main/java/com/recom/security/RECOMJWTAuthenticationFilter.java index f55b53ba..38eeb725 100644 --- a/src/main/java/com/recom/security/RECOMJWTAuthenticationFilter.java +++ b/src/main/java/com/recom/security/RECOMJWTAuthenticationFilter.java @@ -84,7 +84,7 @@ protected void doFilterInternal( repository.saveContext(SecurityContextHolder.getContext(), request, response); filterChain.doFilter(request, response); } - } catch (Throwable e) { + } catch (final Throwable e) { response.setStatus(HttpServletResponse.SC_UNAUTHORIZED); } } diff --git a/src/main/java/com/recom/security/jwt/JwtTokenService.java b/src/main/java/com/recom/security/jwt/JwtTokenService.java index 2ea0c30f..e58aa634 100644 --- a/src/main/java/com/recom/security/jwt/JwtTokenService.java +++ b/src/main/java/com/recom/security/jwt/JwtTokenService.java @@ -46,7 +46,7 @@ public String passThroughIfClaimIsPresent(@Nullable final Object sub) throws Htt public UUID extractAndAssertSubjectIsUUID(@NonNull final String sub) throws HttpUnauthorizedException { try { return conversionService.convert(sub, UUID.class); - } catch (ConversionException e) { + } catch (final ConversionException e) { throw new HttpUnauthorizedException("Invalid token; subject is not an UUID!"); } } diff --git a/src/main/java/com/recom/security/rsa/RSAKeyGenerator.java b/src/main/java/com/recom/security/rsa/RSAKeyGenerator.java index 0f7a5b7a..160b8ece 100644 --- a/src/main/java/com/recom/security/rsa/RSAKeyGenerator.java +++ b/src/main/java/com/recom/security/rsa/RSAKeyGenerator.java @@ -148,7 +148,7 @@ private void persistKeyToFile( try (final OutputStream fos = fileSystemResource.getOutputStream()) { log.info("| +- Save {} key: '{}'", keyType.name().toLowerCase(), filePath.toAbsolutePath()); fos.write(keyBytes); - } catch (IOException e) { + } catch (final IOException e) { throw new IOException(e); } } @@ -165,7 +165,7 @@ private void persistUUIDToFile( try (final OutputStream fos = fileSystemResource.getOutputStream()) { log.info("| +- Save uuid: '{}'", filePath.toAbsolutePath()); fos.write(uuid.getBytes()); - } catch (IOException e) { + } catch (final IOException e) { throw new IOException(e); } } diff --git a/src/main/java/com/recom/service/ReforgerPayloadParserService.java b/src/main/java/com/recom/service/ReforgerPayloadParserService.java index 2e8f8b76..549e53b2 100644 --- a/src/main/java/com/recom/service/ReforgerPayloadParserService.java +++ b/src/main/java/com/recom/service/ReforgerPayloadParserService.java @@ -45,7 +45,7 @@ public T parseValidated( validateInput(parsed); return parsed; - } catch (NoSuchElementException | JsonProcessingException e) { + } catch (final NoSuchElementException | JsonProcessingException e) { log.error("Cannot parse a Reforger JSON DTO {}", String.join(";\n", payload.keySet()), e); throw new HttpBadRequestException(e.getMessage()); } diff --git a/src/main/java/com/recom/service/configuration/ConfigurationRESTManagementService.java b/src/main/java/com/recom/service/configuration/ConfigurationRESTManagementService.java index 1ee76497..dbe4a0d9 100644 --- a/src/main/java/com/recom/service/configuration/ConfigurationRESTManagementService.java +++ b/src/main/java/com/recom/service/configuration/ConfigurationRESTManagementService.java @@ -125,7 +125,7 @@ private Consumer handleOverrideWithCorrespondingDefaultConfigurat if (override.getType() == ConfigurationType.LIST) { try { existingOverride.setValue(StaticObjectMapperProvider.provide().writeValueAsString(override.getMapOverriddenListValue())); - } catch (JsonProcessingException e) { + } catch (final JsonProcessingException e) { throw new RuntimeException(e); } } @@ -143,7 +143,7 @@ private Consumer handleOverrideWithCorrespondingDefaultConfigurat if (override.getType() == ConfigurationType.LIST) { try { configurationBuilder.value(StaticObjectMapperProvider.provide().writeValueAsString(override.getMapOverriddenListValue())); - } catch (JsonProcessingException e) { + } catch (final JsonProcessingException e) { throw new RuntimeException(e); } } diff --git a/src/main/java/com/recom/service/dbcached/AsyncCacheableRequestProcessor.java b/src/main/java/com/recom/service/dbcached/AsyncCacheableRequestProcessor.java index 6c7e4942..77e8e925 100644 --- a/src/main/java/com/recom/service/dbcached/AsyncCacheableRequestProcessor.java +++ b/src/main/java/com/recom/service/dbcached/AsyncCacheableRequestProcessor.java @@ -50,7 +50,7 @@ public ResponseEntity processRequest( .cacheControl(CacheControl.noCache()) .build() ); - } catch (DBCachedDeserializationException dbcde) { + } catch (final DBCachedDeserializationException dbcde) { log.error("Unable to deserialize cached value; Generating new one"); dbCachedManager.delete(cacheName, cacheKey); } @@ -69,7 +69,7 @@ public ResponseEntity processRequest( Optional result = Optional.empty(); try { result = Optional.ofNullable(cacheLoader.get()); - } catch (Exception e) { + } catch (final Exception e) { log.error("Async-Exception", e); } finally { result.ifPresent(value -> dbCachedManager.put(cacheName, cacheKey, value)); diff --git a/src/main/java/com/recom/service/dbcached/DBCachedManager.java b/src/main/java/com/recom/service/dbcached/DBCachedManager.java index 56b00b33..4bbf54bf 100644 --- a/src/main/java/com/recom/service/dbcached/DBCachedManager.java +++ b/src/main/java/com/recom/service/dbcached/DBCachedManager.java @@ -43,7 +43,7 @@ public Optional get( .map(cacheValue -> { try { return (V) cacheValue.get(); - } catch (ClassCastException e) { + } catch (final ClassCastException e) { return null; } }) @@ -68,7 +68,7 @@ public void put( ) { try { cacheManager.getCache(cacheName).put(cacheKey, valueToCache); - } catch (Exception e) { + } catch (final Exception e) { log.debug("Put; cache {} not found", cacheName); } dbCachedPersistenceLayer.put(cacheName, cacheKey, valueToCache); @@ -80,7 +80,7 @@ public void delete( ) { try { cacheManager.getCache(cacheName).evict(cacheKey); - } catch (Exception e) { + } catch (final Exception e) { log.debug("Delete; cache {} not found", cacheName); } dbCachedPersistenceLayer.delete(cacheName, cacheKey); diff --git a/src/main/java/com/recom/service/dbcached/DBCachedService.java b/src/main/java/com/recom/service/dbcached/DBCachedService.java index 237a68ae..dd52ba23 100644 --- a/src/main/java/com/recom/service/dbcached/DBCachedService.java +++ b/src/main/java/com/recom/service/dbcached/DBCachedService.java @@ -26,7 +26,7 @@ public V proxyToDBCacheSafe( ) { try { return proxyToDBCacheUnsafe(cacheName, cacheKey, cacheLoader); - } catch (DBCachedDeserializationException e) { + } catch (final DBCachedDeserializationException e) { log.info("Executing cacheLoader (due to deserialization error) {} - {} ", cacheName, cacheKey); log.info("Delete {} - {} ", cacheName, cacheKey); dbCachedPersistenceLayer.delete(cacheName, cacheKey); diff --git a/src/main/java/com/recom/service/messagebus/MessageBusService.java b/src/main/java/com/recom/service/messagebus/MessageBusService.java index df77045c..4050999e 100644 --- a/src/main/java/com/recom/service/messagebus/MessageBusService.java +++ b/src/main/java/com/recom/service/messagebus/MessageBusService.java @@ -2,8 +2,8 @@ import com.recom.dto.message.MessageBusResponseDto; import com.recom.dto.message.MessageDto; -import com.recom.entity.Message; -import com.recom.model.message.MessageType; +import com.recom.mapper.MessageMapper; +import com.recom.model.message.MessageContainer; import com.recom.observer.*; import com.recom.persistence.message.MessagePersistenceLayer; import lombok.Getter; @@ -12,38 +12,34 @@ import org.springframework.stereotype.Service; import java.util.List; -import java.util.UUID; @Service @RequiredArgsConstructor -public class MessageBusService implements HasSubject { +public class MessageBusService implements HasSubject { @NonNull @Getter() - private final Subjective subject = new Subject<>(); - + private final Subjective subject = new Subject<>(); @NonNull private final MessagePersistenceLayer messagePersistenceLayer; - public void sendMessage( + public void sendMessage( @NonNull final String mapName, - @NonNull final String messageType, - @NonNull final String payload + @NonNull final MessageContainer messageContainer + ) { + subject.notifyObserversWith(new Notification<>(messageContainer)); + } + + @NonNull + public MessageBusResponseDto listMessagesSince( + @NonNull final String mapName, + @NonNull final Long sinceTimestampEpochMilliseconds + ) { - subject.notifyObserversWith(new Notification<>(MessageBusResponseDto.builder().messages( - List.of( - MessageDto.builder() - .uuid(UUID.randomUUID()) - .payload("test") - .build() - ) - ).build())); - -// messagePersistenceLayer.saveAll(List.of(Message.builder() -// .mapName(mapName) -// .messageType(MessageType.valueOf(messageType)) -// .payload(payload) -// .build())); + return MessageBusResponseDto.builder() + .mapName(mapName) + .messages(messagePersistenceLayer.findAllMapSpecificMessagesSince(mapName, sinceTimestampEpochMilliseconds)) + .build(); } } diff --git a/src/main/java/com/recom/service/messagebus/MessageLongPollObserver.java b/src/main/java/com/recom/service/messagebus/MessageLongPollObserver.java index 53d7fd65..aa18f318 100644 --- a/src/main/java/com/recom/service/messagebus/MessageLongPollObserver.java +++ b/src/main/java/com/recom/service/messagebus/MessageLongPollObserver.java @@ -2,11 +2,14 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.recom.configuration.AsyncConfiguration; +import com.recom.dto.map.Point2DDto; import com.recom.dto.message.MessageBusResponseDto; import com.recom.dto.message.MessageDto; import com.recom.entity.Message; import com.recom.exception.HttpTimeoutException; import com.recom.model.message.MessageType; +import com.recom.model.message.OneMessage; +import com.recom.model.message.MessageContainer; import com.recom.observer.Notification; import com.recom.observer.ObserverTemplate; import com.recom.observer.Subjective; @@ -20,8 +23,11 @@ import org.springframework.http.MediaType; import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter; +import java.math.BigDecimal; import java.time.Duration; +import java.time.Instant; import java.time.LocalDateTime; +import java.time.ZoneId; import java.util.List; import java.util.Objects; import java.util.UUID; @@ -29,7 +35,7 @@ @Slf4j @RequiredArgsConstructor -public class MessageLongPollObserver extends ObserverTemplate { +public class MessageLongPollObserver extends ObserverTemplate { @NonNull private final Long timeout; @@ -60,15 +66,15 @@ public MessageLongPollObserver( @Override public void takeNotice( - @NonNull final Subjective subject, - @NonNull final Notification notification + @NonNull final Subjective subject, + @NonNull final Notification messageBag ) { log.debug("MessageLongPollObserver.takeNotice"); try { - responseBodyEmitter.send(notification.getPayload(), MediaType.APPLICATION_JSON); - - persistNotification(notification); - } catch (Exception e) { + final MessageBusResponseDto response = prepareNotification(messageBag.getPayload()); + responseBodyEmitter.send(response, MediaType.APPLICATION_JSON); + persistNotification(response); + } catch (final Exception e) { log.error(e.getMessage(), e); responseBodyEmitter.completeWithError(e); } finally { @@ -77,22 +83,44 @@ public void takeNotice( } } - private void persistNotification(@NonNull final Notification notification) { - final List messagesToSave = notification.getPayload().getMessages().stream() - .map(messageDto -> { - try { - return Message.builder() - .mapName(notification.getPayload().getMapName()) - .messageType(MessageType.TEST) - .payload(StaticObjectMapperProvider.provide().writeValueAsString(messageDto)) - .timestamp(LocalDateTime.now()) - .build(); - } catch (JsonProcessingException jpe) { - log.error("JsonProcessingException: ", jpe); - } + @NonNull + private MessageBusResponseDto prepareNotification(@NonNull final MessageContainer container) { + final Instant now = Instant.now(); + long epochMilli = now.toEpochMilli(); + return MessageBusResponseDto.builder() + .mapName(container.getMapName()) + .messages(container.getMessages().stream() + .map(message -> { + return MessageDto.builder() + .uuid(UUID.randomUUID()) + .messageType(message.getMessageType()) + .payload(message.getPayload()) + .timestampEpochMilliseconds(epochMilli) + .build(); + }) + .toList() + ) + .build(); + } + + private void persistNotification(@NonNull final MessageBusResponseDto messageBusResponse) { + final List messagesToSave = messageBusResponse.getMessages().stream() + .map(message -> { + try { + return Message.builder() + .uuid(message.getUuid()) + .mapName(messageBusResponse.getMapName()) + .messageType(message.getMessageType()) + .payload(StaticObjectMapperProvider.provide().writeValueAsString(message.getPayload())) + .timestamp(LocalDateTime.ofInstant(Instant.ofEpochMilli(message.getTimestampEpochMilliseconds()), ZoneId.systemDefault())) + .build(); + } catch (final JsonProcessingException jpe) { + log.error("JsonProcessingException: ", jpe); + } + return null; + } - return null; - }) + ) .filter(Objects::nonNull) .toList(); @@ -107,22 +135,38 @@ public ResponseBodyEmitter provideResponseEmitter() { public void scheduleTestResponse( @NonNull final String mapName, @NonNull final Duration duration, - @NonNull final Subjective onBehalfOf, + @NonNull final Subjective onBehalfOf, @NonNull final AsyncConfiguration asyncConfiguration ) { log.debug("MessageLongPollObserver.startObserving"); CompletableFuture.runAsync(() -> { try { Thread.sleep(duration.toMillis()); - } catch (InterruptedException ignored) { + } catch (final InterruptedException ignored) { log.error("Interrupted Exception"); } takeNotice( onBehalfOf, new Notification<>( - MessageBusResponseDto.builder() + MessageContainer.builder() .mapName(mapName) - .messages(List.of(MessageDto.builder().uuid(UUID.randomUUID()).build())) + .messages(List.of( + OneMessage.builder() + .messageType(MessageType.TEST) + .payload(Point2DDto.builder() + .x(BigDecimal.valueOf(1.0)) + .y(BigDecimal.valueOf(2.0)) + .build()) + .build(), + OneMessage.builder() + .messageType(MessageType.TEST) + .payload(Point2DDto.builder() + .x(BigDecimal.valueOf(1.0)) + .y(BigDecimal.valueOf(2.0)) + .build()) + .build() + ) + ) .build() ) ); diff --git a/src/main/java/lib/clipboard/goap/fsm/FSM.java b/src/main/java/lib/clipboard/goap/fsm/FSM.java index 216c6cf3..29d2ddf3 100644 --- a/src/main/java/lib/clipboard/goap/fsm/FSM.java +++ b/src/main/java/lib/clipboard/goap/fsm/FSM.java @@ -36,7 +36,7 @@ public void update(@NonNull final IGoapUnit goapUnit) { dispatchPlanFinishedEvent(); } } - } catch (Exception e) { + } catch (final Exception e) { final FSMStateful state = states.pop(); if (state instanceof PerformActionState) { dispatchPlanFailedEvent(((PerformActionState) state).getCurrentActions()); diff --git a/src/main/java/lib/clipboard/goap/fsm/states/PerformActionState.java b/src/main/java/lib/clipboard/goap/fsm/states/PerformActionState.java index 24d00232..3cdc0efa 100644 --- a/src/main/java/lib/clipboard/goap/fsm/states/PerformActionState.java +++ b/src/main/java/lib/clipboard/goap/fsm/states/PerformActionState.java @@ -68,9 +68,9 @@ public boolean isStateStillPerforming(@NonNull final IGoapUnit goapUnit) throws workingOnQueue = true; } - } catch (UnperformableActionException e) { + } catch (final UnperformableActionException e) { System.out.println(e.getMessage()); - } catch (Exception e) { + } catch (final Exception e) { e.printStackTrace(); // throw new Exception(); diff --git a/src/main/java/lib/clipboard/goap/planner/GoapPlannerBase.java b/src/main/java/lib/clipboard/goap/planner/GoapPlannerBase.java index f5da4bcd..12981e6a 100644 --- a/src/main/java/lib/clipboard/goap/planner/GoapPlannerBase.java +++ b/src/main/java/lib/clipboard/goap/planner/GoapPlannerBase.java @@ -79,7 +79,7 @@ protected static boolean addEgdeWithWeigth( graph.setEdgeWeight(graph.getEdge(firstVertex, secondVertex), weight); return true; - } catch (Exception e) { + } catch (final Exception e) { e.printStackTrace(); return false; @@ -186,7 +186,7 @@ public Queue planActions(@NonNull final IGoapUnit goapUnit) { } else { createdPlan = searchGraphForActionQueue(createGraph()); } - } catch (Exception e) { + } catch (final Exception e) { e.printStackTrace(); } @@ -284,7 +284,7 @@ protected HashSet extractPossibleActions() { possibleActions.add(goapAction); } } - } catch (Exception e) { + } catch (final Exception e) { e.printStackTrace(); }