Skip to content

Commit

Permalink
improve message-bus; and persisting and retrieving messages
Browse files Browse the repository at this point in the history
  • Loading branch information
svencc committed Oct 18, 2023
1 parent e1729c7 commit bb4bd00
Show file tree
Hide file tree
Showing 23 changed files with 253 additions and 94 deletions.
43 changes: 33 additions & 10 deletions src/main/java/com/recom/api/messagebus/MessageBusController.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import com.recom.api.commons.HttpCommons;
import com.recom.configuration.AsyncConfiguration;
import com.recom.dto.message.MessageBusRequestDto;
import com.recom.dto.message.MessageBusLongPollRequestDto;
import com.recom.dto.message.MessageBusResponseDto;
import com.recom.dto.message.MessageBusSinceRequestDto;
import com.recom.persistence.message.MessagePersistenceLayer;
import com.recom.service.AssertionService;
import com.recom.service.ReforgerPayloadParserService;
Expand All @@ -17,7 +19,6 @@
import jakarta.validation.Valid;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.*;
import org.springframework.validation.annotation.Validated;
Expand Down Expand Up @@ -49,8 +50,8 @@ public class MessageBusController {
private final AsyncConfiguration asyncConfiguration;

@Operation(
summary = "Get a list of messages",
description = "Gets all map specific, latest message of a type.",
summary = "Long-poll latest messages",
description = "Polls for all map specific, latest message.",
security = @SecurityRequirement(name = HttpCommons.BEARER_AUTHENTICATION_REQUIREMENT)
)
@ApiResponses(value = {
Expand All @@ -64,12 +65,12 @@ public ResponseEntity<ResponseBodyEmitter> getMessagesForm(
) {
log.debug("Requested POST /api/v1/map/renderer/form (FORM)");

return getMessagesJSON(payloadParser.parseValidated(payload, MessageBusRequestDto.class));
return getMessagesJSON(payloadParser.parseValidated(payload, MessageBusLongPollRequestDto.class));
}

@Operation(
summary = "Get a list of messages",
description = "Gets all map specific, latest message of a type.",
summary = "Long-poll latest messages",
description = "Polls for all map specific, latest message.",
security = @SecurityRequirement(name = HttpCommons.BEARER_AUTHENTICATION_REQUIREMENT)
)
@ApiResponses(value = {
Expand All @@ -79,10 +80,10 @@ public ResponseEntity<ResponseBodyEmitter> getMessagesForm(
@PostMapping(path = "", produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<ResponseBodyEmitter> getMessagesJSON(
@RequestBody(required = true)
@NonNull @Valid final MessageBusRequestDto mapRendererRequestDto
@NonNull @Valid final MessageBusLongPollRequestDto messageBusLongPollRequestDto
) {
log.debug("Requested POST /api/v1/map/message-bus (JSON)");
assertionService.assertMapExists(mapRendererRequestDto.getMapName());
assertionService.assertMapExists(messageBusLongPollRequestDto.getMapName());

final MessageLongPollObserver messageLongPollObserver = MessageLongPollObserver.builder()
.timeout(RECOM_CURL_TIMEOUT.toMillis())
Expand All @@ -91,7 +92,7 @@ public ResponseEntity<ResponseBodyEmitter> getMessagesJSON(
.build();
messageLongPollObserver.observe(messageBusService.getSubject());

messageLongPollObserver.scheduleTestResponse(mapRendererRequestDto.getMapName(), Duration.ofSeconds(5), messageBusService.getSubject(), asyncConfiguration);
messageLongPollObserver.scheduleTestResponse(messageBusLongPollRequestDto.getMapName(), Duration.ofSeconds(5), messageBusService.getSubject(), asyncConfiguration);

final HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
Expand All @@ -102,4 +103,26 @@ public ResponseEntity<ResponseBodyEmitter> getMessagesJSON(
.body(messageLongPollObserver.provideResponseEmitter());
}

@Operation(
summary = "Get a list of messages since given timestamp",
description = "Gets all map specific message since provided epoch-millis.",
security = @SecurityRequirement(name = HttpCommons.BEARER_AUTHENTICATION_REQUIREMENT)
)
@ApiResponses(value = {
@ApiResponse(responseCode = HttpCommons.OK_CODE, description = HttpCommons.OK),
@ApiResponse(responseCode = HttpCommons.UNAUTHORIZED_CODE, description = HttpCommons.UNAUTHORIZED, content = @Content())
})
@PostMapping(path = "/after", produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<MessageBusResponseDto> getMessagesSinceJSON(
@RequestBody(required = true)
@NonNull @Valid final MessageBusSinceRequestDto messageBusSinceRequestDto
) {
log.debug("Requested POST /api/v1/map/message-bus/after (JSON)");
assertionService.assertMapExists(messageBusSinceRequestDto.getMapName());

return ResponseEntity.status(HttpStatus.OK)
.cacheControl(CacheControl.noCache())
.body(messageBusService.listMessagesSince(messageBusSinceRequestDto.getMapName(), messageBusSinceRequestDto.getTimestampEpochMilliseconds()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
@NoArgsConstructor
@AllArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
public class MessageBusRequestDto implements Serializable {
public class MessageBusLongPollRequestDto implements Serializable {

@NotBlank
@Schema
Expand Down
34 changes: 34 additions & 0 deletions src/main/java/com/recom/dto/message/MessageBusSinceRequestDto.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.recom.dto.message;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@Data
@Schema
@Builder
@NoArgsConstructor
@AllArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
public class MessageBusSinceRequestDto implements Serializable {

@NotBlank
@Schema
@JsonProperty()
private String mapName;

@NotNull
@Schema(description = "Unix timestamp in milliseconds", example = "1691941419964")
@JsonProperty()
private Long timestampEpochMilliseconds;

}

6 changes: 1 addition & 5 deletions src/main/java/com/recom/dto/message/MessageDto.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,8 @@ public class MessageDto {
@JsonProperty()
private Long timestampEpochMilliseconds;

@Schema(description = "Unix timestamp in milliseconds", example = "1691941419964")
@JsonProperty()
private Long timestampConfirmationEpochMilliseconds;

@Schema
@JsonProperty()
private String payload;
private Object payload;

}
5 changes: 2 additions & 3 deletions src/main/java/com/recom/entity/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@
@AllArgsConstructor
@Table(indexes = {
@Index(name = "IDX_mapName", columnList = "mapName", unique = false),
@Index(name = "IDX_mapName_messageType", columnList = "mapName, messageType", unique = false),
@Index(name = "IDX_mapName_messageType_timestamp", columnList = "mapName, messageType, timestamp", unique = true),
@Index(name = "IDX_mapName_messageType_timestamp", columnList = "mapName, messageType, timestamp", unique = false),
@Index(name = "IDX_mapName_timestamp", columnList = "mapName, timestamp", unique = false),
})
@Cacheable
@Cache(usage = CacheConcurrencyStrategy.READ_WRITE)
public class Message implements Persistable<UUID>, Serializable {

@Id
@GeneratedValue(strategy = GenerationType.AUTO)
// @GeneratedValue(strategy = GenerationType.AUTO)
@Column(insertable = true, updatable = false, nullable = false)
private UUID uuid;

Expand Down
19 changes: 19 additions & 0 deletions src/main/java/com/recom/model/message/MessageContainer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.recom.model.message;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.List;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MessageContainer {

private String mapName;
private List<OneMessage> messages;

}
17 changes: 17 additions & 0 deletions src/main/java/com/recom/model/message/OneMessage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.recom.model.message;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class OneMessage {

private MessageType messageType;
private Object payload;

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ private <V extends Serializable> DBCachedItem updateExistingCacheItem(
try (ByteArrayOutputStream byteArrayOutputStream = serializeObject(value)) {
existingCacheItem.setCachedValue(byteArrayOutputStream.toByteArray());
return existingCacheItem;
} catch (Exception e) {
} catch (final Exception e) {
log.error("Error serializing cache value with key {}!", existingCacheItem.getCacheKey(), e);
return null;
}
Expand All @@ -79,7 +79,7 @@ private <V extends Serializable> DBCachedItem createNewCacheItem(
.cacheKey(cacheKey)
.cachedValue(byteArrayOutputStream.toByteArray())
.build();
} catch (Exception e) {
} catch (final Exception e) {
log.error("Error serializing cache value with key {}!", cacheKey, e);
return null;
}
Expand Down Expand Up @@ -116,7 +116,7 @@ private <V extends Serializable> Optional<V> deserializeCacheValue(
) throws DBCachedDeserializationException {
try (ObjectInputStream inputStream = new ObjectInputStream(new ByteArrayInputStream(serializedValue))) {
return Optional.ofNullable((V) inputStream.readObject());
} catch (IOException | ClassNotFoundException e) {
} catch (final IOException | ClassNotFoundException e) {
log.error("Error deserializing cache value with key {}", cacheKey);
throw new DBCachedDeserializationException(String.format("Unable to deserialize cacheKey %s", cacheKey), e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
package com.recom.persistence.message;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.recom.dto.message.MessageDto;
import com.recom.entity.Message;
import com.recom.mapper.MessageMapper;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.convert.ConversionService;
import org.springframework.stereotype.Service;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;

@Slf4j
@Service
@RequiredArgsConstructor
public class MessagePersistenceLayer {
Expand All @@ -18,12 +28,29 @@ public class MessagePersistenceLayer {
private final MessageRepository messageRepository;
@NonNull
private final ConversionService conversionService;
@NonNull
private final ObjectMapper objectMapper;

@NonNull
// @Cacheable(cacheNames = "CommandPersistenceLayer.findAllMapSpecificCommands") // @TODO there is some work to do, to cache the commands ...
public List<MessageDto> findAllMapSpecificMessages(@NonNull final String mapName) {
return messageRepository.findAllByMapName(mapName).stream()
public List<MessageDto> findAllMapSpecificMessagesSince(
@NonNull final String mapName,
@NonNull final Long sinceTimestampEpochMilliseconds
) {
final LocalDateTime since = LocalDateTime.ofInstant(Instant.ofEpochMilli(sinceTimestampEpochMilliseconds), ZoneId.systemDefault());
return messageRepository.findAllByMapNameAndTimestampAfter(mapName, since).stream()
.map(MessageMapper.INSTANCE::toDto)
.map(messageDto -> {
try {
final Map<String, Object> map = objectMapper.readValue((String) messageDto.getPayload(), Map.class);
messageDto.setPayload(map);
return messageDto;
} catch (JsonProcessingException e) {
log.error("Error while parsing payload of message: {}", messageDto, e);
}
return null;
})
.filter(Objects::nonNull)
.sorted(Comparator.comparing(MessageDto::getTimestampEpochMilliseconds))
.toList();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

import java.time.LocalDateTime;
import java.util.List;

@Repository
interface MessageRepository extends JpaRepository<Message, Long> {

@NonNull
List<Message> findAllByMapName(@NonNull final String mapName);
List<Message> findAllByMapNameAndTimestampAfter(
@NonNull final String mapName,
@NonNull final LocalDateTime timestamp
);

}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ protected void doFilterInternal(
repository.saveContext(SecurityContextHolder.getContext(), request, response);
filterChain.doFilter(request, response);
}
} catch (Throwable e) {
} catch (final Throwable e) {
response.setStatus(HttpServletResponse.SC_UNAUTHORIZED);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/recom/security/jwt/JwtTokenService.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public String passThroughIfClaimIsPresent(@Nullable final Object sub) throws Htt
public UUID extractAndAssertSubjectIsUUID(@NonNull final String sub) throws HttpUnauthorizedException {
try {
return conversionService.convert(sub, UUID.class);
} catch (ConversionException e) {
} catch (final ConversionException e) {
throw new HttpUnauthorizedException("Invalid token; subject is not an UUID!");
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/recom/security/rsa/RSAKeyGenerator.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ private void persistKeyToFile(
try (final OutputStream fos = fileSystemResource.getOutputStream()) {
log.info("| +- Save {} key: '{}'", keyType.name().toLowerCase(), filePath.toAbsolutePath());
fos.write(keyBytes);
} catch (IOException e) {
} catch (final IOException e) {
throw new IOException(e);
}
}
Expand All @@ -165,7 +165,7 @@ private void persistUUIDToFile(
try (final OutputStream fos = fileSystemResource.getOutputStream()) {
log.info("| +- Save uuid: '{}'", filePath.toAbsolutePath());
fos.write(uuid.getBytes());
} catch (IOException e) {
} catch (final IOException e) {
throw new IOException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public <T> T parseValidated(
validateInput(parsed);

return parsed;
} catch (NoSuchElementException | JsonProcessingException e) {
} catch (final NoSuchElementException | JsonProcessingException e) {
log.error("Cannot parse a Reforger JSON DTO {}", String.join(";\n", payload.keySet()), e);
throw new HttpBadRequestException(e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ private Consumer<Configuration> handleOverrideWithCorrespondingDefaultConfigurat
if (override.getType() == ConfigurationType.LIST) {
try {
existingOverride.setValue(StaticObjectMapperProvider.provide().writeValueAsString(override.getMapOverriddenListValue()));
} catch (JsonProcessingException e) {
} catch (final JsonProcessingException e) {
throw new RuntimeException(e);
}
}
Expand All @@ -143,7 +143,7 @@ private Consumer<Configuration> handleOverrideWithCorrespondingDefaultConfigurat
if (override.getType() == ConfigurationType.LIST) {
try {
configurationBuilder.value(StaticObjectMapperProvider.provide().writeValueAsString(override.getMapOverriddenListValue()));
} catch (JsonProcessingException e) {
} catch (final JsonProcessingException e) {
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public <T extends Serializable> ResponseEntity<T> processRequest(
.cacheControl(CacheControl.noCache())
.build()
);
} catch (DBCachedDeserializationException dbcde) {
} catch (final DBCachedDeserializationException dbcde) {
log.error("Unable to deserialize cached value; Generating new one");
dbCachedManager.delete(cacheName, cacheKey);
}
Expand All @@ -69,7 +69,7 @@ public <T extends Serializable> ResponseEntity<T> processRequest(
Optional<T> result = Optional.empty();
try {
result = Optional.ofNullable(cacheLoader.get());
} catch (Exception e) {
} catch (final Exception e) {
log.error("Async-Exception", e);
} finally {
result.ifPresent(value -> dbCachedManager.put(cacheName, cacheKey, value));
Expand Down
Loading

0 comments on commit bb4bd00

Please sign in to comment.