Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(queue): Fix SqlQueue#doContainsMessage to handle queues with more than 100 items #4648

Merged
merged 2 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ class SqlQueue(
override val publisher: EventPublisher,
private val sqlRetryProperties: SqlRetryProperties,
private val ULID: ULID = ULID(),
private val poolName: String = "default"
private val poolName: String = "default",
private val containsMessageBatchSize: Int = 100,
) : MonitorableQueue {

companion object {
Expand Down Expand Up @@ -187,33 +188,32 @@ class SqlQueue(
}

private fun doContainsMessage(predicate: (Message) -> Boolean): Boolean {
val batchSize = 100
val batchSize = containsMessageBatchSize
var found = false
var lastId = "0"

do {
val rs: ResultSet = withRetry(READ) {
val rs = withRetry(READ) {
jooq.select(idField, fingerprintField, bodyField)
.from(messagesTable)
.where(idField.gt(lastId))
.orderBy(idField.asc())
.limit(batchSize)
.fetch()
.intoResultSet()
}

while (!found && rs.next()) {
val rsIterator = rs.iterator()
while (!found && rsIterator.hasNext()) {
val record = rsIterator.next()
val body = record[bodyField, String::class.java]
try {
found = predicate.invoke(mapper.readValue(rs.getString("body")))
found = predicate.invoke(mapper.readValue(body))
} catch (e: Exception) {
log.error(
"Failed reading message with fingerprint: ${rs.getString("fingerprint")} " +
"message: ${rs.getString("body")}",
e
)
log.error("Failed reading message with fingerprint: ${record[fingerprintField, String::class.java]} message: $body", e)
}
lastId = rs.getString("id")
lastId = record[idField, String::class.java]
}
} while (!found && rs.row == batchSize)
} while (!found && rs.isNotEmpty)

return found
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ import com.netflix.spinnaker.q.TestMessage
import com.netflix.spinnaker.q.metrics.EventPublisher
import com.netflix.spinnaker.q.metrics.MonitorableQueueTest
import com.netflix.spinnaker.q.metrics.QueueEvent
import com.netflix.spinnaker.time.MutableClock
import com.nhaarman.mockito_kotlin.mock
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.BeforeEach
import java.time.Clock
import java.time.Duration
import java.util.Optional
Expand All @@ -37,7 +43,8 @@ private val createQueueNoPublisher = { clock: Clock,

private fun createQueue(clock: Clock,
deadLetterCallback: DeadMessageCallback,
publisher: EventPublisher?): SqlQueue {
publisher: EventPublisher?,
containsMessageBatchSize: Int = 5): SqlQueue {
return SqlQueue(
queueName = "test",
schemaVersion = 1,
Expand Down Expand Up @@ -66,7 +73,8 @@ private fun createQueue(clock: Clock,
sqlRetryProperties = SqlRetryProperties(
transactions = retryPolicy,
reads = retryPolicy
)
),
containsMessageBatchSize = containsMessageBatchSize,
)
}

Expand All @@ -78,3 +86,49 @@ private val retryPolicy: RetryProperties = RetryProperties(
maxRetries = 1,
backoffMs = 10 // minimum allowed
)

class SqlQueueSpecificTests {
private val batchSize = 5
private val clock = MutableClock()
private val deadMessageHandler: DeadMessageCallback = mock()
private val publisher: EventPublisher = mock()
private var queue: SqlQueue? = null

@BeforeEach
fun setup() {
queue = createQueue(clock, deadMessageHandler, publisher, batchSize)
}

@AfterEach
fun cleanup() {
cleanupCallback()
}

@Test
fun `doContainsMessage works with no messages present`() {
assertThat(doContainsMessagePayload("test")).isFalse
}

@Test
fun `doContainsMessage works with a single batch`() {
pushTestMessages(batchSize)
assertThat(doContainsMessagePayload("${batchSize-1}")).isTrue
assertThat(doContainsMessagePayload("")).isFalse
}

@Test
fun `doContainsMessage handles multiple batches during search`() {
pushTestMessages(batchSize * 2)
assertThat(doContainsMessagePayload("${batchSize+1}")).isTrue
assertThat(doContainsMessagePayload("")).isFalse
}

private fun pushTestMessages(numberOfMessages: Int) {
for (i in 1 .. numberOfMessages) {
queue?.push(TestMessage(i.toString()))
}
}

private fun doContainsMessagePayload(payload: String): Boolean? =
queue?.containsMessage { message -> message is TestMessage && message.payload == payload }
}