diff --git a/keiko-core/src/main/kotlin/com/netflix/spinnaker/q/QueueProcessor.kt b/keiko-core/src/main/kotlin/com/netflix/spinnaker/q/QueueProcessor.kt index 066ca19..02242c6 100644 --- a/keiko-core/src/main/kotlin/com/netflix/spinnaker/q/QueueProcessor.kt +++ b/keiko-core/src/main/kotlin/com/netflix/spinnaker/q/QueueProcessor.kt @@ -18,14 +18,14 @@ package com.netflix.spinnaker.q import com.netflix.spinnaker.KotlinOpen import com.netflix.spinnaker.q.metrics.EventPublisher -import com.netflix.spinnaker.q.metrics.MessageDead import com.netflix.spinnaker.q.metrics.HandlerThrewError +import com.netflix.spinnaker.q.metrics.MessageDead import com.netflix.spinnaker.q.metrics.NoHandlerCapacity import org.slf4j.Logger import org.slf4j.LoggerFactory.getLogger import org.springframework.scheduling.annotation.Scheduled import java.time.Duration -import java.util.Random +import java.util.* import java.util.concurrent.RejectedExecutionException import javax.annotation.PostConstruct @@ -43,7 +43,8 @@ class QueueProcessor( private val deadMessageHandler: DeadMessageCallback, private val fillExecutorEachCycle: Boolean = false, private val requeueDelay : Duration = Duration.ofSeconds(0), - private val requeueMaxJitter : Duration = Duration.ofSeconds(0) + private val requeueMaxJitter : Duration = Duration.ofSeconds(0), + private val enabled: Boolean = true ) { private val log: Logger = getLogger(javaClass) private val random: Random = Random() @@ -54,7 +55,7 @@ class QueueProcessor( */ @Scheduled(fixedDelayString = "\${queue.poll.frequency.ms:10}") fun poll() = - activator.ifEnabled { + ifEnabled { if (executor.hasCapacity()) { if (fillExecutorEachCycle) { val availableCapacity = executor.availableCapacity() @@ -114,6 +115,12 @@ class QueueProcessor( } } + private fun ifEnabled(fn: () -> Unit) { + if (enabled) { + activator.ifEnabled(fn) + } + } + private val handlerCache = mutableMapOf, MessageHandler<*>>() private fun handlerFor(message: Message) = diff --git a/keiko-spring/src/main/kotlin/com/netflix/spinnaker/config/QueueConfiguration.kt b/keiko-spring/src/main/kotlin/com/netflix/spinnaker/config/QueueConfiguration.kt index b34fab2..a9ce82b 100644 --- a/keiko-spring/src/main/kotlin/com/netflix/spinnaker/config/QueueConfiguration.kt +++ b/keiko-spring/src/main/kotlin/com/netflix/spinnaker/config/QueueConfiguration.kt @@ -67,7 +67,8 @@ class QueueConfiguration { deadMessageHandler, queueProperties.fillExecutorEachCycle, Duration.ofSeconds(queueProperties.requeueDelaySeconds), - Duration.ofSeconds(queueProperties.requeueMaxJitterSeconds) + Duration.ofSeconds(queueProperties.requeueMaxJitterSeconds), + queueProperties.enabled ) @Bean diff --git a/keiko-spring/src/main/kotlin/com/netflix/spinnaker/config/QueueProperties.kt b/keiko-spring/src/main/kotlin/com/netflix/spinnaker/config/QueueProperties.kt index 4e422d4..7a82a8b 100644 --- a/keiko-spring/src/main/kotlin/com/netflix/spinnaker/config/QueueProperties.kt +++ b/keiko-spring/src/main/kotlin/com/netflix/spinnaker/config/QueueProperties.kt @@ -20,6 +20,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties @ConfigurationProperties("keiko.queue") class QueueProperties { + var enabled: Boolean = true var handlerThreadNamePrefix: String = "handlers-" var handlerCorePoolSize: Int = 20 var handlerMaxPoolSize: Int = 20