Skip to content
This repository has been archived by the owner on Oct 8, 2020. It is now read-only.

Commit

Permalink
feat(redis): redis cluster support (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
asher committed Jul 19, 2019
1 parent 87e5670 commit abaed37
Show file tree
Hide file tree
Showing 7 changed files with 693 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,21 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.KotlinModule
import com.netflix.spinnaker.q.metrics.EventPublisher
import com.netflix.spinnaker.q.migration.SerializationMigrator
import com.netflix.spinnaker.q.redis.RedisClusterDeadMessageHandler
import com.netflix.spinnaker.q.redis.RedisClusterQueue
import com.netflix.spinnaker.q.redis.RedisDeadMessageHandler
import com.netflix.spinnaker.q.redis.RedisQueue
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import redis.clients.jedis.HostAndPort
import redis.clients.jedis.Jedis
import redis.clients.jedis.JedisCluster
import redis.clients.jedis.JedisPool
import redis.clients.jedis.Protocol
import redis.clients.util.Pool
Expand All @@ -43,11 +48,19 @@ import java.util.Optional
@EnableConfigurationProperties(RedisQueueProperties::class)
class RedisQueueConfiguration {

@Bean @ConditionalOnMissingBean(GenericObjectPoolConfig::class)
fun redisPoolConfig() = GenericObjectPoolConfig<Any>()
@Bean
@ConditionalOnMissingBean(GenericObjectPoolConfig::class)
fun redisPoolConfig() = GenericObjectPoolConfig<Any>().apply {
blockWhenExhausted = false
maxWaitMillis = 2000
}

@Bean
@ConditionalOnMissingBean(name = ["queueRedisPool"])
@ConditionalOnProperty(
value = ["redis.cluster-enabled"],
havingValue = "false",
matchIfMissing = true)
fun queueRedisPool(
@Value("\${redis.connection:redis://localhost:6379}") connection: String,
@Value("\${redis.timeout:2000}") timeout: Int,
Expand All @@ -66,6 +79,10 @@ class RedisQueueConfiguration {

@Bean
@ConditionalOnMissingBean(name = ["queue"])
@ConditionalOnProperty(
value = ["redis.cluster-enabled"],
havingValue = "false",
matchIfMissing = true)
fun queue(
@Qualifier("queueRedisPool") redisPool: Pool<Jedis>,
redisQueueProperties: RedisQueueProperties,
Expand All @@ -88,6 +105,10 @@ class RedisQueueConfiguration {

@Bean
@ConditionalOnMissingBean(name = ["redisDeadMessageHandler"])
@ConditionalOnProperty(
value = ["redis.cluster-enabled"],
havingValue = "false",
matchIfMissing = true)
fun redisDeadMessageHandler(
@Qualifier("queueRedisPool") redisPool: Pool<Jedis>,
redisQueueProperties: RedisQueueProperties,
Expand All @@ -99,6 +120,65 @@ class RedisQueueConfiguration {
clock = clock
)

@Bean
@ConditionalOnMissingBean(name = ["queueRedisCluster"])
@ConditionalOnProperty(value = ["redis.cluster-enabled"])
fun queueRedisCluster(
@Value("\${redis.connection:redis://localhost:6379}") connection: String,
@Value("\${redis.timeout:2000}") timeout: Int,
@Value("\${redis.maxattempts:4}") maxAttempts: Int,
redisPoolConfig: GenericObjectPoolConfig<Any>
): JedisCluster {
URI.create(connection).let { cx ->
val port = if (cx.port == -1) Protocol.DEFAULT_PORT else cx.port
val password = cx.userInfo?.substringAfter(":")
return JedisCluster(
HostAndPort(cx.host, port),
timeout,
timeout,
maxAttempts,
password,
redisPoolConfig
)
}
}

@Bean
@ConditionalOnMissingBean(name = ["queue", "clusterQueue"])
@ConditionalOnProperty(value = ["redis.cluster-enabled"])
fun clusterQueue(
@Qualifier("queueRedisCluster") cluster: JedisCluster,
redisQueueProperties: RedisQueueProperties,
clock: Clock,
deadMessageHandler: RedisClusterDeadMessageHandler,
publisher: EventPublisher,
redisQueueObjectMapper: ObjectMapper,
serializationMigrator: Optional<SerializationMigrator>
) =
RedisClusterQueue(
queueName = redisQueueProperties.queueName,
jedisCluster = cluster,
clock = clock,
mapper = redisQueueObjectMapper,
deadMessageHandlers = listOf(deadMessageHandler),
publisher = publisher,
ackTimeout = Duration.ofSeconds(redisQueueProperties.ackTimeoutSeconds.toLong()),
serializationMigrator = serializationMigrator
)

@Bean
@ConditionalOnMissingBean(name = ["redisClusterDeadMessageHandler"])
@ConditionalOnProperty(value = ["redis.cluster-enabled"])
fun redisClusterDeadMessageHandler(
@Qualifier("queueRedisCluster") cluster: JedisCluster,
redisQueueProperties: RedisQueueProperties,
clock: Clock
) = RedisClusterDeadMessageHandler(
deadLetterQueueName = redisQueueProperties.deadLetterQueueName,
jedisCluster = cluster,
clock = clock
)

@Bean
@ConditionalOnMissingBean
fun redisQueueObjectMapper(properties: Optional<ObjectMapperSubtypeProperties>): ObjectMapper =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ class RedisQueueProperties {
var queueName: String = "keiko.queue"
var deadLetterQueueName: String = "keiko.queue.deadLetters"
var ackTimeoutSeconds: Int = 60
var shards: Int = 1
}
4 changes: 3 additions & 1 deletion keiko-redis/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ apply from: "$rootDir/gradle/spek.gradle"

dependencies {
api project(":keiko-core")
api "redis.clients:jedis"
api("redis.clients:jedis:2.10.2") {
force = true
}
api "com.fasterxml.jackson.core:jackson-databind"
api "com.fasterxml.jackson.module:jackson-module-kotlin"
api "org.funktionale:funktionale-partials"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package com.netflix.spinnaker.q.redis

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.SerializationFeature
import com.google.common.hash.Hashing
import com.netflix.spinnaker.q.DeadMessageCallback
import com.netflix.spinnaker.q.Message
import com.netflix.spinnaker.q.metrics.EventPublisher
import com.netflix.spinnaker.q.metrics.MonitorableQueue
import com.netflix.spinnaker.q.migration.SerializationMigrator
import org.slf4j.Logger
import redis.clients.jedis.Jedis
import redis.clients.jedis.JedisCommands
import redis.clients.jedis.Transaction
import java.nio.charset.Charset
import java.nio.charset.StandardCharsets
import java.time.Clock
import java.time.Duration
import java.time.temporal.TemporalAmount
import java.util.Optional

abstract class AbstractRedisQueue(
private val clock: Clock,
private val lockTtlSeconds: Int = 10,
private val mapper: ObjectMapper,
private val serializationMigrator: Optional<SerializationMigrator>,
override val ackTimeout: TemporalAmount = Duration.ofMinutes(1),
override val deadMessageHandlers: List<DeadMessageCallback>,
override val publisher: EventPublisher

) : MonitorableQueue {
internal abstract val queueKey: String
internal abstract val unackedKey: String
internal abstract val messagesKey: String
internal abstract val locksKey: String
internal abstract val attemptsKey: String

internal abstract val log: Logger

// Internal ObjectMapper that enforces deterministic property ordering for use only in hashing.
private val hashObjectMapper = ObjectMapper().copy().apply {
enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS)
}

abstract fun cacheScript()
abstract var readMessageWithLockScriptSha: String

internal fun runSerializationMigration(json: String): String {
if (serializationMigrator.isPresent) {
return serializationMigrator.get().migrate(json)
}
return json
}

internal fun handleDeadMessage(message: Message) {
deadMessageHandlers.forEach {
it.invoke(this, message)
}
}

/**
* @return current time (plus optional [delay]) converted to a score for a
* Redis sorted set.
*/
internal fun score(delay: TemporalAmount = Duration.ZERO) =
clock.instant().plus(delay).toEpochMilli().toDouble()

internal inline fun <reified R> ObjectMapper.readValue(content: String): R =
readValue(content, R::class.java)

internal fun Jedis.multi(block: Transaction.() -> Unit) =
multi().use { tx ->
tx.block()
tx.exec()
}

internal fun JedisCommands.hgetInt(key: String, field: String, default: Int = 0) =
hget(key, field)?.toInt() ?: default

internal fun JedisCommands.zismember(key: String, member: String) =
zrank(key, member) != null

internal fun JedisCommands.anyZismember(key: String, members: Set<String>) =
members.any { zismember(key, it) }

internal fun JedisCommands.firstFingerprint(key: String, fingerprint: Fingerprint) =
fingerprint.all.firstOrNull { zismember(key, it) }

@Deprecated("Hashes the attributes property, which is mutable")
internal fun Message.hashV1() =
Hashing
.murmur3_128()
.hashString(toString(), Charset.defaultCharset())
.toString()

internal fun Message.hashV2() =
hashObjectMapper.convertValue(this, MutableMap::class.java)
.apply { remove("attributes") }
.let {
Hashing
.murmur3_128()
.hashString("v2:${hashObjectMapper.writeValueAsString(it)}", StandardCharsets.UTF_8)
.toString()
}

internal fun Message.fingerprint() =
hashV2().let { Fingerprint(latest = it, all = setOf(it, hashV1())) }

internal data class Fingerprint(
val latest: String,
val all: Set<String> = setOf()
)
}

internal const val READ_MESSAGE_SRC = """
local java_scientific = function(x)
return string.format("%.12E", x):gsub("\+", "")
end
-- get the message, move the fingerprint to the unacked queue and return
local message = redis.call("HGET", messagesKey, fingerprint)
-- check for an ack timeout override on the message
local unackScore = unackDefaultScore
if type(message) == "string" and message ~= nil then
local ackTimeoutOverride = tonumber(cjson.decode(message)["ackTimeoutMs"])
if ackTimeoutOverride ~= nil and unackBaseScore ~= nil then
unackScore = unackBaseScore + ackTimeoutOverride
end
end
unackScore = java_scientific(unackScore)
redis.call("ZREM", queueKey, fingerprint)
redis.call("ZADD", unackKey, unackScore, fingerprint)
"""

/* ktlint-disable max-line-length */
internal const val READ_MESSAGE_WITH_LOCK_SRC = """
local queueKey = KEYS[1]
local unackKey = KEYS[2]
local lockKey = KEYS[3]
local messagesKey = KEYS[4]
local maxScore = ARGV[1]
local peekFingerprintCount = ARGV[2]
local lockTtlSeconds = ARGV[3]
local unackDefaultScore = ARGV[4]
local unackBaseScore = ARGV[5]
local not_empty = function(x)
return (type(x) == "table") and (not x.err) and (#x ~= 0)
end
local acquire_lock = function(fingerprints, locksKey, lockTtlSeconds)
if not_empty(fingerprints) then
local i=1
while (i <= #fingerprints) do
redis.call("ECHO", "attempting lock on " .. fingerprints[i])
if redis.call("SET", locksKey .. ":" .. fingerprints[i], "\uD83D\uDD12", "EX", lockTtlSeconds, "NX") then
redis.call("ECHO", "acquired lock on " .. fingerprints[i])
return fingerprints[i], fingerprints[i+1]
end
i=i+2
end
end
return nil, nil
end
-- acquire a lock on a fingerprint
local fingerprints = redis.call("ZRANGEBYSCORE", queueKey, 0.0, maxScore, "WITHSCORES", "LIMIT", 0, peekFingerprintCount)
local fingerprint, fingerprintScore = acquire_lock(fingerprints, lockKey, lockTtlSeconds)
-- no lock could be acquired
if fingerprint == nil then
if #fingerprints == 0 then
return "NoReadyMessages"
end
return "AcquireLockFailed"
end
$READ_MESSAGE_SRC
return {fingerprint, fingerprintScore, message}
"""
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.netflix.spinnaker.q.redis

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.KotlinModule
import com.netflix.spinnaker.q.DeadMessageCallback
import com.netflix.spinnaker.q.Message
import com.netflix.spinnaker.q.Queue
import redis.clients.jedis.JedisCluster
import java.time.Clock

class RedisClusterDeadMessageHandler(
deadLetterQueueName: String,
private val jedisCluster: JedisCluster,
private val clock: Clock
) : DeadMessageCallback {

private val dlqKey = "{$deadLetterQueueName}.messages"

private val mapper = ObjectMapper().registerModule(KotlinModule())

override fun invoke(queue: Queue, message: Message) {
jedisCluster.use { cluster ->
val score = clock.instant().toEpochMilli().toDouble()
cluster.zadd(dlqKey, score, mapper.writeValueAsString(message))
}
}
}
Loading

0 comments on commit abaed37

Please sign in to comment.