diff --git a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt index 9f9fd7a..558a7ec 100644 --- a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt +++ b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt @@ -626,26 +626,30 @@ class SqlQueue( } private fun ackMessage(fingerprint: String) { + /* TODO: since this isn't transactional, we should periodically check for and cleanup + * any old messages with fingerprints not represented on the queue or in unacked. + */ withRetry(RetryCategory.WRITE) { - jooq.transaction { config -> - val txn = DSL.using(config) + jooq.deleteFrom(unackedTable) + .where(fingerprintField.eq(fingerprint)) + .execute() + } - txn.deleteFrom(unackedTable) - .where(fingerprintField.eq(fingerprint)) - .execute() + val changed = withRetry(RetryCategory.WRITE) { + jooq.update(queueTable) + .set(lockedField, "0") + .where(fingerprintField.eq(fingerprint)) + .execute() + } - val changed = txn.update(queueTable) - .set(lockedField, "0") + if (changed == 0) { + withRetry(RetryCategory.WRITE) { + jooq.deleteFrom(messagesTable) .where(fingerprintField.eq(fingerprint)) .execute() - - if (changed == 0) { - txn.deleteFrom(messagesTable) - .where(fingerprintField.eq(fingerprint)) - .execute() - } } } + fire(MessageAcknowledged) }