Skip to content
This repository has been archived by the owner on Oct 8, 2020. It is now read-only.

Commit

Permalink
feat(core): Add config flag for disabling QueueProcessor (#38)
Browse files Browse the repository at this point in the history
  • Loading branch information
robzienert committed May 23, 2018
1 parent 39872bf commit 016601a
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ package com.netflix.spinnaker.q

import com.netflix.spinnaker.KotlinOpen
import com.netflix.spinnaker.q.metrics.EventPublisher
import com.netflix.spinnaker.q.metrics.MessageDead
import com.netflix.spinnaker.q.metrics.HandlerThrewError
import com.netflix.spinnaker.q.metrics.MessageDead
import com.netflix.spinnaker.q.metrics.NoHandlerCapacity
import org.slf4j.Logger
import org.slf4j.LoggerFactory.getLogger
import org.springframework.scheduling.annotation.Scheduled
import java.time.Duration
import java.util.Random
import java.util.*
import java.util.concurrent.RejectedExecutionException
import javax.annotation.PostConstruct

Expand All @@ -43,7 +43,8 @@ class QueueProcessor(
private val deadMessageHandler: DeadMessageCallback,
private val fillExecutorEachCycle: Boolean = false,
private val requeueDelay : Duration = Duration.ofSeconds(0),
private val requeueMaxJitter : Duration = Duration.ofSeconds(0)
private val requeueMaxJitter : Duration = Duration.ofSeconds(0),
private val enabled: Boolean = true
) {
private val log: Logger = getLogger(javaClass)
private val random: Random = Random()
Expand All @@ -54,7 +55,7 @@ class QueueProcessor(
*/
@Scheduled(fixedDelayString = "\${queue.poll.frequency.ms:10}")
fun poll() =
activator.ifEnabled {
ifEnabled {
if (executor.hasCapacity()) {
if (fillExecutorEachCycle) {
val availableCapacity = executor.availableCapacity()
Expand Down Expand Up @@ -114,6 +115,12 @@ class QueueProcessor(
}
}

private fun ifEnabled(fn: () -> Unit) {
if (enabled) {
activator.ifEnabled(fn)
}
}

private val handlerCache = mutableMapOf<Class<out Message>, MessageHandler<*>>()

private fun handlerFor(message: Message) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ class QueueConfiguration {
deadMessageHandler,
queueProperties.fillExecutorEachCycle,
Duration.ofSeconds(queueProperties.requeueDelaySeconds),
Duration.ofSeconds(queueProperties.requeueMaxJitterSeconds)
Duration.ofSeconds(queueProperties.requeueMaxJitterSeconds),
queueProperties.enabled
)

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties

@ConfigurationProperties("keiko.queue")
class QueueProperties {
var enabled: Boolean = true
var handlerThreadNamePrefix: String = "handlers-"
var handlerCorePoolSize: Int = 20
var handlerMaxPoolSize: Int = 20
Expand Down

0 comments on commit 016601a

Please sign in to comment.