Skip to content

Commit

Permalink
Merge #3979 into 3.7.3
Browse files Browse the repository at this point in the history
  • Loading branch information
chemicL committed Jan 31, 2025
2 parents 8ec3a99 + dda86e8 commit 93a78d0
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2024 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2023-2025 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -1114,7 +1114,9 @@ public void dispose() {
if (hasFuture(previousState)) {
this.scheduledFuture.cancel(true);
}
this.carrier.interrupt();
if (Thread.currentThread() != this.carrier) {
this.carrier.interrupt();
}
return;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2024 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2023-2025 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,7 +25,9 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -855,4 +857,71 @@ void allTasksExecuteInParallelWhenUsingDefaultMax(boolean useWorker) throws Exce
scheduler.dispose();
}
}

// see https://github.com/reactor/reactor-core/issues/3948
@Test
void currentTaskDisposalDoesNotInterruptCurrentThread() {
BoundedElasticThreadPerTaskScheduler scheduler = newScheduler(1, 1);
scheduler.init();

AtomicReference<Disposable> disposable = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> exception = new AtomicReference<>();
AtomicBoolean interrupted = new AtomicBoolean();
Runnable task = () -> {
try {
if (latch.await(1, TimeUnit.SECONDS)) {
disposable.get().dispose();
}
interrupted.set(Thread.interrupted());
} catch (InterruptedException e) {
exception.set(e);
}
};

Disposable futureTask = scheduler.schedule(task);

disposable.set(futureTask);
latch.countDown();

Assertions.assertThat(exception.get()).isNull();
Assertions.assertThat(interrupted.get()).isFalse();
Assertions.assertThat(Thread.interrupted()).isFalse();
}

// see https://github.com/reactor/reactor-core/issues/3948
@Test
void externalTaskDisposalDoesInterruptsExecutingThread() throws InterruptedException {
BoundedElasticThreadPerTaskScheduler scheduler = newScheduler(1, 1);
scheduler.init();

CountDownLatch taskRunning = new CountDownLatch(1);
CountDownLatch taskDone = new CountDownLatch(1);
CountDownLatch taskCanFinish = new CountDownLatch(1);
AtomicReference<Exception> exception = new AtomicReference<>();
AtomicBoolean interrupted = new AtomicBoolean();

Runnable task = () -> {
try {
taskRunning.countDown();
taskCanFinish.await(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
interrupted.set(Thread.interrupted());
exception.set(e);
taskDone.countDown();
}
};

Disposable futureTask = scheduler.schedule(task);

Assertions.assertThat(taskRunning.await(1, TimeUnit.SECONDS)).isTrue();

futureTask.dispose();

Assertions.assertThat(taskDone.await(1, TimeUnit.SECONDS)).isTrue();

Assertions.assertThat(exception.get()).isInstanceOf(InterruptedException.class);
Assertions.assertThat(interrupted.get()).isFalse();
Assertions.assertThat(Thread.interrupted()).isFalse();
}
}

0 comments on commit 93a78d0

Please sign in to comment.