Skip to content
This repository has been archived by the owner on Oct 8, 2020. It is now read-only.

Commit

Permalink
fix(core): Introduce jitter when re-queueing message if executor is f…
Browse files Browse the repository at this point in the history
…ull (#35)

- include more meaningful log output (message, attempts, delay, etc.)
  • Loading branch information
ajordens authored May 8, 2018
1 parent 9bd389e commit 10f01b4
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import com.netflix.spinnaker.q.metrics.NoHandlerCapacity
import org.slf4j.Logger
import org.slf4j.LoggerFactory.getLogger
import org.springframework.scheduling.annotation.Scheduled
import java.time.Duration
import java.util.Random
import java.util.concurrent.RejectedExecutionException
import javax.annotation.PostConstruct

Expand All @@ -37,9 +39,12 @@ class QueueProcessor(
private val activator: Activator,
private val publisher: EventPublisher,
private val deadMessageHandler: DeadMessageCallback,
private val fillExecutorEachCycle: Boolean = false
private val fillExecutorEachCycle: Boolean = false,
private val requeueDelay : Duration = Duration.ofSeconds(0),
private val requeueMaxJitter : Duration = Duration.ofSeconds(0)
) {
private val log: Logger = getLogger(javaClass)
private val random: Random = Random()

/**
* Polls the [Queue] once (or more if [fillExecutorEachCycle] is true) so
Expand Down Expand Up @@ -82,8 +87,22 @@ class QueueProcessor(
}
}
} catch (e: RejectedExecutionException) {
log.warn("Executor at capacity, immediately re-queuing message", e)
queue.push(message)
var requeueDelaySeconds = requeueDelay.seconds
if (requeueMaxJitter.seconds > 0) {
requeueDelaySeconds += random.nextInt(requeueMaxJitter.seconds.toInt())
}

val requeueDelay = Duration.ofSeconds(requeueDelaySeconds)
val numberOfAttempts = message.getAttribute<AttemptsAttribute>()

log.warn(
"Executor at capacity, re-queuing message {} (delay: {}, attempts: {})",
message,
requeueDelay,
numberOfAttempts,
e
)
queue.push(message, requeueDelay)
}
} else {
log.error("Unsupported message type ${message.javaClass.simpleName}: $message")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.springframework.context.annotation.Configuration
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
import java.time.Clock
import java.time.Duration

@Configuration
@EnableConfigurationProperties(QueueProperties::class)
Expand Down Expand Up @@ -57,7 +58,17 @@ class QueueConfiguration {
publisher: EventPublisher,
queueProperties: QueueProperties,
deadMessageHandler: DeadMessageCallback
) = QueueProcessor(queue, executor, handlers, activator, publisher, deadMessageHandler, queueProperties.fillExecutorEachCycle)
) = QueueProcessor(
queue,
executor,
handlers,
activator,
publisher,
deadMessageHandler,
queueProperties.fillExecutorEachCycle,
Duration.ofSeconds(queueProperties.requeueDelaySeconds),
Duration.ofSeconds(queueProperties.requeueMaxJitterSeconds)
)

@Bean
fun queueEventPublisher(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ class QueueProperties {
var handlerCorePoolSize: Int = 20
var handlerMaxPoolSize: Int = 20
var fillExecutorEachCycle: Boolean = false
var requeueDelaySeconds : Long = 0
var requeueMaxJitterSeconds : Long = 0
}

0 comments on commit 10f01b4

Please sign in to comment.