diff --git a/independent-projects/resteasy-reactive/pom.xml b/independent-projects/resteasy-reactive/pom.xml index f312f811d8141..3493fd12c4b32 100644 --- a/independent-projects/resteasy-reactive/pom.xml +++ b/independent-projects/resteasy-reactive/pom.xml @@ -46,7 +46,7 @@ 4.0.1 3.1.6 - 1.12.12 + 1.14.7 5.10.1 3.9.6 3.24.2 @@ -72,7 +72,7 @@ 4.2.0 3.7.2 1.0.4 - 5.4.0 + 5.8.0 1.0.0 @@ -98,15 +98,6 @@ pom - - - org.mockito - mockito-bom - ${mockito.version} - pom - import - - io.quarkus.resteasy.reactive resteasy-reactive diff --git a/independent-projects/resteasy-reactive/server/runtime/pom.xml b/independent-projects/resteasy-reactive/server/runtime/pom.xml index b12e26482bb46..10a71b2f8e375 100644 --- a/independent-projects/resteasy-reactive/server/runtime/pom.xml +++ b/independent-projects/resteasy-reactive/server/runtime/pom.xml @@ -45,6 +45,7 @@ org.mockito mockito-core + ${mockito.version} test diff --git a/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/sse/SseServerResource.java b/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/sse/SseServerResource.java index b922621a8126c..650abb0b21cc1 100644 --- a/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/sse/SseServerResource.java +++ b/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/sse/SseServerResource.java @@ -4,9 +4,7 @@ import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.logging.Logger; -import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import jakarta.ws.rs.GET; import jakarta.ws.rs.POST; @@ -20,76 +18,91 @@ import jakarta.ws.rs.sse.SseBroadcaster; import jakarta.ws.rs.sse.SseEventSink; +import org.jboss.logging.Logger; + @Path("sse") -@ApplicationScoped public class SseServerResource { - - public static final Logger logger = Logger.getLogger(SseServerResource.class.getName()); private static SseBroadcaster sseBroadcaster; - private static OutboundSseEvent.Builder eventBuilder; + private static OutboundSseEvent.Builder eventBuilder; private static CountDownLatch closeLatch; private static CountDownLatch errorLatch; + private static final Logger logger = Logger.getLogger(SseServerResource.class); + @Inject public SseServerResource(@Context Sse sse) { + logger.info("Initialized SseServerResource " + this.hashCode()); if (Objects.isNull(eventBuilder)) { eventBuilder = sse.newEventBuilder(); } if (Objects.isNull(sseBroadcaster)) { sseBroadcaster = sse.newBroadcaster(); - sseBroadcaster.onClose(this::onClose); - sseBroadcaster.onError(this::onError); + logger.info("Initializing broadcaster " + sseBroadcaster.hashCode()); + sseBroadcaster.onClose(sseEventSink -> { + CountDownLatch latch = SseServerResource.getCloseLatch(); + logger.info(String.format("Called on close, counting down latch %s", latch.hashCode())); + latch.countDown(); + }); + sseBroadcaster.onError((sseEventSink, throwable) -> { + CountDownLatch latch = SseServerResource.getErrorLatch(); + logger.info(String.format("There was an error, counting down latch %s", latch.hashCode())); + latch.countDown(); + }); } } - private synchronized void onError(SseEventSink sseEventSink, Throwable throwable) { - logger.severe(String.format("There was an error for sseEventSink %s: %s", - sseEventSink.hashCode(), throwable.getMessage())); - errorLatch.countDown(); - } - - private synchronized void onClose(SseEventSink sseEventSink) { - logger.info(String.format("Called on close for %s", sseEventSink.hashCode())); - closeLatch.countDown(); - } - @GET @Path("subscribe") @Produces(MediaType.SERVER_SENT_EVENTS) public void subscribe(@Context SseEventSink sseEventSink) { - sseBroadcaster.register(sseEventSink); - closeLatch = new CountDownLatch(1); - errorLatch = new CountDownLatch(1); + logger.info(this.hashCode() + " /subscribe"); + setLatches(); + getSseBroadcaster().register(sseEventSink); sseEventSink.send(eventBuilder.data(sseEventSink.hashCode()).build()); } @POST @Path("broadcast") public Response broadcast() { - sseBroadcaster.broadcast(eventBuilder.data(Instant.now()).build()); + logger.info(this.hashCode() + " /broadcast"); + getSseBroadcaster().broadcast(eventBuilder.data(Instant.now()).build()); return Response.ok().build(); } @GET @Path("onclose-callback") public Response callback() throws InterruptedException { - boolean onCloseWasCalled = awaitClosedCallback(); + logger.info(this.hashCode() + " /onclose-callback, waiting for latch " + closeLatch.hashCode()); + boolean onCloseWasCalled = closeLatch.await(10, TimeUnit.SECONDS); return Response.ok(onCloseWasCalled).build(); } @GET @Path("onerror-callback") public Response errorCallback() throws InterruptedException { - boolean onErrorWasCalled = awaitErrorCallback(); + logger.info(this.hashCode() + " /onerror-callback, waiting for latch " + errorLatch.hashCode()); + boolean onErrorWasCalled = errorLatch.await(2, TimeUnit.SECONDS); return Response.ok(onErrorWasCalled).build(); } - private synchronized boolean awaitClosedCallback() throws InterruptedException { - return closeLatch.await(10, TimeUnit.SECONDS); + private static SseBroadcaster getSseBroadcaster() { + logger.info("using broadcaster " + sseBroadcaster.hashCode()); + return sseBroadcaster; + } + + public static void setLatches() { + closeLatch = new CountDownLatch(1); + errorLatch = new CountDownLatch(1); + logger.info(String.format("Setting latches: \n closeLatch: %s\n errorLatch: %s", + closeLatch.hashCode(), errorLatch.hashCode())); + } + + public static CountDownLatch getCloseLatch() { + return closeLatch; } - private synchronized boolean awaitErrorCallback() throws InterruptedException { - return errorLatch.await(2, TimeUnit.SECONDS); + public static CountDownLatch getErrorLatch() { + return errorLatch; } } diff --git a/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/sse/SseServerTestCase.java b/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/sse/SseServerTestCase.java index 75d4eb17ae753..fe9d00c42a5d8 100644 --- a/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/sse/SseServerTestCase.java +++ b/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/sse/SseServerTestCase.java @@ -5,7 +5,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.logging.Logger; import jakarta.ws.rs.client.Client; import jakarta.ws.rs.client.ClientBuilder; @@ -23,8 +22,6 @@ public class SseServerTestCase { - final private static Logger logger = Logger.getLogger(SseServerTestCase.class.getName()); - @RegisterExtension static final ResteasyReactiveUnitTest config = new ResteasyReactiveUnitTest() .withApplicationRoot((jar) -> jar @@ -32,13 +29,14 @@ public class SseServerTestCase { @Test public void shouldCallOnCloseOnServer() throws InterruptedException { + System.out.println("####### shouldCallOnCloseOnServer"); Client client = ClientBuilder.newBuilder().build(); WebTarget target = client.target(PortProviderUtil.createURI("/sse/subscribe")); try (SseEventSource sse = SseEventSource.target(target).build()) { CountDownLatch openingLatch = new CountDownLatch(1); List results = new CopyOnWriteArrayList<>(); sse.register(event -> { - logger.info("received data: " + event.readData()); + System.out.println("received data: " + event.readData()); results.add(event.readData()); openingLatch.countDown(); }); @@ -46,6 +44,7 @@ public void shouldCallOnCloseOnServer() throws InterruptedException { Assertions.assertTrue(openingLatch.await(3, TimeUnit.SECONDS)); Assertions.assertEquals(1, results.size()); sse.close(); + System.out.println("called sse.close() from client"); RestAssured.get("/sse/onclose-callback") .then() .statusCode(200) @@ -55,13 +54,14 @@ public void shouldCallOnCloseOnServer() throws InterruptedException { @Test public void shouldNotTryToSendToClosedSink() throws InterruptedException { + System.out.println("####### shouldNotTryToSendToClosedSink"); Client client = ClientBuilder.newBuilder().build(); WebTarget target = client.target(PortProviderUtil.createURI("/sse/subscribe")); try (SseEventSource sse = SseEventSource.target(target).build()) { CountDownLatch openingLatch = new CountDownLatch(1); List results = new ArrayList<>(); sse.register(event -> { - logger.info("received data: " + event.readData()); + System.out.println("received data: " + event.readData()); results.add(event.readData()); openingLatch.countDown(); });