Skip to content
This repository has been archived by the owner on Oct 8, 2020. It is now read-only.

Commit

Permalink
feat(queue): mysql 5.7 compatible queue implementation (#60)
Browse files Browse the repository at this point in the history
  • Loading branch information
asher committed Sep 6, 2019
1 parent cc64092 commit bcfb098
Show file tree
Hide file tree
Showing 19 changed files with 1,423 additions and 42 deletions.
14 changes: 14 additions & 0 deletions keiko-core/src/main/kotlin/com/netflix/spinnaker/q/Message.kt
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,15 @@ const val JSON_NAME_PROPERTY = "kind"
@JsonTypeInfo(use = NAME, property = JSON_NAME_PROPERTY)
interface Attribute

@Deprecated("Message attributes are intended for internal keiko use only, handlers should " +
"limit attempts or run-time through other means.")
/**
* An attribute representing the maximum number of retries for a message.
*
* @deprecated This attribute originally combined the number of times a message
* could be retried due to not being acked within [AckTimeoutSeconds] as well as
* the number of times a properly acked retryable message could be requeued which
* is normal behavior for many Orca message types.
*/
@JsonTypeName("maxAttempts")
data class MaxAttemptsAttribute(val maxAttempts: Int = -1) : Attribute
Expand All @@ -71,3 +78,10 @@ data class MaxAttemptsAttribute(val maxAttempts: Int = -1) : Attribute
*/
@JsonTypeName("attempts")
data class AttemptsAttribute(var attempts: Int = 0) : Attribute

/**
* An attribute representing the number of times a message has been retried
* due to ack timeouts.
*/
@JsonTypeName("ackAttempts")
data class AckAttemptsAttribute(var ackAttempts: Int = 0) : Attribute
10 changes: 10 additions & 0 deletions keiko-core/src/main/kotlin/com/netflix/spinnaker/q/Queue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ interface Queue {
*/
fun poll(callback: QueueCallback): Unit

/**
* Polls the queue for ready messages, processing up-to [maxMessages].
*/
fun poll(maxMessages: Int, callback: QueueCallback): Unit

/**
* Push [message] for immediate delivery.
*/
Expand Down Expand Up @@ -93,6 +98,11 @@ interface Queue {
*/
val deadMessageHandlers: List<DeadMessageCallback>

/**
* Denotes a queue implementation capable of processing multiple messages per poll.
*/
val canPollMany: Boolean

companion object {
/**
* The maximum number of times an un-acknowledged message will be retried
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,17 @@ class QueueProcessor(
* Polls the [Queue] once (or more if [fillExecutorEachCycle] is true) so
* long as [executor] has capacity.
*/
@Scheduled(fixedDelayString = "\${queue.poll.frequency.ms:10}")
@Scheduled(fixedDelayString = "\${queue.poll.frequency.ms:50}")
fun poll() =
ifEnabled {
if (executor.hasCapacity()) {
if (fillExecutorEachCycle) {
executor.availableCapacity().downTo(1).forEach {
pollOnce()
if (queue.canPollMany) {
queue.poll(executor.availableCapacity(), callback)
} else {
executor.availableCapacity().downTo(1).forEach {
pollOnce()
}
}
} else {
pollOnce()
Expand All @@ -72,44 +76,46 @@ class QueueProcessor(
* Polls the [Queue] once to attempt to read a single message.
*/
private fun pollOnce() {
queue.poll { message, ack ->
log.info("Received message $message")
val handler = handlerFor(message)
if (handler != null) {
try {
executor.execute {
try {
handler.invoke(message)
ack.invoke()
} catch (e: Throwable) {
// Something very bad is happening
log.error("Unhandled throwable from $message", e)
publisher.publishEvent(HandlerThrewError(message))
}
}
} catch (e: RejectedExecutionException) {
var requeueDelaySeconds = requeueDelay.seconds
if (requeueMaxJitter.seconds > 0) {
requeueDelaySeconds += random.nextInt(requeueMaxJitter.seconds.toInt())
queue.poll(callback)
}

val callback: QueueCallback = { message, ack ->
log.info("Received message $message")
val handler = handlerFor(message)
if (handler != null) {
try {
executor.execute {
try {
handler.invoke(message)
ack.invoke()
} catch (e: Throwable) {
// Something very bad is happening
log.error("Unhandled throwable from $message", e)
publisher.publishEvent(HandlerThrewError(message))
}
}
} catch (e: RejectedExecutionException) {
var requeueDelaySeconds = requeueDelay.seconds
if (requeueMaxJitter.seconds > 0) {
requeueDelaySeconds += random.nextInt(requeueMaxJitter.seconds.toInt())
}

val requeueDelay = Duration.ofSeconds(requeueDelaySeconds)
val numberOfAttempts = message.getAttribute<AttemptsAttribute>()
val requeueDelay = Duration.ofSeconds(requeueDelaySeconds)
val numberOfAttempts = message.getAttribute<AttemptsAttribute>()

log.warn(
"Executor at capacity, re-queuing message {} (delay: {}, attempts: {})",
message,
requeueDelay,
numberOfAttempts,
e
)
queue.push(message, requeueDelay)
}
} else {
log.error("Unsupported message type ${message.javaClass.simpleName}: $message")
deadMessageHandler.invoke(queue, message)
publisher.publishEvent(MessageDead)
log.warn(
"Executor at capacity, re-queuing message {} (delay: {}, attempts: {})",
message,
requeueDelay,
numberOfAttempts,
e
)
queue.push(message, requeueDelay)
}
} else {
log.error("Unsupported message type ${message.javaClass.simpleName}: $message")
deadMessageHandler.invoke(queue, message)
publisher.publishEvent(MessageDead)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.netflix.spinnaker.q.memory
import com.netflix.spinnaker.q.DeadMessageCallback
import com.netflix.spinnaker.q.Message
import com.netflix.spinnaker.q.Queue
import com.netflix.spinnaker.q.QueueCallback
import com.netflix.spinnaker.q.metrics.EventPublisher
import com.netflix.spinnaker.q.metrics.MessageAcknowledged
import com.netflix.spinnaker.q.metrics.MessageDead
Expand Down Expand Up @@ -51,6 +52,7 @@ class InMemoryQueue(
private val clock: Clock,
override val ackTimeout: TemporalAmount = Duration.ofMinutes(1),
override val deadMessageHandlers: List<DeadMessageCallback>,
override val canPollMany: Boolean = false,
override val publisher: EventPublisher
) : MonitorableQueue {

Expand Down Expand Up @@ -78,6 +80,10 @@ class InMemoryQueue(
}
}

override fun poll(maxMessages: Int, callback: QueueCallback) {
poll(callback)
}

override fun push(message: Message, delay: TemporalAmount) {
val existed = queue.removeIf { it.payload == message }
queue.put(Envelope(message, clock.instant().plus(delay), clock))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ import java.util.Optional

@Configuration
@EnableConfigurationProperties(RedisQueueProperties::class)
@ConditionalOnProperty(
value = ["keiko.queue.redis.enabled"],
havingValue = "true",
matchIfMissing = true)
class RedisQueueConfiguration {

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ abstract class AbstractRedisQueue(
private val serializationMigrator: Optional<SerializationMigrator>,
override val ackTimeout: TemporalAmount = Duration.ofMinutes(1),
override val deadMessageHandlers: List<DeadMessageCallback>,
override val canPollMany: Boolean = false,
override val publisher: EventPublisher

) : MonitorableQueue {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.netflix.spinnaker.q.DeadMessageCallback
import com.netflix.spinnaker.q.MaxAttemptsAttribute
import com.netflix.spinnaker.q.Message
import com.netflix.spinnaker.q.Queue
import com.netflix.spinnaker.q.QueueCallback
import com.netflix.spinnaker.q.metrics.EventPublisher
import com.netflix.spinnaker.q.metrics.LockFailed
import com.netflix.spinnaker.q.metrics.MessageAcknowledged
Expand Down Expand Up @@ -50,6 +51,7 @@ class RedisClusterQueue(
private val serializationMigrator: Optional<SerializationMigrator>,
override val ackTimeout: TemporalAmount = Duration.ofMinutes(1),
override val deadMessageHandlers: List<DeadMessageCallback>,
override val canPollMany: Boolean = false,
override val publisher: EventPublisher
) : AbstractRedisQueue(
clock,
Expand All @@ -58,6 +60,7 @@ class RedisClusterQueue(
serializationMigrator,
ackTimeout,
deadMessageHandlers,
canPollMany,
publisher
) {

Expand All @@ -73,7 +76,7 @@ class RedisClusterQueue(

init {
cacheScript()
log.info("Configured queue: $queueName")
log.info("Configured $javaClass queue: $queueName")
}

final override fun cacheScript() {
Expand Down Expand Up @@ -104,6 +107,10 @@ class RedisClusterQueue(
fire(QueuePolled)
}

override fun poll(maxMessages: Int, callback: QueueCallback) {
poll(callback)
}

override fun push(message: Message, delay: TemporalAmount) {
jedisCluster.firstFingerprint(queueKey, message.fingerprint()).also { fingerprint ->
if (fingerprint != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.netflix.spinnaker.q.DeadMessageCallback
import com.netflix.spinnaker.q.MaxAttemptsAttribute
import com.netflix.spinnaker.q.Message
import com.netflix.spinnaker.q.Queue
import com.netflix.spinnaker.q.QueueCallback
import com.netflix.spinnaker.q.metrics.EventPublisher
import com.netflix.spinnaker.q.metrics.LockFailed
import com.netflix.spinnaker.q.metrics.MessageAcknowledged
Expand Down Expand Up @@ -66,6 +67,7 @@ class RedisQueue(
private val serializationMigrator: Optional<SerializationMigrator>,
override val ackTimeout: TemporalAmount = Duration.ofMinutes(1),
override val deadMessageHandlers: List<DeadMessageCallback>,
override val canPollMany: Boolean = false,
override val publisher: EventPublisher
) : AbstractRedisQueue(
clock,
Expand All @@ -74,6 +76,7 @@ class RedisQueue(
serializationMigrator,
ackTimeout,
deadMessageHandlers,
canPollMany,
publisher
) {

Expand All @@ -89,7 +92,7 @@ class RedisQueue(

init {
cacheScript()
log.info("Configured queue: $queueName")
log.info("Configured $javaClass queue: $queueName")
}

final override fun cacheScript() {
Expand Down Expand Up @@ -124,6 +127,10 @@ class RedisQueue(
}
}

override fun poll(maxMessages: Int, callback: QueueCallback) {
poll(callback)
}

override fun push(message: Message, delay: TemporalAmount) {
pool.resource.use { redis ->
redis.firstFingerprint(queueKey, message.fingerprint()).also { fingerprint ->
Expand Down
22 changes: 22 additions & 0 deletions keiko-sql/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
apply from: "$rootDir/gradle/spek.gradle"

dependencies {
api project(":keiko-core")
api "com.fasterxml.jackson.core:jackson-databind"
api "com.fasterxml.jackson.module:jackson-module-kotlin"
api "org.funktionale:funktionale-partials"
api "com.github.ben-manes.caffeine:guava"

implementation "com.netflix.spinnaker.kork:kork-core"
implementation "com.netflix.spinnaker.kork:kork-exceptions"
implementation "com.netflix.spinnaker.kork:kork-sql"
implementation "de.huxhorn.sulky:de.huxhorn.sulky.ulid"
implementation "io.github.resilience4j:resilience4j-retry"
implementation "javax.validation:validation-api"
implementation "org.jooq:jooq"

testImplementation project(":keiko-tck")
testImplementation "com.netflix.spinnaker.kork:kork-sql-test"
testImplementation "org.testcontainers:mysql"
testImplementation "mysql:mysql-connector-java"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.netflix.spinnaker.config

import com.fasterxml.jackson.databind.ObjectMapper
import com.netflix.spinnaker.q.metrics.EventPublisher
import com.netflix.spinnaker.q.migration.SerializationMigrator
import com.netflix.spinnaker.q.sql.SqlDeadMessageHandler
import com.netflix.spinnaker.q.sql.SqlQueue
import org.jooq.DSLContext
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import java.time.Clock
import java.util.Optional

@Configuration
@EnableConfigurationProperties(SqlQueueProperties::class)
@ConditionalOnProperty(
value = ["keiko.queue.sql.enabled"],
havingValue = "true",
matchIfMissing = false)
class SqlQueueConfiguration {

companion object {
const val SCHEMA_VERSION = 1
}

@Bean
@ConditionalOnMissingBean(name = ["queue"])
fun queue(
jooq: DSLContext,
clock: Clock,
mapper: ObjectMapper,
deadMessageHandler: SqlDeadMessageHandler,
publisher: EventPublisher,
serializationMigrator: Optional<SerializationMigrator>,
properties: SqlQueueProperties
) =
SqlQueue(
queueName = properties.queueName,
schemaVersion = SCHEMA_VERSION,
jooq = jooq,
clock = clock,
lockTtlSeconds = properties.lockTtlSeconds,
mapper = mapper,
serializationMigrator = serializationMigrator,
ackTimeout = properties.ackTimeout,
deadMessageHandlers = listOf(deadMessageHandler),
publisher = publisher,
sqlRetryProperties = properties.retries
)

@Bean
@ConditionalOnMissingBean(name = ["sqlDeadMessageHandler"])
fun sqlDeadMessageHandler(
jooq: DSLContext,
clock: Clock,
properties: SqlQueueProperties
) =
SqlDeadMessageHandler(
deadLetterQueueName = properties.deadLetterQueueName,
schemaVersion = SCHEMA_VERSION,
jooq = jooq,
clock = clock,
sqlRetryProperties = properties.retries
)
}
Loading

0 comments on commit bcfb098

Please sign in to comment.