Skip to content

Commit

Permalink
fix(queue): Fix ZombieExecutionCheckingAgent to handle queues with …
Browse files Browse the repository at this point in the history
…more than 100 items (#4648) (#4685)

`SqlQueue#doContainsMessage` doesn't process more than 1 batch because of an incorrect loop inside.
When the last element in the batch is processed (`ResultSet#next` returns `false`), the following invocation of `ResultSet#getRow` will return 0. No matter how many rows were processed before.

Basically, this PR is just a copy of #4184 with addressed comments.
But #4184 is abandoned so opened this one. Kudos to Ivor for the original PR

Co-authored-by: Jason <[email protected]>
(cherry picked from commit 0a52909)

Co-authored-by: Kirill Batalin <[email protected]>
  • Loading branch information
mergify[bot] and kbatalin authored Mar 26, 2024
1 parent fd27f46 commit e5960bb
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 15 deletions.
26 changes: 13 additions & 13 deletions keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ class SqlQueue(
override val publisher: EventPublisher,
private val sqlRetryProperties: SqlRetryProperties,
private val ULID: ULID = ULID(),
private val poolName: String = "default"
private val poolName: String = "default",
private val containsMessageBatchSize: Int = 100,
) : MonitorableQueue {

companion object {
Expand Down Expand Up @@ -187,33 +188,32 @@ class SqlQueue(
}

private fun doContainsMessage(predicate: (Message) -> Boolean): Boolean {
val batchSize = 100
val batchSize = containsMessageBatchSize
var found = false
var lastId = "0"

do {
val rs: ResultSet = withRetry(READ) {
val rs = withRetry(READ) {
jooq.select(idField, fingerprintField, bodyField)
.from(messagesTable)
.where(idField.gt(lastId))
.orderBy(idField.asc())
.limit(batchSize)
.fetch()
.intoResultSet()
}

while (!found && rs.next()) {
val rsIterator = rs.iterator()
while (!found && rsIterator.hasNext()) {
val record = rsIterator.next()
val body = record[bodyField, String::class.java]
try {
found = predicate.invoke(mapper.readValue(rs.getString("body")))
found = predicate.invoke(mapper.readValue(body))
} catch (e: Exception) {
log.error(
"Failed reading message with fingerprint: ${rs.getString("fingerprint")} " +
"message: ${rs.getString("body")}",
e
)
log.error("Failed reading message with fingerprint: ${record[fingerprintField, String::class.java]} message: $body", e)
}
lastId = rs.getString("id")
lastId = record[idField, String::class.java]
}
} while (!found && rs.row == batchSize)
} while (!found && rs.isNotEmpty)

return found
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ import com.netflix.spinnaker.q.TestMessage
import com.netflix.spinnaker.q.metrics.EventPublisher
import com.netflix.spinnaker.q.metrics.MonitorableQueueTest
import com.netflix.spinnaker.q.metrics.QueueEvent
import com.netflix.spinnaker.time.MutableClock
import com.nhaarman.mockito_kotlin.mock
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.BeforeEach
import java.time.Clock
import java.time.Duration
import java.util.Optional
Expand All @@ -37,7 +43,8 @@ private val createQueueNoPublisher = { clock: Clock,

private fun createQueue(clock: Clock,
deadLetterCallback: DeadMessageCallback,
publisher: EventPublisher?): SqlQueue {
publisher: EventPublisher?,
containsMessageBatchSize: Int = 5): SqlQueue {
return SqlQueue(
queueName = "test",
schemaVersion = 1,
Expand Down Expand Up @@ -66,7 +73,8 @@ private fun createQueue(clock: Clock,
sqlRetryProperties = SqlRetryProperties(
transactions = retryPolicy,
reads = retryPolicy
)
),
containsMessageBatchSize = containsMessageBatchSize,
)
}

Expand All @@ -78,3 +86,49 @@ private val retryPolicy: RetryProperties = RetryProperties(
maxRetries = 1,
backoffMs = 10 // minimum allowed
)

class SqlQueueSpecificTests {
private val batchSize = 5
private val clock = MutableClock()
private val deadMessageHandler: DeadMessageCallback = mock()
private val publisher: EventPublisher = mock()
private var queue: SqlQueue? = null

@BeforeEach
fun setup() {
queue = createQueue(clock, deadMessageHandler, publisher, batchSize)
}

@AfterEach
fun cleanup() {
cleanupCallback()
}

@Test
fun `doContainsMessage works with no messages present`() {
assertThat(doContainsMessagePayload("test")).isFalse
}

@Test
fun `doContainsMessage works with a single batch`() {
pushTestMessages(batchSize)
assertThat(doContainsMessagePayload("${batchSize-1}")).isTrue
assertThat(doContainsMessagePayload("")).isFalse
}

@Test
fun `doContainsMessage handles multiple batches during search`() {
pushTestMessages(batchSize * 2)
assertThat(doContainsMessagePayload("${batchSize+1}")).isTrue
assertThat(doContainsMessagePayload("")).isFalse
}

private fun pushTestMessages(numberOfMessages: Int) {
for (i in 1 .. numberOfMessages) {
queue?.push(TestMessage(i.toString()))
}
}

private fun doContainsMessagePayload(payload: String): Boolean? =
queue?.containsMessage { message -> message is TestMessage && message.payload == payload }
}

0 comments on commit e5960bb

Please sign in to comment.