Skip to content

Commit

Permalink
First pass at deleting backfill runs.
Browse files Browse the repository at this point in the history
  • Loading branch information
mpawliszyn committed Aug 16, 2021
1 parent a9b4121 commit c3a815a
Show file tree
Hide file tree
Showing 21 changed files with 363 additions and 83 deletions.
36 changes: 20 additions & 16 deletions service/src/main/kotlin/app/cash/backfila/BackfillCreator.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import app.cash.backfila.protos.clientservice.PrepareBackfillRequest
import app.cash.backfila.protos.clientservice.PrepareBackfillResponse
import app.cash.backfila.protos.service.CreateBackfillRequest
import app.cash.backfila.service.persistence.BackfilaDb
import app.cash.backfila.service.persistence.BackfillPartitionState
import app.cash.backfila.service.persistence.BackfillState
import app.cash.backfila.service.persistence.DbBackfillRun
import app.cash.backfila.service.persistence.DbRegisteredBackfill
Expand Down Expand Up @@ -84,12 +85,13 @@ class BackfillCreator @Inject constructor(
)
session.save(backfillRun)

check(backfillRun.state == BackfillState.PAUSED)
for (partition in partitions) {
val dbRunPartition = DbRunPartition(
backfillRun.id,
partition.partition_name,
partition.backfill_range ?: KeyRange.Builder().build(),
backfillRun.state,
BackfillPartitionState.PAUSED,
partition.estimated_record_count
)
session.save(dbRunPartition)
Expand Down Expand Up @@ -125,21 +127,7 @@ class BackfillCreator @Inject constructor(
logger.info(e) { "PrepareBackfill on `$service` failed" }
throw BadRequestException("PrepareBackfill on `$service` failed: ${e.message}", e)
}

val partitions = prepareBackfillResponse.partitions
if (partitions.isEmpty()) {
throw BadRequestException("PrepareBackfill returned no partitions")
}
if (partitions.any { it.partition_name == null }) {
throw BadRequestException("PrepareBackfill returned unnamed partitions")
}
if (partitions.distinctBy { it.partition_name }.size != partitions.size) {
throw BadRequestException(
"PrepareBackfill did not return distinct partition names:" +
" ${partitions.map { it.partition_name }}"
)
}

prepareBackfillResponse.validate()
return prepareBackfillResponse
}

