From fd8156b2c41c178cd7106a592d5272117c5919ea Mon Sep 17 00:00:00 2001 From: Matej Pucihar Date: Fri, 1 Mar 2024 09:12:37 +0100 Subject: [PATCH] reproduce exception, README.md --- README.md | 233 ++++++++++++------ src/main/java/si/puci/GreetingResource.java | 35 ++- .../puci/MyReactiveMessagingApplication.java | 43 ---- src/main/resources/application.properties | 3 +- src/main/resources/import.sql | 6 - src/test/java/si/puci/GreetingResourceIT.java | 8 - .../java/si/puci/GreetingResourceTest.java | 53 +++- .../MyReactiveMessagingApplicationTest.java | 25 -- 8 files changed, 242 insertions(+), 164 deletions(-) delete mode 100644 src/main/java/si/puci/MyReactiveMessagingApplication.java delete mode 100644 src/main/resources/import.sql delete mode 100644 src/test/java/si/puci/GreetingResourceIT.java delete mode 100644 src/test/java/si/puci/MyReactiveMessagingApplicationTest.java diff --git a/README.md b/README.md index 4852f8a..734c52e 100644 --- a/README.md +++ b/README.md @@ -1,79 +1,172 @@ -# kafka-db-tx +# Reproduce wierd error with blocking db TX when sending kafka message in it. -This project uses Quarkus, the Supersonic Subatomic Java Framework. - -If you want to learn more about Quarkus, please visit its website: https://quarkus.io/ . - -## Running the application in dev mode - -You can run your application in dev mode that enables live coding using: -```shell script -./mvnw compile quarkus:dev -``` - -> **_NOTE:_** Quarkus now ships with a Dev UI, which is available in dev mode only at http://localhost:8080/q/dev/. - -## Packaging and running the application - -The application can be packaged using: ```shell script -./mvnw package +./mvnw test ``` -It produces the `quarkus-run.jar` file in the `target/quarkus-app/` directory. -Be aware that it’s not an _über-jar_ as the dependencies are copied into the `target/quarkus-app/lib/` directory. -The application is now runnable using `java -jar target/quarkus-app/quarkus-run.jar`. +Test runs for 1 minute, it performs kind of stress test, during which the following exception occurs. +The test just fires REST request every 1ms, the request inserts something into db and writes message to kafka. -If you want to build an _über-jar_, execute the following command: -```shell script -./mvnw package -Dquarkus.package.type=uber-jar -``` +In my case exception occurs in ~~90% of cases, if it doesn't happen retry. It happens randomly. -The application, packaged as an _über-jar_, is now runnable using `java -jar target/*-runner.jar`. +I investigated a bit further, placing breakpoint at +`/Users/matejpucihar/.m2/repository/org/jboss/narayana/jta/narayana-jta/7.0.0.Final/narayana-jta-7.0.0.Final.jar!/com/arjuna/ats/arjuna/logging/arjunaI18NLogger_$logger.class:693` +and +`com.arjuna.ats.arjuna.coordinator.BasicAction.checkChildren` +where I found the 2 threads that are associated with TX. `worker thread` and `kafka-producer-thread`. -## Creating a native executable - -You can create a native executable using: -```shell script -./mvnw package -Dnative -``` +The issue can kind of be solved with usage of executor with no propagated transaction context. -Or, if you don't have GraalVM installed, you can run the native executable build in a container using: -```shell script -./mvnw package -Dnative -Dquarkus.native.container-build=true ``` - -You can then execute your native executable with: `./target/kafka-db-tx-1.0.0-SNAPSHOT-runner` - -If you want to learn more about building native executables, please consult https://quarkus.io/guides/maven-tooling. - -## Related Guides - -- Hibernate Validator ([guide](https://quarkus.io/guides/validation)): Validate object properties (field, getter) and method parameters for your beans (REST, CDI, Jakarta Persistence) -- SmallRye Reactive Messaging - Kafka Connector ([guide](https://quarkus.io/guides/kafka-reactive-getting-started)): Connect to Kafka with Reactive Messaging -- Hibernate ORM with Panache ([guide](https://quarkus.io/guides/hibernate-orm-panache)): Simplify your persistence code for Hibernate ORM via the active record or the repository pattern -- JDBC Driver - PostgreSQL ([guide](https://quarkus.io/guides/datasource)): Connect to the PostgreSQL database via JDBC - -## Provided Code - -### Hibernate ORM - -Create your first JPA entity - -[Related guide section...](https://quarkus.io/guides/hibernate-orm) - -[Related Hibernate with Panache section...](https://quarkus.io/guides/hibernate-orm-panache) - - -### Reactive Messaging codestart - -Use SmallRye Reactive Messaging - -[Related Apache Kafka guide section...](https://quarkus.io/guides/kafka-reactive-getting-started) - - -### RESTEasy Reactive - -Easily start your Reactive RESTful Web Services - -[Related guide section...](https://quarkus.io/guides/getting-started-reactive#reactive-jax-rs-resources) +2024-03-01 08:32:55,147 INFO [io.sma.mut.sub.Subscribers$CallbackBasedSubscriber] (executor-thread-2) 8949 +2024-03-01 08:32:55,151 INFO [io.sma.mut.sub.Subscribers$CallbackBasedSubscriber] (executor-thread-1) 8950 +2024-03-01 08:32:55,157 INFO [io.sma.mut.sub.Subscribers$CallbackBasedSubscriber] (executor-thread-2) 8951 +2024-03-01 08:32:55,162 WARN [com.arj.ats.arjuna] (executor-thread-2) ARJUNA012094: Commit of action id 0:ffff0a010a48:e441:65e18477:d1d0 invoked while multiple threads active within it. +2024-03-01 08:32:55,163 WARN [com.arj.ats.arjuna] (executor-thread-2) ARJUNA012107: CheckedAction::check - atomic action 0:ffff0a010a48:e441:65e18477:d1d0 commiting with 2 threads active! +2024-03-01 08:32:55,163 WARN [com.arj.ats.jta] (executor-thread-2) ARJUNA016039: onePhaseCommit on < formatId=131077, gtrid_length=35, bqual_length=36, tx_uid=0:ffff0a010a48:e441:65e18477:d1d0, node_name=quarkus, branch_uid=0:ffff0a010a48:e441:65e18477:d1d3, subordinatenodename=null, eis_name=0 > (io.agroal.narayana.LocalXAResource@2ea67dc) failed with exception XAException.XA_RBROLLBACK: javax.transaction.xa.XAException: Error trying to transactionCommit local transaction: Enlisted connection used without active transaction + at io.agroal.narayana.LocalXAResource.xaException(LocalXAResource.java:140) + at io.agroal.narayana.LocalXAResource.xaException(LocalXAResource.java:134) + at io.agroal.narayana.LocalXAResource.commit(LocalXAResource.java:72) + at com.arjuna.ats.internal.jta.resources.arjunacore.XAResourceRecord.topLevelOnePhaseCommit(XAResourceRecord.java:678) + at com.arjuna.ats.arjuna.coordinator.BasicAction.onePhaseCommit(BasicAction.java:2457) + at com.arjuna.ats.arjuna.coordinator.BasicAction.End(BasicAction.java:1520) + at com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator.end(TwoPhaseCoordinator.java:74) + at com.arjuna.ats.arjuna.AtomicAction.commit(AtomicAction.java:138) + at com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionImple.commitAndDisassociate(TransactionImple.java:1271) + at com.arjuna.ats.internal.jta.transaction.arjunacore.BaseTransaction.commit(BaseTransaction.java:104) + at io.quarkus.narayana.jta.runtime.NotifyingTransactionManager.commit(NotifyingTransactionManager.java:70) + at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorBase.endTransaction(TransactionalInterceptorBase.java:406) + at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorBase.invokeInOurTx(TransactionalInterceptorBase.java:171) + at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorBase.invokeInOurTx(TransactionalInterceptorBase.java:107) + at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorRequired.doIntercept(TransactionalInterceptorRequired.java:38) + at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorBase.intercept(TransactionalInterceptorBase.java:61) + at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorRequired.intercept(TransactionalInterceptorRequired.java:32) + at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorRequired_Bean.intercept(Unknown Source) + at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:42) + at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:30) + at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:27) + at si.puci.GreetingResource_Subclass.hello(Unknown Source) + at si.puci.GreetingResource_ClientProxy.hello(Unknown Source) + at si.puci.GreetingResource$quarkusrestinvoker$hello_e747664148511e1e5212d3e0f4b40d45c56ab8a1.invoke(Unknown Source) + at org.jboss.resteasy.reactive.server.handlers.InvocationHandler.handle(InvocationHandler.java:29) + at io.quarkus.resteasy.reactive.server.runtime.QuarkusResteasyReactiveRequestContext.invokeHandler(QuarkusResteasyReactiveRequestContext.java:141) + at org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext.run(AbstractResteasyReactiveContext.java:147) + at io.quarkus.vertx.core.runtime.VertxCoreRecorder$14.runWith(VertxCoreRecorder.java:582) + at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2513) + at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1538) + at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:29) + at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:29) + at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) + at java.base/java.lang.Thread.run(Thread.java:1583) +Caused by: java.sql.SQLException: Enlisted connection used without active transaction + at io.agroal.pool.ConnectionHandler.verifyEnlistment(ConnectionHandler.java:398) + at io.agroal.pool.ConnectionHandler.transactionCommit(ConnectionHandler.java:355) + at io.agroal.narayana.LocalXAResource.commit(LocalXAResource.java:69) + ... 31 more + +2024-03-01 08:32:55,169 ERROR [io.qua.ver.htt.run.QuarkusErrorHandler] (executor-thread-2) HTTP Request to /hello failed, error id: 2795b427-72e0-40e2-84da-2e4b8d692231-1: io.quarkus.arc.ArcUndeclaredThrowableException: Error invoking subclass method + at si.puci.GreetingResource_Subclass.hello(Unknown Source) + at si.puci.GreetingResource_ClientProxy.hello(Unknown Source) + at si.puci.GreetingResource$quarkusrestinvoker$hello_e747664148511e1e5212d3e0f4b40d45c56ab8a1.invoke(Unknown Source) + at org.jboss.resteasy.reactive.server.handlers.InvocationHandler.handle(InvocationHandler.java:29) + at io.quarkus.resteasy.reactive.server.runtime.QuarkusResteasyReactiveRequestContext.invokeHandler(QuarkusResteasyReactiveRequestContext.java:141) + at org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext.run(AbstractResteasyReactiveContext.java:147) + at io.quarkus.vertx.core.runtime.VertxCoreRecorder$14.runWith(VertxCoreRecorder.java:582) + at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2513) + at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1538) + at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:29) + at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:29) + at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) + at java.base/java.lang.Thread.run(Thread.java:1583) +Caused by: jakarta.transaction.RollbackException: ARJUNA016053: Could not commit transaction. + at com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionImple.commitAndDisassociate(TransactionImple.java:1283) + at com.arjuna.ats.internal.jta.transaction.arjunacore.BaseTransaction.commit(BaseTransaction.java:104) + at io.quarkus.narayana.jta.runtime.NotifyingTransactionManager.commit(NotifyingTransactionManager.java:70) + at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorBase.endTransaction(TransactionalInterceptorBase.java:406) + at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorBase.invokeInOurTx(TransactionalInterceptorBase.java:171) + at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorBase.invokeInOurTx(TransactionalInterceptorBase.java:107) + at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorRequired.doIntercept(TransactionalInterceptorRequired.java:38) + at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorBase.intercept(TransactionalInterceptorBase.java:61) + at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorRequired.intercept(TransactionalInterceptorRequired.java:32) + at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorRequired_Bean.intercept(Unknown Source) + at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:42) + at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:30) + at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:27) + ... 13 more + Suppressed: javax.transaction.xa.XAException: Error trying to transactionCommit local transaction: Enlisted connection used without active transaction + at io.agroal.narayana.LocalXAResource.xaException(LocalXAResource.java:140) + at io.agroal.narayana.LocalXAResource.xaException(LocalXAResource.java:134) + at io.agroal.narayana.LocalXAResource.commit(LocalXAResource.java:72) + at com.arjuna.ats.internal.jta.resources.arjunacore.XAResourceRecord.topLevelOnePhaseCommit(XAResourceRecord.java:678) + at com.arjuna.ats.arjuna.coordinator.BasicAction.onePhaseCommit(BasicAction.java:2457) + at com.arjuna.ats.arjuna.coordinator.BasicAction.End(BasicAction.java:1520) + at com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator.end(TwoPhaseCoordinator.java:74) + at com.arjuna.ats.arjuna.AtomicAction.commit(AtomicAction.java:138) + at com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionImple.commitAndDisassociate(TransactionImple.java:1271) + ... 25 more + Caused by: java.sql.SQLException: Enlisted connection used without active transaction + at io.agroal.pool.ConnectionHandler.verifyEnlistment(ConnectionHandler.java:398) + at io.agroal.pool.ConnectionHandler.transactionCommit(ConnectionHandler.java:355) + at io.agroal.narayana.LocalXAResource.commit(LocalXAResource.java:69) + ... 31 more + +2024-03-01 08:32:55,179 ERROR [si.puc.GreetingResourceTest] (executor-thread-1) java.lang.AssertionError: 1 expectation failed. +Expected status code <200> but was <500>. + +2024-03-01 08:32:55,180 ERROR [io.qua.mut.run.MutinyInfrastructure] (executor-thread-1) Mutiny had to drop the following exception: io.smallrye.mutiny.CompositeException: Multiple exceptions caught: + [Exception 0] java.lang.AssertionError: 1 expectation failed. +Expected status code <200> but was <500>. + + [Exception 1] org.opentest4j.AssertionFailedError + at io.smallrye.mutiny.subscription.Subscribers$CallbackBasedSubscriber.onFailure(Subscribers.java:95) + at io.smallrye.mutiny.operators.multi.MultiOperatorProcessor.onFailure(MultiOperatorProcessor.java:88) + at io.smallrye.mutiny.operators.multi.MultiOperatorProcessor.failAndCancel(MultiOperatorProcessor.java:42) + at io.smallrye.mutiny.operators.multi.MultiOnItemInvoke$MultiOnItemInvokeProcessor.onItem(MultiOnItemInvoke.java:38) + at io.smallrye.mutiny.operators.multi.builders.IntervalMulti$IntervalRunnable.run(IntervalMulti.java:85) + at org.jboss.threads.EnhancedQueueExecutor$FixedRateRunnableScheduledFuture.performTask(EnhancedQueueExecutor.java:2974) + at org.jboss.threads.EnhancedQueueExecutor$FixedRateRunnableScheduledFuture.performTask(EnhancedQueueExecutor.java:2960) + at org.jboss.threads.EnhancedQueueExecutor$AbstractScheduledFuture.run(EnhancedQueueExecutor.java:2742) + at org.jboss.threads.EnhancedQueueExecutor$RepeatingScheduledFuture.run(EnhancedQueueExecutor.java:2933) + at io.quarkus.vertx.core.runtime.VertxCoreRecorder$14.runWith(VertxCoreRecorder.java:587) + at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2513) + at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1538) + at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:29) + at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:29) + at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) + at java.base/java.lang.Thread.run(Thread.java:1583) + Suppressed: org.opentest4j.AssertionFailedError + at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:34) + at org.junit.jupiter.api.Assertions.fail(Assertions.java:119) + at si.puci.GreetingResourceTest.lambda$testHelloEndpoint$1(GreetingResourceTest.java:36) + at io.smallrye.mutiny.subscription.Subscribers$CallbackBasedSubscriber.onFailure(Subscribers.java:93) + ... 15 more +Caused by: java.lang.AssertionError: 1 expectation failed. +Expected status code <200> but was <500>. + + at java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62) + at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:502) + at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:486) + at org.codehaus.groovy.reflection.CachedConstructor.invoke(CachedConstructor.java:73) + at org.codehaus.groovy.runtime.callsite.ConstructorSite$ConstructorSiteNoUnwrapNoCoerce.callConstructor(ConstructorSite.java:108) + at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCallConstructor(CallSiteArray.java:57) + at org.codehaus.groovy.runtime.callsite.AbstractCallSite.callConstructor(AbstractCallSite.java:263) + at org.codehaus.groovy.runtime.callsite.AbstractCallSite.callConstructor(AbstractCallSite.java:277) + at io.restassured.internal.ResponseSpecificationImpl$HamcrestAssertionClosure.validate(ResponseSpecificationImpl.groovy:512) + at io.restassured.internal.ResponseSpecificationImpl$HamcrestAssertionClosure$validate$1.call(Unknown Source) + at io.restassured.internal.ResponseSpecificationImpl.validateResponseIfRequired(ResponseSpecificationImpl.groovy:696) + at io.restassured.internal.ResponseSpecificationImpl.this$2$validateResponseIfRequired(ResponseSpecificationImpl.groovy) + at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) + at java.base/java.lang.reflect.Method.invoke(Method.java:580) + at org.codehaus.groovy.runtime.callsite.PlainObjectMetaMethodSite.doInvoke(PlainObjectMetaMethodSite.java:43) + at org.codehaus.groovy.runtime.callsite.PogoMetaMethodSite$PogoCachedMethodSiteNoUnwrapNoCoerce.invoke(PogoMetaMethodSite.java:198) + at org.codehaus.groovy.runtime.callsite.PogoMetaMethodSite.callCurrent(PogoMetaMethodSite.java:62) + at org.codehaus.groovy.runtime.callsite.AbstractCallSite.callCurrent(AbstractCallSite.java:185) + at io.restassured.internal.ResponseSpecificationImpl.statusCode(ResponseSpecificationImpl.groovy:135) + at io.restassured.specification.ResponseSpecification$statusCode$0.callCurrent(Unknown Source) + at io.restassured.internal.ResponseSpecificationImpl.statusCode(ResponseSpecificationImpl.groovy:143) + at io.restassured.internal.ValidatableResponseOptionsImpl.statusCode(ValidatableResponseOptionsImpl.java:89) + at si.puci.GreetingResourceTest.lambda$testHelloEndpoint$0(GreetingResourceTest.java:29) + at io.smallrye.context.impl.wrappers.SlowContextualConsumer.accept(SlowContextualConsumer.java:21) + at io.smallrye.mutiny.operators.multi.MultiOnItemInvoke$MultiOnItemInvokeProcessor.onItem(MultiOnItemInvoke.java:36) + ... 12 more +``` \ No newline at end of file diff --git a/src/main/java/si/puci/GreetingResource.java b/src/main/java/si/puci/GreetingResource.java index cbfdc3b..b27e2c6 100644 --- a/src/main/java/si/puci/GreetingResource.java +++ b/src/main/java/si/puci/GreetingResource.java @@ -1,16 +1,47 @@ package si.puci; +import java.time.OffsetDateTime; + +import org.eclipse.microprofile.context.ManagedExecutor; +import org.eclipse.microprofile.context.ThreadContext; +import org.eclipse.microprofile.reactive.messaging.Channel; + +import io.smallrye.context.api.ManagedExecutorConfig; +import io.smallrye.reactive.messaging.MutinyEmitter; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import jakarta.transaction.Transactional; import jakarta.ws.rs.GET; import jakarta.ws.rs.Path; import jakarta.ws.rs.Produces; import jakarta.ws.rs.core.MediaType; @Path("/hello") -public class GreetingResource { +@Transactional +@ApplicationScoped +public class GreetingResource +{ + @Inject + @Channel("words-out") + MutinyEmitter emitter; + + @Inject + @ManagedExecutorConfig(cleared = ThreadContext.TRANSACTION) + ManagedExecutor noTxExecutor; @GET @Produces(MediaType.TEXT_PLAIN) - public String hello() { + public String hello() + { + final var myEntity = new MyEntity(); + myEntity.field = OffsetDateTime.now().toString(); + myEntity.persistAndFlush(); + + emitter.sendAndAwait(myEntity.field); + + //this kinda solves the problem + //noTxExecutor.runAsync(() -> emitter.sendAndAwait(myEntity.field)).join(); + return "Hello from RESTEasy Reactive"; } } diff --git a/src/main/java/si/puci/MyReactiveMessagingApplication.java b/src/main/java/si/puci/MyReactiveMessagingApplication.java deleted file mode 100644 index 094cb56..0000000 --- a/src/main/java/si/puci/MyReactiveMessagingApplication.java +++ /dev/null @@ -1,43 +0,0 @@ -package si.puci; - -import io.quarkus.runtime.StartupEvent; -import org.eclipse.microprofile.reactive.messaging.*; - -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.enterprise.event.Observes; -import jakarta.inject.Inject; -import java.util.stream.Stream; - -@ApplicationScoped -public class MyReactiveMessagingApplication { - - @Inject - @Channel("words-out") - Emitter emitter; - - /** - * Sends message to the "words-out" channel, can be used from a JAX-RS resource or any bean of your application. - * Messages are sent to the broker. - **/ - void onStart(@Observes StartupEvent ev) { - Stream.of("Hello", "with", "SmallRye", "reactive", "message").forEach(string -> emitter.send(string)); - } - - /** - * Consume the message from the "words-in" channel, uppercase it and send it to the uppercase channel. - * Messages come from the broker. - **/ - @Incoming("words-in") - @Outgoing("uppercase") - public Message toUpperCase(Message message) { - return message.withPayload(message.getPayload().toUpperCase()); - } - - /** - * Consume the uppercase channel (in-memory) and print the messages. - **/ - @Incoming("uppercase") - public void sink(String word) { - System.out.println(">> " + word); - } -} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index d8f24b7..a598ad8 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,3 +1,2 @@ -mp.messaging.incoming.words-in.topic=words mp.messaging.outgoing.words-out.topic=words -mp.messaging.incoming.words-in.auto.offset.reset=earliest +mp.messaging.outgoing.words-out.connector=smallrye-kafka diff --git a/src/main/resources/import.sql b/src/main/resources/import.sql deleted file mode 100644 index 16aa523..0000000 --- a/src/main/resources/import.sql +++ /dev/null @@ -1,6 +0,0 @@ --- This file allow to write SQL commands that will be emitted in test and dev. --- The commands are commented as their support depends of the database --- insert into myentity (id, field) values(1, 'field-1'); --- insert into myentity (id, field) values(2, 'field-2'); --- insert into myentity (id, field) values(3, 'field-3'); --- alter sequence myentity_seq restart with 4; \ No newline at end of file diff --git a/src/test/java/si/puci/GreetingResourceIT.java b/src/test/java/si/puci/GreetingResourceIT.java deleted file mode 100644 index d64e9bd..0000000 --- a/src/test/java/si/puci/GreetingResourceIT.java +++ /dev/null @@ -1,8 +0,0 @@ -package si.puci; - -import io.quarkus.test.junit.QuarkusIntegrationTest; - -@QuarkusIntegrationTest -class GreetingResourceIT extends GreetingResourceTest { - // Execute the same tests but in packaged mode. -} diff --git a/src/test/java/si/puci/GreetingResourceTest.java b/src/test/java/si/puci/GreetingResourceTest.java index 84a38b6..d52b082 100644 --- a/src/test/java/si/puci/GreetingResourceTest.java +++ b/src/test/java/si/puci/GreetingResourceTest.java @@ -1,20 +1,57 @@ package si.puci; +import io.quarkus.logging.Log; import io.quarkus.test.junit.QuarkusTest; +import io.smallrye.mutiny.Multi; + +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import static io.restassured.RestAssured.given; import static org.hamcrest.CoreMatchers.is; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + @QuarkusTest -class GreetingResourceTest { +class GreetingResourceTest +{ @Test - void testHelloEndpoint() { - given() - .when().get("/hello") - .then() - .statusCode(200) - .body(is("Hello from RESTEasy Reactive")); - } + void testHelloEndpoint() + { + AtomicBoolean failure = new AtomicBoolean(false); + AtomicReference ex = new AtomicReference<>(null); + final var subscription = Multi.createFrom().ticks().every(Duration.of(1, ChronoUnit.MILLIS)) + .onItem().invoke(i -> given() + .when().get("/hello") + .then() + .statusCode(200) + .body(is("Hello from RESTEasy Reactive"))) + .subscribe().with( + Log::info, + err -> { + Log.error(err); + ex.set(err); + failure.set(true); + }); + try + { + Thread.sleep(60000); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + + subscription.cancel(); + if (ex.get() != null) + { + Log.error(ex.get()); + } + Assertions.assertFalse(failure.get()); + //no error! + } } \ No newline at end of file diff --git a/src/test/java/si/puci/MyReactiveMessagingApplicationTest.java b/src/test/java/si/puci/MyReactiveMessagingApplicationTest.java deleted file mode 100644 index 3c2c952..0000000 --- a/src/test/java/si/puci/MyReactiveMessagingApplicationTest.java +++ /dev/null @@ -1,25 +0,0 @@ -package si.puci; - -import io.quarkus.test.common.QuarkusTestResource; -import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; -import io.quarkus.test.junit.QuarkusTest; - -import org.eclipse.microprofile.reactive.messaging.Message; -import org.junit.jupiter.api.Test; - -import jakarta.inject.Inject; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -@QuarkusTest -class MyReactiveMessagingApplicationTest { - - @Inject - MyReactiveMessagingApplication application; - - @Test - void test() { - assertEquals("HELLO", application.toUpperCase(Message.of("Hello")).getPayload()); - assertEquals("BONJOUR", application.toUpperCase(Message.of("bonjour")).getPayload()); - } -}