Skip to content

Commit

Permalink
Refectored CoreSubscriberWithContext to encapsulate state
Browse files Browse the repository at this point in the history
  • Loading branch information
chemicL committed Oct 30, 2023
1 parent 4586399 commit be01f56
Showing 1 changed file with 45 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -452,40 +452,27 @@ <T> void assertThatThreadLocalsPresentDirectCoreSubscribe(
<T> void assertThatThreadLocalsPresentDirectCoreSubscribe(
CorePublisher<? extends T> source, Runnable asyncAction) {
assertThatNoException().isThrownBy(() -> {
AtomicReference<String> valueInOnNext = new AtomicReference<>();
AtomicReference<String> valueInOnComplete = new AtomicReference<>();
AtomicReference<String> valueInOnError = new AtomicReference<>();
AtomicReference<Throwable> error = new AtomicReference<>();
AtomicBoolean complete = new AtomicBoolean();
AtomicBoolean hadNext = new AtomicBoolean();
CountDownLatch latch = new CountDownLatch(1);

CoreSubscriberWithContext<T> subscriberWithContext = new CoreSubscriberWithContext<>(valueInOnNext,
valueInOnComplete,
valueInOnError,
error,
latch,
hadNext,
complete);
CoreSubscriberWithContext<T> subscriberWithContext = new CoreSubscriberWithContext<>();

source.subscribe(subscriberWithContext);

executorService.submit(asyncAction)
.get(100, TimeUnit.MILLISECONDS);

if (!latch.await(500, TimeUnit.MILLISECONDS)) {
if (!subscriberWithContext.latch.await(500, TimeUnit.MILLISECONDS)) {
throw new TimeoutException("timed out");
}

if (hadNext.get()) {
assertThat(valueInOnNext.get()).isEqualTo("present");
if (subscriberWithContext.hadNext.get()) {
assertThat(subscriberWithContext.valueInOnNext.get()).isEqualTo(
"present");
}
if (error.get() == null) {
assertThat(valueInOnComplete.get()).isEqualTo("present");
assertThat(complete).isTrue();
if (subscriberWithContext.error.get() == null) {
assertThat(subscriberWithContext.valueInOnComplete.get()).isEqualTo("present");
assertThat(subscriberWithContext.complete).isTrue();
}
else {
assertThat(valueInOnError.get()).isEqualTo("present");
assertThat(subscriberWithContext.valueInOnError.get()).isEqualTo("present");
}
});
}
Expand All @@ -502,40 +489,26 @@ <T> void assertThatThreadLocalsPresentDirectRawSubscribe(
<T> void assertThatThreadLocalsPresentDirectRawSubscribe(
Publisher<? extends T> source, Runnable asyncAction) {
assertThatNoException().isThrownBy(() -> {
AtomicReference<String> valueInOnNext = new AtomicReference<>();
AtomicReference<String> valueInOnComplete = new AtomicReference<>();
AtomicReference<String> valueInOnError = new AtomicReference<>();
AtomicReference<Throwable> error = new AtomicReference<>();
AtomicBoolean hadNext = new AtomicBoolean();
AtomicBoolean complete = new AtomicBoolean();
CountDownLatch latch = new CountDownLatch(1);

CoreSubscriberWithContext<T> subscriberWithContext = new CoreSubscriberWithContext<>(valueInOnNext,
valueInOnComplete,
valueInOnError,
error,
latch,
hadNext,
complete);
CoreSubscriberWithContext<T> subscriberWithContext = new CoreSubscriberWithContext<>();

source.subscribe(subscriberWithContext);

executorService.submit(asyncAction)
.get(100, TimeUnit.MILLISECONDS);

if (!latch.await(500, TimeUnit.MILLISECONDS)) {
if (!subscriberWithContext.latch.await(500, TimeUnit.MILLISECONDS)) {
throw new TimeoutException("timed out");
}

if (hadNext.get()) {
assertThat(valueInOnNext.get()).isEqualTo("present");
if (subscriberWithContext.hadNext.get()) {
assertThat(subscriberWithContext.valueInOnNext.get()).isEqualTo("present");
}
if (error.get() == null) {
assertThat(valueInOnComplete.get()).isEqualTo("present");
assertThat(complete).isTrue();
if (subscriberWithContext.error.get() == null) {
assertThat(subscriberWithContext.valueInOnComplete.get()).isEqualTo("present");
assertThat(subscriberWithContext.complete).isTrue();
}
else {
assertThat(valueInOnError.get()).isEqualTo("present");
assertThat(subscriberWithContext.valueInOnError.get()).isEqualTo("present");
}
});
}
Expand Down Expand Up @@ -563,35 +536,24 @@ void internalFluxSubscribeNoFusion() {

@Test
void directFluxSubscribeAsCoreSubscriber() throws InterruptedException, TimeoutException {
AtomicReference<String> valueInOnNext = new AtomicReference<>();
AtomicReference<String> valueInOnComplete = new AtomicReference<>();
AtomicReference<String> valueInOnError = new AtomicReference<>();
AtomicReference<Throwable> error = new AtomicReference<>();
AtomicBoolean hadNext = new AtomicBoolean();
AtomicBoolean complete = new AtomicBoolean();
CountDownLatch latch = new CountDownLatch(1);

Flux<String> flux = threadSwitchingFlux();

CoreSubscriberWithContext<String> subscriberWithContext =
new CoreSubscriberWithContext<>(
valueInOnNext, valueInOnComplete, valueInOnError,
error, latch, hadNext, complete);
CoreSubscriberWithContext<String> subscriberWithContext = new CoreSubscriberWithContext<>();

flux.subscribe(subscriberWithContext);

if (!latch.await(100, TimeUnit.MILLISECONDS)) {
if (!subscriberWithContext.latch.await(100, TimeUnit.MILLISECONDS)) {
throw new TimeoutException("timed out");
}

assertThat(error.get()).isNull();
assertThat(complete.get()).isTrue();
assertThat(subscriberWithContext.error.get()).isNull();
assertThat(subscriberWithContext.complete.get()).isTrue();

// We can't do anything here. subscribe(CoreSubscriber) is abstract in
// CoreSubscriber interface and we have no means to intercept the calls to
// restore ThreadLocals.
assertThat(valueInOnNext.get()).isEqualTo("ref_init");
assertThat(valueInOnComplete.get()).isEqualTo("ref_init");
assertThat(subscriberWithContext.valueInOnNext.get()).isEqualTo("ref_init");
assertThat(subscriberWithContext.valueInOnComplete.get()).isEqualTo("ref_init");
}

// Fundamental tests for Mono
Expand All @@ -618,35 +580,25 @@ void internalMonoFlatMapSubscribeNoFusion() {

@Test
void directMonoSubscribeAsCoreSubscriber() throws InterruptedException, TimeoutException {
AtomicReference<String> valueInOnNext = new AtomicReference<>();
AtomicReference<String> valueInOnComplete = new AtomicReference<>();
AtomicReference<String> valueInOnError = new AtomicReference<>();
AtomicReference<Throwable> error = new AtomicReference<>();
AtomicBoolean complete = new AtomicBoolean();
AtomicBoolean hadNext = new AtomicBoolean();
CountDownLatch latch = new CountDownLatch(1);

Mono<String> mono = new ThreadSwitchingMono<>("Hello", executorService);

CoreSubscriberWithContext<String> subscriberWithContext =
new CoreSubscriberWithContext<>(
valueInOnNext, valueInOnComplete, valueInOnError,
error, latch, hadNext, complete);
new CoreSubscriberWithContext<>();

mono.subscribe(subscriberWithContext);

if (!latch.await(100, TimeUnit.MILLISECONDS)) {
if (!subscriberWithContext.latch.await(100, TimeUnit.MILLISECONDS)) {
throw new TimeoutException("timed out");
}

assertThat(error.get()).isNull();
assertThat(complete.get()).isTrue();
assertThat(subscriberWithContext.error.get()).isNull();
assertThat(subscriberWithContext.complete.get()).isTrue();

// We can't do anything here. subscribe(CoreSubscriber) is abstract in
// CoreSubscriber interface and we have no means to intercept the calls to
// restore ThreadLocals.
assertThat(valueInOnNext.get()).isEqualTo("ref_init");
assertThat(valueInOnComplete.get()).isEqualTo("ref_init");
assertThat(subscriberWithContext.valueInOnNext.get()).isEqualTo("ref_init");
assertThat(subscriberWithContext.valueInOnComplete.get()).isEqualTo("ref_init");
}

// Flux tests
Expand Down Expand Up @@ -1925,29 +1877,22 @@ void printInterestingClasses() throws Exception {

private class CoreSubscriberWithContext<T> implements CoreSubscriber<T> {

private final AtomicReference<String> valueInOnNext;
private final AtomicReference<String> valueInOnComplete;
private final AtomicReference<String> valueInOnError;
private final AtomicReference<Throwable> error;
private final CountDownLatch latch;
private final AtomicBoolean complete;
private final AtomicBoolean hadNext;

public CoreSubscriberWithContext(
AtomicReference<String> valueInOnNext,
AtomicReference<String> valueInOnComplete,
AtomicReference<String> valueInOnError,
AtomicReference<Throwable> error,
CountDownLatch latch,
AtomicBoolean hadNext,
AtomicBoolean complete) {
this.valueInOnNext = valueInOnNext;
this.valueInOnComplete = valueInOnComplete;
this.valueInOnError = valueInOnError;
this.error = error;
this.latch = latch;
this.hadNext = hadNext;
this.complete = complete;
final AtomicReference<String> valueInOnNext;
final AtomicReference<String> valueInOnComplete;
final AtomicReference<String> valueInOnError;
final AtomicReference<Throwable> error;
final CountDownLatch latch;
final AtomicBoolean complete;
final AtomicBoolean hadNext;

public CoreSubscriberWithContext() {
this.valueInOnNext = new AtomicReference<>();
this.valueInOnComplete = new AtomicReference<>();
this.valueInOnError = new AtomicReference<>();
this.error = new AtomicReference<>();
this.complete = new AtomicBoolean();
this.hadNext = new AtomicBoolean();
this.latch = new CountDownLatch(1);
}

@Override
Expand Down

0 comments on commit be01f56

Please sign in to comment.