Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread interruption flag leaks from BoundedElasticThreadPerTaskScheduler thread into user code #3948

Open
petkov opened this issue Dec 4, 2024 · 2 comments
Labels
type/bug A general bug
Milestone

Comments

@petkov
Copy link

petkov commented Dec 4, 2024

When virtual threads are enabled with reactor.schedulers.defaultBoundedElasticOnVirtualThreads property, BoundedElasticThreadPerTaskScheduler thread leaks interrupted signal into user code. Below is the test that shows that:

    @Test
    void shouldOnErrorResumeLambdaBeExecutedInsideUninterruptedThread() {
        System.setProperty("reactor.schedulers.defaultBoundedElasticOnVirtualThreads", "true");
        Mono.fromCallable(() -> {
                throw new RuntimeException("$ERROR$");
            })
            .subscribeOn(Schedulers.boundedElastic())
            .onErrorResume(e -> {
                Assertions.assertFalse(Thread.interrupted()); // The test fails here
                return Mono.empty();
            })
            .block();
    }

Expected Behavior

The lambda which is called by onErrorResume should be run on a thread that is not interrupted.

Actual Behavior

The lambda which is called by onErrorResume is run on an interrupted Thread.

Reactor Core version: 3.6.6

Notes

Bellow test passes if an exception is thrown inside map function:

    @Test
    void shouldOnErrorResumeLambdaBeExecutedInsideUninterruptedThread() {
        System.setProperty("reactor.schedulers.defaultBoundedElasticOnVirtualThreads", "true");
        Mono.fromCallable(() -> {
                return "NOT_EMPTY";
            })
            .map(s -> {
                throw new RuntimeException("$ERROR$");
            })
            .subscribeOn(Schedulers.boundedElastic())
            .onErrorResume(e -> {
                Assertions.assertFalse(Thread.interrupted()); // The test fails here
                return Mono.empty();
            })
            .block();
    }
@chemicL chemicL added the type/bug A general bug label Dec 10, 2024
@chemicL chemicL added this to the 3.6.14 milestone Dec 10, 2024
@chemicL
Copy link
Member

chemicL commented Dec 10, 2024

Thank you for the report. It indeed looks like a bug, I'll try to review the implementation and figure out what to do.

@kwondh5217
Copy link

kwondh5217 commented Jan 22, 2025

Hi @chemicL !

I might be missing something here, but It seems that the interrupted flag persists because the thread isn’t in a blocking state, which prevents the flag from resetting.

In the first test, the interruption occurs before the error handler (onErrorResume) is executed, so the interrupted flag leaks into user code. However, in the second test, the error handler (onErrorResume) executes before the interrupt is triggered, which allows the test case to pass as the interrupted flag hasn't been set yet.

I was wondering if this approach might help:
SchedulerTask.dispose() in BoundedElasticThreadPerTaskScheduler

if (isRunning(previousState)) {
 if (hasFuture(previousState)) {
  this.scheduledFuture.cancel(true);
 }
 if (this.carrier.getState() == Thread.State.WAITING ||
   this.carrier.getState() == Thread.State.TIMED_WAITING ||
   this.carrier.getState() == Thread.State.BLOCKED) {
  this.carrier.interrupt();
 }
 return;
}

After this change, the test cases pass as expected. It ensures that the interrupt is only triggered when the thread is in a waiting or blocked state, potentially avoiding unnecessary leaks into user code.

What do you think about this adjustment? I'm open to feedback if I misunderstood anything! 🙇

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug A general bug
Projects
None yet
Development

No branches or pull requests

3 participants