Skip to content
This repository has been archived by the owner on Oct 8, 2020. It is now read-only.

Commit

Permalink
feat(monitoring): Expose a method to allow searching for messages
Browse files Browse the repository at this point in the history
  • Loading branch information
robfletcher committed May 17, 2018
1 parent 62da0ca commit 39872bf
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.netflix.spinnaker.q.metrics

import com.netflix.spinnaker.q.Message
import com.netflix.spinnaker.q.Queue

/**
Expand All @@ -30,6 +31,12 @@ interface MonitorableQueue : Queue {
* @return the current state of the queue.
*/
fun readState(): QueueState

/**
* Confirms if the queue currently contains one or more messages matching
* [predicate].
*/
fun containsMessage(predicate: (Message) -> Boolean): Boolean
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ class InMemoryQueue(
unacked = unacked.size
)

override fun containsMessage(predicate: (Message) -> Boolean): Boolean =
queue.map(Envelope::payload).any(predicate)

private fun ack(messageId: UUID) {
unacked.removeIf { it.id == messageId }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.SerializationFeature
import com.google.common.hash.Hashing
import com.netflix.spinnaker.KotlinOpen
import com.netflix.spinnaker.q.AttemptsAttribute
import com.netflix.spinnaker.q.DeadMessageCallback
import com.netflix.spinnaker.q.MaxAttemptsAttribute
import com.netflix.spinnaker.q.Message
import com.netflix.spinnaker.q.*
import com.netflix.spinnaker.q.Queue
import com.netflix.spinnaker.q.metrics.*
import com.netflix.spinnaker.q.migration.SerializationMigrator
Expand Down Expand Up @@ -215,6 +212,22 @@ class RedisQueue(
}
}

override fun containsMessage(predicate: (Message) -> Boolean): Boolean =
pool.resource.use { redis ->
var found = false
var cursor = "0"
while (!found) {
redis.hscan(messagesKey, cursor).apply {
found = result
.map { mapper.readValue<Message>(it.value) }
.any(predicate)
cursor = stringCursor
}
if (cursor == "0") break
}
return found
}

override fun toString() = "RedisQueue[$queueName]"

private fun ackMessage(fingerprint: String) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@ import com.netflix.spinnaker.time.MutableClock
import com.nhaarman.mockito_kotlin.*
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.jetbrains.spek.api.dsl.*
import java.io.Closeable
import java.time.Clock
import java.time.Duration
Expand Down Expand Up @@ -76,6 +73,12 @@ abstract class MonitorableQueueTest<out Q : MonitorableQueue>(
}
}
}

it("reports no matching message exists") {
with(queue!!) {
assertThat(containsMessage { it is TestMessage }).isFalse()
}
}
}

describe("pushing a message") {
Expand All @@ -101,6 +104,12 @@ abstract class MonitorableQueueTest<out Q : MonitorableQueue>(
}
}
}

it("reports a matching message exists") {
with(queue!!) {
assertThat(containsMessage { it is TestMessage && it.payload == "a" }).isTrue()
}
}
}

describe("pushing a duplicate message") {
Expand Down

0 comments on commit 39872bf

Please sign in to comment.