Skip to content

Commit

Permalink
Skip automatic context-propagation in Flux.generate (#3848)
Browse files Browse the repository at this point in the history
As Flux.generate uses SynchronousSink, which should not be used
asynchronously, we can eliminate the unnecessary ThreadLocal restoration
from this operator. 

Related to #3840.
  • Loading branch information
chemicL authored Jul 17, 2024
1 parent ebded61 commit eaa889e
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -74,18 +74,15 @@ final class FluxGenerate<T, S>

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
CoreSubscriber<? super T> wrapped =
Operators.restoreContextOnSubscriberIfAutoCPEnabled(this, actual);

S state;

try {
state = stateSupplier.call();
} catch (Throwable e) {
Operators.error(wrapped, Operators.onOperatorError(e, wrapped.currentContext()));
Operators.error(actual, Operators.onOperatorError(e, actual.currentContext()));
return;
}
wrapped.onSubscribe(new GenerateSubscription<>(wrapped, state, generator, stateConsumer));
actual.onSubscribe(new GenerateSubscription<>(actual, state, generator, stateConsumer));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -763,16 +763,6 @@ void fluxConcatIterable() {
// Direct subscription
}

@Test
void fluxGenerate() {
assertThreadLocalsPresentInFlux(() -> Flux.generate(sink -> {
sink.next("Hello");
// the generator is checked if any signal was delivered by the consumer
// so we perform asynchronous completion only
executorService.submit(sink::complete);
}));
}

@Test
void fluxCombineLatest() {
assertThreadLocalsPresentInFlux(() ->
Expand Down

0 comments on commit eaa889e

Please sign in to comment.