Skip to content

Commit

Permalink
upgrade examples
Browse files Browse the repository at this point in the history
  • Loading branch information
Matej Pucihar committed Mar 12, 2024
1 parent 7a2a06a commit 1088870
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 22 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
<quarkus.platform.version>3.8.1</quarkus.platform.version>
<quarkus.platform.version>3.8.2</quarkus.platform.version>
<skipITs>true</skipITs>
<surefire-plugin.version>3.2.5</surefire-plugin.version>
</properties>
Expand Down
91 changes: 77 additions & 14 deletions src/main/java/si/puci/GreetingResource.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package si.puci;

import java.time.OffsetDateTime;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import org.eclipse.microprofile.context.ManagedExecutor;
import org.eclipse.microprofile.context.ThreadContext;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.context.api.ManagedExecutorConfig;
import io.smallrye.reactive.messaging.MutinyEmitter;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
Expand All @@ -22,26 +23,88 @@
public class GreetingResource
{
@Inject
@Channel("words-out")
MutinyEmitter<String> emitter;

@Channel("mutiny-emitter")
MutinyEmitter<String> mutinyEmitter;
@Inject
@ManagedExecutorConfig(cleared = ThreadContext.TRANSACTION)
ManagedExecutor noTxExecutor;
@Channel("emitter")
Emitter<String> emitter;

@GET
@Path("/mutiny-emitter-send-and-await")
@Produces(MediaType.TEXT_PLAIN)
public String hello()
public String mutinyEmitterSendAndAwait()
{
/////////// THIS THROWS (1 in 1000) /////////////////

final var myEntity = new MyEntity();
myEntity.field = OffsetDateTime.now().toString();
myEntity.persistAndFlush();
myEntity.persist();

emitter.sendAndAwait(myEntity.field);
mutinyEmitter.sendAndAwait(myEntity.field);

return "mutiny-emitter-send-and-await";
}

@GET
@Path("/mutiny-emitter-send-and-forget")
@Produces(MediaType.TEXT_PLAIN)
public String mutinyEmitterSendAndForget()
{
/////////// THIS ALWAYS THROWS (expected) /////////////////
final var myEntity = new MyEntity();
myEntity.field = OffsetDateTime.now().toString();
myEntity.persist();

mutinyEmitter.sendAndForget(myEntity.field);

//this cannot work, since propagated TX is still bound to kafka thread in time of commit
return "mutiny-emitter-send-and-forget";
}

//this kinda solves the problem
//noTxExecutor.runAsync(() -> emitter.sendAndAwait(myEntity.field)).join();
@GET
@Path("/emitter-send-block-at-end")
@Produces(MediaType.TEXT_PLAIN)
public String emitterSendBlockAtEnd()
{
/////////// THIS THROWS (1 in 1000) /////////////////
final var myEntity = new MyEntity();
myEntity.field = OffsetDateTime.now().toString();
myEntity.persist();

final var completableFuture = new CompletableFuture<String>();
emitter.send(
Message.of(myEntity.field)
.withAck(() -> {
completableFuture.complete("emitter-send-block-at-end");
return CompletableFuture.completedFuture(null);
})
.withNack(t -> {
completableFuture.completeExceptionally(t);
return CompletableFuture.completedFuture(null);
}));
return completableFuture.join();
}
@GET
@Path("/emitter-send-return-fut")
@Produces(MediaType.TEXT_PLAIN)
public CompletionStage<String> emitterSend()
{
/////////// THIS DOES NOT THROW (no idea why)/////////////////
final var myEntity = new MyEntity();
myEntity.field = OffsetDateTime.now().toString();
myEntity.persist();

return "Hello from RESTEasy Reactive";
final var completableFuture = new CompletableFuture<String>();
emitter.send(
Message.of(myEntity.field)
.withAck(() -> {
completableFuture.complete("emitter-send-return-fut");
return CompletableFuture.completedFuture(null);
})
.withNack(t -> {
completableFuture.completeExceptionally(t);
return CompletableFuture.completedFuture(null);
}));
return completableFuture;
}
}
11 changes: 9 additions & 2 deletions src/main/java/si/puci/MyEntity.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package si.puci;

import io.quarkus.hibernate.orm.panache.PanacheEntity;
import io.quarkus.hibernate.orm.panache.PanacheEntityBase;
import jakarta.persistence.Entity;

import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;

/**
* Example JPA entity defined as a Panache Entity.
Expand All @@ -24,6 +27,10 @@
* }
*/
@Entity
public class MyEntity extends PanacheEntity {
public class MyEntity extends PanacheEntityBase
{
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
Long id;
public String field;
}
5 changes: 3 additions & 2 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
mp.messaging.outgoing.words-out.topic=words
mp.messaging.outgoing.words-out.connector=smallrye-kafka
#uncomment trace logger of arjuna and the exception will always fall xD
#quarkus.log.category."com.arjuna".level=trace
#quarkus.log.category."com.arjuna".min-level=trace
95 changes: 92 additions & 3 deletions src/test/java/si/puci/GreetingResourceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,106 @@
class GreetingResourceTest
{
@Test
void testHelloEndpoint()
void mutinyEmitterSendAndAwait()
{
//this throws 1:1000

AtomicBoolean failure = new AtomicBoolean(false);
AtomicReference<Throwable> ex = new AtomicReference<>(null);

final var subscription = Multi.createFrom().ticks().every(Duration.of(1, ChronoUnit.MILLIS))
.onItem().invoke(i -> given()
.when().get("/hello/mutiny-emitter-send-and-await")
.then()
.statusCode(200)
.body(is("mutiny-emitter-send-and-await")))
.subscribe().with(
Log::info,
err -> {
Log.error(err);
ex.set(err);
failure.set(true);
});
try
{
Thread.sleep(60000);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}

subscription.cancel();
if (ex.get() != null)
{
Log.error(ex.get());
}
Assertions.assertFalse(failure.get());
//no error!
}
@Test
void mutinyEmitterSendAndForget()
{
//this always throws
given()
.when().get("/hello/mutiny-emitter-send-and-forget")
.then()
.statusCode(200)
.body(is("mutiny-emitter-send-and-forget"));
}

@Test
void emitterSendBlockAtEnd()
{
//this throws 1:1000

AtomicBoolean failure = new AtomicBoolean(false);
AtomicReference<Throwable> ex = new AtomicReference<>(null);

final var subscription = Multi.createFrom().ticks().every(Duration.of(1, ChronoUnit.MILLIS))
.onItem().invoke(i -> given()
.when().get("/hello/emitter-send-block-at-end")
.then()
.statusCode(200)
.body(is("emitter-send-block-at-end")))
.subscribe().with(
Log::info,
err -> {
Log.error(err);
ex.set(err);
failure.set(true);
});
try
{
Thread.sleep(60000);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}

subscription.cancel();
if (ex.get() != null)
{
Log.error(ex.get());
}
Assertions.assertFalse(failure.get());
//no error!
}
@Test
void emitterSendReturnFut()
{
//this never throws.

AtomicBoolean failure = new AtomicBoolean(false);
AtomicReference<Throwable> ex = new AtomicReference<>(null);

final var subscription = Multi.createFrom().ticks().every(Duration.of(1, ChronoUnit.MILLIS))
.onItem().invoke(i -> given()
.when().get("/hello")
.when().get("/hello/emitter-send-return-fut")
.then()
.statusCode(200)
.body(is("Hello from RESTEasy Reactive")))
.body(is("emitter-send-return-fut")))
.subscribe().with(
Log::info,
err -> {
Expand Down

0 comments on commit 1088870

Please sign in to comment.