diff --git a/keiko-core/src/main/kotlin/com/netflix/spinnaker/q/Message.kt b/keiko-core/src/main/kotlin/com/netflix/spinnaker/q/Message.kt index 5f2c340..2934220 100644 --- a/keiko-core/src/main/kotlin/com/netflix/spinnaker/q/Message.kt +++ b/keiko-core/src/main/kotlin/com/netflix/spinnaker/q/Message.kt @@ -60,8 +60,15 @@ const val JSON_NAME_PROPERTY = "kind" @JsonTypeInfo(use = NAME, property = JSON_NAME_PROPERTY) interface Attribute +@Deprecated("Message attributes are intended for internal keiko use only, handlers should " + + "limit attempts or run-time through other means.") /** * An attribute representing the maximum number of retries for a message. + * + * @deprecated This attribute originally combined the number of times a message + * could be retried due to not being acked within [AckTimeoutSeconds] as well as + * the number of times a properly acked retryable message could be requeued which + * is normal behavior for many Orca message types. */ @JsonTypeName("maxAttempts") data class MaxAttemptsAttribute(val maxAttempts: Int = -1) : Attribute @@ -71,3 +78,10 @@ data class MaxAttemptsAttribute(val maxAttempts: Int = -1) : Attribute */ @JsonTypeName("attempts") data class AttemptsAttribute(var attempts: Int = 0) : Attribute + +/** + * An attribute representing the number of times a message has been retried + * due to ack timeouts. + */ +@JsonTypeName("ackAttempts") +data class AckAttemptsAttribute(var ackAttempts: Int = 0) : Attribute diff --git a/keiko-core/src/main/kotlin/com/netflix/spinnaker/q/Queue.kt b/keiko-core/src/main/kotlin/com/netflix/spinnaker/q/Queue.kt index bbd3256..d85b7dc 100644 --- a/keiko-core/src/main/kotlin/com/netflix/spinnaker/q/Queue.kt +++ b/keiko-core/src/main/kotlin/com/netflix/spinnaker/q/Queue.kt @@ -46,6 +46,11 @@ interface Queue { */ fun poll(callback: QueueCallback): Unit + /** + * Polls the queue for ready messages, processing up-to [maxMessages]. + */ + fun poll(maxMessages: Int, callback: QueueCallback): Unit + /** * Push [message] for immediate delivery. */ @@ -93,6 +98,11 @@ interface Queue { */ val deadMessageHandlers: List + /** + * Denotes a queue implementation capable of processing multiple messages per poll. + */ + val canPollMany: Boolean + companion object { /** * The maximum number of times an un-acknowledged message will be retried diff --git a/keiko-core/src/main/kotlin/com/netflix/spinnaker/q/QueueProcessor.kt b/keiko-core/src/main/kotlin/com/netflix/spinnaker/q/QueueProcessor.kt index f8e309c..4ed0e4f 100644 --- a/keiko-core/src/main/kotlin/com/netflix/spinnaker/q/QueueProcessor.kt +++ b/keiko-core/src/main/kotlin/com/netflix/spinnaker/q/QueueProcessor.kt @@ -52,13 +52,17 @@ class QueueProcessor( * Polls the [Queue] once (or more if [fillExecutorEachCycle] is true) so * long as [executor] has capacity. */ - @Scheduled(fixedDelayString = "\${queue.poll.frequency.ms:10}") + @Scheduled(fixedDelayString = "\${queue.poll.frequency.ms:50}") fun poll() = ifEnabled { if (executor.hasCapacity()) { if (fillExecutorEachCycle) { - executor.availableCapacity().downTo(1).forEach { - pollOnce() + if (queue.canPollMany) { + queue.poll(executor.availableCapacity(), callback) + } else { + executor.availableCapacity().downTo(1).forEach { + pollOnce() + } } } else { pollOnce() @@ -72,44 +76,46 @@ class QueueProcessor( * Polls the [Queue] once to attempt to read a single message. */ private fun pollOnce() { - queue.poll { message, ack -> - log.info("Received message $message") - val handler = handlerFor(message) - if (handler != null) { - try { - executor.execute { - try { - handler.invoke(message) - ack.invoke() - } catch (e: Throwable) { - // Something very bad is happening - log.error("Unhandled throwable from $message", e) - publisher.publishEvent(HandlerThrewError(message)) - } - } - } catch (e: RejectedExecutionException) { - var requeueDelaySeconds = requeueDelay.seconds - if (requeueMaxJitter.seconds > 0) { - requeueDelaySeconds += random.nextInt(requeueMaxJitter.seconds.toInt()) + queue.poll(callback) + } + + val callback: QueueCallback = { message, ack -> + log.info("Received message $message") + val handler = handlerFor(message) + if (handler != null) { + try { + executor.execute { + try { + handler.invoke(message) + ack.invoke() + } catch (e: Throwable) { + // Something very bad is happening + log.error("Unhandled throwable from $message", e) + publisher.publishEvent(HandlerThrewError(message)) } + } + } catch (e: RejectedExecutionException) { + var requeueDelaySeconds = requeueDelay.seconds + if (requeueMaxJitter.seconds > 0) { + requeueDelaySeconds += random.nextInt(requeueMaxJitter.seconds.toInt()) + } - val requeueDelay = Duration.ofSeconds(requeueDelaySeconds) - val numberOfAttempts = message.getAttribute() + val requeueDelay = Duration.ofSeconds(requeueDelaySeconds) + val numberOfAttempts = message.getAttribute() - log.warn( - "Executor at capacity, re-queuing message {} (delay: {}, attempts: {})", - message, - requeueDelay, - numberOfAttempts, - e - ) - queue.push(message, requeueDelay) - } - } else { - log.error("Unsupported message type ${message.javaClass.simpleName}: $message") - deadMessageHandler.invoke(queue, message) - publisher.publishEvent(MessageDead) + log.warn( + "Executor at capacity, re-queuing message {} (delay: {}, attempts: {})", + message, + requeueDelay, + numberOfAttempts, + e + ) + queue.push(message, requeueDelay) } + } else { + log.error("Unsupported message type ${message.javaClass.simpleName}: $message") + deadMessageHandler.invoke(queue, message) + publisher.publishEvent(MessageDead) } } diff --git a/keiko-mem/src/main/kotlin/com/netflix/spinnaker/q/memory/InMemoryQueue.kt b/keiko-mem/src/main/kotlin/com/netflix/spinnaker/q/memory/InMemoryQueue.kt index f31be84..92bc1dd 100644 --- a/keiko-mem/src/main/kotlin/com/netflix/spinnaker/q/memory/InMemoryQueue.kt +++ b/keiko-mem/src/main/kotlin/com/netflix/spinnaker/q/memory/InMemoryQueue.kt @@ -19,6 +19,7 @@ package com.netflix.spinnaker.q.memory import com.netflix.spinnaker.q.DeadMessageCallback import com.netflix.spinnaker.q.Message import com.netflix.spinnaker.q.Queue +import com.netflix.spinnaker.q.QueueCallback import com.netflix.spinnaker.q.metrics.EventPublisher import com.netflix.spinnaker.q.metrics.MessageAcknowledged import com.netflix.spinnaker.q.metrics.MessageDead @@ -51,6 +52,7 @@ class InMemoryQueue( private val clock: Clock, override val ackTimeout: TemporalAmount = Duration.ofMinutes(1), override val deadMessageHandlers: List, + override val canPollMany: Boolean = false, override val publisher: EventPublisher ) : MonitorableQueue { @@ -78,6 +80,10 @@ class InMemoryQueue( } } + override fun poll(maxMessages: Int, callback: QueueCallback) { + poll(callback) + } + override fun push(message: Message, delay: TemporalAmount) { val existed = queue.removeIf { it.payload == message } queue.put(Envelope(message, clock.instant().plus(delay), clock)) diff --git a/keiko-redis-spring/src/main/kotlin/com/netflix/spinnaker/config/RedisQueueConfiguration.kt b/keiko-redis-spring/src/main/kotlin/com/netflix/spinnaker/config/RedisQueueConfiguration.kt index 821b12b..e37c487 100644 --- a/keiko-redis-spring/src/main/kotlin/com/netflix/spinnaker/config/RedisQueueConfiguration.kt +++ b/keiko-redis-spring/src/main/kotlin/com/netflix/spinnaker/config/RedisQueueConfiguration.kt @@ -46,6 +46,10 @@ import java.util.Optional @Configuration @EnableConfigurationProperties(RedisQueueProperties::class) +@ConditionalOnProperty( + value = ["keiko.queue.redis.enabled"], + havingValue = "true", + matchIfMissing = true) class RedisQueueConfiguration { @Bean diff --git a/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/AbstractRedisQueue.kt b/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/AbstractRedisQueue.kt index 7715564..09cea80 100644 --- a/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/AbstractRedisQueue.kt +++ b/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/AbstractRedisQueue.kt @@ -26,6 +26,7 @@ abstract class AbstractRedisQueue( private val serializationMigrator: Optional, override val ackTimeout: TemporalAmount = Duration.ofMinutes(1), override val deadMessageHandlers: List, + override val canPollMany: Boolean = false, override val publisher: EventPublisher ) : MonitorableQueue { diff --git a/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/RedisClusterQueue.kt b/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/RedisClusterQueue.kt index f5b3d57..067cf36 100644 --- a/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/RedisClusterQueue.kt +++ b/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/RedisClusterQueue.kt @@ -8,6 +8,7 @@ import com.netflix.spinnaker.q.DeadMessageCallback import com.netflix.spinnaker.q.MaxAttemptsAttribute import com.netflix.spinnaker.q.Message import com.netflix.spinnaker.q.Queue +import com.netflix.spinnaker.q.QueueCallback import com.netflix.spinnaker.q.metrics.EventPublisher import com.netflix.spinnaker.q.metrics.LockFailed import com.netflix.spinnaker.q.metrics.MessageAcknowledged @@ -50,6 +51,7 @@ class RedisClusterQueue( private val serializationMigrator: Optional, override val ackTimeout: TemporalAmount = Duration.ofMinutes(1), override val deadMessageHandlers: List, + override val canPollMany: Boolean = false, override val publisher: EventPublisher ) : AbstractRedisQueue( clock, @@ -58,6 +60,7 @@ class RedisClusterQueue( serializationMigrator, ackTimeout, deadMessageHandlers, + canPollMany, publisher ) { @@ -73,7 +76,7 @@ class RedisClusterQueue( init { cacheScript() - log.info("Configured queue: $queueName") + log.info("Configured $javaClass queue: $queueName") } final override fun cacheScript() { @@ -104,6 +107,10 @@ class RedisClusterQueue( fire(QueuePolled) } + override fun poll(maxMessages: Int, callback: QueueCallback) { + poll(callback) + } + override fun push(message: Message, delay: TemporalAmount) { jedisCluster.firstFingerprint(queueKey, message.fingerprint()).also { fingerprint -> if (fingerprint != null) { diff --git a/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/RedisQueue.kt b/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/RedisQueue.kt index d444e33..6d3c347 100644 --- a/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/RedisQueue.kt +++ b/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/RedisQueue.kt @@ -24,6 +24,7 @@ import com.netflix.spinnaker.q.DeadMessageCallback import com.netflix.spinnaker.q.MaxAttemptsAttribute import com.netflix.spinnaker.q.Message import com.netflix.spinnaker.q.Queue +import com.netflix.spinnaker.q.QueueCallback import com.netflix.spinnaker.q.metrics.EventPublisher import com.netflix.spinnaker.q.metrics.LockFailed import com.netflix.spinnaker.q.metrics.MessageAcknowledged @@ -66,6 +67,7 @@ class RedisQueue( private val serializationMigrator: Optional, override val ackTimeout: TemporalAmount = Duration.ofMinutes(1), override val deadMessageHandlers: List, + override val canPollMany: Boolean = false, override val publisher: EventPublisher ) : AbstractRedisQueue( clock, @@ -74,6 +76,7 @@ class RedisQueue( serializationMigrator, ackTimeout, deadMessageHandlers, + canPollMany, publisher ) { @@ -89,7 +92,7 @@ class RedisQueue( init { cacheScript() - log.info("Configured queue: $queueName") + log.info("Configured $javaClass queue: $queueName") } final override fun cacheScript() { @@ -124,6 +127,10 @@ class RedisQueue( } } + override fun poll(maxMessages: Int, callback: QueueCallback) { + poll(callback) + } + override fun push(message: Message, delay: TemporalAmount) { pool.resource.use { redis -> redis.firstFingerprint(queueKey, message.fingerprint()).also { fingerprint -> diff --git a/keiko-sql/build.gradle b/keiko-sql/build.gradle new file mode 100644 index 0000000..9a20799 --- /dev/null +++ b/keiko-sql/build.gradle @@ -0,0 +1,22 @@ +apply from: "$rootDir/gradle/spek.gradle" + +dependencies { + api project(":keiko-core") + api "com.fasterxml.jackson.core:jackson-databind" + api "com.fasterxml.jackson.module:jackson-module-kotlin" + api "org.funktionale:funktionale-partials" + api "com.github.ben-manes.caffeine:guava" + + implementation "com.netflix.spinnaker.kork:kork-core" + implementation "com.netflix.spinnaker.kork:kork-exceptions" + implementation "com.netflix.spinnaker.kork:kork-sql" + implementation "de.huxhorn.sulky:de.huxhorn.sulky.ulid" + implementation "io.github.resilience4j:resilience4j-retry" + implementation "javax.validation:validation-api" + implementation "org.jooq:jooq" + + testImplementation project(":keiko-tck") + testImplementation "com.netflix.spinnaker.kork:kork-sql-test" + testImplementation "org.testcontainers:mysql" + testImplementation "mysql:mysql-connector-java" +} diff --git a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlQueueConfiguration.kt b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlQueueConfiguration.kt new file mode 100644 index 0000000..3e8cdb0 --- /dev/null +++ b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlQueueConfiguration.kt @@ -0,0 +1,68 @@ +package com.netflix.spinnaker.config + +import com.fasterxml.jackson.databind.ObjectMapper +import com.netflix.spinnaker.q.metrics.EventPublisher +import com.netflix.spinnaker.q.migration.SerializationMigrator +import com.netflix.spinnaker.q.sql.SqlDeadMessageHandler +import com.netflix.spinnaker.q.sql.SqlQueue +import org.jooq.DSLContext +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import java.time.Clock +import java.util.Optional + +@Configuration +@EnableConfigurationProperties(SqlQueueProperties::class) +@ConditionalOnProperty( + value = ["keiko.queue.sql.enabled"], + havingValue = "true", + matchIfMissing = false) +class SqlQueueConfiguration { + + companion object { + const val SCHEMA_VERSION = 1 + } + + @Bean + @ConditionalOnMissingBean(name = ["queue"]) + fun queue( + jooq: DSLContext, + clock: Clock, + mapper: ObjectMapper, + deadMessageHandler: SqlDeadMessageHandler, + publisher: EventPublisher, + serializationMigrator: Optional, + properties: SqlQueueProperties + ) = + SqlQueue( + queueName = properties.queueName, + schemaVersion = SCHEMA_VERSION, + jooq = jooq, + clock = clock, + lockTtlSeconds = properties.lockTtlSeconds, + mapper = mapper, + serializationMigrator = serializationMigrator, + ackTimeout = properties.ackTimeout, + deadMessageHandlers = listOf(deadMessageHandler), + publisher = publisher, + sqlRetryProperties = properties.retries + ) + + @Bean + @ConditionalOnMissingBean(name = ["sqlDeadMessageHandler"]) + fun sqlDeadMessageHandler( + jooq: DSLContext, + clock: Clock, + properties: SqlQueueProperties + ) = + SqlDeadMessageHandler( + deadLetterQueueName = properties.deadLetterQueueName, + schemaVersion = SCHEMA_VERSION, + jooq = jooq, + clock = clock, + sqlRetryProperties = properties.retries + ) +} diff --git a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlQueueProperties.kt b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlQueueProperties.kt new file mode 100644 index 0000000..2043317 --- /dev/null +++ b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlQueueProperties.kt @@ -0,0 +1,58 @@ +package com.netflix.spinnaker.config + +import com.netflix.spinnaker.kork.sql.config.RetryProperties +import com.netflix.spinnaker.kork.sql.config.SqlRetryProperties +import org.springframework.boot.context.properties.ConfigurationProperties +import org.springframework.validation.annotation.Validated +import java.time.Duration +import javax.validation.constraints.Pattern +import javax.validation.constraints.Positive + +@ConfigurationProperties("keiko.queue.sql") +@Validated +class SqlQueueProperties { + /** + * Enables use of the SqlQueue implementation, disabling RedisQueue. + */ + var enabled: Boolean = false + + /** + * [queueName] namespaces the database tables so that multiple keiko queues can be collocated + * on the same database. When using a globally writeable data store with queue processors in + * multiple regions, it may be desirable to use the region as the [queueName]. Must match + * the regexp `\w+`. + */ + @Pattern(regexp = "^\\w+$") + var queueName: String = "default" + + /** + * [deadLetterQueueName] defaults to [queueName] but can be set independently so that multiple + * queues on the same database can potentially share a DLQ. Must match regexp "\w+". + */ + @Pattern(regexp = "^\\w+$") + var deadLetterQueueName: String = queueName + + /** + * The length of time in seconds that a handler has to complete processing a message, and + * acknowledge that processing has been completed. Messages that are not completed within + * [ackTimeoutSeconds] are returned to the queue, up-to [Queue.maxRetries] times. + */ + var ackTimeout: Duration = Duration.ofMinutes(2) + + /** + * The length of time in seconds that a message with a locked set on the queue table has to + * be moved to the unacked table, signifying that it is actively being processed. Messages + * that are not moved from queued to unacked in [lockTtlSeconds] will have the lock released. + */ + @Positive(message = "lockTtlSeconds must be a positive integer") + val lockTtlSeconds: Int = 20 + + /** + * [SqlRetryProperties] determines how read and write database queries are retried. + * See: https://github.com/spinnaker/kork/blob/master/kork-sql/src/main/kotlin/com/netflix/spinnaker/kork/sql/config/SqlRetryProperties.kt + */ + var retries: SqlRetryProperties = SqlRetryProperties( + transactions = RetryProperties(maxRetries = 10, backoffMs = 100), + reads = RetryProperties() + ) +} diff --git a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlDeadMessageHandler.kt b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlDeadMessageHandler.kt new file mode 100644 index 0000000..4b7d932 --- /dev/null +++ b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlDeadMessageHandler.kt @@ -0,0 +1,110 @@ +package com.netflix.spinnaker.q.sql + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.SerializationFeature +import com.google.common.hash.Hashing +import com.netflix.spinnaker.kork.sql.config.SqlRetryProperties +import com.netflix.spinnaker.q.DeadMessageCallback +import com.netflix.spinnaker.q.Message +import com.netflix.spinnaker.q.Queue +import de.huxhorn.sulky.ulid.ULID +import io.github.resilience4j.retry.Retry +import io.github.resilience4j.retry.RetryConfig +import io.vavr.control.Try +import org.jooq.DSLContext +import org.jooq.exception.SQLDialectNotSupportedException +import org.jooq.impl.DSL +import org.jooq.util.mysql.MySQLDSL +import org.slf4j.LoggerFactory +import java.lang.Exception +import java.nio.charset.StandardCharsets +import java.time.Clock +import java.time.Duration + +class SqlDeadMessageHandler( + deadLetterQueueName: String, + schemaVersion: Int, + private val jooq: DSLContext, + private val clock: Clock, + private val sqlRetryProperties: SqlRetryProperties, + private val ULID: ULID = ULID() +) : DeadMessageCallback { + + companion object { + @Suppress("UnstableApiUsage") + private val hashObjectMapper = ObjectMapper().copy().apply { + enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS) + } + + private val nameSanitization = """[^A-Za-z0-9_]""".toRegex() + + private val log = LoggerFactory.getLogger(SqlDeadMessageHandler::class.java) + } + + private val dlqBase = "keiko_v${schemaVersion}_dlq" + private val dlqTableName = "${dlqBase}_${deadLetterQueueName.replace(nameSanitization, "_")}" + + private val dlqTable = DSL.table(dlqTableName) + private val idField = DSL.field("id") + private val fingerprintField = DSL.field("fingerprint") + private val updatedAtField = DSL.field("updated_at") + private val bodyField = DSL.field("body") + + init { + initTables() + } + + override fun invoke(queue: Queue, message: Message) { + var fingerprint: String? = null + var json: String? = null + + try { + /* Storing the fingerprint may be useful for correlating Queue log messages to DLQ rows */ + fingerprint = message.fingerprint() + json = hashObjectMapper.writeValueAsString(message) + val ulid = ULID.nextValue().toString() + + withRetry { + jooq.insertInto(dlqTable) + .set(idField, ulid) + .set(fingerprintField, fingerprint) + .set(updatedAtField, clock.millis()) + .set(bodyField, json) + .onDuplicateKeyUpdate() + .set(updatedAtField, MySQLDSL.values(updatedAtField) as Any) + .set(bodyField, MySQLDSL.values(bodyField) as Any) + } + } catch (e: Exception) { + log.error("Failed to deadLetter message, fingerprint: $fingerprint, message: $json", e) + } + } + + private fun initTables() { + jooq.execute("CREATE TABLE IF NOT EXISTS $dlqTableName LIKE ${dlqBase}_template") + } + + @Suppress("UnstableApiUsage") + fun Message.fingerprint() = + hashObjectMapper.convertValue(this, MutableMap::class.java) + .apply { remove("attributes") } + .let { + Hashing + .murmur3_128() + .hashString( + "v2:${hashObjectMapper.writeValueAsString(it)}", StandardCharsets.UTF_8) + .toString() + } + + private fun withRetry(action: () -> T): T { + val retry = Retry.of( + "sqlWrite", + RetryConfig.custom() + .maxAttempts(sqlRetryProperties.transactions.maxRetries) + .waitDuration(Duration.ofMillis(sqlRetryProperties.transactions.backoffMs)) + .ignoreExceptions(SQLDialectNotSupportedException::class.java) + .build() + ) + + return Try.ofSupplier(Retry.decorateSupplier(retry, action)).get() + } +} diff --git a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt new file mode 100644 index 0000000..9f9fd7a --- /dev/null +++ b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt @@ -0,0 +1,744 @@ +package com.netflix.spinnaker.q.sql + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.SerializationFeature +import com.fasterxml.jackson.module.kotlin.readValue +import com.google.common.hash.Hashing +import com.netflix.spinnaker.KotlinOpen +import com.netflix.spinnaker.kork.sql.config.SqlRetryProperties +import com.netflix.spinnaker.q.AckAttemptsAttribute +import com.netflix.spinnaker.q.AttemptsAttribute +import com.netflix.spinnaker.q.DeadMessageCallback +import com.netflix.spinnaker.q.MaxAttemptsAttribute +import com.netflix.spinnaker.q.Message +import com.netflix.spinnaker.q.Queue +import com.netflix.spinnaker.q.metrics.EventPublisher +import com.netflix.spinnaker.q.metrics.MessageAcknowledged +import com.netflix.spinnaker.q.metrics.MessageDead +import com.netflix.spinnaker.q.metrics.MessageNotFound +import com.netflix.spinnaker.q.metrics.MessageProcessing +import com.netflix.spinnaker.q.metrics.MessagePushed +import com.netflix.spinnaker.q.metrics.MessageRescheduled +import com.netflix.spinnaker.q.metrics.MessageRetried +import com.netflix.spinnaker.q.metrics.MonitorableQueue +import com.netflix.spinnaker.q.metrics.QueuePolled +import com.netflix.spinnaker.q.metrics.QueueState +import com.netflix.spinnaker.q.metrics.RetryPolled +import com.netflix.spinnaker.q.metrics.fire +import com.netflix.spinnaker.q.migration.SerializationMigrator +import de.huxhorn.sulky.ulid.ULID +import io.github.resilience4j.retry.Retry +import io.github.resilience4j.retry.RetryConfig +import io.vavr.control.Try +import org.funktionale.partials.partially1 +import org.jooq.DSLContext +import org.jooq.exception.SQLDialectNotSupportedException +import org.jooq.impl.DSL +import org.jooq.impl.DSL.count +import org.jooq.impl.DSL.field +import org.jooq.impl.DSL.select +import org.jooq.impl.DSL.sql +import org.jooq.impl.DSL.table +import org.jooq.impl.DSL.update +import org.jooq.util.mysql.MySQLDSL +import org.slf4j.LoggerFactory +import org.springframework.scheduling.annotation.Scheduled +import java.net.InetAddress +import java.nio.charset.StandardCharsets +import java.sql.ResultSet +import java.time.Clock +import java.time.Duration +import java.time.Instant +import java.time.temporal.TemporalAmount +import java.util.Optional +import java.util.concurrent.TimeUnit +import kotlin.Exception +import kotlin.math.max +import kotlin.math.min + +@KotlinOpen +class SqlQueue( + queueName: String, + schemaVersion: Int, + private val jooq: DSLContext, + private val clock: Clock, + private val lockTtlSeconds: Int, + private val mapper: ObjectMapper, + private val serializationMigrator: Optional, + override val ackTimeout: TemporalAmount = Duration.ofMinutes(1), + override val deadMessageHandlers: List, + override val canPollMany: Boolean = true, + override val publisher: EventPublisher, + private val sqlRetryProperties: SqlRetryProperties, + private val ULID: ULID = ULID() +) : MonitorableQueue { + + companion object { + @Suppress("UnstableApiUsage") + /** + * [lockId] is a hash of the hostname, used to claim messages on [queueTable] without + * performing a locking read. + */ + private val lockId = Hashing + .murmur3_128() + .hashString(InetAddress.getLocalHost().hostName, StandardCharsets.UTF_8) + .toString() + + private val hashObjectMapper = ObjectMapper().copy().apply { + enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS) + } + + private val lockedAtRegex = """^\w+:(\d+)$""".toRegex() + private val nameSanitization = """[^A-Za-z0-9_]""".toRegex() + + private val log = LoggerFactory.getLogger(SqlQueue::class.java) + } + + private val sanitizedName = queueName.replace(nameSanitization, "_") + private val queueBase = "keiko_v${schemaVersion}_queue" + private val unackedBase = "keiko_v${schemaVersion}_unacked" + private val messagesBase = "keiko_v${schemaVersion}_messages" + + private val queueTableName = "${queueBase}_$sanitizedName" + private val unackedTableName = "${unackedBase}_$sanitizedName" + private val messagesTableName = "${messagesBase}_$sanitizedName" + + private val queueTable = table(queueTableName) + private val unackedTable = table(unackedTableName) + private val messagesTable = table(messagesTableName) + + private val bodyField = field("body") + private val deliveryField = field("delivery") + private val expiryField = field("expiry") + private val fingerprintField = field("fingerprint") + private val idField = field("id") + private val lockedField = field("locked") + + private val lockTtlDuration = Duration.ofSeconds(lockTtlSeconds.toLong()) + + init { + log.info("Configured $javaClass queue: $queueName") + initTables() + } + + override fun readState(): QueueState { + /** + * Reading counts across all tables in a single query for consistency + */ + val rs = jooq.select() + .select( + select(count()) + .from(queueTable) + .asField("depth"), + select(count()) + .from(queueTable) + .where(deliveryField.le(clock.instant().toEpochMilli())) + .asField("ready"), + select(count()) + .from(unackedTable) + .asField("unacked"), + select(count()) + .from(messagesTable) + .asField("messages") + ) + .fetch() + .intoResultSet() + + rs.next() + + val depth = rs.getInt("depth") + val ready = rs.getInt("ready") + val unacked = rs.getInt("unacked") + val messages = rs.getInt("messages") + + return QueueState( + depth = depth, + ready = ready, + unacked = unacked, + orphaned = messages - (depth + unacked) + ) + } + + override fun containsMessage(predicate: (Message) -> Boolean): Boolean { + val batchSize = 100 + var found = false + var lastId = "0" + + do { + val rs: ResultSet = withRetry(RetryCategory.READ) { + jooq.select(idField, fingerprintField, bodyField) + .from(messagesTable) + .where(idField.gt(lastId)) + .limit(batchSize) + .fetch() + .intoResultSet() + } + + while (!found && rs.next()) { + try { + found = predicate.invoke(mapper.readValue(rs.getString("body"))) + } catch (e: Exception) { + log.error("Failed reading message with fingerprint: ${rs.getString("fingerprint")} " + + "message: ${rs.getString("body")}", e) + } + lastId = rs.getString("id") + } + } while (!found && rs.row == batchSize) + + return found + } + + override fun poll(callback: (Message, () -> Unit) -> Unit) { + poll(1, callback) + } + + /** + * TODO: Emit metrics: histogram of poll runtime, count of messages grabbed per poll, count of passes + */ + override fun poll(maxMessages: Int, callback: (Message, () -> Unit) -> Unit) { + val now = clock.instant().toEpochMilli() + var changed = 0 + + /** + * Selects the primary key ulid's of up to ([maxMessages] * 3) ready and unlocked messages, + * sorted by delivery time. + * + * To minimize lock contention, this is a non-locking read. The id's returned may be + * locked or removed by another instance before we can acquire them. We read more id's + * than [maxMessages] and shuffle them to decrease the likelihood that multiple instances + * polling concurrently are all competing for the oldest ready messages when many more + * than [maxMessages] are read. + * + * Candidate rows are locked via an autocommit update query by primary key that will + * only modify unlocked rows. When (candidates > maxMessages), a sliding window is used + * to traverse the shuffled candidates, sized to (maxMessages - changed) with up-to 3 + * attempts (and update queries) to grab [maxMessages]. + * + * I.e. if maxMessage == 5 and + * candidates == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10].shuffle() == [9, 3, 7, 1, 10, 8, 5, 2, 6, 4] + * + * - pass1: attempts to claim [9, 3, 7, 1, 10], locks 3 messages + * - pass2: attempts to claim [8, 5], locks 1 message + * - pass3: attempt to claim [2], succeeds but if not, there are no further attempts + * - proceeds to process 5 messages locked via 3 update queries. + * + * This makes a trade-off between grabbing the maximum number of ready messages per poll cycle + * vs. minimizing [poll] runtime which is also critical to throughput. In testing a scenario + * with up-to 100k ready messages and 7 orca/keiko-sql instances with [fillExecutorEachCycle] + * enabled and 20 handler threads, instances were able to successfully grab [maxMessages] on + * each poll cycle > 94% of the time and no poll cycles came up empty handed. + * + * Note: this differs from other [Queue.poll] implementations in that the message + * body is only saved back to the db in order to increment [AttemptsAttribute] if + * [MaxAttemptsAttribute] has been set to a positive integer. Otherwise, + * [AttemptsAttribute] is unused. + */ + val candidates = jooq.select(idField) + .from(queueTable) + .where(deliveryField.le(now), lockedField.eq("0")) + .orderBy(deliveryField.asc()) + .limit(max(10, maxMessages * 3)) + .fetchInto(String::class.java) + + if (candidates == null || candidates.isEmpty()) { + fire(QueuePolled) + return + } + + candidates.shuffle() + + var position = 0 + var passes = 0 + while (changed < maxMessages && position < candidates.size && passes < 3) { + passes++ + val sliceNext = min(maxMessages - 1 - changed, candidates.size - 1 - position) + val ids = candidates.slice(IntRange(position, position + sliceNext)) + when (sliceNext) { + 0 -> position++ + else -> position += sliceNext + } + + changed += jooq.update(queueTable) + .set(lockedField, "$lockId:$now") + .where(idField.`in`(*ids.toTypedArray()), lockedField.eq("0")) + .execute() + } + + if (changed > 0) { + val rs = withRetry(RetryCategory.READ) { + jooq.select(field("q.id").`as`("id"), + field("q.fingerprint").`as`("fingerprint"), + field("q.delivery").`as`("delivery"), + field("m.body").`as`("body")) + .from(queueTable.`as`("q")) + .leftOuterJoin(messagesTable.`as`("m")) + .on(sql("q.fingerprint = m.fingerprint")) + .where(field("q.locked").like("$lockId%")) + .fetch() + .intoResultSet() + } + + val lockedMessages = mutableListOf() + var ulid = ULID.nextValue() + + while (rs.next()) { + val fingerprint = rs.getString("fingerprint") + try { + val message = mapper.readValue(runSerializationMigration(rs.getString("body"))) + .apply { + val currentAttempts = (getAttribute() ?: AttemptsAttribute()) + .run { copy(attempts = attempts + 1) } + + setAttribute(currentAttempts) + } + + val timeoutOverride = message.ackTimeoutMs ?: 0 + + lockedMessages.add( + LockedMessage( + queueId = rs.getString("id"), + fingerprint = fingerprint, + scheduledTime = Instant.ofEpochMilli(rs.getLong("delivery")), + message = message, + expiry = if (timeoutOverride > 0) { + atTime(Duration.ofMillis(timeoutOverride)) + } else { + atTime(ackTimeout) + }, + maxAttempts = message.getAttribute()?.maxAttempts ?: 0, + ackCallback = this::ackMessage.partially1(fingerprint) + ) + ) + } catch (e: Exception) { + log.error("Failed reading message for fingerprint: $fingerprint, " + + "json: ${rs.getString("body")}, removing", e) + deleteAll(fingerprint) + } + } + + val ids = lockedMessages + .map { it.queueId } + .toList() + + val maxAttemptsUpdates = lockedMessages + .filter { it.maxAttempts > 0 } + .map { + update(messagesTable) + .set(bodyField, mapper.writeValueAsString(it.message)) + .where(fingerprintField.eq(it.fingerprint)) + } + .toList() + + withRetry(RetryCategory.WRITE) { + jooq.transaction { config -> + val txn = DSL.using(config) + + txn.insertInto( + unackedTable, + idField, + fingerprintField, + expiryField) + .apply { + lockedMessages.forEach { + values(ulid.toString(), it.fingerprint, it.expiry) + ulid = ULID.nextMonotonicValue(ulid) + } + } + .onDuplicateKeyUpdate() + .set(expiryField, MySQLDSL.values(expiryField) as Any) + .execute() + + if (maxAttemptsUpdates.isNotEmpty()) { + txn.batch(maxAttemptsUpdates).execute() + } + } + } + + /** + * Deleting from queue table outside of the above txn to minimize potential for deadlocks. + * If an instance crashes after committing the above txn but before the following delete, + * [retry] will release the locks after [lockTtlSeconds] and another instance will grab them. + */ + withRetry(RetryCategory.WRITE) { + jooq.deleteFrom(queueTable) + .where(idField.`in`(*ids.toTypedArray())) + .execute() + } + + lockedMessages.forEach { + fire(MessageProcessing(it.message, it.scheduledTime, clock.instant())) + callback(it.message, it.ackCallback) + } + } + + fire(QueuePolled) + } + + override fun push(message: Message, delay: TemporalAmount) { + val fingerprint = message.hashV2() + val ulid = ULID.nextValue() + val deliveryTime = atTime(delay) + + message.setAttribute( + message.getAttribute() ?: AttemptsAttribute() + ) + + withRetry(RetryCategory.WRITE) { + jooq.transaction { config -> + val txn = DSL.using(config) + + txn.insertInto(messagesTable) + .set(idField, ulid.toString()) + .set(fingerprintField, fingerprint) + .set(bodyField, mapper.writeValueAsString(message)) + .onDuplicateKeyUpdate() + .set(bodyField, MySQLDSL.values(bodyField) as Any) + .execute() + + txn.insertInto(queueTable) + .set(idField, ULID.nextMonotonicValue(ulid).toString()) + .set(fingerprintField, fingerprint) + .set(deliveryField, deliveryTime) + .set(lockedField, "0") + .onDuplicateKeyUpdate() + .set(deliveryField, MySQLDSL.values(deliveryField) as Any) + .execute() + } + } + + fire(MessagePushed(message)) + } + + override fun reschedule(message: Message, delay: TemporalAmount) { + val fingerprint = message.hashV2() + + withRetry(RetryCategory.WRITE) { + val rows = jooq.update(queueTable) + .set(deliveryField, atTime(delay)) + .where(fingerprintField.eq(fingerprint)) + .execute() + + if (rows == 1) { + log.debug("Rescheduled message: $message, fingerprint: $fingerprint to deliver in $delay") + fire(MessageRescheduled(message)) + } else { + log.warn("Failed to reschedule message: $message, fingerprint: $fingerprint, not found " + + "on queue") + fire(MessageNotFound(message)) + } + } + } + + override fun ensure(message: Message, delay: TemporalAmount) { + val fingerprint = message.hashV2() + var missing = false + + withRetry(RetryCategory.WRITE) { + jooq.transaction { config -> + val txn = DSL.using(config) + + val queueRows = txn.select(fingerprintField) + .from(queueTable) + .where(fingerprintField.eq(fingerprint)) + .limit(1) + .execute() + + if (queueRows == 0) { + val unackedRows = txn.select(fingerprintField) + .from(unackedTable) + .where(fingerprintField.eq(fingerprint)) + .limit(1) + .execute() + + if (unackedRows == 0) { + missing = true + } + } + } + } + + if (missing) { + log.debug( + "Pushing ensured message onto queue as it does not exist in queue or unacked tables" + ) + push(message, delay) + } + } + + private fun expireStaleLocks() { + val now = clock.instant().toEpochMilli() + val minMs = now.minus(TimeUnit.SECONDS.toMillis(lockTtlSeconds.toLong())) + val minUlid = ULID.nextValue(minMs).toString() + + val rs = withRetry(RetryCategory.READ) { + jooq.select(idField, fingerprintField, deliveryField, lockedField) + .from(queueTable) + .where( + idField.lt(minUlid), + lockedField.ne("0") + ) + .fetch() + .intoResultSet() + } + + var ulid = ULID.nextValue() + + while (rs.next()) { + val id = rs.getString("id") + val lock = rs.getString("locked") + val fingerprint = rs.getString("fingerprint") + val lockMatch = lockedAtRegex.find(lock) + + if (lockMatch != null && lockMatch.groupValues.size > 1) { + if (lockMatch.groupValues[1].toLong() > minMs) { + /* Not time yet */ + continue + } + } else { + log.error("Failed parsing lockedAt time for message id: $id, " + + "fingerprint: $fingerprint, lock: $lock, releasing") + } + + /** + * No retries inside this transaction, transient failures here will be cleared up at + * the next retry() interval. + */ + jooq.transaction { config -> + val txn = DSL.using(config) + + val deleted = txn.delete(queueTable) + .where(idField.eq(id), lockedField.eq(lock)) + .execute() + + if (deleted == 1) { + log.info("releasing stale lock for fingerprint: $fingerprint") + ulid = ULID.nextMonotonicValue(ulid) + + /** + * Re-insert with a fresh ulid and for immediate delivery + */ + txn.insertInto(queueTable) + .set(idField, ulid.toString()) + .set(fingerprintField, fingerprint) + .set(deliveryField, now) + .set(lockedField, "0") + .execute() + } + } + } + } + + /** + * Differs from other [Queue.retry] implementations, as unacked messages are requeued for + * delivery at now + [lockTtlSeconds] instead of immediately. + */ + @Scheduled(fixedDelayString = "\${queue.retry.frequency.ms:10000}") + override fun retry() { + expireStaleLocks() + + val unackBaseTime = clock.instant().toEpochMilli() + + val rs = jooq.select(field("u.id").`as`("id"), + field("u.expiry").`as`("expiry"), + field("u.fingerprint").`as`("fingerprint"), + field("m.body").`as`("body")) + .from(unackedTable.`as`("u")) + .leftOuterJoin(messagesTable.`as`("m")) + .on(sql("u.fingerprint = m.fingerprint")) + .where(field("u.expiry").le(unackBaseTime)) + .fetch() + .intoResultSet() + + while (rs.next()) { + val fingerprint = rs.getString("fingerprint") + var rows = 0 + var dlq = false + var message: Message + var acks: Int + + try { + message = mapper.readValue(runSerializationMigration(rs.getString("body"))) + + val ackAttemptsAttribute = (message.getAttribute() ?: AckAttemptsAttribute()) + .run { copy(ackAttempts = ackAttempts + 1) } + + message.setAttribute(ackAttemptsAttribute) + acks = ackAttemptsAttribute.ackAttempts + + val attempts = message.getAttribute()?.attempts + ?: 0 + val maxAttempts = message.getAttribute()?.maxAttempts + ?: 0 + + if (ackAttemptsAttribute.ackAttempts >= Queue.maxRetries || + (maxAttempts > 0 && attempts > maxAttempts)) { + log.warn("Message $fingerprint with payload $message exceeded max ack retries") + dlq = true + } + } catch (e: Exception) { + log.error("Failed to deserialize message $fingerprint, cleaning up", e) + deleteAll(fingerprint) + continue + } + + if (dlq) { + deleteAll(fingerprint) + handleDeadMessage(message) + continue + } + + jooq.transaction { config -> + val txn = DSL.using(config) + + rows = txn.delete(unackedTable) + .where(idField.eq(rs.getString("id"))) + .execute() + + if (rows == 1) { + log.warn("Retrying message $fingerprint after $acks ack attempts") + + txn.insertInto(queueTable) + .set(idField, ULID.nextValue().toString()) + .set(fingerprintField, fingerprint) + .set(deliveryField, atTime(lockTtlDuration)) + .set(lockedField, "0") + .onDuplicateKeyUpdate() + .set(deliveryField, MySQLDSL.values(deliveryField) as Any) + .execute() + } + } + + /** + * Updating message with increment ackAttempt value outside of the above txn to minimize + * lock times related to the [queueTable] insert. If this does not complete within + * [lockTtlSeconds], the ackAttempt attribute increment may be lost, making it best effort. + */ + if (rows == 1) { + jooq.update(messagesTable) + .set(bodyField, mapper.writeValueAsString(message)) + .where(fingerprintField.eq(fingerprint)) + .execute() + + fire(MessageRetried) + } + } + fire(RetryPolled) + } + + private fun ackMessage(fingerprint: String) { + withRetry(RetryCategory.WRITE) { + jooq.transaction { config -> + val txn = DSL.using(config) + + txn.deleteFrom(unackedTable) + .where(fingerprintField.eq(fingerprint)) + .execute() + + val changed = txn.update(queueTable) + .set(lockedField, "0") + .where(fingerprintField.eq(fingerprint)) + .execute() + + if (changed == 0) { + txn.deleteFrom(messagesTable) + .where(fingerprintField.eq(fingerprint)) + .execute() + } + } + } + fire(MessageAcknowledged) + } + + private fun deleteAll(fingerprint: String) { + withRetry(RetryCategory.WRITE) { + jooq.deleteFrom(queueTable) + .where(fingerprintField.eq(fingerprint)) + .execute() + + jooq.deleteFrom(unackedTable) + .where(fingerprintField.eq(fingerprint)) + .execute() + + jooq.deleteFrom(messagesTable) + .where(fingerprintField.eq(fingerprint)) + .execute() + } + } + + private fun initTables() { + withRetry(RetryCategory.WRITE) { + jooq.execute("CREATE TABLE IF NOT EXISTS $queueTableName LIKE ${queueBase}_template") + jooq.execute("CREATE TABLE IF NOT EXISTS $unackedTableName LIKE ${unackedBase}_template") + jooq.execute("CREATE TABLE IF NOT EXISTS $messagesTableName LIKE ${messagesBase}_template") + } + } + + private fun handleDeadMessage(message: Message) { + deadMessageHandlers.forEach { + it.invoke(this, message) + } + + fire(MessageDead) + } + + private fun runSerializationMigration(json: String): String { + if (serializationMigrator.isPresent) { + return serializationMigrator.get().migrate(json) + } + return json + } + + private fun atTime(delay: TemporalAmount = Duration.ZERO) = + clock.instant().plus(delay).toEpochMilli() + + @Suppress("UnstableApiUsage") + fun Message.hashV2() = + hashObjectMapper.convertValue(this, MutableMap::class.java) + .apply { remove("attributes") } + .let { + Hashing + .murmur3_128() + .hashString("v2:${hashObjectMapper.writeValueAsString(it)}", StandardCharsets.UTF_8) + .toString() + } + + private enum class RetryCategory { + WRITE, READ + } + + private fun withRetry(category: RetryCategory, action: () -> T): T { + return if (category == RetryCategory.WRITE) { + val retry = Retry.of( + "sqlWrite", + RetryConfig.custom() + .maxAttempts(sqlRetryProperties.transactions.maxRetries) + .waitDuration(Duration.ofMillis(sqlRetryProperties.transactions.backoffMs)) + .ignoreExceptions(SQLDialectNotSupportedException::class.java) + .build() + ) + + Try.ofSupplier(Retry.decorateSupplier(retry, action)).get() + } else { + val retry = Retry.of( + "sqlRead", + RetryConfig.custom() + .maxAttempts(sqlRetryProperties.reads.maxRetries) + .waitDuration(Duration.ofMillis(sqlRetryProperties.reads.backoffMs)) + .ignoreExceptions(SQLDialectNotSupportedException::class.java) + .build() + ) + + Try.ofSupplier(Retry.decorateSupplier(retry, action)).get() + } + } + + private data class LockedMessage( + val queueId: String, + val fingerprint: String, + val scheduledTime: Instant, + val message: Message, + val expiry: Long, + val maxAttempts: Int, + val ackCallback: () -> Unit + ) +} diff --git a/keiko-sql/src/main/resources/db/changelog-master.yml b/keiko-sql/src/main/resources/db/changelog-master.yml new file mode 100644 index 0000000..9de06cd --- /dev/null +++ b/keiko-sql/src/main/resources/db/changelog-master.yml @@ -0,0 +1,4 @@ +databaseChangeLog: + - include: + file: changelog/20190822-initial-schema.yml + relativeToChangelogFile: true diff --git a/keiko-sql/src/main/resources/db/changelog/20190822-initial-schema.yml b/keiko-sql/src/main/resources/db/changelog/20190822-initial-schema.yml new file mode 100644 index 0000000..4230342 --- /dev/null +++ b/keiko-sql/src/main/resources/db/changelog/20190822-initial-schema.yml @@ -0,0 +1,231 @@ +databaseChangeLog: + - changeSet: + id: create-keiko-queue-table-v1 + author: afeldman + changes: + - createTable: + tableName: keiko_v1_queue_template + columns: + - column: + name: id + type: char(26) + constraints: + primaryKey: true + nullable: false + - column: + name: fingerprint + type: char(36) + constraints: + nullable: false + - column: + name: delivery + type: bigint(13) + constraints: + nullable: false + - column: + name: locked + type: char(46) + defaultValue: 0 + constraints: + nullable: false + - modifySql: + dbms: mysql + append: + value: " engine innodb" + rollback: + - dropTable: + tableName: keiko_v1_queue_template + + - changeSet: + id: create-keiko-queue-table-v1-indices + author: afeldman + changes: + - createIndex: + indexName: keiko_queue_fingerprint_idx + tableName: keiko_v1_queue_template + columns: + - column: + name: fingerprint + unique: true + - createIndex: + indexName: keiko_queue_delivery_locked_idx + tableName: keiko_v1_queue_template + columns: + - column: + name: delivery + - column: + name: locked + - createIndex: + indexName: keiko_queue_locked_idx + tableName: keiko_v1_queue_template + columns: + - column: + name: locked + rollback: + - dropIndex: + indexName: keiko_queue_fingerprint_idx + tableName: keiko_v1_queue_template + - dropIndex: + indexName: keiko_queue_delivery_locked_idx + tableName: keiko_v1_queue_template + + - changeSet: + id: create-keiko-queue-unacked-table-v1 + author: afeldman + changes: + - createTable: + tableName: keiko_v1_unacked_template + columns: + - column: + name: id + type: char(26) + constraints: + primaryKey: true + nullable: false + - column: + name: fingerprint + type: char(36) + constraints: + nullable: false + - column: + name: expiry + type: bigint + constraints: + nullable: false + - modifySql: + dbms: mysql + append: + value: " engine innodb" + rollback: + - dropTable: + tableName: keiko_v1_unacked_template + + - changeSet: + id: create-keiko-queue-unacked-table-v1-indices + author: afeldman + changes: + - createIndex: + indexName: keiko_unacked_fingerprint_idx + tableName: keiko_v1_unacked_template + columns: + - column: + name: fingerprint + unique: true + - createIndex: + indexName: keiko_unacked_expiry_idx + tableName: keiko_v1_unacked_template + columns: + - column: + name: expiry + rollback: + - dropIndex: + indexName: keiko_unacked_fingerprint_idx + tableName: keiko_v1_unacked_template + - dropIndex: + indexName: keiko_unacked_expiry_idx + tableName: keiko_v1_unacked_template + + - changeSet: + id: create-keiko-queue-messages-table-v1 + author: afeldman + changes: + - createTable: + tableName: keiko_v1_messages_template + columns: + - column: + name: id + type: char(26) + constraints: + primaryKey: true + nullable: false + - column: + name: fingerprint + type: char(36) + constraints: + nullable: false + - column: + name: body + type: longtext + constraints: + nullable: false + - modifySql: + dbms: mysql + append: + value: " engine innodb" + rollback: + - dropTable: + tableName: keiko_v1_unacked_template + + - changeSet: + id: create-keiko-queue-messages-table-v1-indices + author: afeldman + changes: + - createIndex: + indexName: keiko_messages_fingerprint_idx + tableName: keiko_v1_messages_template + columns: + - column: + name: fingerprint + unique: true + rollback: + - dropIndex: + indexName: keiko_messages_fingerprint_idx + tableName: keiko_v1_messages_template + + - changeSet: + id: create-keiko-dlq-table-v1 + author: afeldman + changes: + - createTable: + tableName: keiko_v1_dlq_template + columns: + - column: + name: id + type: char(26) + constraints: + primaryKey: true + nullable: false + - column: + name: fingerprint + type: char(36) + constraints: + nullable: false + - column: + name: updated_at + type: bigint(13) + constraints: + nullable: false + - column: + name: body + type: longtext + constraints: + nullable: false + - modifySql: + dbms: mysql + append: + value: " engine innodb" + rollback: + - dropTable: + tableName: keiko_v1_dlq_template + + - changeSet: + id: create-keiko-dlq-table-v1-indices + author: afeldman + changes: + - createIndex: + indexName: keiko_dlq_fingerprint_idx + tableName: keiko_v1_dlq_template + columns: + - column: + name: fingerprint + unique: true + - createIndex: + indexName: keiko_dlq_updated_at_idx + tableName: keiko_v1_dlq_template + columns: + - column: + name: updated_at + rollback: + - dropIndex: + indexName: keiko_messages_fingerprint_idx + tableName: keiko_v1_dlq_template diff --git a/keiko-sql/src/test/kotlin/com/netflix/spinnaker/q/sql/SqlQueueTest.kt b/keiko-sql/src/test/kotlin/com/netflix/spinnaker/q/sql/SqlQueueTest.kt new file mode 100644 index 0000000..f975273 --- /dev/null +++ b/keiko-sql/src/test/kotlin/com/netflix/spinnaker/q/sql/SqlQueueTest.kt @@ -0,0 +1,70 @@ +package com.netflix.spinnaker.q.sql + +import com.fasterxml.jackson.databind.DeserializationFeature +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.KotlinModule +import com.netflix.spinnaker.kork.sql.config.RetryProperties +import com.netflix.spinnaker.kork.sql.config.SqlRetryProperties +import com.netflix.spinnaker.kork.sql.test.SqlTestUtil +import com.netflix.spinnaker.q.AckAttemptsAttribute +import com.netflix.spinnaker.q.AttemptsAttribute +import com.netflix.spinnaker.q.DeadMessageCallback +import com.netflix.spinnaker.q.MaxAttemptsAttribute +import com.netflix.spinnaker.q.QueueTest +import com.netflix.spinnaker.q.TestMessage +import com.netflix.spinnaker.q.metrics.EventPublisher +import com.netflix.spinnaker.q.metrics.MonitorableQueueTest +import com.netflix.spinnaker.q.metrics.QueueEvent +import org.funktionale.partials.invoke +import java.time.Clock +import java.time.Duration +import java.util.Optional + +object SqlQueueTest : QueueTest(createQueue(p3 = null), ::cleanupCallback) + +object SqlMonitorableQueueTest : MonitorableQueueTest( + createQueue, + SqlQueue::retry, + ::cleanupCallback +) + +private val testDb = SqlTestUtil.initTcMysqlDatabase() +private val jooq = testDb.context + +private val createQueue = { clock: Clock, + deadLetterCallback: DeadMessageCallback, + publisher: EventPublisher? -> + SqlQueue( + queueName = "test", + schemaVersion = 1, + jooq = jooq, + clock = clock, + lockTtlSeconds = 2, + mapper = ObjectMapper().apply { + registerModule(KotlinModule()) + disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES) + + registerSubtypes(TestMessage::class.java) + registerSubtypes(MaxAttemptsAttribute::class.java, + AttemptsAttribute::class.java, + AckAttemptsAttribute::class.java) + }, + serializationMigrator = Optional.empty(), + ackTimeout = Duration.ofSeconds(60), + deadMessageHandlers = listOf(deadLetterCallback), + publisher = publisher ?: (object : EventPublisher { + override fun publishEvent(event: QueueEvent) {} + }), + sqlRetryProperties = SqlRetryProperties(transactions = retryPolicy, + reads = retryPolicy) + ) +} + +private fun cleanupCallback() { + SqlTestUtil.cleanupDb(jooq) +} + +private val retryPolicy: RetryProperties = RetryProperties( + maxRetries = 1, + backoffMs = 10 // minimum allowed +) diff --git a/keiko-tck/src/main/kotlin/com/netflix/spinnaker/q/QueueTest.kt b/keiko-tck/src/main/kotlin/com/netflix/spinnaker/q/QueueTest.kt index 61fa921..db33fb6 100644 --- a/keiko-tck/src/main/kotlin/com/netflix/spinnaker/q/QueueTest.kt +++ b/keiko-tck/src/main/kotlin/com/netflix/spinnaker/q/QueueTest.kt @@ -217,6 +217,7 @@ abstract class QueueTest( with(queue!!) { clock.incrementBy(ackTimeout) retry() + clock.incrementBy(ackTimeout) poll(callback) } } @@ -257,6 +258,7 @@ abstract class QueueTest( with(queue!!) { clock.incrementBy(ackTimeoutOverride) retry() + clock.incrementBy(ackTimeout) poll(callback) } } @@ -306,6 +308,7 @@ abstract class QueueTest( poll { _, _ -> } clock.incrementBy(ackTimeout) retry() + clock.incrementBy(ackTimeout) } } } @@ -315,6 +318,7 @@ abstract class QueueTest( on("polling the queue again") { with(queue!!) { + clock.incrementBy(ackTimeout) poll(callback) } } @@ -514,6 +518,7 @@ abstract class QueueTest( with(queue!!) { clock.incrementBy(ackTimeout) retry() + clock.incrementBy(ackTimeout) poll(callback) } } @@ -569,6 +574,7 @@ abstract class QueueTest( with(queue!!) { clock.incrementBy(ackTimeout) retry() + clock.incrementBy(ackTimeout) poll(callback) poll(callback) } diff --git a/keiko-tck/src/main/kotlin/com/netflix/spinnaker/q/metrics/MonitorableQueueTest.kt b/keiko-tck/src/main/kotlin/com/netflix/spinnaker/q/metrics/MonitorableQueueTest.kt index c28ba54..293b7c4 100644 --- a/keiko-tck/src/main/kotlin/com/netflix/spinnaker/q/metrics/MonitorableQueueTest.kt +++ b/keiko-tck/src/main/kotlin/com/netflix/spinnaker/q/metrics/MonitorableQueueTest.kt @@ -21,6 +21,7 @@ import com.netflix.spinnaker.q.DeadMessageCallback import com.netflix.spinnaker.q.Queue import com.netflix.spinnaker.q.TestMessage import com.netflix.spinnaker.time.MutableClock +import com.nhaarman.mockito_kotlin.any import com.nhaarman.mockito_kotlin.argumentCaptor import com.nhaarman.mockito_kotlin.atLeastOnce import com.nhaarman.mockito_kotlin.isA @@ -135,7 +136,11 @@ abstract class MonitorableQueueTest( } it("fires an event to report the push") { - verify(publisher).publishEvent(isA()) + + /* This test previously verified publication of a MessageDuplicate event + * but it is now optional for MonitorableQueue implementations. + */ + verify(publisher, times(2)).publishEvent(any()) } it("reports an unchanged queue depth") { @@ -285,6 +290,7 @@ abstract class MonitorableQueueTest( on("checking for unacknowledged messages") { clock.incrementBy(queue!!.ackTimeout) triggerRedeliveryCheck.invoke(queue!!) + clock.incrementBy(queue!!.ackTimeout) } it("fires an event indicating the message is being retried") { @@ -318,12 +324,17 @@ abstract class MonitorableQueueTest( } on("checking for unacknowledged messages") { + resetMocks() clock.incrementBy(queue!!.ackTimeout) triggerRedeliveryCheck.invoke(queue!!) + clock.incrementBy(queue!!.ackTimeout) } it("fires an event indicating the message is a duplicate") { - verify(publisher).publishEvent(isA()) + /* This should see one of either (MessageDuplicate, MessagePushed) and + * a RetryPolled event. + */ + verify(publisher, times(2)).publishEvent(any()) } it("reports the depth without the message re-queued") { @@ -351,6 +362,7 @@ abstract class MonitorableQueueTest( queue!!.poll { _, _ -> } clock.incrementBy(queue!!.ackTimeout) triggerRedeliveryCheck.invoke(queue!!) + clock.incrementBy(queue!!.ackTimeout) } } diff --git a/settings.gradle.kts b/settings.gradle.kts index a8d0177..dcea0e3 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -6,7 +6,8 @@ include( "keiko-redis-spring", "keiko-spring", "keiko-tck", - "keiko-test-common" + "keiko-test-common", + "keiko-sql" ) rootProject.name = "keiko"