Skip to content
This repository has been archived by the owner on Oct 8, 2020. It is now read-only.

Commit

Permalink
feat(sql): gc messages in smaller, more frequent chunks (#68)
Browse files Browse the repository at this point in the history
  • Loading branch information
asher committed Sep 18, 2019
1 parent 6626595 commit 2d8ae23
Showing 1 changed file with 24 additions and 16 deletions.
40 changes: 24 additions & 16 deletions keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import io.github.resilience4j.retry.RetryConfig
import io.vavr.control.Try
import org.funktionale.partials.partially1
import org.jooq.DSLContext
import org.jooq.SortOrder
import org.jooq.exception.SQLDialectNotSupportedException
import org.jooq.impl.DSL
import org.jooq.impl.DSL.count
Expand Down Expand Up @@ -66,13 +67,12 @@ class SqlQueue(
private val lockTtlSeconds: Int,
private val mapper: ObjectMapper,
private val serializationMigrator: Optional<SerializationMigrator>,
override val ackTimeout: TemporalAmount = Duration.ofMinutes(1),
override val ackTimeout: Duration = Duration.ofMinutes(5),
override val deadMessageHandlers: List<DeadMessageCallback>,
override val canPollMany: Boolean = true,
override val publisher: EventPublisher,
private val sqlRetryProperties: SqlRetryProperties,
private val ULID: ULID = ULID(),
private val cleanupAfter: TemporalAmount = Duration.ofMinutes(5)
private val ULID: ULID = ULID()
) : MonitorableQueue {

companion object {
Expand Down Expand Up @@ -115,9 +115,9 @@ class SqlQueue(
private val fingerprintField = field("fingerprint")
private val idField = field("id")
private val lockedField = field("locked")
private val orders = listOf(SortOrder.ASC, SortOrder.DESC)

private val lockTtlDuration = Duration.ofSeconds(lockTtlSeconds.toLong())

private val writeRetryBackoffMin = max(sqlRetryProperties.transactions.backoffMs - 25, 25)
private val writeRetryBackoffMax = max(sqlRetryProperties.transactions.backoffMs + 50, 100)

Expand Down Expand Up @@ -632,12 +632,18 @@ class SqlQueue(
fire(RetryPolled)
}

// TODO: ideally, this would only run on one instance at a time with a distributed lock
@Scheduled(fixedDelayString = "\${queue.cleanup.frequency.ms:300000}")
@Scheduled(fixedDelayString = "\${queue.cleanup.frequency.ms:2000}")
fun cleanupMessages() {
val minTime = clock.instant().minus(cleanupAfter)
val minUlid = ULID.nextValue(minTime.toEpochMilli()).toString()
val start = clock.millis()
val cleanBefore = start - ackTimeout.multipliedBy(2).toMillis()
val minUlid = ULID.nextValue(cleanBefore).toString()

/**
* Coin toss whether to read from the heads or tails of older messages
**/
val order = orders.shuffled().first()

// TODO: make limits/batchSizes configurable
val rs = withRetry(RetryCategory.READ) {
jooq.select(
field("m.id").`as`("mid"),
Expand All @@ -650,27 +656,29 @@ class SqlQueue(
.leftOuterJoin(unackedTable.`as`("u"))
.on(sql("m.fingerprint = u.fingerprint"))
.where(field("m.id").lt(minUlid))
.orderBy(field("m.id").sort(order))
.limit(2000)
.fetch()
.intoResultSet()
}

val toDelete = mutableListOf<String>()
var candidates = 0
val candidates = mutableListOf<String>()
var olderMessages = 0

while (rs.next()) {
while (rs.next() && candidates.size < 1000) {
val queueId: String? = rs.getString("qid")
val unackedId: String? = rs.getString("uid")

if (queueId == null && unackedId == null) {
toDelete.add(rs.getString("mid"))
candidates.add(rs.getString("mid"))
}

candidates++
olderMessages++
}

var deleted = 0

toDelete.sorted().chunked(10).forEach { chunk ->
candidates.chunked(100).forEach { chunk ->
withRetry(RetryCategory.WRITE) {
deleted += jooq.deleteFrom(messagesTable)
.where(idField.`in`(*chunk.toTypedArray()))
Expand All @@ -679,8 +687,8 @@ class SqlQueue(
}

if (deleted > 0) {
log.debug("Cleaned up $deleted completed messages (${toDelete.size} candidates / " +
"$candidates messages older than $minTime)")
log.debug("Deleted $deleted completed messages / ${candidates.size} attempted in " +
"${clock.millis() - start}ms")
}
}

Expand Down

0 comments on commit 2d8ae23

Please sign in to comment.