diff --git a/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/AutomaticContextPropagationTest.java b/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/AutomaticContextPropagationTest.java index 05ef569e56..61e45b7057 100644 --- a/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/AutomaticContextPropagationTest.java +++ b/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/AutomaticContextPropagationTest.java @@ -452,40 +452,27 @@ <T> void assertThatThreadLocalsPresentDirectCoreSubscribe( <T> void assertThatThreadLocalsPresentDirectCoreSubscribe( CorePublisher<? extends T> source, Runnable asyncAction) { assertThatNoException().isThrownBy(() -> { - AtomicReference<String> valueInOnNext = new AtomicReference<>(); - AtomicReference<String> valueInOnComplete = new AtomicReference<>(); - AtomicReference<String> valueInOnError = new AtomicReference<>(); - AtomicReference<Throwable> error = new AtomicReference<>(); - AtomicBoolean complete = new AtomicBoolean(); - AtomicBoolean hadNext = new AtomicBoolean(); - CountDownLatch latch = new CountDownLatch(1); - - CoreSubscriberWithContext<T> subscriberWithContext = new CoreSubscriberWithContext<>(valueInOnNext, - valueInOnComplete, - valueInOnError, - error, - latch, - hadNext, - complete); + CoreSubscriberWithContext<T> subscriberWithContext = new CoreSubscriberWithContext<>(); source.subscribe(subscriberWithContext); executorService.submit(asyncAction) .get(100, TimeUnit.MILLISECONDS); - if (!latch.await(500, TimeUnit.MILLISECONDS)) { + if (!subscriberWithContext.latch.await(500, TimeUnit.MILLISECONDS)) { throw new TimeoutException("timed out"); } - if (hadNext.get()) { - assertThat(valueInOnNext.get()).isEqualTo("present"); + if (subscriberWithContext.hadNext.get()) { + assertThat(subscriberWithContext.valueInOnNext.get()).isEqualTo( + "present"); } - if (error.get() == null) { - assertThat(valueInOnComplete.get()).isEqualTo("present"); - assertThat(complete).isTrue(); + if (subscriberWithContext.error.get() == null) { + assertThat(subscriberWithContext.valueInOnComplete.get()).isEqualTo("present"); + assertThat(subscriberWithContext.complete).isTrue(); } else { - assertThat(valueInOnError.get()).isEqualTo("present"); + assertThat(subscriberWithContext.valueInOnError.get()).isEqualTo("present"); } }); } @@ -502,40 +489,26 @@ <T> void assertThatThreadLocalsPresentDirectRawSubscribe( <T> void assertThatThreadLocalsPresentDirectRawSubscribe( Publisher<? extends T> source, Runnable asyncAction) { assertThatNoException().isThrownBy(() -> { - AtomicReference<String> valueInOnNext = new AtomicReference<>(); - AtomicReference<String> valueInOnComplete = new AtomicReference<>(); - AtomicReference<String> valueInOnError = new AtomicReference<>(); - AtomicReference<Throwable> error = new AtomicReference<>(); - AtomicBoolean hadNext = new AtomicBoolean(); - AtomicBoolean complete = new AtomicBoolean(); - CountDownLatch latch = new CountDownLatch(1); - - CoreSubscriberWithContext<T> subscriberWithContext = new CoreSubscriberWithContext<>(valueInOnNext, - valueInOnComplete, - valueInOnError, - error, - latch, - hadNext, - complete); + CoreSubscriberWithContext<T> subscriberWithContext = new CoreSubscriberWithContext<>(); source.subscribe(subscriberWithContext); executorService.submit(asyncAction) .get(100, TimeUnit.MILLISECONDS); - if (!latch.await(500, TimeUnit.MILLISECONDS)) { + if (!subscriberWithContext.latch.await(500, TimeUnit.MILLISECONDS)) { throw new TimeoutException("timed out"); } - if (hadNext.get()) { - assertThat(valueInOnNext.get()).isEqualTo("present"); + if (subscriberWithContext.hadNext.get()) { + assertThat(subscriberWithContext.valueInOnNext.get()).isEqualTo("present"); } - if (error.get() == null) { - assertThat(valueInOnComplete.get()).isEqualTo("present"); - assertThat(complete).isTrue(); + if (subscriberWithContext.error.get() == null) { + assertThat(subscriberWithContext.valueInOnComplete.get()).isEqualTo("present"); + assertThat(subscriberWithContext.complete).isTrue(); } else { - assertThat(valueInOnError.get()).isEqualTo("present"); + assertThat(subscriberWithContext.valueInOnError.get()).isEqualTo("present"); } }); } @@ -563,35 +536,24 @@ void internalFluxSubscribeNoFusion() { @Test void directFluxSubscribeAsCoreSubscriber() throws InterruptedException, TimeoutException { - AtomicReference<String> valueInOnNext = new AtomicReference<>(); - AtomicReference<String> valueInOnComplete = new AtomicReference<>(); - AtomicReference<String> valueInOnError = new AtomicReference<>(); - AtomicReference<Throwable> error = new AtomicReference<>(); - AtomicBoolean hadNext = new AtomicBoolean(); - AtomicBoolean complete = new AtomicBoolean(); - CountDownLatch latch = new CountDownLatch(1); - Flux<String> flux = threadSwitchingFlux(); - CoreSubscriberWithContext<String> subscriberWithContext = - new CoreSubscriberWithContext<>( - valueInOnNext, valueInOnComplete, valueInOnError, - error, latch, hadNext, complete); + CoreSubscriberWithContext<String> subscriberWithContext = new CoreSubscriberWithContext<>(); flux.subscribe(subscriberWithContext); - if (!latch.await(100, TimeUnit.MILLISECONDS)) { + if (!subscriberWithContext.latch.await(100, TimeUnit.MILLISECONDS)) { throw new TimeoutException("timed out"); } - assertThat(error.get()).isNull(); - assertThat(complete.get()).isTrue(); + assertThat(subscriberWithContext.error.get()).isNull(); + assertThat(subscriberWithContext.complete.get()).isTrue(); // We can't do anything here. subscribe(CoreSubscriber) is abstract in // CoreSubscriber interface and we have no means to intercept the calls to // restore ThreadLocals. - assertThat(valueInOnNext.get()).isEqualTo("ref_init"); - assertThat(valueInOnComplete.get()).isEqualTo("ref_init"); + assertThat(subscriberWithContext.valueInOnNext.get()).isEqualTo("ref_init"); + assertThat(subscriberWithContext.valueInOnComplete.get()).isEqualTo("ref_init"); } // Fundamental tests for Mono @@ -618,35 +580,25 @@ void internalMonoFlatMapSubscribeNoFusion() { @Test void directMonoSubscribeAsCoreSubscriber() throws InterruptedException, TimeoutException { - AtomicReference<String> valueInOnNext = new AtomicReference<>(); - AtomicReference<String> valueInOnComplete = new AtomicReference<>(); - AtomicReference<String> valueInOnError = new AtomicReference<>(); - AtomicReference<Throwable> error = new AtomicReference<>(); - AtomicBoolean complete = new AtomicBoolean(); - AtomicBoolean hadNext = new AtomicBoolean(); - CountDownLatch latch = new CountDownLatch(1); - Mono<String> mono = new ThreadSwitchingMono<>("Hello", executorService); CoreSubscriberWithContext<String> subscriberWithContext = - new CoreSubscriberWithContext<>( - valueInOnNext, valueInOnComplete, valueInOnError, - error, latch, hadNext, complete); + new CoreSubscriberWithContext<>(); mono.subscribe(subscriberWithContext); - if (!latch.await(100, TimeUnit.MILLISECONDS)) { + if (!subscriberWithContext.latch.await(100, TimeUnit.MILLISECONDS)) { throw new TimeoutException("timed out"); } - assertThat(error.get()).isNull(); - assertThat(complete.get()).isTrue(); + assertThat(subscriberWithContext.error.get()).isNull(); + assertThat(subscriberWithContext.complete.get()).isTrue(); // We can't do anything here. subscribe(CoreSubscriber) is abstract in // CoreSubscriber interface and we have no means to intercept the calls to // restore ThreadLocals. - assertThat(valueInOnNext.get()).isEqualTo("ref_init"); - assertThat(valueInOnComplete.get()).isEqualTo("ref_init"); + assertThat(subscriberWithContext.valueInOnNext.get()).isEqualTo("ref_init"); + assertThat(subscriberWithContext.valueInOnComplete.get()).isEqualTo("ref_init"); } // Flux tests @@ -1925,29 +1877,22 @@ void printInterestingClasses() throws Exception { private class CoreSubscriberWithContext<T> implements CoreSubscriber<T> { - private final AtomicReference<String> valueInOnNext; - private final AtomicReference<String> valueInOnComplete; - private final AtomicReference<String> valueInOnError; - private final AtomicReference<Throwable> error; - private final CountDownLatch latch; - private final AtomicBoolean complete; - private final AtomicBoolean hadNext; - - public CoreSubscriberWithContext( - AtomicReference<String> valueInOnNext, - AtomicReference<String> valueInOnComplete, - AtomicReference<String> valueInOnError, - AtomicReference<Throwable> error, - CountDownLatch latch, - AtomicBoolean hadNext, - AtomicBoolean complete) { - this.valueInOnNext = valueInOnNext; - this.valueInOnComplete = valueInOnComplete; - this.valueInOnError = valueInOnError; - this.error = error; - this.latch = latch; - this.hadNext = hadNext; - this.complete = complete; + final AtomicReference<String> valueInOnNext; + final AtomicReference<String> valueInOnComplete; + final AtomicReference<String> valueInOnError; + final AtomicReference<Throwable> error; + final CountDownLatch latch; + final AtomicBoolean complete; + final AtomicBoolean hadNext; + + public CoreSubscriberWithContext() { + this.valueInOnNext = new AtomicReference<>(); + this.valueInOnComplete = new AtomicReference<>(); + this.valueInOnError = new AtomicReference<>(); + this.error = new AtomicReference<>(); + this.complete = new AtomicBoolean(); + this.hadNext = new AtomicBoolean(); + this.latch = new CountDownLatch(1); } @Override