Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Simplify scope and queue closure #50421

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ sealed class AirbyteConnectorRunner(
ApplicationContext.builder(R::class.java, *envs)
.propertySources(
*listOfNotNull(
null,
airbytePropertySource,
commandLinePropertySource,
MetadataYamlPropertySource(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,13 @@ abstract class DestinationConfiguration : Configuration {
*/
open val gracefulCancellationTimeoutMs: Long = 60 * 1000L // 1 minutes

// TODO: Generalize this configuration (when we generalize the accumulator tasks)
open val numProcessRecordsWorkers: Int = 2
open val processRecordsWorkersAreIO: Boolean = true // TODO: Refactor reading from the spill-disk out of this task scope

open val numProcessBatchWorkers: Int = 5
open val processBatchWorkersAreIO: Boolean = true

open val numProcessBatchWorkersForFileTransfer: Int = 3
open val batchQueueDepth: Int = 10

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.JsonNodeFactory
import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.load.data.*
import io.github.oshai.kotlinlogging.KotlinLogging

class JsonSchemaToAirbyteType {
private val log = KotlinLogging.logger {}

fun convert(schema: JsonNode): AirbyteType = convertInner(schema)!!

private fun convertInner(schema: JsonNode): AirbyteType? {
Expand Down Expand Up @@ -87,7 +90,10 @@ class JsonSchemaToAirbyteType {
TimestampTypeWithTimezone
}
null -> StringType
else -> UnknownType(schema)
else -> {
log.warn { "Ignoring unrecognized string format: ${schema.get("format").asText()}" }
StringType
}
}

private fun fromNumber(schema: ObjectNode): AirbyteType =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ interface QueueWriter<T> : CloseableCoroutine {
interface MessageQueue<T> : QueueReader<T>, QueueWriter<T>

abstract class ChannelMessageQueue<T> : MessageQueue<T> {
open val channel = Channel<T>(Channel.UNLIMITED)
open val channel: Channel<T> = Channel(Channel.UNLIMITED)

override suspend fun publish(message: T) = channel.send(message)
override suspend fun consume(): Flow<T> = channel.receiveAsFlow()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ import jakarta.inject.Named
import jakarta.inject.Singleton
import java.util.concurrent.atomic.AtomicBoolean
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

Expand Down Expand Up @@ -92,7 +97,6 @@ interface DestinationTaskLauncher : TaskLauncher {
justification = "arguments are guaranteed to be non-null by Kotlin's type system"
)
class DefaultDestinationTaskLauncher(
private val taskScopeProvider: TaskScopeProvider<WrappedTask<ScopedTask>>,
private val catalog: DestinationCatalog,
private val config: DestinationConfiguration,
private val syncManager: SyncManager,
Expand Down Expand Up @@ -132,15 +136,27 @@ class DefaultDestinationTaskLauncher(
) : DestinationTaskLauncher {
private val log = KotlinLogging.logger {}

private val supervisor = SupervisorJob()
private val ioScope = CoroutineScope(Dispatchers.IO + supervisor)
private val defaultScope = CoroutineScope(Dispatchers.Default + supervisor)

private val batchUpdateLock = Mutex()
private val succeeded = Channel<Boolean>(Channel.UNLIMITED)

private val teardownIsEnqueued = AtomicBoolean(false)
private val failSyncIsEnqueued = AtomicBoolean(false)

abstract class WrappedTask : Task {
abstract val innerTask: Task

override fun toString(): String {
return "$this($innerTask)"
}
}

inner class TaskWrapper(
override val innerTask: ScopedTask,
) : WrappedTask<ScopedTask> {
override val innerTask: Task,
): WrappedTask() {
override suspend fun execute() {
try {
innerTask.execute()
Expand All @@ -159,16 +175,22 @@ class DefaultDestinationTaskLauncher(
}

inner class NoopWrapper(
override val innerTask: ScopedTask,
) : WrappedTask<ScopedTask> {
override val innerTask: Task,
) : WrappedTask() {
override suspend fun execute() {
innerTask.execute()
}
}

private suspend fun enqueue(task: ScopedTask, withExceptionHandling: Boolean = true) {
private suspend fun launch(
task: Task,
withExceptionHandling: Boolean = true,
isIOTask: Boolean = false) {
val wrapped = if (withExceptionHandling) TaskWrapper(task) else NoopWrapper(task)
taskScopeProvider.launch(wrapped)
val scope = if (isIOTask) ioScope else defaultScope
scope.launch {
wrapped.execute()
}
}

override suspend fun run() {
Expand All @@ -183,71 +205,70 @@ class DefaultDestinationTaskLauncher(
fileTransferQueue = fileTransferQueue,
destinationTaskLauncher = this,
)
enqueue(inputConsumerTask)
launch(inputConsumerTask, isIOTask = true)

// Launch the client interface setup task
log.info { "Starting startup task" }
val setupTask = setupTaskFactory.make(this)
enqueue(setupTask)
launch(setupTask, isIOTask = true)

// TODO: pluggable file transfer
if (!fileTransferEnabled) {
// Start a spill-to-disk task for each record stream
catalog.streams.forEach { stream ->
log.info { "Starting spill-to-disk task for $stream" }
val spillTask = spillToDiskTaskFactory.make(this, stream.descriptor)
enqueue(spillTask)
launch(spillTask, isIOTask = true)
}

repeat(config.numProcessRecordsWorkers) {
log.info { "Launching process records task $it" }
val task = processRecordsTaskFactory.make(this)
enqueue(task)
launch(task, isIOTask = config.processRecordsWorkersAreIO)
}

repeat(config.numProcessBatchWorkers) {
log.info { "Launching process batch task $it" }
val task = processBatchTaskFactory.make(this)
enqueue(task)
launch(task, isIOTask = config.processBatchWorkersAreIO)
}
} else {
repeat(config.numProcessRecordsWorkers) {
log.info { "Launching process file task $it" }
enqueue(processFileTaskFactory.make(this))
launch(processFileTaskFactory.make(this), isIOTask = true)
}

repeat(config.numProcessBatchWorkersForFileTransfer) {
log.info { "Launching process batch task $it" }
val task = processBatchTaskFactory.make(this)
enqueue(task)
launch(task, isIOTask = true)
}
}

// Start flush task
log.info { "Starting timed file aggregate flush task " }
enqueue(flushTickTask)
launch(flushTickTask, isIOTask = false)

// Start the checkpoint management tasks
log.info { "Starting timed checkpoint flush task" }
enqueue(timedCheckpointFlushTask)
launch(timedCheckpointFlushTask, isIOTask = true)

log.info { "Starting checkpoint update task" }
enqueue(updateCheckpointsTask)
launch(updateCheckpointsTask, isIOTask = false)

// Await completion
if (succeeded.receive()) {
taskScopeProvider.close()
} else {
taskScopeProvider.kill()
}
val success = succeeded.receive()
log.info { "Sync complete (success=$success), killing all remaining tasks" }
supervisor.cancelAndJoin()
// TODO: Close queues here
}

/** Called when the initial destination setup completes. */
override suspend fun handleSetupComplete() {
catalog.streams.forEach {
log.info { "Starting open stream task for $it" }
val task = openStreamTaskFactory.make(this, it)
enqueue(task)
launch(task, isIOTask = true)
}
}

Expand All @@ -273,14 +294,14 @@ class DefaultDestinationTaskLauncher(
log.info {
"Batch $wrapped is persisted: Starting flush checkpoints task for $stream"
}
enqueue(flushCheckpointsTaskFactory.make())
launch(flushCheckpointsTaskFactory.make(), isIOTask = true)
}

if (streamManager.isBatchProcessingComplete()) {
log.info { "Batch processing complete: Starting close stream task for $stream" }

val task = closeStreamTaskFactory.make(this, stream)
enqueue(task)
launch(task, isIOTask = true)
} else {
log.info { "Batch processing not complete: nothing else to do." }
}
Expand All @@ -290,7 +311,7 @@ class DefaultDestinationTaskLauncher(
/** Called when a stream is closed. */
override suspend fun handleStreamClosed(stream: DestinationStream.Descriptor) {
if (teardownIsEnqueued.setOnce()) {
enqueue(teardownTaskFactory.make(this))
launch(teardownTaskFactory.make(this), isIOTask = true)
} else {
log.info { "Teardown task already enqueued, not enqueuing another one" }
}
Expand All @@ -299,15 +320,15 @@ class DefaultDestinationTaskLauncher(
override suspend fun handleException(e: Exception) {
catalog.streams
.map { failStreamTaskFactory.make(this, e, it.descriptor) }
.forEach { enqueue(it, withExceptionHandling = false) }
.forEach { launch(it, withExceptionHandling = false, isIOTask = true) }
}

override suspend fun handleFailStreamComplete(
stream: DestinationStream.Descriptor,
e: Exception
) {
if (failSyncIsEnqueued.setOnce()) {
enqueue(failSyncTaskFactory.make(this, e))
launch(failSyncTaskFactory.make(this, e), isIOTask = true)
} else {
log.info { "Teardown task already enqueued, not enqueuing another one" }
}
Expand Down
Loading
Loading