Skip to content
This repository has been archived by the owner on Oct 8, 2020. It is now read-only.

Commit

Permalink
fix(redis): fix dlq logic in unacked retry handler (#42)
Browse files Browse the repository at this point in the history
  • Loading branch information
asher authored Aug 14, 2018
1 parent 7528847 commit e451c68
Showing 1 changed file with 40 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.netflix.spinnaker.q.redis

import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.SerializationFeature
import com.google.common.hash.Hashing
Expand Down Expand Up @@ -176,35 +177,34 @@ class RedisQueue(

fingerprints.forEach { fingerprint ->
val attempts = redis.hgetInt(attemptsKey, fingerprint)
if (attempts >= Queue.maxRetries) {
redis.readMessageWithoutLock(fingerprint) { message ->
redis.readMessageWithoutLock(fingerprint) { message ->
val maxAttempts = message.getAttribute<MaxAttemptsAttribute>()?.maxAttempts ?: 0

/* If maxAttempts attribute is set, let poll() handle max retry logic.
If not, check for attempts >= Queue.maxRetries - 1, as attemptsKey is now
only incremented when retrying unacked messages vs. by readMessage*() */
if (maxAttempts == 0 && attempts >= Queue.maxRetries - 1) {
log.warn("Message $fingerprint with payload $message exceeded max retries")
handleDeadMessage(message)
redis.removeMessage(fingerprint)
}
fire(MessageDead)
} else {
if (redis.zismember(queueKey, fingerprint)) {
redis
.multi {
zrem(unackedKey, fingerprint)
zadd(queueKey, score(), fingerprint)
// we only need to read the message for metrics purposes
hget(messagesKey, fingerprint)
}
.let { (_, _, json) ->
mapper
.readValue<Message>(runSerializationMigration(json as String))
.let { message ->
log.warn("Not retrying message $fingerprint because an identical message " +
"is already on the queue")
fire(MessageDuplicate(message))
}
}
fire(MessageDead)
} else {
log.warn("Retrying message $fingerprint after $attempts attempts")
redis.requeueMessage(fingerprint)
fire(MessageRetried)
if (redis.zismember(queueKey, fingerprint)) {
redis
.multi {
zrem(unackedKey, fingerprint)
zadd(queueKey, score(), fingerprint)
hincrBy(attemptsKey, fingerprint, 1L)
}
log.warn("Not retrying message $fingerprint because an identical message " +
"is already on the queue")
fire(MessageDuplicate(message))
} else {
log.warn("Retrying message $fingerprint after $attempts attempts")
redis.hincrBy(attemptsKey, fingerprint, 1L)
redis.requeueMessage(fingerprint)
fire(MessageRetried)
}
}
}
}
Expand Down Expand Up @@ -295,27 +295,24 @@ class RedisQueue(
zrem(unackedKey, fingerprint)
hdel(messagesKey, fingerprint)
del("$locksKey:$fingerprint")

// TODO: use AttemptAttribute instead
hdel(attemptsKey, fingerprint)
}
}

private fun Jedis.readMessageWithoutLock(fingerprint: String, block: (Message) -> Unit) {
eval(READ_MESSAGE_WITHOUT_LOCK,
listOf(
queueKey,
unackedKey,
messagesKey,
attemptsKey
),
listOf(
fingerprint,
format("%f", score(ackTimeout)),
format("%f", score())
)
).let {
readMessage(fingerprint, it as String?, block)
try {
hget(messagesKey, fingerprint)
.let {
val message = mapper.readValue<Message>(runSerializationMigration(it))
block.invoke(message)
}
} catch (e: IOException) {
log.error("Failed to read unacked message $fingerprint, requeuing...", e)
hincrBy(attemptsKey, fingerprint, 1L)
requeueMessage(fingerprint)
} catch (e: JsonParseException) {
log.error("Payload for unacked message $fingerprint is missing or corrupt", e)
removeMessage(fingerprint)
}
}

Expand All @@ -324,8 +321,7 @@ class RedisQueue(
queueKey,
unackedKey,
locksKey,
messagesKey,
attemptsKey
messagesKey
), listOf(
score().toString(),
10.toString(), // TODO rz - make this configurable.
Expand Down Expand Up @@ -362,7 +358,6 @@ class RedisQueue(
try {
val message = mapper.readValue<Message>(runSerializationMigration(json))
.apply {
// TODO: AttemptsAttribute could replace `attemptsKey`
val currentAttempts = (getAttribute() ?: AttemptsAttribute())
.run { copy(attempts = attempts + 1) }
setAttribute(currentAttempts)
Expand All @@ -373,6 +368,7 @@ class RedisQueue(
block.invoke(message)
} catch (e: IOException) {
log.error("Failed to read message $fingerprint, requeuing...", e)
hincrBy(attemptsKey, fingerprint, 1L)
requeueMessage(fingerprint)
}
}
Expand Down Expand Up @@ -466,7 +462,6 @@ private const val READ_MESSAGE = """
redis.call("ZREM", queueKey, fingerprint)
redis.call("ZADD", unackKey, unackScore, fingerprint)
redis.call("HINCRBY", attemptsKey, fingerprint, 1)
"""

/* ktlint-disable max-line-length */
Expand All @@ -475,7 +470,6 @@ private const val READ_MESSAGE_WITH_LOCK = """
local unackKey = KEYS[2]
local lockKey = KEYS[3]
local messagesKey = KEYS[4]
local attemptsKey = KEYS[5]
local maxScore = ARGV[1]
local peekFingerprintCount = ARGV[2]
local lockTtlSeconds = ARGV[3]
Expand Down Expand Up @@ -517,19 +511,3 @@ private const val READ_MESSAGE_WITH_LOCK = """
return {fingerprint, fingerprintScore, message}
"""
/* ktlint-enable max-line-length */

// TODO rz - I don't think we should be incrementing attempts here...
private const val READ_MESSAGE_WITHOUT_LOCK = """
local queueKey = KEYS[1]
local unackKey = KEYS[2]
local messagesKey = KEYS[3]
local attemptsKey = KEYS[4]
local fingerprint = ARGV[1]
local unackDefaultScore = ARGV[2]
local unackBaseScore = ARGV[3]
$READ_MESSAGE
return message
"""

0 comments on commit e451c68

Please sign in to comment.