Skip to content
This repository has been archived by the owner on Oct 8, 2020. It is now read-only.

Commit

Permalink
feat(sql): async message cleanup (#66)
Browse files Browse the repository at this point in the history
  • Loading branch information
asher committed Sep 18, 2019
1 parent 1a42c2a commit b055c98
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 22 deletions.
74 changes: 54 additions & 20 deletions keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ class SqlQueue(
override val canPollMany: Boolean = true,
override val publisher: EventPublisher,
private val sqlRetryProperties: SqlRetryProperties,
private val ULID: ULID = ULID()
private val ULID: ULID = ULID(),
private val cleanupAfter: TemporalAmount = Duration.ofMinutes(5)
) : MonitorableQueue {

companion object {
Expand Down Expand Up @@ -631,30 +632,63 @@ class SqlQueue(
fire(RetryPolled)
}

private fun ackMessage(fingerprint: String) {
withRetry(RetryCategory.WRITE) {
jooq.deleteFrom(unackedTable)
.where(fingerprintField.eq(fingerprint))
.execute()
// TODO: ideally, this would only run on one instance at a time with a distributed lock
@Scheduled(fixedDelayString = "\${queue.cleanup.frequency.ms:300000}")
fun cleanupMessages() {
val minTime = clock.instant().minus(cleanupAfter)
val minUlid = ULID.nextValue(minTime.toEpochMilli()).toString()

val rs = withRetry(RetryCategory.READ) {
jooq.select(
field("m.id").`as`("mid"),
field("q.id").`as`("qid"),
field("u.id").`as`("uid")
)
.from(messagesTable.`as`("m"))
.leftOuterJoin(queueTable.`as`("q"))
.on(sql("m.fingerprint = q.fingerprint"))
.leftOuterJoin(unackedTable.`as`("u"))
.on(sql("m.fingerprint = u.fingerprint"))
.where(field("m.id").lt(minUlid))
.fetch()
.intoResultSet()
}

withRetry(RetryCategory.WRITE) {
jooq.transaction { config ->
val txn = DSL.using(config)
val toDelete = mutableListOf<String>()
var candidates = 0

val changed = txn.update(queueTable)
.set(lockedField, "0")
.where(fingerprintField.eq(fingerprint))
.execute()
while (rs.next()) {
val queueId: String? = rs.getString("qid")
val unackedId: String? = rs.getString("uid")

// TODO: consider async scheduled cleanup of messagesTable if having this in the
// ack txn results in frequent deadlocks.
if (changed == 0) {
txn.deleteFrom(messagesTable)
.where(fingerprintField.eq(fingerprint))
.execute()
}
if (queueId == null && unackedId == null) {
toDelete.add(rs.getString("mid"))
}

candidates++
}

var deleted = 0

toDelete.sorted().chunked(10).forEach { chunk ->
withRetry(RetryCategory.WRITE) {
deleted += jooq.deleteFrom(messagesTable)
.where(idField.`in`(*chunk.toTypedArray()))
.execute()
}
}

if (deleted > 0) {
log.debug("Cleaned up $deleted completed messages ($toDelete candidates / " +
"$candidates messages older than $minTime)")
}
}

private fun ackMessage(fingerprint: String) {
withRetry(RetryCategory.WRITE) {
jooq.deleteFrom(unackedTable)
.where(fingerprintField.eq(fingerprint))
.execute()
}

fire(MessageAcknowledged)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ abstract class MonitorableQueueTest<out Q : MonitorableQueue>(
assertThat(depth).isEqualTo(0)
assertThat(unacked).isEqualTo(0)
assertThat(ready).isEqualTo(0)
assertThat(orphaned).isEqualTo(0)
}
}
}
Expand Down Expand Up @@ -303,7 +302,6 @@ abstract class MonitorableQueueTest<out Q : MonitorableQueue>(
assertThat(depth).isEqualTo(1)
assertThat(unacked).isEqualTo(0)
assertThat(ready).isEqualTo(1)
assertThat(orphaned).isEqualTo(0)
}
}
}
Expand Down

0 comments on commit b055c98

Please sign in to comment.