Skip to content

Commit

Permalink
added module propagators-reactor
Browse files Browse the repository at this point in the history
the module `propagators-reactor` enables context propagation inside project reactor
  • Loading branch information
cbianco committed Jun 17, 2022
1 parent b577ac4 commit 8076080
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 0 deletions.
10 changes: 10 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
<version.narayana>5.12.1.Final</version.narayana>
<version.rxjava2>2.2.21</version.rxjava2>
<version.rxjava1>1.3.8</version.rxjava1>
<version.reactor-core>3.4.19</version.reactor-core>

<!-- Testing versions -->
<version.smallrye.common>1.10.0</version.smallrye.common>
Expand Down Expand Up @@ -93,6 +94,14 @@
<artifactId>rxjava</artifactId>
<version>${version.rxjava1}</version>
</dependency>


<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>${version.reactor-core}</version>
</dependency>

</dependencies>
</dependencyManagement>

Expand All @@ -103,6 +112,7 @@
<module>application</module>
<module>propagators-rxjava1</module>
<module>propagators-rxjava2</module>
<module>propagators-reactor</module>
<module>tests</module>
<module>tck</module>
<module>api</module>
Expand Down
30 changes: 30 additions & 0 deletions propagators-reactor/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.smallrye</groupId>
<artifactId>smallrye-context-propagation-parent</artifactId>
<version>1.2.3-SNAPSHOT</version>
</parent>
<artifactId>smallrye-context-propagation-propagators-reactor</artifactId>
<name>smallrye-context-propagation-propagators-reactor</name>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.eclipse.microprofile.context-propagation</groupId>
<artifactId>microprofile-context-propagation-api</artifactId>
</dependency>

<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.smallrye.context.propagators.reactor;

import org.eclipse.microprofile.context.ThreadContext;
import org.eclipse.microprofile.context.spi.ContextManager;
import org.eclipse.microprofile.context.spi.ContextManagerExtension;
import reactor.core.scheduler.Schedulers;

public class ReactorContextPropagator implements ContextManagerExtension {

@Override
public void setup(ContextManager manager) {

ThreadContext threadContext = manager.newThreadContextBuilder().build();

Schedulers.onScheduleHook(
ReactorContextPropagator.class.getName(),
threadContext::contextualRunnable);

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.smallrye.context.propagators.reactor.ReactorContextPropagator
8 changes: 8 additions & 0 deletions tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>smallrye-context-propagation-propagators-reactor</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>smallrye-context-propagation-cdi</artifactId>
Expand Down
107 changes: 107 additions & 0 deletions tests/src/test/java/io/smallrye/context/test/ReactorTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package io.smallrye.context.test;

import io.smallrye.context.SmallRyeContextManagerProvider;
import io.smallrye.context.test.util.AbstractTest;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.util.concurrent.CountDownLatch;

public class ReactorTest extends AbstractTest {

@BeforeClass
public static void init() {
// initialise
SmallRyeContextManagerProvider.getManager();
}

@Before
public void before() {
MyContext.init();
MyContext.get().set("test");
}

@After
public void after() {
MyContext.clear();
}

@Test
public void testFlux() throws Throwable {
// check initial state
checkContextCaptured();

CountDownLatch latch = new CountDownLatch(1);

Throwable[] ret = new Throwable[1];
Flux.create(sink -> {
// check deferred state
try {
checkContextCaptured();
sink.next("YES");
sink.complete();
} catch (Throwable e) {
sink.error(e);
}

})
.subscribeOn(Schedulers.newSingle("test"))
.subscribe(
success -> latch.countDown(),
error -> {
ret[0] = error;
latch.countDown();
});

latch.await();

if (ret[0] != null) {
throw ret[0];
}
}

@Test
public void testMono() throws Throwable {
// check initial state
checkContextCaptured();

CountDownLatch latch = new CountDownLatch(1);

Throwable[] ret = new Throwable[1];

Mono.create((sink) -> {
try {
// check deferred state
checkContextCaptured();
sink.success("YES");
} catch (Throwable e) {
sink.error(e);
}
})
.subscribeOn(Schedulers.newSingle("test"))
.subscribe(
success -> latch.countDown(),
e -> {
ret[0] = e;
latch.countDown();
}
);

latch.await();

if (ret[0] != null) {
throw ret[0];
}
}

private void checkContextCaptured() {
Assert.assertEquals("test", MyContext.get().getReqId());
}

}

0 comments on commit 8076080

Please sign in to comment.