Expand Down Expand Up @@ -181,5 +169,21 @@ class BackfillCreator @Inject constructor(

companion object {
private val logger = getLogger<BackfillCreator>()

fun PrepareBackfillResponse.validate() {
val partitions = this.partitions
if (partitions.isEmpty()) {
throw BadRequestException("PrepareBackfill returned no partitions")
}
if (partitions.any { it.partition_name == null }) {
throw BadRequestException("PrepareBackfill returned unnamed partitions")
}
if (partitions.distinctBy { it.partition_name }.size != partitions.size) {
throw BadRequestException(
"PrepareBackfill did not return distinct partition names:" +
" ${partitions.map { it.partition_name }}"
)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package app.cash.backfila.dashboard
import app.cash.backfila.service.SlackHelper
import app.cash.backfila.service.persistence.BackfilaDb
import app.cash.backfila.service.persistence.BackfillState
import app.cash.backfila.service.persistence.BackfillState.CANCELLED
import app.cash.backfila.service.persistence.BackfillState.PAUSED
import app.cash.backfila.service.persistence.BackfillState.RUNNING
import app.cash.backfila.service.persistence.DbBackfillRun
Expand All @@ -25,7 +26,10 @@ class BackfillStateToggler @Inject constructor(
val requiredCurrentState = when (desiredState) {
PAUSED -> RUNNING
RUNNING -> PAUSED
else -> throw IllegalArgumentException("can only toggle to RUNNING or PAUSED")
CANCELLED -> PAUSED
else -> throw IllegalArgumentException(
"can only toggle between RUNNING and PAUSED or cancel a PAUSED run. "
)
}

transacter.transaction { session ->
Expand All @@ -46,22 +50,27 @@ class BackfillStateToggler @Inject constructor(
}
run.setState(session, queryFactory, desiredState)

val startedOrStopped = if (desiredState == RUNNING) "started" else "stopped"
val action = when (desiredState) {
PAUSED -> "stopped"
RUNNING -> "started"
CANCELLED -> "cancelled"
else -> desiredState.name
}
session.save(
DbEventLog(
run.id,
partition_id = null,
user = caller.principal,
type = DbEventLog.Type.STATE_CHANGE,
message = "backfill $startedOrStopped"
message = "backfill $action"
)
)
}

if (desiredState == RUNNING) {
slackHelper.runStarted(Id(id), caller.principal)
} else {
slackHelper.runPaused(Id(id), caller.principal)
when (desiredState) {
RUNNING -> slackHelper.runStarted(Id(id), caller.principal)
PAUSED -> slackHelper.runPaused(Id(id), caller.principal)
CANCELLED -> slackHelper.runCancelled(Id(id), caller.principal)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package app.cash.backfila.dashboard

import app.cash.backfila.service.persistence.BackfillState
import javax.inject.Inject
import misk.MiskCaller
import misk.scope.ActionScoped
import misk.security.authz.Authenticated
import misk.web.PathParam
import misk.web.Post
import misk.web.RequestBody
import misk.web.RequestContentType
import misk.web.ResponseContentType
import misk.web.actions.WebAction
import misk.web.mediatype.MediaTypes
import wisp.logging.getLogger

class CancelBackfillRequest
class CancelBackfillResponse

class CancelBackfillAction @Inject constructor(
private val caller: @JvmSuppressWildcards ActionScoped<MiskCaller?>,
private val backfillStateToggler: BackfillStateToggler
) : WebAction {

@Post("/backfills/{id}/cancel")
@RequestContentType(MediaTypes.APPLICATION_JSON)
@ResponseContentType(MediaTypes.APPLICATION_JSON)
// TODO allow any user
@Authenticated(capabilities = ["users"])
fun cancel(
@PathParam id: Long,
@RequestBody request: CancelBackfillRequest
): CancelBackfillResponse {
// TODO check user has permissions for this service with access api
logger.info { "Canceling backfill $id by ${caller.get()?.user}" }
backfillStateToggler.toggleRunningState(id, caller.get()!!, BackfillState.CANCELLED)
return CancelBackfillResponse()
}

companion object {
private val logger = getLogger<CancelBackfillAction>()
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package app.cash.backfila.dashboard

import app.cash.backfila.BackfillCreator.Companion.validate
import app.cash.backfila.client.ConnectorProvider
import app.cash.backfila.protos.clientservice.KeyRange
import app.cash.backfila.protos.clientservice.PrepareBackfillRequest
import app.cash.backfila.protos.clientservice.PrepareBackfillResponse
import app.cash.backfila.service.persistence.BackfilaDb
import app.cash.backfila.service.persistence.BackfillState
import app.cash.backfila.service.persistence.BackfillState.Companion.getPartitionState
import app.cash.backfila.service.persistence.DbBackfillRun
import app.cash.backfila.service.persistence.DbRegisteredBackfill
import app.cash.backfila.service.persistence.DbRunPartition
Expand Down Expand Up @@ -138,13 +140,14 @@ class CloneBackfillAction @Inject constructor(
)
session.save(backfillRun)

check(backfillRun.state == BackfillState.PAUSED)
if (request.range_clone_type == RangeCloneType.NEW) {
for (partition in partitions) {
val dbRunPartition = DbRunPartition(
backfillRun.id,
partition.partition_name,
partition.backfill_range ?: KeyRange.Builder().build(),
backfillRun.state,
backfillRun.state.getPartitionState(),
partition.estimated_record_count
)
session.save(dbRunPartition)
Expand All @@ -161,12 +164,13 @@ class CloneBackfillAction @Inject constructor(
)
}

check(backfillRun.state == BackfillState.PAUSED)
for (sourcePartition in sourcePartitions) {
val dbRunPartition = DbRunPartition(
backfillRun.id,
sourcePartition.partition_name,
sourcePartition.backfillRange(),
backfillRun.state,
backfillRun.state.getPartitionState(),
sourcePartition.estimated_record_count
)
// Copy the cursor if continuing, otherwise just leave blank to start from beginning.
Expand Down Expand Up @@ -203,19 +207,7 @@ class CloneBackfillAction @Inject constructor(
logger.info(e) { "PrepareBackfill on `${dbData.serviceName}` failed" }
throw BadRequestException("PrepareBackfill on `${dbData.serviceName}` failed: ${e.message}", e)
}
val partitions = prepareBackfillResponse.partitions
if (partitions.isEmpty()) {
throw BadRequestException("PrepareBackfill returned no partitions")
}
if (partitions.any { it.partition_name == null }) {
throw BadRequestException("PrepareBackfill returned unnamed partitions")
}
if (partitions.distinctBy { it.partition_name }.size != partitions.size) {
throw BadRequestException(
"PrepareBackfill did not return distinct partition names:" +
" ${partitions.map { it.partition_name }}"
)
}
prepareBackfillResponse.validate()
return prepareBackfillResponse
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package app.cash.backfila.dashboard

import app.cash.backfila.service.persistence.BackfilaDb
import app.cash.backfila.service.persistence.BackfillPartitionState
import app.cash.backfila.service.persistence.BackfillState
import app.cash.backfila.service.persistence.DbBackfillRun
import app.cash.backfila.service.persistence.DbEventLog
Expand All @@ -26,7 +27,7 @@ import misk.web.mediatype.MediaTypes
data class UiPartition(
val id: Long,
val name: String,
val state: BackfillState,
val state: BackfillPartitionState,
val pkey_cursor: String?,
val pkey_start: String?,
val pkey_end: String?,
Expand Down Expand Up @@ -105,7 +106,7 @@ class GetBackfillStatusAction @Inject constructor(
UiPartition(
partition.id.id,
partition.partition_name,
partition.run_state,
partition.partition_state,
partition.pkey_cursor?.utf8(),
partition.pkey_range_start?.utf8(),
partition.pkey_range_end?.utf8(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ class SlackHelper @Inject constructor(
slackClient.postMessage("Backfila", ":backfila:", message, channel)
}

fun runCancelled(id: Id<DbBackfillRun>, user: String) {
val (message, channel) = transacter.transaction { session ->
val run = session.load(id)
val message = ":backfila_cancel:${dryRunEmoji(run)} ${nameAndId(run)} cancelled by @$user"
message to run.service.slack_channel
}
slackClient.postMessage("Backfila", ":backfila:", message, channel)
}

fun runErrored(id: Id<DbBackfillRun>) {
val (message, channel) = transacter.transaction { session ->
val run = session.load(id)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package app.cash.backfila.service.persistence

enum class BackfillPartitionState {
PAUSED, // A resumable backfill partition that is not currently meant to be running
RUNNING, // A backfill partition that is allowed to run.
COMPLETE, // A completed partition, this is a final non-resumable state.
STALE, // A partition that is no longer relevant, this is a final non-resumable state (possibly a split partition)
CANCELLED; // A partition that has been manually cancelled, this is a final non-resumable state.

companion object {
val FINAL_STATES = setOf(STALE, CANCELLED, COMPLETE)
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,22 @@
package app.cash.backfila.service.persistence

enum class BackfillState {
PAUSED,
RUNNING,
COMPLETE
PAUSED, // A resumable backfill that is not currently meant to be running
RUNNING, // A backfill that is allowed to run.
COMPLETE, // A completed backfill, this is a final non-resumable state.
CANCELLED; // A backfill that has been manually cancelled, this is a final non-resumable state.

companion object {
val FINAL_STATES = setOf(CANCELLED, COMPLETE)

/**
* When the Backfill state changes modify the underlying partitions to these corresponding states.
*/
fun BackfillState.getPartitionState() = when (this) {
PAUSED -> BackfillPartitionState.PAUSED
RUNNING -> BackfillPartitionState.RUNNING
CANCELLED -> BackfillPartitionState.CANCELLED
COMPLETE -> BackfillPartitionState.COMPLETE
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package app.cash.backfila.service.persistence

import app.cash.backfila.service.persistence.BackfillState.Companion.getPartitionState
import com.google.common.base.Preconditions.checkState
import java.time.Instant
import javax.persistence.Column
Expand Down Expand Up @@ -130,18 +131,18 @@ class DbBackfillRun() : DbUnsharded<DbBackfillRun>, DbTimestampedEntity {
.list(session)

fun setState(session: Session, queryFactory: Query.Factory, state: BackfillState) {
// State can't be changed after being completed.
checkState(this.state != BackfillState.COMPLETE)
// Backfills in final states cannot change.
checkState(!BackfillState.FINAL_STATES.contains(this.state))
this.state = state
// Set the state of all the partitions that are not complete
val query = session.hibernateSession.createQuery(
"update DbRunPartition " +
"set run_state = :newState, version = version + 1 " +
"where backfill_run_id = :runId and run_state <> :completed"
"set partition_state = :newPartitionState, version = version + 1 " +
"where backfill_run_id = :runId and partition_state not in ( :finalPartitionStates )"
)
query.setParameter("runId", id)
query.setParameter("newState", state)
query.setParameter("completed", BackfillState.COMPLETE)
query.setParameter("newPartitionState", state.getPartitionState())
query.setParameterList("finalPartitionStates", BackfillPartitionState.FINAL_STATES)
query.executeUpdate()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class DbRunPartition() : DbUnsharded<DbRunPartition>, DbTimestampedEntity {
*/
@Column(nullable = false)
@Enumerated(EnumType.STRING)
lateinit var run_state: BackfillState
lateinit var partition_state: BackfillPartitionState

@Column
var lease_token: String? = null
Expand Down Expand Up @@ -120,14 +120,14 @@ class DbRunPartition() : DbUnsharded<DbRunPartition>, DbTimestampedEntity {
backfill_run_id: Id<DbBackfillRun>,
partition_name: String,
backfill_range: KeyRange,
run_state: BackfillState,
run_state: BackfillPartitionState,
estimated_record_count: Long?
) : this() {
this.backfill_run_id = backfill_run_id
this.partition_name = partition_name
this.pkey_range_start = backfill_range.start
this.pkey_range_end = backfill_range.end
this.run_state = run_state
this.partition_state = run_state
this.lease_expires_at = Instant.ofEpochSecond(1L)
this.estimated_record_count = estimated_record_count
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ interface RunPartitionQuery : Query<DbRunPartition> {
@Constraint("backfill_run_id", Operator.IN)
fun backfillRunIdIn(backfillRunIds: Collection<Id<DbBackfillRun>>): RunPartitionQuery

@Constraint("run_state")
fun runState(runState: BackfillState): RunPartitionQuery
@Constraint("partition_state")
fun partitionState(runState: BackfillPartitionState): RunPartitionQuery

@Constraint("lease_expires_at", Operator.LT)
fun leaseExpiresAtBefore(time: Instant): RunPartitionQuery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import app.cash.backfila.protos.clientservice.RunBatchResponse
import app.cash.backfila.service.BackfilaMetrics
import app.cash.backfila.service.SlackHelper
import app.cash.backfila.service.persistence.BackfilaDb
import app.cash.backfila.service.persistence.BackfillPartitionState
import app.cash.backfila.service.persistence.BackfillState
import app.cash.backfila.service.persistence.DbBackfillRun
import app.cash.backfila.service.persistence.DbEventLog
Expand Down Expand Up @@ -169,7 +170,7 @@ class BackfillRunner private constructor(
batchAwaiter.updateProgress(dbRunPartition)

// Now that state is stored, check if we should exit.
if (dbRunPartition.run_state != BackfillState.RUNNING) {
if (dbRunPartition.partition_state != BackfillPartitionState.RUNNING) {
logger.info { "Backfill is no longer in RUNNING state, stopping runner ${logLabel()}" }
running = false
return@transaction
Expand Down
Loading

0 comments on commit c3a815a

Please sign in to comment.