Skip to content

Commit

Permalink
improve simple messageBus test and compine with MessageBusService; fi…
Browse files Browse the repository at this point in the history
…x issue with observationStoppedThrough -> put execution to AutoClosable close() method!
  • Loading branch information
svencc committed Oct 19, 2023
1 parent 62896fe commit 9c2abbf
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 29 deletions.
22 changes: 12 additions & 10 deletions src/main/java/com/recom/api/messagebus/MessageBusController.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,22 +85,24 @@ public ResponseEntity<ResponseBodyEmitter> getMessagesJSON(
log.debug("Requested POST /api/v1/map/message-bus (JSON)");
assertionService.assertMapExists(messageBusLongPollRequestDto.getMapName());

final MessageLongPollObserver messageLongPollObserver = MessageLongPollObserver.builder()
try (final MessageLongPollObserver messageLongPollObserver = MessageLongPollObserver.builder()
.timeout(RECOM_CURL_TIMEOUT.toMillis())
.asyncTaskExecutor(asyncConfiguration.provideClusterGeneratorExecutor())
.messagePersistenceLayer(messagePersistenceLayer)
.build();
messageLongPollObserver.observe(messageBusService.getSubject());
.build()
) {
messageLongPollObserver.observe(messageBusService.getSubject());

messageLongPollObserver.scheduleTestResponse(messageBusLongPollRequestDto.getMapName(), Duration.ofSeconds(5), messageBusService.getSubject(), asyncConfiguration);
messageLongPollObserver.scheduleTestResponse(messageBusLongPollRequestDto.getMapName(), Duration.ofSeconds(5), messageBusService.getSubject(), asyncConfiguration);

final HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
final HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);

return ResponseEntity.status(HttpStatus.OK)
.headers(httpHeaders)
.cacheControl(CacheControl.noCache())
.body(messageLongPollObserver.provideResponseEmitter());
return ResponseEntity.status(HttpStatus.OK)
.headers(httpHeaders)
.cacheControl(CacheControl.noCache())
.body(messageLongPollObserver.provideResponseEmitter());
}
}

@Operation(
Expand Down
8 changes: 7 additions & 1 deletion src/main/java/com/recom/observer/ObserverTemplate.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
import java.util.ArrayList;
import java.util.List;

public abstract class ObserverTemplate<T> implements Observing<T> {
public abstract class ObserverTemplate<T> implements Observing<T>, AutoCloseable {

@NonNull
protected final List<Subjective<T>> subjects = new ArrayList<>();

@Override
public void observe(@NonNull final Subjective<T> subject) {
subject.beObservedBy(this);
subjects.add(subject);
}

Expand All @@ -26,4 +27,9 @@ public void takeDeathNoticeFrom(@NonNull final Subjective<T> subject) {
subjects.remove(subject);
}

@Override
public void close() {
subjects.forEach(subject -> subject.observationStoppedThrough(this));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ public void takeNotice(
log.error(e.getMessage(), e);
responseBodyEmitter.completeWithError(e);
} finally {
subject.observationStoppedThrough(this);
responseBodyEmitter.complete();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.recom.service.messagebus;

import com.recom.model.message.MessageContainer;
import com.recom.observer.Subjective;
import com.recom.persistence.message.MessagePersistenceLayer;
import lombok.NonNull;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import static org.junit.jupiter.api.Assertions.*;

@ExtendWith(MockitoExtension.class)
class MessageBusServiceTest {

@Mock
@NonNull
private MessagePersistenceLayer messagePersistenceLayer;
@InjectMocks
private MessageBusService serviceUnderTest;

@Test
void sendMessage() {
// Arrange
// Act
serviceUnderTest.sendMessage("mapName", new MessageContainer());
// Assert
}

@Test
void listMessagesSince() {
// Arrange
// Act
serviceUnderTest.listMessagesSince("mapName", 0L);
// Assert
}

@Test
void getSubject() {
// Arrange
// Act
final Subjective<MessageContainer> resultToTest = serviceUnderTest.getSubject();
// Assert
assertNotNull(resultToTest);
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package com.recom.service.messagebus;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.recom.configuration.AsyncConfiguration;
import com.recom.model.message.MessageContainer;
import com.recom.model.message.MessageType;
import com.recom.model.message.OneMessage;
import com.recom.observer.Subject;
import com.recom.observer.Subjective;
import com.recom.persistence.message.MessagePersistenceLayer;
import com.recom.property.RECOMAsyncProperties;
import com.recom.service.provider.StaticObjectMapperProvider;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -16,7 +14,6 @@
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.core.task.AsyncTaskExecutor;

import java.time.Duration;
import java.util.List;

import static org.mockito.Mockito.*;
Expand All @@ -29,7 +26,6 @@ class MessageLongPollObserverTest {
@Mock
private MessagePersistenceLayer messagePersistenceLayer;
private MessageBusService messageBusService;
private AsyncConfiguration asyncConfiguration;
private MessageLongPollObserver observerUnderTest;

@BeforeEach
Expand All @@ -49,20 +45,13 @@ public void setUp() {
final ObjectMapper objectMapper = new ObjectMapper();
final StaticObjectMapperProvider staticObjectMapperProvider = new StaticObjectMapperProvider(objectMapper);
staticObjectMapperProvider.postConstruct();

// Prepare AsyncConfiguration
final RECOMAsyncProperties properties = RECOMAsyncProperties.builder()
.corePoolSize(1)
.maxPoolSize(1)
.build();
asyncConfiguration = new AsyncConfiguration(properties);
}

@Test
public void test() throws InterruptedException {
public void sendMessage_withOneMessage_messageIsPersisted() throws InterruptedException {
// Arrange
observerUnderTest.scheduleTestResponse("TestMap", Duration.ofMillis(5), new Subject<>(), asyncConfiguration);
observerUnderTest.observe(messageBusService.getSubject());
final Subjective<MessageContainer> messageBusSubjectSpy = spy(messageBusService.getSubject());
observerUnderTest.observe(messageBusSubjectSpy);

final OneMessage testMessage = OneMessage.builder()
.messageType(MessageType.TEST)
Expand All @@ -78,10 +67,13 @@ public void test() throws InterruptedException {
.build()
);

// run autocloseable implementation;
observerUnderTest.close();

// Assert
// Sleep for a duration longer than the scheduled task to complete
Thread.sleep(200);
verify(messagePersistenceLayer, times(1)).saveAll(anyList());
verify(messageBusSubjectSpy, times(1)).beObservedBy(observerUnderTest);
verify(messageBusSubjectSpy, times(1)).observationStoppedThrough(any()); // executed by autocloseable implementation
}

}

0 comments on commit 9c2abbf

Please sign in to comment.