Skip to content

Commit

Permalink
fix(queue): re-prioritize message on queue if an identical one is pushed
Browse files Browse the repository at this point in the history
  • Loading branch information
robfletcher committed Sep 18, 2017
1 parent a40d508 commit 7ce0f61
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ class RedisQueue(
pool.resource.use { redis ->
val fingerprint = message.hash()
if (redis.zismember(queueKey, fingerprint)) {
log.warn("Ignoring message as an identical one is already on the queue: $fingerprint, message: $message")
log.warn("Re-prioritizing message as an identical one is already on the queue: $fingerprint, message: $message")
redis.zadd(queueKey, score(delay), fingerprint)
fire<MessageDuplicate>(message)
} else {
redis.queueMessage(message, delay)
Expand Down Expand Up @@ -135,10 +136,11 @@ class RedisQueue(
redis
.multi {
zrem(unackedKey, fingerprint)
zadd(queueKey, score(), fingerprint)
// we only need to read the message for metrics purposes
hget(messagesKey, fingerprint)
}
.let { (_, json) ->
.let { (_, _, json) ->
mapper
.readValue<Message>(json as String)
.let { message ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ object RedisQueueTest : QueueTest<RedisQueue>(createQueue(p3 = null), ::shutdown

object RedisMonitorableQueueTest : MonitorableQueueTest<RedisQueue>(
createQueue,
RedisQueue::retry,
::shutdownCallback
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,12 @@ class InMemoryQueue(
}

override fun push(message: Message, delay: TemporalAmount) {
if (queue.none { it.payload == message }) {
queue.put(Envelope(message, clock.instant().plus(delay), clock))
fire<MessagePushed>(message)
} else {
val existed = queue.removeIf { it.payload == message }
queue.put(Envelope(message, clock.instant().plus(delay), clock))
if (existed) {
fire<MessageDuplicate>(message)
} else {
fire<MessagePushed>(message)
}
}

Expand All @@ -79,12 +80,13 @@ class InMemoryQueue(
deadMessageHandler.invoke(this, message.payload)
fire<MessageDead>()
} else {
if (queue.none { it.payload == message.payload }) {
log.warn("redelivering unacked message ${message.payload}")
queue.put(message.copy(scheduledTime = now, count = message.count + 1))
fire<MessageRetried>()
} else {
val existed = queue.removeIf { it.payload == message.payload }
log.warn("redelivering unacked message ${message.payload}")
queue.put(message.copy(scheduledTime = now, count = message.count + 1))
if (existed) {
fire<MessageDuplicate>(message.payload)
} else {
fire<MessageRetried>()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.threeten.extra.Hours
import java.io.Closeable
import java.time.Clock
import java.time.Duration
Expand Down Expand Up @@ -105,7 +106,7 @@ abstract class QueueTest<out Q : Queue>(
afterGroup(::resetMocks)

on("polling the queue twice") {
queue!!.apply {
with(queue!!) {
poll(callback)
poll(callback)
}
Expand Down Expand Up @@ -169,7 +170,7 @@ abstract class QueueTest<out Q : Queue>(

beforeGroup {
queue = createQueue(clock, deadLetterCallback)
queue!!.apply {
with(queue!!) {
push(message)
poll { _, ack ->
ack()
Expand All @@ -181,7 +182,7 @@ abstract class QueueTest<out Q : Queue>(
afterGroup(::resetMocks)

on("polling the queue after the message acknowledgment has timed out") {
queue!!.apply {
with(queue!!) {
clock.incrementBy(ackTimeout)
retry()
poll(callback)
Expand All @@ -198,7 +199,7 @@ abstract class QueueTest<out Q : Queue>(

beforeGroup {
queue = createQueue(clock, deadLetterCallback)
queue!!.apply {
with(queue!!) {
push(message)
poll { _, _ -> }
}
Expand All @@ -208,7 +209,7 @@ abstract class QueueTest<out Q : Queue>(
afterGroup(::resetMocks)

on("polling the queue after the message acknowledgment has timed out") {
queue!!.apply {
with(queue!!) {
clock.incrementBy(ackTimeout)
retry()
poll(callback)
Expand All @@ -225,7 +226,7 @@ abstract class QueueTest<out Q : Queue>(

beforeGroup {
queue = createQueue(clock, deadLetterCallback)
queue!!.apply {
with(queue!!) {
push(message)
repeat(2) {
poll { _, _ -> }
Expand All @@ -239,7 +240,7 @@ abstract class QueueTest<out Q : Queue>(
afterGroup(::resetMocks)

on("polling the queue again") {
queue!!.apply {
with(queue!!) {
poll(callback)
}
}
Expand All @@ -254,7 +255,7 @@ abstract class QueueTest<out Q : Queue>(

beforeGroup {
queue = createQueue(clock, deadLetterCallback)
queue!!.apply {
with(queue!!) {
push(message)
repeat(maxRetries) {
poll { _, _ -> }
Expand All @@ -268,7 +269,7 @@ abstract class QueueTest<out Q : Queue>(
afterGroup(::resetMocks)

on("polling the queue again") {
queue!!.apply {
with(queue!!) {
poll(callback)
}
}
Expand All @@ -283,7 +284,7 @@ abstract class QueueTest<out Q : Queue>(

and("the message has been dead-lettered") {
on("the next time retry checks happen") {
queue!!.apply {
with(queue!!) {
retry()
poll(callback)
}
Expand All @@ -304,6 +305,34 @@ abstract class QueueTest<out Q : Queue>(
given("a message was pushed") {
val message = StartExecution(Pipeline::class.java, "1", "foo")

and("a duplicate is pushed with a newer delivery time") {
val delay = Hours.of(1)

beforeGroup {
queue = createQueue(clock, deadLetterCallback).apply {
push(message, delay)
push(message.copy())
}
}

afterGroup(::stopQueue)
afterGroup(::resetMocks)

on("polling the queue") {
queue!!.poll(callback)
}

it("delivers the message immediately and only once") {
verify(callback).invoke(eq(message), any())
}

it("does not hold on to the first message") {
clock.incrementBy(delay)
queue!!.poll(callback)
verifyNoMoreInteractions(callback)
}
}

and("a different message is pushed before acknowledging the first") {
val newMessage = message.copy(executionId = "2")

Expand Down Expand Up @@ -374,6 +403,40 @@ abstract class QueueTest<out Q : Queue>(
}
}

and("another identical message is pushed with a delay and the first is never acknowledged") {
val delay = Hours.of(1)

beforeGroup {
queue = createQueue(clock, deadLetterCallback).apply {
push(message)
poll { _, ack ->
push(message.copy(), delay)
}
}
}

afterGroup(::stopQueue)
afterGroup(::resetMocks)

on("polling the queue again after the first message times out") {
with(queue!!) {
clock.incrementBy(ackTimeout)
retry()
poll(callback)
}
}

it("re-queued the message for immediate delivery") {
verify(callback).invoke(eq(message), any())
}

it("discarded the delayed message") {
clock.incrementBy(delay)
queue!!.poll(callback)
verifyNoMoreInteractions(callback)
}
}

and("another identical message is pushed after acknowledging the first") {
beforeGroup {
queue = createQueue(clock, deadLetterCallback).apply {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ import java.time.Clock
object InMemoryQueueTest : QueueTest<InMemoryQueue>(createQueue(p3 = null))

object InMemoryMonitorableQueueTest : MonitorableQueueTest<InMemoryQueue>(
createQueue,
InMemoryQueue::retry
createQueue
)

private val createQueue = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import java.time.Duration

abstract class MonitorableQueueTest<out Q : MonitorableQueue>(
createQueue: (Clock, DeadMessageCallback, ApplicationEventPublisher?) -> Q,
triggerRedeliveryCheck: Q.() -> Unit,
shutdownCallback: (() -> Unit)? = null
) : Spek({

Expand Down Expand Up @@ -216,8 +215,10 @@ abstract class MonitorableQueueTest<out Q : MonitorableQueue>(
}

on("checking for unacknowledged messages") {
clock.incrementBy(queue!!.ackTimeout)
triggerRedeliveryCheck.invoke(queue!!)
with(queue!!) {
clock.incrementBy(ackTimeout)
retry()
}
}

it("fires an event") {
Expand All @@ -236,8 +237,10 @@ abstract class MonitorableQueueTest<out Q : MonitorableQueue>(
}

on("checking for unacknowledged messages") {
clock.incrementBy(queue!!.ackTimeout)
triggerRedeliveryCheck.invoke(queue!!)
with(queue!!) {
clock.incrementBy(ackTimeout)
retry()
}
}

it("fires an event indicating the message is being retried") {
Expand Down Expand Up @@ -269,8 +272,10 @@ abstract class MonitorableQueueTest<out Q : MonitorableQueue>(
}

on("checking for unacknowledged messages") {
clock.incrementBy(queue!!.ackTimeout)
triggerRedeliveryCheck.invoke(queue!!)
with(queue!!) {
clock.incrementBy(ackTimeout)
retry()
}
}

it("fires an event indicating the message is a duplicate") {
Expand All @@ -297,9 +302,11 @@ abstract class MonitorableQueueTest<out Q : MonitorableQueue>(

on("failing to acknowledge the message ${Queue.maxRetries} times") {
(1..Queue.maxRetries).forEach {
queue!!.poll { _, _ -> }
clock.incrementBy(queue!!.ackTimeout)
triggerRedeliveryCheck.invoke(queue!!)
with(queue!!) {
poll { _, _ -> }
clock.incrementBy(ackTimeout)
retry()
}
}
}

Expand Down

0 comments on commit 7ce0f61

Please sign in to comment.