Skip to content

Commit

Permalink
chore: load CDK clears partial aggregates at cadence (#48551)
Browse files Browse the repository at this point in the history
  • Loading branch information
tryangul authored Nov 20, 2024
1 parent 0727e44 commit 7810e32
Show file tree
Hide file tree
Showing 16 changed files with 852 additions and 284 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@ airbyte:
resources:
disk:
bytes: ${CONNECTOR_STORAGE_LIMIT_BYTES:5368709120} # 5GB
flush:
rate-ms: 900000 # 15 minutes
window-ms: 900000 # 15 minutes
5 changes: 5 additions & 0 deletions airbyte-cdk/bulk/core/load/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,16 @@ def integrationTestTask = tasks.register('integrationTest', Test) {
maxParallelForks = project.test.maxParallelForks
maxHeapSize = project.test.maxHeapSize
}

// These tests are lightweight enough to run on every PR.
tasks.named('check').configure {
dependsOn integrationTest
}

test {
systemProperties(["mockk.junit.extension.requireParallelTesting":"true"])
}

configurations {
integrationTestImplementation.extendsFrom testImplementation
integrationTestRuntimeOnly.extendsFrom testRuntimeOnly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,61 +17,54 @@ interface Sized {
}

/**
* Wrapper for record messages published to the message queue, containing metadata like index and
* size.
* Wrapper message for stream events published to the stream specific queues, containing metadata
* like index and size.
*
* In a future where we deserialize only the info necessary for routing, this could include a dumb
* container for the serialized, and deserialization could be deferred until the spooled records
* were recovered from disk.
*/
sealed class DestinationRecordWrapped : Sized
sealed class DestinationStreamEvent : Sized

sealed class DestinationFileWrapped : Sized

data class StreamRecordWrapped(
data class StreamRecordEvent(
val index: Long,
override val sizeBytes: Long,
val record: DestinationRecord
) : DestinationRecordWrapped()
) : DestinationStreamEvent()

data class StreamFileWrapped(
data class StreamCompleteEvent(
val index: Long,
override val sizeBytes: Long,
val file: DestinationFile
) : DestinationFileWrapped()

data class StreamRecordCompleteWrapped(
val index: Long,
) : DestinationRecordWrapped() {
) : DestinationStreamEvent() {
override val sizeBytes: Long = 0L
}

data class StreamFileCompleteWrapped(
val index: Long,
) : DestinationFileWrapped() {
data class StreamFlushEvent(
val tickedAtMs: Long,
) : DestinationStreamEvent() {
override val sizeBytes: Long = 0L
}

class DestinationRecordQueue : ChannelMessageQueue<Reserved<DestinationRecordWrapped>>()
class DestinationStreamEventQueue : ChannelMessageQueue<Reserved<DestinationStreamEvent>>()

/**
* A supplier of message queues to which ([ReservationManager.reserve]'d) @
* [DestinationRecordWrapped] messages can be published on a @ [DestinationStream] key. The queues
* themselves do not manage memory.
* A supplier of message queues to which ([ReservationManager.reserve]'d) @ [DestinationStreamEvent]
* messages can be published on a @ [DestinationStream] key. The queues themselves do not manage
* memory.
*/
@Singleton
@Secondary
class DestinationRecordQueueSupplier(catalog: DestinationCatalog) :
MessageQueueSupplier<DestinationStream.Descriptor, Reserved<DestinationRecordWrapped>> {
private val queues = ConcurrentHashMap<DestinationStream.Descriptor, DestinationRecordQueue>()
class DestinationStreamQueueSupplier(catalog: DestinationCatalog) :
MessageQueueSupplier<DestinationStream.Descriptor, Reserved<DestinationStreamEvent>> {
private val queues =
ConcurrentHashMap<DestinationStream.Descriptor, DestinationStreamEventQueue>()

init {
catalog.streams.forEach { queues[it.descriptor] = DestinationRecordQueue() }
catalog.streams.forEach { queues[it.descriptor] = DestinationStreamEventQueue() }
}

override fun get(key: DestinationStream.Descriptor): DestinationRecordQueue {
override fun get(key: DestinationStream.Descriptor): DestinationStreamEventQueue {
return queues[key]
?: throw IllegalArgumentException("Reading from non-existent record stream: $key")
?: throw IllegalArgumentException("Reading from non-existent stream: $key")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import kotlinx.coroutines.sync.withLock

/** Releasable reservation of memory. */
class Reserved<T>(
private val parentManager: ReservationManager,
val bytesReserved: Long,
private val parentManager: ReservationManager? = null,
val bytesReserved: Long = 0,
val value: T,
) : CloseableCoroutine {
private var released = AtomicBoolean(false)
Expand All @@ -24,7 +24,7 @@ class Reserved<T>(
if (!released.compareAndSet(false, true)) {
return
}
parentManager.release(bytesReserved)
parentManager?.release(bytesReserved)
}

fun <U> replace(value: U): Reserved<U> = Reserved(parentManager, bytesReserved, value)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.state

import java.time.Clock

/*
* Simple time-windowing strategy for bucketing partial aggregates.
*
* Works off time relative to the injected @param clock. Generally this is the processing time domain.
*/
data class TimeWindowTrigger(
private val clock: Clock,
private val windowWidthMs: Long,
) {
private var openedAtMs: Long? = null

/*
* Sets window open timestamp for computing completeness. Idempotent. Mutative.
*/
fun open(): Long {
if (openedAtMs == null) {
openedAtMs = clock.millis()
}
return openedAtMs!!
}

/*
* Returns whether window is complete relative to configured @param windowWidthMs. Non-mutative.
*/
fun isComplete(): Boolean {
return openedAtMs?.let { ts -> (clock.millis() - ts) >= windowWidthMs } ?: false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import io.airbyte.cdk.load.message.BatchEnvelope
import io.airbyte.cdk.load.message.CheckpointMessageWrapped
import io.airbyte.cdk.load.message.DestinationFile
import io.airbyte.cdk.load.message.DestinationMessage
import io.airbyte.cdk.load.message.DestinationRecordWrapped
import io.airbyte.cdk.load.message.DestinationStreamEvent
import io.airbyte.cdk.load.message.MessageQueueSupplier
import io.airbyte.cdk.load.message.QueueWriter
import io.airbyte.cdk.load.state.Reserved
Expand All @@ -25,6 +25,7 @@ import io.airbyte.cdk.load.task.implementor.ProcessRecordsTaskFactory
import io.airbyte.cdk.load.task.implementor.SetupTaskFactory
import io.airbyte.cdk.load.task.implementor.TeardownTaskFactory
import io.airbyte.cdk.load.task.internal.FlushCheckpointsTaskFactory
import io.airbyte.cdk.load.task.internal.FlushTickTask
import io.airbyte.cdk.load.task.internal.InputConsumerTaskFactory
import io.airbyte.cdk.load.task.internal.SizedInputFlow
import io.airbyte.cdk.load.task.internal.SpillToDiskTaskFactory
Expand Down Expand Up @@ -98,6 +99,7 @@ class DefaultDestinationTaskLauncher(
// Internal Tasks
private val inputConsumerTaskFactory: InputConsumerTaskFactory,
private val spillToDiskTaskFactory: SpillToDiskTaskFactory,
private val flushTickTask: FlushTickTask,

// Implementor Tasks
private val setupTaskFactory: SetupTaskFactory,
Expand All @@ -110,7 +112,7 @@ class DefaultDestinationTaskLauncher(

// Checkpoint Tasks
private val flushCheckpointsTaskFactory: FlushCheckpointsTaskFactory,
private val timedFlushTask: TimedForcedCheckpointFlushTask,
private val timedCheckpointFlushTask: TimedForcedCheckpointFlushTask,
private val updateCheckpointsTask: UpdateCheckpointsTask,

// Exception handling
Expand All @@ -120,7 +122,7 @@ class DefaultDestinationTaskLauncher(
// Input Comsumer requirements
private val inputFlow: SizedInputFlow<Reserved<DestinationMessage>>,
private val recordQueueSupplier:
MessageQueueSupplier<DestinationStream.Descriptor, Reserved<DestinationRecordWrapped>>,
MessageQueueSupplier<DestinationStream.Descriptor, Reserved<DestinationStreamEvent>>,
private val checkpointQueue: QueueWriter<Reserved<CheckpointMessageWrapped>>,
) : DestinationTaskLauncher {
private val log = KotlinLogging.logger {}
Expand Down Expand Up @@ -163,9 +165,13 @@ class DefaultDestinationTaskLauncher(
}
}

// Start flush task
log.info { "Starting timed file aggregate flush task " }
enqueue(flushTickTask)

// Start the checkpoint management tasks
log.info { "Starting timed flush task" }
enqueue(timedFlushTask)
log.info { "Starting timed checkpoint flush task" }
enqueue(timedCheckpointFlushTask)

log.info { "Starting checkpoint update task" }
enqueue(updateCheckpointsTask)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.task.internal

import com.google.common.annotations.VisibleForTesting
import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.file.TimeProvider
import io.airbyte.cdk.load.message.DestinationStreamEvent
import io.airbyte.cdk.load.message.MessageQueueSupplier
import io.airbyte.cdk.load.message.StreamFlushEvent
import io.airbyte.cdk.load.state.Reserved
import io.airbyte.cdk.load.task.KillableScope
import io.airbyte.cdk.load.task.SyncLevel
import io.micronaut.context.annotation.Secondary
import io.micronaut.context.annotation.Value
import jakarta.inject.Singleton
import java.time.Clock

@Singleton
@Secondary
class FlushTickTask(
@Value("\${airbyte.flush.rate-ms}") private val tickIntervalMs: Long,
private val clock: Clock,
private val coroutineTimeUtils: TimeProvider,
private val catalog: DestinationCatalog,
private val recordQueueSupplier:
MessageQueueSupplier<DestinationStream.Descriptor, Reserved<DestinationStreamEvent>>,
) : SyncLevel, KillableScope {
override suspend fun execute() {
while (true) {
waitAndPublishFlushTick()
}
}

@VisibleForTesting
suspend fun waitAndPublishFlushTick() {
coroutineTimeUtils.delay(tickIntervalMs)

catalog.streams.forEach {
val queue = recordQueueSupplier.get(it.descriptor)
queue.publish(Reserved(value = StreamFlushEvent(clock.millis())))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ import io.airbyte.cdk.load.message.DestinationMessage
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.message.DestinationRecordStreamComplete
import io.airbyte.cdk.load.message.DestinationRecordStreamIncomplete
import io.airbyte.cdk.load.message.DestinationRecordWrapped
import io.airbyte.cdk.load.message.DestinationStreamAffinedMessage
import io.airbyte.cdk.load.message.DestinationStreamEvent
import io.airbyte.cdk.load.message.GlobalCheckpoint
import io.airbyte.cdk.load.message.GlobalCheckpointWrapped
import io.airbyte.cdk.load.message.MessageQueueSupplier
import io.airbyte.cdk.load.message.QueueWriter
import io.airbyte.cdk.load.message.StreamCheckpoint
import io.airbyte.cdk.load.message.StreamCheckpointWrapped
import io.airbyte.cdk.load.message.StreamRecordCompleteWrapped
import io.airbyte.cdk.load.message.StreamRecordWrapped
import io.airbyte.cdk.load.message.StreamCompleteEvent
import io.airbyte.cdk.load.message.StreamRecordEvent
import io.airbyte.cdk.load.message.Undefined
import io.airbyte.cdk.load.state.Reserved
import io.airbyte.cdk.load.state.SyncManager
Expand Down Expand Up @@ -55,7 +55,7 @@ class DefaultInputConsumerTask(
private val catalog: DestinationCatalog,
private val inputFlow: SizedInputFlow<Reserved<DestinationMessage>>,
private val recordQueueSupplier:
MessageQueueSupplier<DestinationStream.Descriptor, Reserved<DestinationRecordWrapped>>,
MessageQueueSupplier<DestinationStream.Descriptor, Reserved<DestinationStreamEvent>>,
private val checkpointQueue: QueueWriter<Reserved<CheckpointMessageWrapped>>,
private val syncManager: SyncManager,
private val destinationTaskLauncher: DestinationTaskLauncher,
Expand All @@ -72,7 +72,7 @@ class DefaultInputConsumerTask(
when (val message = reserved.value) {
is DestinationRecord -> {
val wrapped =
StreamRecordWrapped(
StreamRecordEvent(
index = manager.countRecordIn(),
sizeBytes = sizeBytes,
record = message
Expand All @@ -81,7 +81,7 @@ class DefaultInputConsumerTask(
}
is DestinationRecordStreamComplete -> {
reserved.release() // safe because multiple calls conflate
val wrapped = StreamRecordCompleteWrapped(index = manager.markEndOfStream())
val wrapped = StreamCompleteEvent(index = manager.markEndOfStream())
recordQueue.publish(reserved.replace(wrapped))
recordQueue.close()
}
Expand Down Expand Up @@ -179,7 +179,7 @@ interface InputConsumerTaskFactory {
catalog: DestinationCatalog,
inputFlow: SizedInputFlow<Reserved<DestinationMessage>>,
recordQueueSupplier:
MessageQueueSupplier<DestinationStream.Descriptor, Reserved<DestinationRecordWrapped>>,
MessageQueueSupplier<DestinationStream.Descriptor, Reserved<DestinationStreamEvent>>,
checkpointQueue: QueueWriter<Reserved<CheckpointMessageWrapped>>,
destinationTaskLauncher: DestinationTaskLauncher,
): InputConsumerTask
Expand All @@ -193,7 +193,7 @@ class DefaultInputConsumerTaskFactory(private val syncManager: SyncManager) :
catalog: DestinationCatalog,
inputFlow: SizedInputFlow<Reserved<DestinationMessage>>,
recordQueueSupplier:
MessageQueueSupplier<DestinationStream.Descriptor, Reserved<DestinationRecordWrapped>>,
MessageQueueSupplier<DestinationStream.Descriptor, Reserved<DestinationStreamEvent>>,
checkpointQueue: QueueWriter<Reserved<CheckpointMessageWrapped>>,
destinationTaskLauncher: DestinationTaskLauncher,
): InputConsumerTask {
Expand Down
Loading

0 comments on commit 7810e32

Please sign in to comment.