Skip to content

Commit

Permalink
feat: get info about jobs pinned to release candidates (#14517)
Browse files Browse the repository at this point in the history
  • Loading branch information
clnoll committed Nov 5, 2024
1 parent 64c5eb2 commit 1461053
Show file tree
Hide file tree
Showing 6 changed files with 577 additions and 70 deletions.
62 changes: 62 additions & 0 deletions airbyte-api/server-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5222,6 +5222,31 @@ paths:
tags:
- connector_rollout

/v1/connector_rollout/get_actor_sync_info:
post:
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/ConnectorRolloutGetActorSyncInfoRequestBody"

responses:
"200":
description: A list of actors pinned to the release candidate, and information about their syncs
content:
application/json:
schema:
$ref: "#/components/schemas/ConnectorRolloutActorSyncInfoResponse"
"404":
$ref: "#/components/responses/NotFoundResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"

summary: Get a list of actors pinned to a release candidate, and information about their syncs
operationId: getConnectorRolloutActorSyncInfo
tags:
- connector_rollout

/v1/connector_rollout/start:
post:
requestBody:
Expand Down Expand Up @@ -7703,6 +7728,43 @@ components:
type: string
format: uuid

ConnectorRolloutGetActorSyncInfoRequestBody:
type: object
required:
- id
properties:
id:
type: string
format: uuid

ConnectorRolloutActorSyncInfoResponse:
type: object
required:
- data
properties:
data:
type: array
items:
$ref: "#/components/schemas/ConnectorRolloutActorSyncInfo"

ConnectorRolloutActorSyncInfo:
type: object
required:
- actor_id
- n_succeeded
- n_failed
- n_connections
properties:
actor_id:
type: string
format: uuid
n_succeeded:
type: integer
n_failed:
type: integer
n_connections:
type: integer

ConnectorRolloutManualFinalizeResponse:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.airbyte.commons.server.handlers

import com.google.common.annotations.VisibleForTesting
import io.airbyte.api.model.generated.ConnectorRolloutActorSyncInfo
import io.airbyte.api.model.generated.ConnectorRolloutFinalizeRequestBody
import io.airbyte.api.model.generated.ConnectorRolloutManualFinalizeRequestBody
import io.airbyte.api.model.generated.ConnectorRolloutManualFinalizeResponse
Expand Down Expand Up @@ -430,6 +431,18 @@ open class ConnectorRolloutHandler
return buildConnectorRolloutRead(updatedConnectorRollout)
}

fun getActorSyncInfo(id: UUID): List<ConnectorRolloutActorSyncInfo> {
val rollout = connectorRolloutService.getConnectorRollout(id)
val actorSyncInfoMap = rolloutActorFinder.getSyncInfoForPinnedActors(rollout)
return actorSyncInfoMap.map { (actorId, syncInfo) ->
ConnectorRolloutActorSyncInfo()
.actorId(actorId)
.nConnections(syncInfo.nConnections)
.nSucceeded(syncInfo.nSucceeded)
.nFailed(syncInfo.nFailed)
}
}

