Skip to content

Commit

Permalink
updated mockito version
Browse files Browse the repository at this point in the history
removed bom for mockito test dependency

sorted imports

removed colors and added logger instead of system out println

formatted

added more logs

added more logs

added more logs, updated bytebuddy

formatted

adding details to test log
  • Loading branch information
a29340 committed Jan 10, 2024
1 parent 747ce71 commit d971e26
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 45 deletions.
13 changes: 2 additions & 11 deletions independent-projects/resteasy-reactive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
<!-- Versions -->
<jakarta.enterprise.cdi-api.version>4.0.1</jakarta.enterprise.cdi-api.version>
<jandex.version>3.1.6</jandex.version>
<bytebuddy.version>1.12.12</bytebuddy.version>
<bytebuddy.version>1.14.7</bytebuddy.version>
<junit5.version>5.10.1</junit5.version>
<maven.version>3.9.6</maven.version>
<assertj.version>3.24.2</assertj.version>
Expand All @@ -72,7 +72,7 @@
<awaitility.version>4.2.0</awaitility.version>
<smallrye-mutiny-vertx-core.version>3.7.2</smallrye-mutiny-vertx-core.version>
<reactive-streams.version>1.0.4</reactive-streams.version>
<mockito.version>5.4.0</mockito.version>
<mockito.version>5.8.0</mockito.version>
<mutiny-zero.version>1.0.0</mutiny-zero.version>

<!-- Forbidden API checks -->
Expand All @@ -98,15 +98,6 @@
<type>pom</type>
</dependency>

