Skip to content

Commit

Permalink
GH-9825: DelayerEndpointSpec: Set TaskScheduler to the handler as well
Browse files Browse the repository at this point in the history
Fixes: #9825
Issue link: #9825

The `DelayerEndpointSpec` extends `ConsumerEndpointSpec` which has a `taskScheduler()` option.
However this is set only to the endpoint for this `MessageHandler`.

* Override `taskScheduler()` method on the `DelayerEndpointSpec` to set
the provided `TaskScheduler` to the `DelayHandler` as well

(cherry picked from commit 12fee0a)
  • Loading branch information
artembilan authored and spring-builds committed Feb 13, 2025
1 parent 7958d58 commit cb85fe0
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.springframework.integration.transaction.TransactionInterceptorBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.transaction.TransactionManager;
import org.springframework.transaction.interceptor.TransactionInterceptor;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -243,4 +244,16 @@ public DelayerEndpointSpec messageGroupId(String messageGroupId) {
return this;
}

/**
* Set a provided {@link TaskScheduler} into the {@link DelayHandler},
* as well as call {@code super} to set it into an endpoint for this handler (if necessary).
* @param taskScheduler the {@link TaskScheduler} to use.
* @return the spec
*/
@Override
public DelayerEndpointSpec taskScheduler(TaskScheduler taskScheduler) {
this.handler.setTaskScheduler(taskScheduler);
return super.taskScheduler(taskScheduler);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.gateway.GatewayProxyFactoryBean;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.handler.DelayHandler;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer;
import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice;
Expand Down Expand Up @@ -161,6 +162,10 @@ public class IntegrationFlowTests {
@Qualifier("bridgeFlow2Input")
private MessageChannel bridgeFlow2Input;

@Autowired
@Qualifier("delayer.handler")
DelayHandler delayHandler;

@Autowired
@Qualifier("bridgeFlow2Output")
private PollableChannel bridgeFlow2Output;
Expand Down Expand Up @@ -259,6 +264,8 @@ public void testBridge() {
assertThat(reply).isNotNull();
assertThat(reply.getPayload()).isEqualTo("test");
assertThat(this.delayedAdvice.getInvoked()).isTrue();

assertThat(TestUtils.getPropertyValue(this.delayHandler, "taskScheduler")).isSameAs(this.customScheduler);
}

@Test
Expand Down Expand Up @@ -812,15 +819,17 @@ public IntegrationFlow bridgeFlow() {
}

@Bean
public IntegrationFlow bridgeFlow2() {
public IntegrationFlow bridgeFlow2(TaskScheduler customScheduler) {
return IntegrationFlow.from("bridgeFlow2Input")
.bridge(c -> c.autoStartup(false).id("bridge"))
.fixedSubscriberChannel()
.delay(d -> d
.messageGroupId("delayer")
.delayExpression("200")
.advice(this.delayedAdvice)
.messageStore(this.messageStore()))
.messageStore(messageStore())
.taskScheduler(customScheduler)
.id("delayer"))
.channel(MessageChannels.queue("bridgeFlow2Output"))
.get();
}
Expand All @@ -833,8 +842,8 @@ public SimpleMessageStore messageStore() {
@Bean
public IntegrationFlow claimCheckFlow() {
return IntegrationFlow.from("claimCheckInput")
.claimCheckIn(this.messageStore())
.claimCheckOut(this.messageStore())
.claimCheckIn(messageStore())
.claimCheckOut(messageStore())
.get();
}

Expand Down

0 comments on commit cb85fe0

Please sign in to comment.