Skip to content
This repository has been archived by the owner on Oct 8, 2020. It is now read-only.

Commit

Permalink
chore(sql): limit delete batch size, ack updates queue by PK (#64)
Browse files Browse the repository at this point in the history
  • Loading branch information
asher authored Sep 16, 2019
1 parent ff26d8f commit d94bfbd
Showing 1 changed file with 25 additions and 9 deletions.
34 changes: 25 additions & 9 deletions keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -363,10 +363,12 @@ class SqlQueue(
* If an instance crashes after committing the above txn but before the following delete,
* [retry] will release the locks after [lockTtlSeconds] and another instance will grab them.
*/
withRetry(RetryCategory.WRITE) {
jooq.deleteFrom(queueTable)
.where(idField.`in`(*ids.toTypedArray()))
.execute()
ids.sorted().chunked(4).forEach { chunk ->
withRetry(RetryCategory.WRITE) {
jooq.deleteFrom(queueTable)
.where(idField.`in`(*chunk.toTypedArray()))
.execute()
}
}

lockedMessages.forEach {
Expand Down Expand Up @@ -639,11 +641,25 @@ class SqlQueue(
.execute()
}

val changed = withRetry(RetryCategory.WRITE) {
jooq.update(queueTable)
.set(lockedField, "0")
.where(fingerprintField.eq(fingerprint))
.execute()
var changed = 0

withRetry(RetryCategory.WRITE) {
jooq.transaction { config ->
val txn = DSL.using(config)

val id = txn.select(idField)
.from(queueTable)
.where(fingerprintField.eq(fingerprint))
.limit(1)
.fetchInto(String::class.java)

if (id.isNotEmpty()) {
changed = txn.update(queueTable)
.set(lockedField, "0")
.where(idField.eq(id.first()))
.execute()
}
}
}

if (changed == 0) {
Expand Down

0 comments on commit d94bfbd

Please sign in to comment.