From b055c986cb2a5c6cd16a158b7499eb93a77c18f0 Mon Sep 17 00:00:00 2001 From: Asher Feldman Date: Tue, 17 Sep 2019 17:55:14 -0700 Subject: [PATCH] feat(sql): async message cleanup (#66) --- .../com/netflix/spinnaker/q/sql/SqlQueue.kt | 74 ++++++++++++++----- .../q/metrics/MonitorableQueueTest.kt | 2 - 2 files changed, 54 insertions(+), 22 deletions(-) diff --git a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt index ff66f97..ee44b90 100644 --- a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt +++ b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt @@ -71,7 +71,8 @@ class SqlQueue( override val canPollMany: Boolean = true, override val publisher: EventPublisher, private val sqlRetryProperties: SqlRetryProperties, - private val ULID: ULID = ULID() + private val ULID: ULID = ULID(), + private val cleanupAfter: TemporalAmount = Duration.ofMinutes(5) ) : MonitorableQueue { companion object { @@ -631,30 +632,63 @@ class SqlQueue( fire(RetryPolled) } - private fun ackMessage(fingerprint: String) { - withRetry(RetryCategory.WRITE) { - jooq.deleteFrom(unackedTable) - .where(fingerprintField.eq(fingerprint)) - .execute() + // TODO: ideally, this would only run on one instance at a time with a distributed lock + @Scheduled(fixedDelayString = "\${queue.cleanup.frequency.ms:300000}") + fun cleanupMessages() { + val minTime = clock.instant().minus(cleanupAfter) + val minUlid = ULID.nextValue(minTime.toEpochMilli()).toString() + + val rs = withRetry(RetryCategory.READ) { + jooq.select( + field("m.id").`as`("mid"), + field("q.id").`as`("qid"), + field("u.id").`as`("uid") + ) + .from(messagesTable.`as`("m")) + .leftOuterJoin(queueTable.`as`("q")) + .on(sql("m.fingerprint = q.fingerprint")) + .leftOuterJoin(unackedTable.`as`("u")) + .on(sql("m.fingerprint = u.fingerprint")) + .where(field("m.id").lt(minUlid)) + .fetch() + .intoResultSet() } - withRetry(RetryCategory.WRITE) { - jooq.transaction { config -> - val txn = DSL.using(config) + val toDelete = mutableListOf() + var candidates = 0 - val changed = txn.update(queueTable) - .set(lockedField, "0") - .where(fingerprintField.eq(fingerprint)) - .execute() + while (rs.next()) { + val queueId: String? = rs.getString("qid") + val unackedId: String? = rs.getString("uid") - // TODO: consider async scheduled cleanup of messagesTable if having this in the - // ack txn results in frequent deadlocks. - if (changed == 0) { - txn.deleteFrom(messagesTable) - .where(fingerprintField.eq(fingerprint)) - .execute() - } + if (queueId == null && unackedId == null) { + toDelete.add(rs.getString("mid")) } + + candidates++ + } + + var deleted = 0 + + toDelete.sorted().chunked(10).forEach { chunk -> + withRetry(RetryCategory.WRITE) { + deleted += jooq.deleteFrom(messagesTable) + .where(idField.`in`(*chunk.toTypedArray())) + .execute() + } + } + + if (deleted > 0) { + log.debug("Cleaned up $deleted completed messages ($toDelete candidates / " + + "$candidates messages older than $minTime)") + } + } + + private fun ackMessage(fingerprint: String) { + withRetry(RetryCategory.WRITE) { + jooq.deleteFrom(unackedTable) + .where(fingerprintField.eq(fingerprint)) + .execute() } fire(MessageAcknowledged) diff --git a/keiko-tck/src/main/kotlin/com/netflix/spinnaker/q/metrics/MonitorableQueueTest.kt b/keiko-tck/src/main/kotlin/com/netflix/spinnaker/q/metrics/MonitorableQueueTest.kt index 293b7c4..1b63142 100644 --- a/keiko-tck/src/main/kotlin/com/netflix/spinnaker/q/metrics/MonitorableQueueTest.kt +++ b/keiko-tck/src/main/kotlin/com/netflix/spinnaker/q/metrics/MonitorableQueueTest.kt @@ -248,7 +248,6 @@ abstract class MonitorableQueueTest( assertThat(depth).isEqualTo(0) assertThat(unacked).isEqualTo(0) assertThat(ready).isEqualTo(0) - assertThat(orphaned).isEqualTo(0) } } } @@ -303,7 +302,6 @@ abstract class MonitorableQueueTest( assertThat(depth).isEqualTo(1) assertThat(unacked).isEqualTo(0) assertThat(ready).isEqualTo(1) - assertThat(orphaned).isEqualTo(0) } } }