diff --git a/pom.xml b/pom.xml index 72c3526a..fb3a719a 100644 --- a/pom.xml +++ b/pom.xml @@ -45,6 +45,7 @@ 5.12.1.Final 2.2.21 1.3.8 + 3.4.19 1.10.0 @@ -93,6 +94,14 @@ rxjava ${version.rxjava1} + + + + io.projectreactor + reactor-core + ${version.reactor-core} + + @@ -103,6 +112,7 @@ application propagators-rxjava1 propagators-rxjava2 + propagators-reactor tests tck api diff --git a/propagators-reactor/pom.xml b/propagators-reactor/pom.xml new file mode 100644 index 00000000..ea70c3d7 --- /dev/null +++ b/propagators-reactor/pom.xml @@ -0,0 +1,30 @@ + + + 4.0.0 + + io.smallrye + smallrye-context-propagation-parent + 1.2.3-SNAPSHOT + + smallrye-context-propagation-propagators-reactor + smallrye-context-propagation-propagators-reactor + + + + junit + junit + test + + + + org.eclipse.microprofile.context-propagation + microprofile-context-propagation-api + + + + io.projectreactor + reactor-core + + + + diff --git a/propagators-reactor/src/main/java/io/smallrye/context/propagators/reactor/ReactorContextPropagator.java b/propagators-reactor/src/main/java/io/smallrye/context/propagators/reactor/ReactorContextPropagator.java new file mode 100644 index 00000000..c3c129d8 --- /dev/null +++ b/propagators-reactor/src/main/java/io/smallrye/context/propagators/reactor/ReactorContextPropagator.java @@ -0,0 +1,21 @@ +package io.smallrye.context.propagators.reactor; + +import org.eclipse.microprofile.context.ThreadContext; +import org.eclipse.microprofile.context.spi.ContextManager; +import org.eclipse.microprofile.context.spi.ContextManagerExtension; +import reactor.core.scheduler.Schedulers; + +public class ReactorContextPropagator implements ContextManagerExtension { + + @Override + public void setup(ContextManager manager) { + + ThreadContext threadContext = manager.newThreadContextBuilder().build(); + + Schedulers.onScheduleHook( + ReactorContextPropagator.class.getName(), + threadContext::contextualRunnable); + + } + +} diff --git a/propagators-reactor/src/main/resources/META-INF/services/org.eclipse.microprofile.context.spi.ContextManagerExtension b/propagators-reactor/src/main/resources/META-INF/services/org.eclipse.microprofile.context.spi.ContextManagerExtension new file mode 100644 index 00000000..e5086a3c --- /dev/null +++ b/propagators-reactor/src/main/resources/META-INF/services/org.eclipse.microprofile.context.spi.ContextManagerExtension @@ -0,0 +1 @@ +io.smallrye.context.propagators.reactor.ReactorContextPropagator \ No newline at end of file diff --git a/tests/pom.xml b/tests/pom.xml index 63911e39..5b868241 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -52,6 +52,14 @@ ${project.version} test + + + ${project.groupId} + smallrye-context-propagation-propagators-reactor + ${project.version} + test + + ${project.groupId} smallrye-context-propagation-cdi diff --git a/tests/src/test/java/io/smallrye/context/test/ReactorTest.java b/tests/src/test/java/io/smallrye/context/test/ReactorTest.java new file mode 100644 index 00000000..ba5c3c6b --- /dev/null +++ b/tests/src/test/java/io/smallrye/context/test/ReactorTest.java @@ -0,0 +1,107 @@ +package io.smallrye.context.test; + +import io.smallrye.context.SmallRyeContextManagerProvider; +import io.smallrye.context.test.util.AbstractTest; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +import java.util.concurrent.CountDownLatch; + +public class ReactorTest extends AbstractTest { + + @BeforeClass + public static void init() { + // initialise + SmallRyeContextManagerProvider.getManager(); + } + + @Before + public void before() { + MyContext.init(); + MyContext.get().set("test"); + } + + @After + public void after() { + MyContext.clear(); + } + + @Test + public void testFlux() throws Throwable { + // check initial state + checkContextCaptured(); + + CountDownLatch latch = new CountDownLatch(1); + + Throwable[] ret = new Throwable[1]; + Flux.create(sink -> { + // check deferred state + try { + checkContextCaptured(); + sink.next("YES"); + sink.complete(); + } catch (Throwable e) { + sink.error(e); + } + + }) + .subscribeOn(Schedulers.newSingle("test")) + .subscribe( + success -> latch.countDown(), + error -> { + ret[0] = error; + latch.countDown(); + }); + + latch.await(); + + if (ret[0] != null) { + throw ret[0]; + } + } + + @Test + public void testMono() throws Throwable { + // check initial state + checkContextCaptured(); + + CountDownLatch latch = new CountDownLatch(1); + + Throwable[] ret = new Throwable[1]; + + Mono.create((sink) -> { + try { + // check deferred state + checkContextCaptured(); + sink.success("YES"); + } catch (Throwable e) { + sink.error(e); + } + }) + .subscribeOn(Schedulers.newSingle("test")) + .subscribe( + success -> latch.countDown(), + e -> { + ret[0] = e; + latch.countDown(); + } + ); + + latch.await(); + + if (ret[0] != null) { + throw ret[0]; + } + } + + private void checkContextCaptured() { + Assert.assertEquals("test", MyContext.get().getReqId()); + } + +}