open fun manualStartConnectorRollout(connectorRolloutWorkflowStart: ConnectorRolloutManualStartRequestBody): ConnectorRolloutRead {
val rollout =
getOrCreateAndValidateManualStartInput(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import io.airbyte.config.ConnectorRolloutFinalState
import io.airbyte.config.persistence.UserPersistence
import io.airbyte.connector.rollout.client.ConnectorRolloutClient
import io.airbyte.connector.rollout.shared.ActorSelectionInfo
import io.airbyte.connector.rollout.shared.ActorSyncJobInfo
import io.airbyte.connector.rollout.shared.RolloutActorFinder
import io.airbyte.connector.rollout.shared.models.ConnectorRolloutOutput
import io.airbyte.data.helpers.ActorDefinitionVersionUpdater
Expand Down Expand Up @@ -673,6 +674,28 @@ internal class ConnectorRolloutHandlerTest {
verify { connectorRolloutService.getConnectorRollout(rolloutId) }
}

@Test
fun `test getActorSyncInfo`() {
val rolloutId = UUID.randomUUID()
val connectorRollout = createMockConnectorRollout(rolloutId)
val actorId = UUID.randomUUID()
val nSucceeded = 1
val nFailed = 2
val nConnections = 5
val actorSyncJobInfo = ActorSyncJobInfo(nSucceeded, nFailed, nConnections)

every { connectorRolloutService.getConnectorRollout(rolloutId) } returns connectorRollout
every { rolloutActorFinder.getSyncInfoForPinnedActors(connectorRollout) } returns mapOf(actorId to actorSyncJobInfo)

val result = connectorRolloutHandler.getActorSyncInfo(rolloutId)

assertEquals(1, result.size)
assertEquals(actorId, result.first().actorId)
assertEquals(nConnections, result.first().getnConnections())
assertEquals(nSucceeded, result.first().getnSucceeded())
assertEquals(nFailed, result.first().getnFailed())
}

@Test
fun `test manualStartConnectorRollout`() {
val rolloutId = UUID.randomUUID()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,25 @@ import io.airbyte.config.ConfigOriginType
import io.airbyte.config.ConfigResourceType
import io.airbyte.config.ConfigScopeType
import io.airbyte.config.ConnectorRollout
import io.airbyte.config.Job
import io.airbyte.config.JobConfig
import io.airbyte.config.JobStatus
import io.airbyte.config.Schedule
import io.airbyte.config.StandardSync
import io.airbyte.data.exceptions.ConfigNotFoundException
import io.airbyte.data.helpers.ActorDefinitionVersionUpdater
import io.airbyte.data.services.ConnectionService
import io.airbyte.data.services.DestinationService
import io.airbyte.data.services.JobService
import io.airbyte.data.services.ScopedConfigurationService
import io.airbyte.data.services.SourceService
import io.airbyte.data.services.shared.ConfigScopeMapWithId
import io.airbyte.data.services.shared.ConnectorVersionKey
import io.airbyte.persistence.job.JobPersistence
import io.github.oshai.kotlinlogging.KotlinLogging
import jakarta.inject.Singleton
import java.time.Instant
import java.time.OffsetDateTime
import java.time.ZoneOffset
import java.util.UUID
import kotlin.math.ceil

Expand All @@ -34,16 +39,22 @@ data class ActorSelectionInfo(
val percentagePinned: Int,
)

data class ActorSyncJobInfo(
var nSucceeded: Int = 0,
var nFailed: Int = 0,
var nConnections: Int = 0,
)

@Singleton
class RolloutActorFinder(
private val actorDefinitionVersionUpdater: ActorDefinitionVersionUpdater,
private val connectionService: ConnectionService,
private val jobPersistence: JobPersistence,
private val jobService: JobService,
private val scopedConfigurationService: ScopedConfigurationService,
private val sourceService: SourceService,
private val destinationService: DestinationService,
) {
open fun getActorSelectionInfo(
fun getActorSelectionInfo(
connectorRollout: ConnectorRollout,
targetPercent: Int,
): ActorSelectionInfo {
Expand All @@ -59,15 +70,16 @@ class RolloutActorFinder(

logger.info { "Rollout ${connectorRollout.id}: ${candidates.size} after filterByTier" }

val sortedActorDefinitionConnections = getSortedActorDefinitionConnections(candidates, connectorRollout.actorDefinitionId, actorType)
candidates = filterByJobStatus(candidates, sortedActorDefinitionConnections, actorType)
val sortedActorDefinitionConnections =
getSortedActorDefinitionConnections(candidates.map { it.id }, connectorRollout.actorDefinitionId, actorType)
candidates = filterByJobStatus(connectorRollout, candidates, sortedActorDefinitionConnections, actorType)

logger.info {
"Rollout ${connectorRollout.id}: " +
"${sortedActorDefinitionConnections.size} connections total; ${candidates.size} after filterByJobStatus"
}

val nPreviouslyPinned = getNPinnedToReleaseCandidate(connectorRollout)
val nPreviouslyPinned = getActorsPinnedToReleaseCandidate(connectorRollout).size
logger.info { "Rollout ${connectorRollout.id}: $nPreviouslyPinned already pinned to the release candidate" }

candidates = filterByAlreadyPinned(connectorRollout.actorDefinitionId, candidates)
Expand Down Expand Up @@ -108,6 +120,93 @@ class RolloutActorFinder(
)
}

fun getSyncInfoForPinnedActors(connectorRollout: ConnectorRollout): Map<UUID, ActorSyncJobInfo> {
val actorType = getActorType(connectorRollout.actorDefinitionId)

val pinnedActorSyncs =
getSortedActorDefinitionConnections(
getActorsPinnedToReleaseCandidate(connectorRollout),
connectorRollout.actorDefinitionId,
actorType,
)

logger.info { "Rollout ${connectorRollout.id}: ${pinnedActorSyncs.size} connections for pinned actors" }

return getActorJobInfo(
connectorRollout,
pinnedActorSyncs,
actorType,
Instant.ofEpochMilli(connectorRollout.createdAt).atOffset(ZoneOffset.UTC),
connectorRollout.releaseCandidateVersionId,
)
}

@VisibleForTesting
fun getActorJobInfo(
connectorRollout: ConnectorRollout,
actorSyncs: List<StandardSync>,
actorType: ActorType,
createdAt: OffsetDateTime?,
versionId: UUID?,
): Map<UUID, ActorSyncJobInfo> {
val actorActorSyncJobInfoMap = mutableMapOf<UUID, ActorSyncJobInfo>()

for (connection in actorSyncs) {
val actorId = (if (actorType == ActorType.SOURCE) connection.sourceId else connection.destinationId) ?: continue
val connectionJobs =
jobService.listJobs(
setOf(JobConfig.ConfigType.SYNC),
connection.connectionId.toString(),
1,
0,
listOf(),
createdAt ?: OffsetDateTime.now().minusDays(1),
null,
null,
null,
).filter { if (versionId != null) jobDefinitionVersionIdEq(actorType, it, versionId) else jobDockerImageIsDefault(actorType, it) }
val nSucceeded = connectionJobs.filter { it.status == JobStatus.SUCCEEDED }.size
val nFailed = connectionJobs.filter { it.status == JobStatus.FAILED }.size

if (actorActorSyncJobInfoMap.containsKey(actorId)) {
actorActorSyncJobInfoMap[actorId]!!.nSucceeded += nSucceeded
actorActorSyncJobInfoMap[actorId]!!.nFailed += nSucceeded
} else {
actorActorSyncJobInfoMap[actorId] = ActorSyncJobInfo(nSucceeded = nSucceeded, nFailed = nFailed)
}
actorActorSyncJobInfoMap[actorId]!!.nConnections++
}

logger.info { "connectorRollout.id=${connectorRollout.id} actorActorSyncJobInfoMap=$actorActorSyncJobInfoMap" }

return actorActorSyncJobInfoMap
}

@VisibleForTesting
fun jobDefinitionVersionIdEq(
actorType: ActorType,
job: Job,
versionId: UUID,
): Boolean {
return if (actorType == ActorType.SOURCE) {
job.config.sync.sourceDefinitionVersionId == versionId
} else {
job.config.sync.destinationDefinitionVersionId == versionId
}
}

@VisibleForTesting
fun jobDockerImageIsDefault(
actorType: ActorType,
job: Job,
): Boolean {
return if (actorType == ActorType.SOURCE) {
job.config.sync.sourceDockerImageIsDefault
} else {
job.config.sync.destinationDockerImageIsDefault
}
}

@VisibleForTesting
fun getActorType(actorDefinitionId: UUID): ActorType {
return try {
Expand Down Expand Up @@ -166,7 +265,7 @@ class RolloutActorFinder(
}

@VisibleForTesting
fun getNPinnedToReleaseCandidate(connectorRollout: ConnectorRollout): Int {
fun getActorsPinnedToReleaseCandidate(connectorRollout: ConnectorRollout): List<UUID> {
val scopedConfigurations =
scopedConfigurationService.listScopedConfigurationsWithValues(
ConnectorVersionKey.key,
Expand All @@ -180,24 +279,23 @@ class RolloutActorFinder(
return scopedConfigurations.filter {
it.value == connectorRollout.releaseCandidateVersionId.toString() &&
it.originType == ConfigOriginType.CONNECTOR_ROLLOUT
}.size
}.map { it.id }
}

@VisibleForTesting
fun getSortedActorDefinitionConnections(
candidates: Collection<ConfigScopeMapWithId>,
actorIds: List<UUID>,
actorDefinitionId: UUID,
actorType: ActorType,
): List<StandardSync> {
val candidateActorIds = candidates.map { it.id }
return connectionService.listConnectionsByActorDefinitionIdAndType(
actorDefinitionId,
actorType.toString(),
false,
).filter { connection ->
when (actorType) {
ActorType.SOURCE -> connection.sourceId in candidateActorIds
ActorType.DESTINATION -> connection.destinationId in candidateActorIds
ActorType.SOURCE -> connection.sourceId in actorIds
ActorType.DESTINATION -> connection.destinationId in actorIds
}
}.filter { connection ->
connection.manual != true
Expand All @@ -224,28 +322,15 @@ class RolloutActorFinder(

@VisibleForTesting
fun filterByJobStatus(
connectorRollout: ConnectorRollout,
candidates: Collection<ConfigScopeMapWithId>,
actorDefinitionConnections: List<StandardSync>,
actorType: ActorType,
): List<ConfigScopeMapWithId> {
// For each connection, get the most recent job
// We map the status of all jobs for an actor to the actor ID; if any of the actor's connections failed
// we don't want to use the actor for the rollout
val actorJobSuccessMap = mutableMapOf<UUID, Boolean>()
val connectionIdToActorId =
actorDefinitionConnections.associate {
it.connectionId to if (actorType == ActorType.SOURCE) it.sourceId else it.destinationId
}
val connectionJobs = jobPersistence.getLastSyncJobForConnections(actorDefinitionConnections.map { it.connectionId })
for (job in connectionJobs) {
val actorId = connectionIdToActorId[job.connectionId]
if (actorId != null) {
actorJobSuccessMap[actorId] = actorJobSuccessMap.getOrDefault(actorId, true) && (job.status == JobStatus.SUCCEEDED)
}
}

// If any of the actor's connections failed we don't want to use the actor for the rollout
val actorJobInfo = getActorJobInfo(connectorRollout, actorDefinitionConnections, actorType, null, null)
return candidates.filter {
actorJobSuccessMap.getOrDefault(it.id, false)
actorJobInfo.containsKey(it.id) && actorJobInfo[it.id]!!.nFailed == 0 && actorJobInfo[it.id]!!.nSucceeded > 0
}
}

Expand Down
Loading

0 comments on commit 1461053

Please sign in to comment.