<!-- Mockito BOM -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-bom</artifactId>
<version>${mockito.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>

<dependency>
<groupId>io.quarkus.resteasy.reactive</groupId>
<artifactId>resteasy-reactive</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
Expand All @@ -20,76 +18,91 @@
import jakarta.ws.rs.sse.SseBroadcaster;
import jakarta.ws.rs.sse.SseEventSink;

import org.jboss.logging.Logger;

@Path("sse")
@ApplicationScoped
public class SseServerResource {

public static final Logger logger = Logger.getLogger(SseServerResource.class.getName());
private static SseBroadcaster sseBroadcaster;
private static OutboundSseEvent.Builder eventBuilder;

private static OutboundSseEvent.Builder eventBuilder;
private static CountDownLatch closeLatch;
private static CountDownLatch errorLatch;

private static final Logger logger = Logger.getLogger(SseServerResource.class);

@Inject
public SseServerResource(@Context Sse sse) {
logger.info("Initialized SseServerResource " + this.hashCode());
if (Objects.isNull(eventBuilder)) {
eventBuilder = sse.newEventBuilder();
}
if (Objects.isNull(sseBroadcaster)) {
sseBroadcaster = sse.newBroadcaster();
sseBroadcaster.onClose(this::onClose);
sseBroadcaster.onError(this::onError);
logger.info("Initializing broadcaster " + sseBroadcaster.hashCode());
sseBroadcaster.onClose(sseEventSink -> {
CountDownLatch latch = SseServerResource.getCloseLatch();
logger.info(String.format("Called on close, counting down latch %s", latch.hashCode()));
latch.countDown();
});
sseBroadcaster.onError((sseEventSink, throwable) -> {
CountDownLatch latch = SseServerResource.getErrorLatch();
logger.info(String.format("There was an error, counting down latch %s", latch.hashCode()));
latch.countDown();
});
}
}

private synchronized void onError(SseEventSink sseEventSink, Throwable throwable) {
logger.severe(String.format("There was an error for sseEventSink %s: %s",
sseEventSink.hashCode(), throwable.getMessage()));
errorLatch.countDown();
}

private synchronized void onClose(SseEventSink sseEventSink) {
logger.info(String.format("Called on close for %s", sseEventSink.hashCode()));
closeLatch.countDown();
}

@GET
@Path("subscribe")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void subscribe(@Context SseEventSink sseEventSink) {
sseBroadcaster.register(sseEventSink);
closeLatch = new CountDownLatch(1);
errorLatch = new CountDownLatch(1);
logger.info(this.hashCode() + " /subscribe");
setLatches();
getSseBroadcaster().register(sseEventSink);
sseEventSink.send(eventBuilder.data(sseEventSink.hashCode()).build());
}

@POST
@Path("broadcast")
public Response broadcast() {
sseBroadcaster.broadcast(eventBuilder.data(Instant.now()).build());
logger.info(this.hashCode() + " /broadcast");
getSseBroadcaster().broadcast(eventBuilder.data(Instant.now()).build());
return Response.ok().build();
}

@GET
@Path("onclose-callback")
public Response callback() throws InterruptedException {
boolean onCloseWasCalled = awaitClosedCallback();
logger.info(this.hashCode() + " /onclose-callback, waiting for latch " + closeLatch.hashCode());
boolean onCloseWasCalled = closeLatch.await(10, TimeUnit.SECONDS);
return Response.ok(onCloseWasCalled).build();
}

@GET
@Path("onerror-callback")
public Response errorCallback() throws InterruptedException {
boolean onErrorWasCalled = awaitErrorCallback();
logger.info(this.hashCode() + " /onerror-callback, waiting for latch " + errorLatch.hashCode());
boolean onErrorWasCalled = errorLatch.await(2, TimeUnit.SECONDS);
return Response.ok(onErrorWasCalled).build();
}

private synchronized boolean awaitClosedCallback() throws InterruptedException {
return closeLatch.await(10, TimeUnit.SECONDS);
private static SseBroadcaster getSseBroadcaster() {
logger.info("using broadcaster " + sseBroadcaster.hashCode());
return sseBroadcaster;
}

public static void setLatches() {
closeLatch = new CountDownLatch(1);
errorLatch = new CountDownLatch(1);
logger.info(String.format("Setting latches: \n closeLatch: %s\n errorLatch: %s",
closeLatch.hashCode(), errorLatch.hashCode()));
}

public static CountDownLatch getCloseLatch() {
return closeLatch;
}

private synchronized boolean awaitErrorCallback() throws InterruptedException {
return errorLatch.await(2, TimeUnit.SECONDS);
public static CountDownLatch getErrorLatch() {
return errorLatch;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.client.ClientBuilder;
Expand All @@ -23,29 +22,29 @@

public class SseServerTestCase {

final private static Logger logger = Logger.getLogger(SseServerTestCase.class.getName());

@RegisterExtension
static final ResteasyReactiveUnitTest config = new ResteasyReactiveUnitTest()
.withApplicationRoot((jar) -> jar
.addClasses(SseServerResource.class));

@Test
public void shouldCallOnCloseOnServer() throws InterruptedException {
System.out.println("####### shouldCallOnCloseOnServer");
Client client = ClientBuilder.newBuilder().build();
WebTarget target = client.target(PortProviderUtil.createURI("/sse/subscribe"));
try (SseEventSource sse = SseEventSource.target(target).build()) {
CountDownLatch openingLatch = new CountDownLatch(1);
List<String> results = new CopyOnWriteArrayList<>();
sse.register(event -> {
logger.info("received data: " + event.readData());
System.out.println("received data: " + event.readData());
results.add(event.readData());
openingLatch.countDown();
});
sse.open();
Assertions.assertTrue(openingLatch.await(3, TimeUnit.SECONDS));
Assertions.assertEquals(1, results.size());
sse.close();
System.out.println("called sse.close() from client");
RestAssured.get("/sse/onclose-callback")
.then()
.statusCode(200)
Expand All @@ -55,13 +54,14 @@ public void shouldCallOnCloseOnServer() throws InterruptedException {

@Test
public void shouldNotTryToSendToClosedSink() throws InterruptedException {
System.out.println("####### shouldNotTryToSendToClosedSink");
Client client = ClientBuilder.newBuilder().build();
WebTarget target = client.target(PortProviderUtil.createURI("/sse/subscribe"));
try (SseEventSource sse = SseEventSource.target(target).build()) {
CountDownLatch openingLatch = new CountDownLatch(1);
List<String> results = new ArrayList<>();
sse.register(event -> {
logger.info("received data: " + event.readData());
System.out.println("received data: " + event.readData());
results.add(event.readData());
openingLatch.countDown();
});
Expand Down

0 comments on commit d971e26

Please sign in to comment.