From 2d8ae238ff6e418d0c6c1d0fd48e763014ce8879 Mon Sep 17 00:00:00 2001 From: Asher Feldman Date: Wed, 18 Sep 2019 09:31:24 -0700 Subject: [PATCH] feat(sql): gc messages in smaller, more frequent chunks (#68) --- .../com/netflix/spinnaker/q/sql/SqlQueue.kt | 40 +++++++++++-------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt index 66a851a..f0dca94 100644 --- a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt +++ b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt @@ -32,6 +32,7 @@ import io.github.resilience4j.retry.RetryConfig import io.vavr.control.Try import org.funktionale.partials.partially1 import org.jooq.DSLContext +import org.jooq.SortOrder import org.jooq.exception.SQLDialectNotSupportedException import org.jooq.impl.DSL import org.jooq.impl.DSL.count @@ -66,13 +67,12 @@ class SqlQueue( private val lockTtlSeconds: Int, private val mapper: ObjectMapper, private val serializationMigrator: Optional, - override val ackTimeout: TemporalAmount = Duration.ofMinutes(1), + override val ackTimeout: Duration = Duration.ofMinutes(5), override val deadMessageHandlers: List, override val canPollMany: Boolean = true, override val publisher: EventPublisher, private val sqlRetryProperties: SqlRetryProperties, - private val ULID: ULID = ULID(), - private val cleanupAfter: TemporalAmount = Duration.ofMinutes(5) + private val ULID: ULID = ULID() ) : MonitorableQueue { companion object { @@ -115,9 +115,9 @@ class SqlQueue( private val fingerprintField = field("fingerprint") private val idField = field("id") private val lockedField = field("locked") + private val orders = listOf(SortOrder.ASC, SortOrder.DESC) private val lockTtlDuration = Duration.ofSeconds(lockTtlSeconds.toLong()) - private val writeRetryBackoffMin = max(sqlRetryProperties.transactions.backoffMs - 25, 25) private val writeRetryBackoffMax = max(sqlRetryProperties.transactions.backoffMs + 50, 100) @@ -632,12 +632,18 @@ class SqlQueue( fire(RetryPolled) } - // TODO: ideally, this would only run on one instance at a time with a distributed lock - @Scheduled(fixedDelayString = "\${queue.cleanup.frequency.ms:300000}") + @Scheduled(fixedDelayString = "\${queue.cleanup.frequency.ms:2000}") fun cleanupMessages() { - val minTime = clock.instant().minus(cleanupAfter) - val minUlid = ULID.nextValue(minTime.toEpochMilli()).toString() + val start = clock.millis() + val cleanBefore = start - ackTimeout.multipliedBy(2).toMillis() + val minUlid = ULID.nextValue(cleanBefore).toString() + + /** + * Coin toss whether to read from the heads or tails of older messages + **/ + val order = orders.shuffled().first() + // TODO: make limits/batchSizes configurable val rs = withRetry(RetryCategory.READ) { jooq.select( field("m.id").`as`("mid"), @@ -650,27 +656,29 @@ class SqlQueue( .leftOuterJoin(unackedTable.`as`("u")) .on(sql("m.fingerprint = u.fingerprint")) .where(field("m.id").lt(minUlid)) + .orderBy(field("m.id").sort(order)) + .limit(2000) .fetch() .intoResultSet() } - val toDelete = mutableListOf() - var candidates = 0 + val candidates = mutableListOf() + var olderMessages = 0 - while (rs.next()) { + while (rs.next() && candidates.size < 1000) { val queueId: String? = rs.getString("qid") val unackedId: String? = rs.getString("uid") if (queueId == null && unackedId == null) { - toDelete.add(rs.getString("mid")) + candidates.add(rs.getString("mid")) } - candidates++ + olderMessages++ } var deleted = 0 - toDelete.sorted().chunked(10).forEach { chunk -> + candidates.chunked(100).forEach { chunk -> withRetry(RetryCategory.WRITE) { deleted += jooq.deleteFrom(messagesTable) .where(idField.`in`(*chunk.toTypedArray())) @@ -679,8 +687,8 @@ class SqlQueue( } if (deleted > 0) { - log.debug("Cleaned up $deleted completed messages (${toDelete.size} candidates / " + - "$candidates messages older than $minTime)") + log.debug("Deleted $deleted completed messages / ${candidates.size} attempted in " + + "${clock.millis() - start}ms") } }