Skip to content

Commit

Permalink
message-bus
Browse files Browse the repository at this point in the history
  • Loading branch information
svencc committed Oct 25, 2023
1 parent 22e6a4d commit 112cfe6
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 30 deletions.
41 changes: 12 additions & 29 deletions src/main/java/com/recom/api/messagebus/MessageBusController.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.recom.service.messagebus.MessageLongPollObserver;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.responses.ApiResponses;
import io.swagger.v3.oas.annotations.security.SecurityRequirement;
Expand All @@ -34,7 +35,7 @@
@RequestMapping("/api/v1/message-bus")
public class MessageBusController {

private final static Duration RECOM_CURL_TIMEOUT = Duration.ofSeconds(12); // 12 seconds seems to be the maximum on REFORGER side!
private final static Duration RECOM_CURL_TIMEOUT = Duration.ofSeconds(15); // 12 seconds seems to be the maximum on REFORGER side!

@NonNull
private final AssertionService assertionService;
Expand All @@ -49,15 +50,15 @@ public class MessageBusController {
security = @SecurityRequirement(name = HttpCommons.BEARER_AUTHENTICATION_REQUIREMENT)
)
@ApiResponses(value = {
@ApiResponse(responseCode = HttpCommons.OK_CODE, description = HttpCommons.OK),
@ApiResponse(responseCode = HttpCommons.OK_CODE, description = HttpCommons.OK, content = @Content(mediaType = "application/json", schema = @Schema(implementation = MessageBusResponseDto.class))),
@ApiResponse(responseCode = HttpCommons.UNAUTHORIZED_CODE, description = HttpCommons.UNAUTHORIZED, content = @Content())
})
@PostMapping(path = "/form", produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_FORM_URLENCODED_VALUE)
public ResponseEntity<ResponseBodyEmitter> getMessagesForm(
@RequestParam(required = true)
@NonNull final Map<String, String> payload
) {
log.debug("Requested POST /api/v1/map/renderer/form (FORM)");
log.debug("Requested POST/api/v1/message-bus/form (FORM)");

return getMessagesJSON(payloadParser.parseValidated(payload, MessageBusLongPollRequestDto.class));
}
Expand All @@ -68,20 +69,24 @@ public ResponseEntity<ResponseBodyEmitter> getMessagesForm(
security = @SecurityRequirement(name = HttpCommons.BEARER_AUTHENTICATION_REQUIREMENT)
)
@ApiResponses(value = {
@ApiResponse(responseCode = HttpCommons.OK_CODE, description = HttpCommons.OK),
@ApiResponse(responseCode = HttpCommons.OK_CODE, description = HttpCommons.OK, content = @Content(mediaType = "application/json", schema = @Schema(implementation = MessageBusResponseDto.class))),
@ApiResponse(responseCode = HttpCommons.UNAUTHORIZED_CODE, description = HttpCommons.UNAUTHORIZED, content = @Content())
})
@PostMapping(path = "", produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<ResponseBodyEmitter> getMessagesJSON(
@RequestBody(required = true)
@NonNull @Valid final MessageBusLongPollRequestDto messageBusLongPollRequestDto
) {
log.debug("Requested POST /api/v1/map/message-bus (JSON)");
if (messageBusLongPollRequestDto.getTimestampEpochMilliseconds() != null && messageBusLongPollRequestDto.getTimestampEpochMilliseconds() != 0L) {
log.debug("Requested POST /api/v1/message-bus since {} (JSON)", messageBusLongPollRequestDto.getTimestampEpochMilliseconds());
} else {
log.debug("Requested POST /api/v1/message-bus (JSON)");
}
assertionService.assertMapExists(messageBusLongPollRequestDto.getMapName());

ResponseBodyEmitter emitter;
final Lazy<MessageBusResponseDto> messagesSinceLazy = Lazy.of(() -> messageBusService.listMessagesSince(messageBusLongPollRequestDto.getMapName(), messageBusLongPollRequestDto.getTimestampEpochMilliseconds()));
if (messageBusLongPollRequestDto.getTimestampEpochMilliseconds() != null && !messagesSinceLazy.get().getMessages().isEmpty()) {
if (messageBusLongPollRequestDto.getTimestampEpochMilliseconds() != null && messageBusLongPollRequestDto.getTimestampEpochMilliseconds() != 0L && !messagesSinceLazy.get().getMessages().isEmpty()) {
emitter = new ResponseBodyEmitter(RECOM_CURL_TIMEOUT.toMillis());
try {
emitter.send(messagesSinceLazy.get(), MediaType.APPLICATION_JSON);
Expand All @@ -104,26 +109,4 @@ public ResponseEntity<ResponseBodyEmitter> getMessagesJSON(
.body(emitter);
}

@Operation(
summary = "Get a list of messages since given timestamp",
description = "Gets all map specific message since provided epoch-millis.",
security = @SecurityRequirement(name = HttpCommons.BEARER_AUTHENTICATION_REQUIREMENT)
)
@ApiResponses(value = {
@ApiResponse(responseCode = HttpCommons.OK_CODE, description = HttpCommons.OK),
@ApiResponse(responseCode = HttpCommons.UNAUTHORIZED_CODE, description = HttpCommons.UNAUTHORIZED, content = @Content())
})
@PostMapping(path = "/after", produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<MessageBusResponseDto> getMessagesSinceJSON(
@RequestBody(required = true)
@NonNull @Valid final MessageBusLongPollRequestDto messageBusLongPollRequestDto
) {
log.debug("Requested POST /api/v1/map/message-bus/after (JSON)");
assertionService.assertMapExists(messageBusLongPollRequestDto.getMapName());

return ResponseEntity.status(HttpStatus.OK)
.cacheControl(CacheControl.noCache())
.body(messageBusService.listMessagesSince(messageBusLongPollRequestDto.getMapName(), messageBusLongPollRequestDto.getTimestampEpochMilliseconds()));
}

}
}
4 changes: 4 additions & 0 deletions src/main/java/com/recom/api/test/EmitMessagesController.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ public ResponseEntity<Void> emitMessagesJSON(
messageBusService.sendMessage(MessageContainer.builder()
.mapName(messageBusLongPollRequestDto.getMapName())
.messages(List.of(
OneMessage.builder()
.messageType(MessageType.FETCH_MAP_RENDER_DATA)
.payload(null)
.build(),
OneMessage.builder()
.messageType(MessageType.FETCH_MAP_RENDER_DATA)
.payload(null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void takeNotice(
) {
log.debug("MessageLongPollObserver.takeNotice");
try {
responseBodyEmitter.send(response, MediaType.APPLICATION_JSON);
responseBodyEmitter.send(response.getPayload(), MediaType.APPLICATION_JSON);
} catch (final Exception e) {
log.error("MessageLongPollObserver.takeNotice", e);
responseBodyEmitter.completeWithError(e);
Expand Down

0 comments on commit 112cfe6

Please sign in to comment.