Skip to content

Commit

Permalink
Bulk Load CDK: Various small exception handling/shutdown workflow fix…
Browse files Browse the repository at this point in the history
…es/improvements (#46724)

Co-authored-by: Edward Gao <[email protected]>
  • Loading branch information
johnny-schmidt and edgao authored Oct 13, 2024
1 parent 9828f91 commit e6458f5
Show file tree
Hide file tree
Showing 22 changed files with 263 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicReference
import java.util.function.Consumer
import kotlinx.coroutines.delay
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

Expand All @@ -32,6 +33,7 @@ interface CheckpointManager<K, T> {
suspend fun flushReadyCheckpointMessages()
suspend fun getLastSuccessfulFlushTimeMs(): Long
suspend fun getNextCheckpointIndexes(): Map<K, Long>
suspend fun awaitAllCheckpointsFlushed()
}

/**
Expand Down Expand Up @@ -153,8 +155,8 @@ abstract class StreamsCheckpointManager<T> : CheckpointManager<DestinationStream
}
if (allStreamsPersisted) {
log.info { "Flushing global checkpoint with stream indexes: ${head.streamIndexes}" }
globalCheckpoints.poll()
validateAndSendMessage(head.checkpointMessage, head.streamIndexes)
globalCheckpoints.poll() // don't remove until after we've successfully sent
} else {
break
}
Expand All @@ -168,11 +170,11 @@ abstract class StreamsCheckpointManager<T> : CheckpointManager<DestinationStream
while (true) {
val (nextIndex, nextMessage) = streamCheckpoints.peek() ?: break
if (manager.areRecordsPersistedUntil(nextIndex)) {
streamCheckpoints.poll()
log.info {
"Flushing checkpoint for stream: ${stream.descriptor} at index: $nextIndex"
}
validateAndSendMessage(nextMessage, listOf(stream.descriptor to nextIndex))
streamCheckpoints.poll() // don't remove until after we've successfully sent
} else {
break
}
Expand Down Expand Up @@ -221,6 +223,24 @@ abstract class StreamsCheckpointManager<T> : CheckpointManager<DestinationStream
}
}
}

override suspend fun awaitAllCheckpointsFlushed() {
while (true) {
val allCheckpointsFlushed =
flushLock.withLock {
globalCheckpoints.isEmpty() && streamCheckpoints.all { it.value.isEmpty() }
}
if (allCheckpointsFlushed) {
log.info { "All checkpoints flushed" }
break
}
log.info { "Waiting for all checkpoints to flush" }
// Not usually a fan of busywaiting, but it's extremely unlikely we
// get here without more than a handful of stragglers
delay(1000L)
flushReadyCheckpointMessages()
}
}
}

@Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import io.airbyte.cdk.load.task.implementor.FailSyncTaskFactory
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton
import kotlinx.coroutines.CancellationException

/**
* The level at which a task operates:
Expand All @@ -23,16 +24,20 @@ import jakarta.inject.Singleton
*/
sealed interface LeveledTask : Task

interface SyncTask : LeveledTask
interface SyncLevel : LeveledTask

interface StreamTask : LeveledTask {
interface StreamLevel : LeveledTask {
val stream: DestinationStream
}

interface DestinationTaskExceptionHandler<T : Task> : TaskExceptionHandler<LeveledTask, T> {
interface DestinationTaskExceptionHandler<T : Task, U : Task> : TaskExceptionHandler<T, U> {
suspend fun handleSyncFailure(e: Exception)
suspend fun handleStreamFailure(stream: DestinationStream, e: Exception)
suspend fun handleTeardownComplete()
suspend fun handleSyncFailed()
}

interface WrappedTask<T : Task> : Task {
val innerTask: T
}

/**
Expand All @@ -48,19 +53,21 @@ interface DestinationTaskExceptionHandler<T : Task> : TaskExceptionHandler<Level
@SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION", justification = "Kotlin async continuation")
@Singleton
@Secondary
class DefaultDestinationTaskExceptionHandler(
private val taskScopeProvider: TaskScopeProvider<ScopedTask>,
class DefaultDestinationTaskExceptionHandler<T>(
private val taskScopeProvider: TaskScopeProvider<WrappedTask<ScopedTask>>,
private val catalog: DestinationCatalog,
private val syncManager: SyncManager,
private val failStreamTaskFactory: FailStreamTaskFactory,
private val failSyncTaskFactory: FailSyncTaskFactory,
) : DestinationTaskExceptionHandler<ScopedTask> {
) : DestinationTaskExceptionHandler<T, WrappedTask<ScopedTask>> where
T : LeveledTask,
T : ScopedTask {
val log = KotlinLogging.logger {}

inner class SyncTaskWrapper(
private val syncManager: SyncManager,
private val innerTask: SyncTask,
) : SyncTask, InternalTask {
override val innerTask: ScopedTask,
) : WrappedTask<ScopedTask> {
override suspend fun execute() {
if (!syncManager.isActive()) {
val result = syncManager.awaitSyncResult()
Expand All @@ -69,13 +76,15 @@ class DefaultDestinationTaskExceptionHandler(
"Task $innerTask run after sync has succeeded. This should not happen."
)
}
log.info { "Sync terminated, skipping task $innerTask." }

log.info { "Sync task $innerTask skipped because sync has already failed." }
return
}

try {
innerTask.execute()
} catch (e: CancellationException) {
log.warn { "Sync task $innerTask was cancelled." }
throw e
} catch (e: Exception) {
handleSyncFailure(e)
}
Expand All @@ -87,30 +96,32 @@ class DefaultDestinationTaskExceptionHandler(
}

inner class StreamTaskWrapper(
private val stream: DestinationStream,
private val syncManager: SyncManager,
private val innerTask: StreamTask,
) : SyncTask, InternalTask {
override val innerTask: ScopedTask,
) : WrappedTask<ScopedTask> {
override suspend fun execute() {
// Stop dispatching tasks if the stream has been killed by a failure elsewhere.
// Specifically fail if the stream was marked succeeded: we should not be in this state.
val streamManager = syncManager.getStreamManager(innerTask.stream.descriptor)
val streamManager = syncManager.getStreamManager(stream.descriptor)
if (!streamManager.isActive()) {
val result = streamManager.awaitStreamResult()
if (result is StreamSucceeded) {
throw IllegalStateException(
"Task $innerTask run after its stream ${innerTask.stream.descriptor} has succeeded. This should not happen."
"Task $innerTask run after its stream ${stream.descriptor} has succeeded. This should not happen."
)
}
log.info {
"Stream ${innerTask.stream.descriptor} terminated with $result, skipping task $innerTask."
}
log.info { "Stream task $innerTask skipped because stream has already failed." }
return
}

try {
innerTask.execute()
} catch (e: CancellationException) {
log.warn { "Stream task $innerTask was cancelled." }
throw e
} catch (e: Exception) {
handleStreamFailure(innerTask.stream, e)
handleStreamFailure(stream, e)
}
}

Expand All @@ -119,30 +130,43 @@ class DefaultDestinationTaskExceptionHandler(
}
}

override fun withExceptionHandling(task: LeveledTask): ScopedTask {
inner class NoHandlingWrapper(
override val innerTask: ScopedTask,
) : WrappedTask<ScopedTask> {
override suspend fun execute() {
innerTask.execute()
}

override fun toString(): String {
return "NoHandlingWrapper(innerTask=$innerTask)"
}
}

override fun withExceptionHandling(task: T): WrappedTask<ScopedTask> {
return when (task) {
is SyncTask -> SyncTaskWrapper(syncManager, task)
is StreamTask -> SyncTaskWrapper(syncManager, StreamTaskWrapper(syncManager, task))
is SyncLevel -> SyncTaskWrapper(syncManager, task)
is StreamLevel -> StreamTaskWrapper(task.stream, syncManager, task)
else -> throw IllegalArgumentException("Task without level: $task")
}
}

override suspend fun handleSyncFailure(e: Exception) {
log.error { "Sync failed: $e: killing remaining streams" }
catalog.streams.forEach {
val task = failStreamTaskFactory.make(this, e, it, kill = true)
taskScopeProvider.launch(task)
taskScopeProvider.launch(NoHandlingWrapper(task))
}
val failSyncTask = failSyncTaskFactory.make(this, e)
taskScopeProvider.launch(failSyncTask)
taskScopeProvider.launch(NoHandlingWrapper(failSyncTask))
}

override suspend fun handleStreamFailure(stream: DestinationStream, e: Exception) {
log.error { "Caught failure in stream task: $e for ${stream.descriptor}, failing stream" }
val failStreamTask = failStreamTaskFactory.make(this, e, stream, kill = false)
taskScopeProvider.launch(failStreamTask)
taskScopeProvider.launch(NoHandlingWrapper(failStreamTask))
}

override suspend fun handleTeardownComplete() {
taskScopeProvider.close()
override suspend fun handleSyncFailed() {
taskScopeProvider.kill()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ interface DestinationTaskLauncher : TaskLauncher {
justification = "arguments are guaranteed to be non-null by Kotlin's type system"
)
class DefaultDestinationTaskLauncher(
private val taskScopeProvider: TaskScopeProvider<ScopedTask>,
private val taskScopeProvider: TaskScopeProvider<WrappedTask<ScopedTask>>,
private val catalog: DestinationCatalog,
private val syncManager: SyncManager,

Expand All @@ -100,7 +100,7 @@ class DefaultDestinationTaskLauncher(
private val updateCheckpointsTask: UpdateCheckpointsTask,

// Exception handling
private val exceptionHandler: TaskExceptionHandler<LeveledTask, ScopedTask>
private val exceptionHandler: TaskExceptionHandler<LeveledTask, WrappedTask<ScopedTask>>
) : DestinationTaskLauncher {
private val log = KotlinLogging.logger {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,41 @@ import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.launch
import kotlinx.coroutines.supervisorScope
import kotlinx.coroutines.withTimeoutOrNull

/**
* The scope in which a task should run
* - InternalTask:
* - [InternalScope]:
* ```
* - internal to the task launcher
* - should not be blockable by implementor errors
* - killable w/o side effects
* ```
* - ImplementorTask: implemented by the destination
* - [ImplementorScope]: implemented by the destination
* ```
* - calls implementor interface
* - should not block internal tasks (esp reading from stdin)
* - should complete if possible even when failing the sync
* ```
* - [ShutdownScope]: special case of [ImplementorScope]
* ```
* - tasks that should run during shutdown
* - handles canceling/joining other tasks
* - (and so should not cancel themselves)
* ```
*/
sealed interface ScopedTask : Task

interface InternalTask : ScopedTask
interface InternalScope : ScopedTask

interface ImplementorScope : ScopedTask

interface ImplementorTask : ScopedTask
interface ShutdownScope : ScopedTask

@Singleton
@Secondary
class DestinationTaskScopeProvider(config: DestinationConfiguration) :
TaskScopeProvider<ScopedTask> {
TaskScopeProvider<WrappedTask<ScopedTask>> {
private val log = KotlinLogging.logger {}

private val timeoutMs = config.gracefulCancellationTimeoutMs
Expand All @@ -64,35 +71,48 @@ class DestinationTaskScopeProvider(config: DestinationConfiguration) :
.asCoroutineDispatcher()
)

override suspend fun launch(task: ScopedTask) {
when (task) {
is InternalTask -> internalScope.scope.launch { execute(task) }
is ImplementorTask -> implementorScope.scope.launch { execute(task) }
override suspend fun launch(task: WrappedTask<ScopedTask>) {
when (task.innerTask) {
is InternalScope -> internalScope.scope.launch { execute(task, "internal") }
is ImplementorScope -> implementorScope.scope.launch { execute(task, "implementor") }
is ShutdownScope -> implementorScope.scope.launch { execute(task, "shutdown") }
}
}

private suspend fun execute(task: ScopedTask) {
log.info { "Launching task $task" }
private suspend fun execute(task: WrappedTask<ScopedTask>, scope: String) {
log.info { "Launching task $task in scope $scope" }
val elapsed = measureTimeMillis { task.execute() }
log.info { "Task $task completed in $elapsed ms" }
}

override suspend fun close() = supervisorScope {
override suspend fun close() {
log.info { "Closing task scopes" }
internalScope.job.cancel()
// Under normal operation, all tasks should be complete
// (except things like force flush, which loop). So
// - it's safe to force cancel the internal tasks
// - implementor scope should join immediately unless we're
// failing, in which case we want to give them a chance to
// fail gracefully
// - implementor scope should join immediately
implementorScope.job.join()
log.info { "Implementor tasks completed, cancelling internal tasks." }
internalScope.job.cancel()
}

override suspend fun kill() {
log.info { "Killing task scopes" }

// Give the implementor tasks a chance to fail gracefully
withTimeoutOrNull(timeoutMs) {
log.info { "Waiting ${timeoutMs}ms for implementor tasks to complete" }
log.info {
"Cancelled internal tasks, waiting ${timeoutMs}ms for implementor tasks to complete"
}
implementorScope.job.join()
log.info { "Implementor tasks completed" }
}
?: run {
log.error { "Implementor tasks did not complete within ${timeoutMs}ms, cancelling" }
implementorScope.job.cancel()
}

log.info { "Cancelling internal tasks" }
internalScope.job.cancel()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,7 @@ interface TaskExceptionHandler<T : Task, U : Task> {
interface TaskScopeProvider<T : Task> : CloseableCoroutine {
/** Launch a task in the correct scope. */
suspend fun launch(task: T)

/** Unliked close, may attempt to fail gracefully, but should guarantee return. */
suspend fun kill()
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ package io.airbyte.cdk.load.task.implementor
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.state.SyncManager
import io.airbyte.cdk.load.task.DestinationTaskLauncher
import io.airbyte.cdk.load.task.ImplementorTask
import io.airbyte.cdk.load.task.StreamTask
import io.airbyte.cdk.load.task.ImplementorScope
import io.airbyte.cdk.load.task.StreamLevel
import io.airbyte.cdk.load.write.StreamLoader
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton

interface CloseStreamTask : StreamTask, ImplementorTask
interface CloseStreamTask : StreamLevel, ImplementorScope

/**
* Wraps @[StreamLoader.close] and marks the stream as closed in the stream manager. Also starts the
Expand Down
Loading

0 comments on commit e6458f5

Please sign in to comment.