Skip to content
This repository has been archived by the owner on Oct 8, 2020. It is now read-only.

Commit

Permalink
fix(sql): minimize deadlock potential between acking and enqueueing (#62
Browse files Browse the repository at this point in the history
)
  • Loading branch information
asher committed Sep 13, 2019
1 parent 88d7b4f commit 3101c2d
Showing 1 changed file with 17 additions and 13 deletions.
30 changes: 17 additions & 13 deletions keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -626,26 +626,30 @@ class SqlQueue(
}

private fun ackMessage(fingerprint: String) {
/* TODO: since this isn't transactional, we should periodically check for and cleanup
* any old messages with fingerprints not represented on the queue or in unacked.
*/
withRetry(RetryCategory.WRITE) {
jooq.transaction { config ->
val txn = DSL.using(config)
jooq.deleteFrom(unackedTable)
.where(fingerprintField.eq(fingerprint))
.execute()
}

txn.deleteFrom(unackedTable)
.where(fingerprintField.eq(fingerprint))
.execute()
val changed = withRetry(RetryCategory.WRITE) {
jooq.update(queueTable)
.set(lockedField, "0")
.where(fingerprintField.eq(fingerprint))
.execute()
}

val changed = txn.update(queueTable)
.set(lockedField, "0")
if (changed == 0) {
withRetry(RetryCategory.WRITE) {
jooq.deleteFrom(messagesTable)
.where(fingerprintField.eq(fingerprint))
.execute()

if (changed == 0) {
txn.deleteFrom(messagesTable)
.where(fingerprintField.eq(fingerprint))
.execute()
}
}
}

fire(MessageAcknowledged)
}

Expand Down

0 comments on commit 3101c2d

Please sign in to comment.