Skip to content
This repository has been archived by the owner on Oct 8, 2020. It is now read-only.

Commit

Permalink
fix(sql): fix potential race condition between ack and push (#65)
Browse files Browse the repository at this point in the history
  • Loading branch information
asher committed Sep 17, 2019
1 parent d94bfbd commit 1a42c2a
Showing 1 changed file with 8 additions and 21 deletions.
29 changes: 8 additions & 21 deletions keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -632,44 +632,31 @@ class SqlQueue(
}

private fun ackMessage(fingerprint: String) {
/* TODO: since this isn't transactional, we should periodically check for and cleanup
* any old messages with fingerprints not represented on the queue or in unacked.
*/
withRetry(RetryCategory.WRITE) {
jooq.deleteFrom(unackedTable)
.where(fingerprintField.eq(fingerprint))
.execute()
}

var changed = 0

withRetry(RetryCategory.WRITE) {
jooq.transaction { config ->
val txn = DSL.using(config)

val id = txn.select(idField)
.from(queueTable)
val changed = txn.update(queueTable)
.set(lockedField, "0")
.where(fingerprintField.eq(fingerprint))
.limit(1)
.fetchInto(String::class.java)
.execute()

if (id.isNotEmpty()) {
changed = txn.update(queueTable)
.set(lockedField, "0")
.where(idField.eq(id.first()))
// TODO: consider async scheduled cleanup of messagesTable if having this in the
// ack txn results in frequent deadlocks.
if (changed == 0) {
txn.deleteFrom(messagesTable)
.where(fingerprintField.eq(fingerprint))
.execute()
}
}
}

if (changed == 0) {
withRetry(RetryCategory.WRITE) {
jooq.deleteFrom(messagesTable)
.where(fingerprintField.eq(fingerprint))
.execute()
}
}

fire(MessageAcknowledged)
}

Expand Down

0 comments on commit 1a42c2a

Please sign in to comment.