Skip to content

Commit

Permalink
message-bus clean Implementation; Test Missing; add database message …
Browse files Browse the repository at this point in the history
…logging
  • Loading branch information
svencc committed Oct 17, 2023
1 parent 0d7e997 commit 95980b9
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 25 deletions.
29 changes: 13 additions & 16 deletions src/main/java/com/recom/api/messagebus/MessageBusController.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
import com.recom.api.commons.HttpCommons;
import com.recom.configuration.AsyncConfiguration;
import com.recom.dto.message.MessageBusRequestDto;
import com.recom.dto.message.MessageBusResponseDto;
import com.recom.dto.message.MessageDto;
import com.recom.observer.Notification;
import com.recom.persistence.message.MessagePersistenceLayer;
import com.recom.service.AssertionService;
import com.recom.service.ReforgerPayloadParserService;
Expand All @@ -22,20 +19,13 @@
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.http.*;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult;
import org.springframework.web.context.request.async.WebAsyncTask;
import org.springframework.web.servlet.function.ServerRequest;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;

@Slf4j
@Validated
Expand Down Expand Up @@ -68,7 +58,7 @@ public class MessageBusController {
@ApiResponse(responseCode = HttpCommons.UNAUTHORIZED_CODE, description = HttpCommons.UNAUTHORIZED, content = @Content())
})
@PostMapping(path = "/form", produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_FORM_URLENCODED_VALUE)
public ResponseBodyEmitter getMessagesForm(
public ResponseEntity<ResponseBodyEmitter> getMessagesForm(
@RequestParam(required = true)
@NonNull final Map<String, String> payload
) {
Expand All @@ -90,7 +80,7 @@ public ResponseBodyEmitter getMessagesForm(
@ApiResponse(responseCode = HttpCommons.UNAUTHORIZED_CODE, description = HttpCommons.UNAUTHORIZED, content = @Content())
})
@PostMapping(path = "", produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
public ResponseBodyEmitter getMessagesJSON(
public ResponseEntity<ResponseBodyEmitter> getMessagesJSON(
@RequestBody(required = true)
@NonNull @Valid final MessageBusRequestDto mapRendererRequestDto
) {
Expand All @@ -100,12 +90,19 @@ public ResponseBodyEmitter getMessagesJSON(
final MessageLongPollObserver messageLongPollObserver = MessageLongPollObserver.builder()
.timeout(RECOM_CURL_TIMEOUT.toMillis())
.asyncTaskExecutor(asyncConfiguration.provideClusterGeneratorExecutor())
.messagePersistenceLayer(messagePersistenceLayer)
.build();
messageLongPollObserver.observe(messageBusService.getSubject());

messageLongPollObserver.schedlueTestResponse(Duration.ofSeconds(5), messageBusService.getSubject(), asyncConfiguration);

return messageLongPollObserver.provideResponseEmitter();
messageLongPollObserver.scheduleTestResponse(mapRendererRequestDto.getMapName(), Duration.ofSeconds(5), messageBusService.getSubject(), asyncConfiguration);

final HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);

return ResponseEntity.status(HttpStatus.OK)
.headers(httpHeaders)
.cacheControl(CacheControl.noCache())
.body(messageLongPollObserver.provideResponseEmitter());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
@JsonInclude(JsonInclude.Include.NON_NULL)
public class MessageBusResponseDto implements Serializable {

@Schema
@JsonProperty()
private String mapName;

@Schema
@JsonProperty()
private List<MessageDto> messages;
Expand Down
5 changes: 1 addition & 4 deletions src/main/java/com/recom/entity/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,9 @@ public class Message implements Persistable<UUID>, Serializable {
@Column(insertable = true, updatable = true, nullable = true, columnDefinition = "LONGTEXT")
private String payload;

@Column(insertable = true, updatable = false, nullable = false, columnDefinition = "DATETIME(6) DEFAULT NOW(6)")
@Column(insertable = true, updatable = false, nullable = true, columnDefinition = "DATETIME(6) DEFAULT NOW(6)")
private LocalDateTime timestamp;

@Column(insertable = true, updatable = false, nullable = false, columnDefinition = "DATETIME(6) DEFAULT NOW(6)")
private LocalDateTime timestampConfirmation;


@Override
public int hashCode() {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/recom/model/message/MessageType.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

public enum MessageType {

FETCH_MAP_RENDER_DATA
TEST, FETCH_MAP_RENDER_DATA

}
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package com.recom.service.messagebus;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.recom.configuration.AsyncConfiguration;
import com.recom.dto.message.MessageBusResponseDto;
import com.recom.dto.message.MessageDto;
import com.recom.entity.Message;
import com.recom.exception.HttpTimeoutException;
import com.recom.model.message.MessageType;
import com.recom.observer.Notification;
import com.recom.observer.ObserverTemplate;
import com.recom.observer.Subjective;
import com.recom.persistence.message.MessagePersistenceLayer;
import com.recom.service.provider.StaticObjectMapperProvider;
import lombok.Builder;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
Expand All @@ -16,7 +21,9 @@
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;

Expand All @@ -29,25 +36,28 @@ public class MessageLongPollObserver extends ObserverTemplate<MessageBusResponse
@NonNull
private final AsyncTaskExecutor asyncTaskExecutor;
@NonNull
private final MessagePersistenceLayer messagePersistenceLayer;
@NonNull
private final ResponseBodyEmitter responseBodyEmitter;

@Builder()
public MessageLongPollObserver(
@NonNull final Long timeout,
@NonNull final AsyncTaskExecutor asyncTaskExecutor
@NonNull final AsyncTaskExecutor asyncTaskExecutor,
@NonNull final MessagePersistenceLayer messagePersistenceLayer
) {
this.timeout = timeout;
this.asyncTaskExecutor = asyncTaskExecutor;
this.responseBodyEmitter = new ResponseBodyEmitter(timeout);
this.messagePersistenceLayer = messagePersistenceLayer;

this.responseBodyEmitter = new ResponseBodyEmitter(timeout);
this.responseBodyEmitter.onTimeout(() -> {
log.debug("MessageLongPollObserver.onTimeout");
super.subjects.forEach(subject -> subject.observationStoppedThrough(this));
this.responseBodyEmitter.completeWithError(new HttpTimeoutException("Timeout"));
});
}


@Override
public void takeNotice(
@NonNull final Subjective<MessageBusResponseDto> subject,
Expand All @@ -56,6 +66,8 @@ public void takeNotice(
log.debug("MessageLongPollObserver.takeNotice");
try {
responseBodyEmitter.send(notification.getPayload(), MediaType.APPLICATION_JSON);

persistNotification(notification);
} catch (Exception e) {
log.error(e.getMessage(), e);
responseBodyEmitter.completeWithError(e);
Expand All @@ -65,12 +77,35 @@ public void takeNotice(
}
}

private void persistNotification(@NonNull final Notification<MessageBusResponseDto> notification) {
final List<Message> messagesToSave = notification.getPayload().getMessages().stream()
.map(messageDto -> {
try {
return Message.builder()
.mapName(notification.getPayload().getMapName())
.messageType(MessageType.TEST)
.payload(StaticObjectMapperProvider.provide().writeValueAsString(messageDto))
.timestamp(LocalDateTime.now())
.build();
} catch (JsonProcessingException jpe) {
log.error("JsonProcessingException: ", jpe);
}

return null;
})
.filter(Objects::nonNull)
.toList();

messagePersistenceLayer.saveAll(messagesToSave);
}

@NonNull
public ResponseBodyEmitter provideResponseEmitter() {
return responseBodyEmitter;
}

public void schedlueTestResponse(
public void scheduleTestResponse(
@NonNull final String mapName,
@NonNull final Duration duration,
@NonNull final Subjective<MessageBusResponseDto> onBehalfOf,
@NonNull final AsyncConfiguration asyncConfiguration
Expand All @@ -86,6 +121,7 @@ public void schedlueTestResponse(
onBehalfOf,
new Notification<>(
MessageBusResponseDto.builder()
.mapName(mapName)
.messages(List.of(MessageDto.builder().uuid(UUID.randomUUID()).build()))
.build()
)
Expand Down

0 comments on commit 95980b9

Please sign in to comment.