diff --git a/keiko-redis-spring/src/main/kotlin/com/netflix/spinnaker/config/RedisQueueConfiguration.kt b/keiko-redis-spring/src/main/kotlin/com/netflix/spinnaker/config/RedisQueueConfiguration.kt index 1bf62a5..821b12b 100644 --- a/keiko-redis-spring/src/main/kotlin/com/netflix/spinnaker/config/RedisQueueConfiguration.kt +++ b/keiko-redis-spring/src/main/kotlin/com/netflix/spinnaker/config/RedisQueueConfiguration.kt @@ -21,16 +21,21 @@ import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.KotlinModule import com.netflix.spinnaker.q.metrics.EventPublisher import com.netflix.spinnaker.q.migration.SerializationMigrator +import com.netflix.spinnaker.q.redis.RedisClusterDeadMessageHandler +import com.netflix.spinnaker.q.redis.RedisClusterQueue import com.netflix.spinnaker.q.redis.RedisDeadMessageHandler import com.netflix.spinnaker.q.redis.RedisQueue import org.apache.commons.pool2.impl.GenericObjectPoolConfig import org.springframework.beans.factory.annotation.Qualifier import org.springframework.beans.factory.annotation.Value import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration +import redis.clients.jedis.HostAndPort import redis.clients.jedis.Jedis +import redis.clients.jedis.JedisCluster import redis.clients.jedis.JedisPool import redis.clients.jedis.Protocol import redis.clients.util.Pool @@ -43,11 +48,19 @@ import java.util.Optional @EnableConfigurationProperties(RedisQueueProperties::class) class RedisQueueConfiguration { - @Bean @ConditionalOnMissingBean(GenericObjectPoolConfig::class) - fun redisPoolConfig() = GenericObjectPoolConfig() + @Bean + @ConditionalOnMissingBean(GenericObjectPoolConfig::class) + fun redisPoolConfig() = GenericObjectPoolConfig().apply { + blockWhenExhausted = false + maxWaitMillis = 2000 + } @Bean @ConditionalOnMissingBean(name = ["queueRedisPool"]) + @ConditionalOnProperty( + value = ["redis.cluster-enabled"], + havingValue = "false", + matchIfMissing = true) fun queueRedisPool( @Value("\${redis.connection:redis://localhost:6379}") connection: String, @Value("\${redis.timeout:2000}") timeout: Int, @@ -66,6 +79,10 @@ class RedisQueueConfiguration { @Bean @ConditionalOnMissingBean(name = ["queue"]) + @ConditionalOnProperty( + value = ["redis.cluster-enabled"], + havingValue = "false", + matchIfMissing = true) fun queue( @Qualifier("queueRedisPool") redisPool: Pool, redisQueueProperties: RedisQueueProperties, @@ -88,6 +105,10 @@ class RedisQueueConfiguration { @Bean @ConditionalOnMissingBean(name = ["redisDeadMessageHandler"]) + @ConditionalOnProperty( + value = ["redis.cluster-enabled"], + havingValue = "false", + matchIfMissing = true) fun redisDeadMessageHandler( @Qualifier("queueRedisPool") redisPool: Pool, redisQueueProperties: RedisQueueProperties, @@ -99,6 +120,65 @@ class RedisQueueConfiguration { clock = clock ) + @Bean + @ConditionalOnMissingBean(name = ["queueRedisCluster"]) + @ConditionalOnProperty(value = ["redis.cluster-enabled"]) + fun queueRedisCluster( + @Value("\${redis.connection:redis://localhost:6379}") connection: String, + @Value("\${redis.timeout:2000}") timeout: Int, + @Value("\${redis.maxattempts:4}") maxAttempts: Int, + redisPoolConfig: GenericObjectPoolConfig + ): JedisCluster { + URI.create(connection).let { cx -> + val port = if (cx.port == -1) Protocol.DEFAULT_PORT else cx.port + val password = cx.userInfo?.substringAfter(":") + return JedisCluster( + HostAndPort(cx.host, port), + timeout, + timeout, + maxAttempts, + password, + redisPoolConfig + ) + } + } + + @Bean + @ConditionalOnMissingBean(name = ["queue", "clusterQueue"]) + @ConditionalOnProperty(value = ["redis.cluster-enabled"]) + fun clusterQueue( + @Qualifier("queueRedisCluster") cluster: JedisCluster, + redisQueueProperties: RedisQueueProperties, + clock: Clock, + deadMessageHandler: RedisClusterDeadMessageHandler, + publisher: EventPublisher, + redisQueueObjectMapper: ObjectMapper, + serializationMigrator: Optional + ) = + RedisClusterQueue( + queueName = redisQueueProperties.queueName, + jedisCluster = cluster, + clock = clock, + mapper = redisQueueObjectMapper, + deadMessageHandlers = listOf(deadMessageHandler), + publisher = publisher, + ackTimeout = Duration.ofSeconds(redisQueueProperties.ackTimeoutSeconds.toLong()), + serializationMigrator = serializationMigrator + ) + + @Bean + @ConditionalOnMissingBean(name = ["redisClusterDeadMessageHandler"]) + @ConditionalOnProperty(value = ["redis.cluster-enabled"]) + fun redisClusterDeadMessageHandler( + @Qualifier("queueRedisCluster") cluster: JedisCluster, + redisQueueProperties: RedisQueueProperties, + clock: Clock + ) = RedisClusterDeadMessageHandler( + deadLetterQueueName = redisQueueProperties.deadLetterQueueName, + jedisCluster = cluster, + clock = clock + ) + @Bean @ConditionalOnMissingBean fun redisQueueObjectMapper(properties: Optional): ObjectMapper = diff --git a/keiko-redis-spring/src/main/kotlin/com/netflix/spinnaker/config/RedisQueueProperties.kt b/keiko-redis-spring/src/main/kotlin/com/netflix/spinnaker/config/RedisQueueProperties.kt index 98f0f73..da96b30 100644 --- a/keiko-redis-spring/src/main/kotlin/com/netflix/spinnaker/config/RedisQueueProperties.kt +++ b/keiko-redis-spring/src/main/kotlin/com/netflix/spinnaker/config/RedisQueueProperties.kt @@ -23,4 +23,5 @@ class RedisQueueProperties { var queueName: String = "keiko.queue" var deadLetterQueueName: String = "keiko.queue.deadLetters" var ackTimeoutSeconds: Int = 60 + var shards: Int = 1 } diff --git a/keiko-redis/build.gradle b/keiko-redis/build.gradle index a21db99..35bb89d 100644 --- a/keiko-redis/build.gradle +++ b/keiko-redis/build.gradle @@ -2,7 +2,9 @@ apply from: "$rootDir/gradle/spek.gradle" dependencies { api project(":keiko-core") - api "redis.clients:jedis" + api("redis.clients:jedis:2.10.2") { + force = true + } api "com.fasterxml.jackson.core:jackson-databind" api "com.fasterxml.jackson.module:jackson-module-kotlin" api "org.funktionale:funktionale-partials" diff --git a/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/AbstractRedisQueue.kt b/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/AbstractRedisQueue.kt new file mode 100644 index 0000000..7715564 --- /dev/null +++ b/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/AbstractRedisQueue.kt @@ -0,0 +1,184 @@ +package com.netflix.spinnaker.q.redis + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.SerializationFeature +import com.google.common.hash.Hashing +import com.netflix.spinnaker.q.DeadMessageCallback +import com.netflix.spinnaker.q.Message +import com.netflix.spinnaker.q.metrics.EventPublisher +import com.netflix.spinnaker.q.metrics.MonitorableQueue +import com.netflix.spinnaker.q.migration.SerializationMigrator +import org.slf4j.Logger +import redis.clients.jedis.Jedis +import redis.clients.jedis.JedisCommands +import redis.clients.jedis.Transaction +import java.nio.charset.Charset +import java.nio.charset.StandardCharsets +import java.time.Clock +import java.time.Duration +import java.time.temporal.TemporalAmount +import java.util.Optional + +abstract class AbstractRedisQueue( + private val clock: Clock, + private val lockTtlSeconds: Int = 10, + private val mapper: ObjectMapper, + private val serializationMigrator: Optional, + override val ackTimeout: TemporalAmount = Duration.ofMinutes(1), + override val deadMessageHandlers: List, + override val publisher: EventPublisher + +) : MonitorableQueue { + internal abstract val queueKey: String + internal abstract val unackedKey: String + internal abstract val messagesKey: String + internal abstract val locksKey: String + internal abstract val attemptsKey: String + + internal abstract val log: Logger + + // Internal ObjectMapper that enforces deterministic property ordering for use only in hashing. + private val hashObjectMapper = ObjectMapper().copy().apply { + enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS) + } + + abstract fun cacheScript() + abstract var readMessageWithLockScriptSha: String + + internal fun runSerializationMigration(json: String): String { + if (serializationMigrator.isPresent) { + return serializationMigrator.get().migrate(json) + } + return json + } + + internal fun handleDeadMessage(message: Message) { + deadMessageHandlers.forEach { + it.invoke(this, message) + } + } + + /** + * @return current time (plus optional [delay]) converted to a score for a + * Redis sorted set. + */ + internal fun score(delay: TemporalAmount = Duration.ZERO) = + clock.instant().plus(delay).toEpochMilli().toDouble() + + internal inline fun ObjectMapper.readValue(content: String): R = + readValue(content, R::class.java) + + internal fun Jedis.multi(block: Transaction.() -> Unit) = + multi().use { tx -> + tx.block() + tx.exec() + } + + internal fun JedisCommands.hgetInt(key: String, field: String, default: Int = 0) = + hget(key, field)?.toInt() ?: default + + internal fun JedisCommands.zismember(key: String, member: String) = + zrank(key, member) != null + + internal fun JedisCommands.anyZismember(key: String, members: Set) = + members.any { zismember(key, it) } + + internal fun JedisCommands.firstFingerprint(key: String, fingerprint: Fingerprint) = + fingerprint.all.firstOrNull { zismember(key, it) } + + @Deprecated("Hashes the attributes property, which is mutable") + internal fun Message.hashV1() = + Hashing + .murmur3_128() + .hashString(toString(), Charset.defaultCharset()) + .toString() + + internal fun Message.hashV2() = + hashObjectMapper.convertValue(this, MutableMap::class.java) + .apply { remove("attributes") } + .let { + Hashing + .murmur3_128() + .hashString("v2:${hashObjectMapper.writeValueAsString(it)}", StandardCharsets.UTF_8) + .toString() + } + + internal fun Message.fingerprint() = + hashV2().let { Fingerprint(latest = it, all = setOf(it, hashV1())) } + + internal data class Fingerprint( + val latest: String, + val all: Set = setOf() + ) +} + +internal const val READ_MESSAGE_SRC = """ + local java_scientific = function(x) + return string.format("%.12E", x):gsub("\+", "") + end + + -- get the message, move the fingerprint to the unacked queue and return + local message = redis.call("HGET", messagesKey, fingerprint) + + -- check for an ack timeout override on the message + local unackScore = unackDefaultScore + if type(message) == "string" and message ~= nil then + local ackTimeoutOverride = tonumber(cjson.decode(message)["ackTimeoutMs"]) + if ackTimeoutOverride ~= nil and unackBaseScore ~= nil then + unackScore = unackBaseScore + ackTimeoutOverride + end + end + + unackScore = java_scientific(unackScore) + + redis.call("ZREM", queueKey, fingerprint) + redis.call("ZADD", unackKey, unackScore, fingerprint) +""" + +/* ktlint-disable max-line-length */ +internal const val READ_MESSAGE_WITH_LOCK_SRC = """ + local queueKey = KEYS[1] + local unackKey = KEYS[2] + local lockKey = KEYS[3] + local messagesKey = KEYS[4] + local maxScore = ARGV[1] + local peekFingerprintCount = ARGV[2] + local lockTtlSeconds = ARGV[3] + local unackDefaultScore = ARGV[4] + local unackBaseScore = ARGV[5] + + local not_empty = function(x) + return (type(x) == "table") and (not x.err) and (#x ~= 0) + end + + local acquire_lock = function(fingerprints, locksKey, lockTtlSeconds) + if not_empty(fingerprints) then + local i=1 + while (i <= #fingerprints) do + redis.call("ECHO", "attempting lock on " .. fingerprints[i]) + if redis.call("SET", locksKey .. ":" .. fingerprints[i], "\uD83D\uDD12", "EX", lockTtlSeconds, "NX") then + redis.call("ECHO", "acquired lock on " .. fingerprints[i]) + return fingerprints[i], fingerprints[i+1] + end + i=i+2 + end + end + return nil, nil + end + + -- acquire a lock on a fingerprint + local fingerprints = redis.call("ZRANGEBYSCORE", queueKey, 0.0, maxScore, "WITHSCORES", "LIMIT", 0, peekFingerprintCount) + local fingerprint, fingerprintScore = acquire_lock(fingerprints, lockKey, lockTtlSeconds) + + -- no lock could be acquired + if fingerprint == nil then + if #fingerprints == 0 then + return "NoReadyMessages" + end + return "AcquireLockFailed" + end + + $READ_MESSAGE_SRC + + return {fingerprint, fingerprintScore, message} +""" diff --git a/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/RedisClusterDeadMessageHandler.kt b/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/RedisClusterDeadMessageHandler.kt new file mode 100644 index 0000000..3b774aa --- /dev/null +++ b/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/RedisClusterDeadMessageHandler.kt @@ -0,0 +1,27 @@ +package com.netflix.spinnaker.q.redis + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.KotlinModule +import com.netflix.spinnaker.q.DeadMessageCallback +import com.netflix.spinnaker.q.Message +import com.netflix.spinnaker.q.Queue +import redis.clients.jedis.JedisCluster +import java.time.Clock + +class RedisClusterDeadMessageHandler( + deadLetterQueueName: String, + private val jedisCluster: JedisCluster, + private val clock: Clock +) : DeadMessageCallback { + + private val dlqKey = "{$deadLetterQueueName}.messages" + + private val mapper = ObjectMapper().registerModule(KotlinModule()) + + override fun invoke(queue: Queue, message: Message) { + jedisCluster.use { cluster -> + val score = clock.instant().toEpochMilli().toDouble() + cluster.zadd(dlqKey, score, mapper.writeValueAsString(message)) + } + } +} diff --git a/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/RedisClusterQueue.kt b/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/RedisClusterQueue.kt new file mode 100644 index 0000000..f5b3d57 --- /dev/null +++ b/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/RedisClusterQueue.kt @@ -0,0 +1,368 @@ +package com.netflix.spinnaker.q.redis + +import com.fasterxml.jackson.core.JsonParseException +import com.fasterxml.jackson.databind.ObjectMapper +import com.netflix.spinnaker.KotlinOpen +import com.netflix.spinnaker.q.AttemptsAttribute +import com.netflix.spinnaker.q.DeadMessageCallback +import com.netflix.spinnaker.q.MaxAttemptsAttribute +import com.netflix.spinnaker.q.Message +import com.netflix.spinnaker.q.Queue +import com.netflix.spinnaker.q.metrics.EventPublisher +import com.netflix.spinnaker.q.metrics.LockFailed +import com.netflix.spinnaker.q.metrics.MessageAcknowledged +import com.netflix.spinnaker.q.metrics.MessageDead +import com.netflix.spinnaker.q.metrics.MessageDuplicate +import com.netflix.spinnaker.q.metrics.MessageNotFound +import com.netflix.spinnaker.q.metrics.MessageProcessing +import com.netflix.spinnaker.q.metrics.MessagePushed +import com.netflix.spinnaker.q.metrics.MessageRescheduled +import com.netflix.spinnaker.q.metrics.MessageRetried +import com.netflix.spinnaker.q.metrics.QueuePolled +import com.netflix.spinnaker.q.metrics.QueueState +import com.netflix.spinnaker.q.metrics.RetryPolled +import com.netflix.spinnaker.q.metrics.fire +import com.netflix.spinnaker.q.migration.SerializationMigrator +import org.funktionale.partials.partially1 +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import org.springframework.scheduling.annotation.Scheduled +import redis.clients.jedis.JedisCluster +import redis.clients.jedis.Transaction +import redis.clients.jedis.exceptions.JedisDataException +import redis.clients.jedis.params.sortedset.ZAddParams.zAddParams +import redis.clients.util.JedisClusterCRC16 +import java.io.IOException +import java.time.Clock +import java.time.Duration +import java.time.Instant +import java.time.temporal.TemporalAmount +import java.util.Locale +import java.util.Optional + +@KotlinOpen +class RedisClusterQueue( + private val queueName: String, + private val jedisCluster: JedisCluster, + private val clock: Clock, + private val lockTtlSeconds: Int = 10, + private val mapper: ObjectMapper, + private val serializationMigrator: Optional, + override val ackTimeout: TemporalAmount = Duration.ofMinutes(1), + override val deadMessageHandlers: List, + override val publisher: EventPublisher +) : AbstractRedisQueue( + clock, + lockTtlSeconds, + mapper, + serializationMigrator, + ackTimeout, + deadMessageHandlers, + publisher +) { + + final override val log: Logger = LoggerFactory.getLogger(javaClass) + + override val queueKey = "{$queueName}.queue" + override val unackedKey = "{$queueName}.unacked" + override val messagesKey = "{$queueName}.messages" + override val locksKey = "{$queueName}.locks" + override val attemptsKey = "{$queueName}.attempts" + + override lateinit var readMessageWithLockScriptSha: String + + init { + cacheScript() + log.info("Configured queue: $queueName") + } + + final override fun cacheScript() { + readMessageWithLockScriptSha = jedisCluster.scriptLoad(READ_MESSAGE_WITH_LOCK_SRC, queueKey) + } + + override fun poll(callback: (Message, () -> Unit) -> Unit) { + jedisCluster.readMessageWithLock() + ?.also { (fingerprint, scheduledTime, json) -> + val ack = this::ackMessage.partially1(fingerprint) + jedisCluster.readMessage(fingerprint, json) { message -> + val attempts = message.getAttribute()?.attempts + ?: 0 + val maxAttempts = message.getAttribute()?.maxAttempts + ?: 0 + + if (maxAttempts > 0 && attempts > maxAttempts) { + log.warn("Message $fingerprint with payload $message exceeded $maxAttempts retries") + handleDeadMessage(message) + jedisCluster.removeMessage(fingerprint) + fire(MessageDead) + } else { + fire(MessageProcessing(message, scheduledTime, clock.instant())) + callback(message, ack) + } + } + } + fire(QueuePolled) + } + + override fun push(message: Message, delay: TemporalAmount) { + jedisCluster.firstFingerprint(queueKey, message.fingerprint()).also { fingerprint -> + if (fingerprint != null) { + log.info("Re-prioritizing message as an identical one is already on the queue: " + + "$fingerprint, message: $message") + jedisCluster.zadd(queueKey, score(delay), fingerprint, zAddParams().xx()) + fire(MessageDuplicate(message)) + } else { + jedisCluster.queueMessage(message, delay) + fire(MessagePushed(message)) + } + } + } + + override fun reschedule(message: Message, delay: TemporalAmount) { + val fingerprint = message.fingerprint().latest + log.debug("Re-scheduling message: $message, fingerprint: $fingerprint to deliver in $delay") + val status: Long = jedisCluster.zadd(queueKey, score(delay), fingerprint, zAddParams().xx()) + if (status.toInt() == 1) { + fire(MessageRescheduled(message)) + } else { + fire(MessageNotFound(message)) + } + } + + override fun ensure(message: Message, delay: TemporalAmount) { + val fingerprint = message.fingerprint() + if (!jedisCluster.anyZismember(queueKey, fingerprint.all) && + !jedisCluster.anyZismember(unackedKey, fingerprint.all)) { + log.debug( + "Pushing ensured message onto queue as it does not exist in queue or unacked sets" + ) + push(message, delay) + } + } + + @Scheduled(fixedDelayString = "\${queue.retry.frequency.ms:10000}") + override fun retry() { + jedisCluster + .zrangeByScore(unackedKey, 0.0, score()) + .let { fingerprints -> + if (fingerprints.size > 0) { + fingerprints + .map { "$locksKey:$it" } + .let { jedisCluster.del(*it.toTypedArray()) } + } + + fingerprints.forEach { fingerprint -> + val attempts = jedisCluster.hgetInt(attemptsKey, fingerprint) + jedisCluster.readMessageWithoutLock(fingerprint) { message -> + val maxAttempts = message.getAttribute()?.maxAttempts ?: 0 + + /* If maxAttempts attribute is set, let poll() handle max retry logic. + If not, check for attempts >= Queue.maxRetries - 1, as attemptsKey is now + only incremented when retrying unacked messages vs. by readMessage*() */ + if (maxAttempts == 0 && attempts >= Queue.maxRetries - 1) { + log.warn("Message $fingerprint with payload $message exceeded max retries") + handleDeadMessage(message) + jedisCluster.removeMessage(fingerprint) + fire(MessageDead) + } else { + if (jedisCluster.zismember(queueKey, fingerprint)) { + jedisCluster + .multi { + zrem(unackedKey, fingerprint) + zadd(queueKey, score(), fingerprint) + hincrBy(attemptsKey, fingerprint, 1L) + } + log.info("Not retrying message $fingerprint because an identical message " + + "is already on the queue") + fire(MessageDuplicate(message)) + } else { + log.warn("Retrying message $fingerprint after $attempts attempts") + jedisCluster.hincrBy(attemptsKey, fingerprint, 1L) + jedisCluster.requeueMessage(fingerprint) + fire(MessageRetried) + } + } + } + } + } + .also { + fire(RetryPolled) + } + } + + override fun readState(): QueueState = + jedisCluster.multi { + zcard(queueKey) + zcount(queueKey, 0.0, score()) + zcard(unackedKey) + hlen(messagesKey) + } + .map { (it as Long).toInt() } + .let { (queued, ready, processing, messages) -> + return QueueState( + depth = queued, + ready = ready, + unacked = processing, + orphaned = messages - (queued + processing) + ) + } + + override fun containsMessage(predicate: (Message) -> Boolean): Boolean { + var found = false + var cursor = "0" + while (!found) { + jedisCluster.hscan(messagesKey, cursor).apply { + found = result + .map { mapper.readValue(it.value) } + .any(predicate) + cursor = stringCursor + } + if (cursor == "0") break + } + return found + } + + internal fun JedisCluster.queueMessage( + message: Message, + delay: TemporalAmount = Duration.ZERO + ) { + val fingerprint = message.fingerprint().latest + + // ensure the message has the attempts tracking attribute + message.setAttribute( + message.getAttribute() ?: AttemptsAttribute() + ) + + multi { + hset(messagesKey, fingerprint, mapper.writeValueAsString(message)) + zadd(queueKey, score(delay), fingerprint) + } + } + + internal fun JedisCluster.requeueMessage(fingerprint: String) { + multi { + zrem(unackedKey, fingerprint) + zadd(queueKey, score(), fingerprint) + } + } + + internal fun JedisCluster.removeMessage(fingerprint: String) { + multi { + zrem(queueKey, fingerprint) + zrem(unackedKey, fingerprint) + hdel(messagesKey, fingerprint) + del("$locksKey:$fingerprint") + hdel(attemptsKey, fingerprint) + } + } + + internal fun JedisCluster.readMessageWithoutLock( + fingerprint: String, + block: (Message) -> Unit + ) { + try { + hget(messagesKey, fingerprint) + .let { + val message = mapper.readValue(runSerializationMigration(it)) + block.invoke(message) + } + } catch (e: IOException) { + log.error("Failed to read unacked message $fingerprint, requeuing...", e) + hincrBy(attemptsKey, fingerprint, 1L) + requeueMessage(fingerprint) + } catch (e: JsonParseException) { + log.error("Payload for unacked message $fingerprint is missing or corrupt", e) + removeMessage(fingerprint) + } + } + + internal fun JedisCluster.readMessageWithLock(): Triple? { + try { + val response = evalsha(readMessageWithLockScriptSha, listOf( + queueKey, + unackedKey, + locksKey, + messagesKey + ), listOf( + score().toString(), + 10.toString(), // TODO rz - make this configurable. + lockTtlSeconds.toString(), + java.lang.String.format(Locale.US, "%f", score(ackTimeout)), + java.lang.String.format(Locale.US, "%f", score()) + )) + if (response is List<*>) { + return Triple( + response[0].toString(), // fingerprint + Instant.ofEpochMilli(response[1].toString().toLong()), // fingerprintScore + response[2]?.toString() // message + ) + } + if (response == "ReadLockFailed") { + // This isn't a "bad" thing, but means there's more work than keiko can process in a cycle + // in this case, but may be a signal to tune `peekFingerprintCount` + fire(LockFailed) + } + } catch (e: JedisDataException) { + if ((e.message ?: "").startsWith("NOSCRIPT")) { + cacheScript() + return readMessageWithLock() + } else { + throw e + } + } + return null + } + + internal fun JedisCluster.readMessage( + fingerprint: String, + json: String?, + block: (Message) -> Unit + ) { + if (json == null) { + log.error("Payload for message $fingerprint is missing") + // clean up what is essentially an unrecoverable message + removeMessage(fingerprint) + } else { + try { + val message = mapper.readValue(runSerializationMigration(json)) + .apply { + val currentAttempts = (getAttribute() ?: AttemptsAttribute()) + .run { copy(attempts = attempts + 1) } + setAttribute(currentAttempts) + } + + hset(messagesKey, fingerprint, mapper.writeValueAsString(message)) + + block.invoke(message) + } catch (e: IOException) { + log.error("Failed to read message $fingerprint, requeuing...", e) + hincrBy(attemptsKey, fingerprint, 1L) + requeueMessage(fingerprint) + } + } + } + + fun JedisCluster.multi(block: Transaction.() -> Unit) = + getConnectionFromSlot(JedisClusterCRC16.getSlot(queueKey)) + .use { c -> + c.multi() + .let { tx -> + tx.block() + tx.exec() + } + } + + private fun ackMessage(fingerprint: String) { + if (jedisCluster.zismember(queueKey, fingerprint)) { + // only remove this message from the unacked queue as a matching one has + // been put on the main queue + jedisCluster.multi { + zrem(unackedKey, fingerprint) + del("$locksKey:$fingerprint") + } + } else { + jedisCluster.removeMessage(fingerprint) + } + fire(MessageAcknowledged) + } +} diff --git a/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/RedisQueue.kt b/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/RedisQueue.kt index 8fd804d..d444e33 100644 --- a/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/RedisQueue.kt +++ b/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/RedisQueue.kt @@ -18,8 +18,6 @@ package com.netflix.spinnaker.q.redis import com.fasterxml.jackson.core.JsonParseException import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.databind.SerializationFeature -import com.google.common.hash.Hashing import com.netflix.spinnaker.KotlinOpen import com.netflix.spinnaker.q.AttemptsAttribute import com.netflix.spinnaker.q.DeadMessageCallback @@ -36,7 +34,6 @@ import com.netflix.spinnaker.q.metrics.MessageProcessing import com.netflix.spinnaker.q.metrics.MessagePushed import com.netflix.spinnaker.q.metrics.MessageRescheduled import com.netflix.spinnaker.q.metrics.MessageRetried -import com.netflix.spinnaker.q.metrics.MonitorableQueue import com.netflix.spinnaker.q.metrics.QueuePolled import com.netflix.spinnaker.q.metrics.QueueState import com.netflix.spinnaker.q.metrics.RetryPolled @@ -47,19 +44,13 @@ import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.scheduling.annotation.Scheduled import redis.clients.jedis.Jedis -import redis.clients.jedis.JedisCommands import redis.clients.jedis.ScriptingCommands -import redis.clients.jedis.Transaction import redis.clients.jedis.exceptions.JedisDataException import redis.clients.jedis.params.sortedset.ZAddParams.zAddParams import redis.clients.util.Pool import java.io.IOException -import java.lang.String.format -import java.nio.charset.Charset -import java.nio.charset.StandardCharsets import java.time.Clock import java.time.Duration -import java.time.Duration.ZERO import java.time.Instant import java.time.temporal.TemporalAmount import java.util.Locale @@ -76,29 +67,32 @@ class RedisQueue( override val ackTimeout: TemporalAmount = Duration.ofMinutes(1), override val deadMessageHandlers: List, override val publisher: EventPublisher -) : MonitorableQueue { - - private val log: Logger = LoggerFactory.getLogger(javaClass) - - private val queueKey = "$queueName.queue" - private val unackedKey = "$queueName.unacked" - private val messagesKey = "$queueName.messages" - private val locksKey = "$queueName.locks" - private val attemptsKey = "$queueName.attempts" - - // Internal ObjectMapper that enforces deterministic property ordering for use only in hashing. - private val hashObjectMapper = ObjectMapper().copy().apply { - enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS) - } - - private lateinit var readMessageWithLockScriptSha: String +) : AbstractRedisQueue( + clock, + lockTtlSeconds, + mapper, + serializationMigrator, + ackTimeout, + deadMessageHandlers, + publisher +) { + + final override val log: Logger = LoggerFactory.getLogger(javaClass) + + override val queueKey = "$queueName.queue" + override val unackedKey = "$queueName.unacked" + override val messagesKey = "$queueName.messages" + override val locksKey = "$queueName.locks" + override val attemptsKey = "$queueName.attempts" + + override lateinit var readMessageWithLockScriptSha: String init { cacheScript() log.info("Configured queue: $queueName") } - fun cacheScript() { + final override fun cacheScript() { pool.resource.use { redis -> readMessageWithLockScriptSha = redis.scriptLoad(READ_MESSAGE_WITH_LOCK_SRC) } @@ -277,7 +271,7 @@ class RedisQueue( } } - private fun Jedis.queueMessage(message: Message, delay: TemporalAmount = ZERO) { + internal fun Jedis.queueMessage(message: Message, delay: TemporalAmount = Duration.ZERO) { val fingerprint = message.fingerprint().latest // ensure the message has the attempts tracking attribute @@ -291,14 +285,14 @@ class RedisQueue( } } - private fun Jedis.requeueMessage(fingerprint: String) { + internal fun Jedis.requeueMessage(fingerprint: String) { multi { zrem(unackedKey, fingerprint) zadd(queueKey, score(), fingerprint) } } - private fun Jedis.removeMessage(fingerprint: String) { + internal fun Jedis.removeMessage(fingerprint: String) { multi { zrem(queueKey, fingerprint) zrem(unackedKey, fingerprint) @@ -308,7 +302,7 @@ class RedisQueue( } } - private fun Jedis.readMessageWithoutLock(fingerprint: String, block: (Message) -> Unit) { + internal fun Jedis.readMessageWithoutLock(fingerprint: String, block: (Message) -> Unit) { try { hget(messagesKey, fingerprint) .let { @@ -325,7 +319,7 @@ class RedisQueue( } } - private fun ScriptingCommands.readMessageWithLock(): Triple? { + internal fun ScriptingCommands.readMessageWithLock(): Triple? { try { val response = evalsha(readMessageWithLockScriptSha, listOf( queueKey, @@ -336,8 +330,8 @@ class RedisQueue( score().toString(), 10.toString(), // TODO rz - make this configurable. lockTtlSeconds.toString(), - format(Locale.US, "%f", score(ackTimeout)), - format(Locale.US, "%f", score()) + java.lang.String.format(Locale.US, "%f", score(ackTimeout)), + java.lang.String.format(Locale.US, "%f", score()) )) if (response is List<*>) { return Triple( @@ -367,7 +361,7 @@ class RedisQueue( * [block]. If it's not accessible for whatever reason any references are * cleaned up. */ - private fun Jedis.readMessage(fingerprint: String, json: String?, block: (Message) -> Unit) { + internal fun Jedis.readMessage(fingerprint: String, json: String?, block: (Message) -> Unit) { if (json == null) { log.error("Payload for message $fingerprint is missing") // clean up what is essentially an unrecoverable message @@ -391,141 +385,4 @@ class RedisQueue( } } } - - private fun runSerializationMigration(json: String): String { - if (serializationMigrator.isPresent) { - return serializationMigrator.get().migrate(json) - } - return json - } - - private fun handleDeadMessage(message: Message) { - deadMessageHandlers.forEach { - it.invoke(this, message) - } - } - - /** - * @return current time (plus optional [delay]) converted to a score for a - * Redis sorted set. - */ - private fun score(delay: TemporalAmount = ZERO) = - clock.instant().plus(delay).toEpochMilli().toDouble() - - private inline fun ObjectMapper.readValue(content: String): R = - readValue(content, R::class.java) - - private fun Jedis.multi(block: Transaction.() -> Unit) = - multi().use { tx -> - tx.block() - tx.exec() - } - - private fun JedisCommands.hgetInt(key: String, field: String, default: Int = 0) = - hget(key, field)?.toInt() ?: default - - private fun JedisCommands.zismember(key: String, member: String) = - zrank(key, member) != null - - private fun JedisCommands.anyZismember(key: String, members: Set) = - members.any { zismember(key, it) } - - private fun JedisCommands.firstFingerprint(key: String, fingerprint: Fingerprint) = - fingerprint.all.firstOrNull { zismember(key, it) } - - @Deprecated("Hashes the attributes property, which is mutable") - private fun Message.hashV1() = - Hashing - .murmur3_128() - .hashString(toString(), Charset.defaultCharset()) - .toString() - - private fun Message.hashV2() = - hashObjectMapper.convertValue(this, MutableMap::class.java) - .apply { remove("attributes") } - .let { - Hashing - .murmur3_128() - .hashString("v2:${hashObjectMapper.writeValueAsString(it)}", StandardCharsets.UTF_8) - .toString() - } - - private fun Message.fingerprint() = - hashV2().let { Fingerprint(latest = it, all = setOf(it, hashV1())) } - - internal data class Fingerprint( - val latest: String, - val all: Set = setOf() - ) } - -private const val READ_MESSAGE_SRC = """ - local java_scientific = function(x) - return string.format("%.12E", x):gsub("\+", "") - end - - -- get the message, move the fingerprint to the unacked queue and return - local message = redis.call("HGET", messagesKey, fingerprint) - - -- check for an ack timeout override on the message - local unackScore = unackDefaultScore - if type(message) == "string" and message ~= nil then - local ackTimeoutOverride = tonumber(cjson.decode(message)["ackTimeoutMs"]) - if ackTimeoutOverride ~= nil and unackBaseScore ~= nil then - unackScore = unackBaseScore + ackTimeoutOverride - end - end - - unackScore = java_scientific(unackScore) - - redis.call("ZREM", queueKey, fingerprint) - redis.call("ZADD", unackKey, unackScore, fingerprint) -""" - -/* ktlint-disable max-line-length */ -private const val READ_MESSAGE_WITH_LOCK_SRC = """ - local queueKey = KEYS[1] - local unackKey = KEYS[2] - local lockKey = KEYS[3] - local messagesKey = KEYS[4] - local maxScore = ARGV[1] - local peekFingerprintCount = ARGV[2] - local lockTtlSeconds = ARGV[3] - local unackDefaultScore = ARGV[4] - local unackBaseScore = ARGV[5] - - local not_empty = function(x) - return (type(x) == "table") and (not x.err) and (#x ~= 0) - end - - local acquire_lock = function(fingerprints, locksKey, lockTtlSeconds) - if not_empty(fingerprints) then - local i=1 - while (i <= #fingerprints) do - redis.call("ECHO", "attempting lock on " .. fingerprints[i]) - if redis.call("SET", locksKey .. ":" .. fingerprints[i], "\uD83D\uDD12", "EX", lockTtlSeconds, "NX") then - redis.call("ECHO", "acquired lock on " .. fingerprints[i]) - return fingerprints[i], fingerprints[i+1] - end - i=i+2 - end - end - return nil, nil - end - - -- acquire a lock on a fingerprint - local fingerprints = redis.call("ZRANGEBYSCORE", queueKey, 0.0, maxScore, "WITHSCORES", "LIMIT", 0, peekFingerprintCount) - local fingerprint, fingerprintScore = acquire_lock(fingerprints, lockKey, lockTtlSeconds) - - -- no lock could be acquired - if fingerprint == nil then - if #fingerprints == 0 then - return "NoReadyMessages" - end - return "AcquireLockFailed" - end - - $READ_MESSAGE_SRC - - return {fingerprint, fingerprintScore, message} -"""