Skip to content

Commit

Permalink
Merge branch 'master' into christo/pardot-v5-justin
Browse files Browse the repository at this point in the history
  • Loading branch information
DanyloGL authored Dec 17, 2024
2 parents 4897610 + dfcdea8 commit 42775e3
Show file tree
Hide file tree
Showing 1,088 changed files with 17,918 additions and 12,005 deletions.
5 changes: 5 additions & 0 deletions airbyte-cdk/bulk/core/load/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ def integrationTestTask = tasks.register('integrationTest', Test) {
systemProperties = project.test.systemProperties
maxParallelForks = project.test.maxParallelForks
maxHeapSize = project.test.maxHeapSize

testLogging() {
events 'skipped', 'started', 'passed', 'failed'
exceptionFormat 'full'
}
}

// These tests are lightweight enough to run on every PR.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class MockBasicFunctionalityIntegrationTest :
preserveUndeclaredFields = true,
commitDataIncrementally = false,
allTypesBehavior = Untyped,
supportFileTransfer = false,
) {
@Test
override fun testBasicWrite() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.cdk.load.mock_integration_test

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.load.command.Append
import io.airbyte.cdk.load.command.Dedupe
import io.airbyte.cdk.load.command.DestinationStream
Expand All @@ -12,13 +13,15 @@ import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.load.message.DestinationFile
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.message.SimpleBatch
import io.airbyte.cdk.load.state.StreamIncompleteResult
import io.airbyte.cdk.load.state.StreamProcessingFailed
import io.airbyte.cdk.load.test.util.OutputRecord
import io.airbyte.cdk.load.write.DestinationWriter
import io.airbyte.cdk.load.write.StreamLoader
import io.github.oshai.kotlinlogging.KotlinLogging
import java.time.Instant
import java.util.UUID
import javax.inject.Singleton
import kotlinx.coroutines.delay

@Singleton
class MockDestinationWriter : DestinationWriter {
Expand All @@ -27,7 +30,10 @@ class MockDestinationWriter : DestinationWriter {
}
}

@SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION", justification = "Kotlin async continuation")
class MockStreamLoader(override val stream: DestinationStream) : StreamLoader {
private val log = KotlinLogging.logger {}

abstract class MockBatch : Batch {
override val groupId: String? = null
}
Expand All @@ -38,11 +44,8 @@ class MockStreamLoader(override val stream: DestinationStream) : StreamLoader {
data class LocalFileBatch(val file: DestinationFile) : MockBatch() {
override val state = Batch.State.LOCAL
}
data class PersistedBatch(val records: List<DestinationRecord>) : MockBatch() {
override val state = Batch.State.PERSISTED
}

override suspend fun close(streamFailure: StreamIncompleteResult?) {
override suspend fun close(streamFailure: StreamProcessingFailed?) {
if (streamFailure == null) {
when (val importType = stream.importType) {
is Append -> {
Expand Down Expand Up @@ -82,6 +85,7 @@ class MockStreamLoader(override val stream: DestinationStream) : StreamLoader {
override suspend fun processBatch(batch: Batch): Batch {
return when (batch) {
is LocalBatch -> {
log.info { "Persisting ${batch.records.size} records for ${stream.descriptor}" }
batch.records.forEach {
val filename = getFilename(it.stream, staging = true)
val record =
Expand All @@ -99,9 +103,14 @@ class MockStreamLoader(override val stream: DestinationStream) : StreamLoader {
// blind insert into the staging area. We'll dedupe on commit.
MockDestinationBackend.insert(filename, record)
}
PersistedBatch(batch.records)
// HACK: This destination is too fast and causes a race
// condition between consuming and flushing state messages
// that causes the test to fail. This would not be an issue
// in a real sync, because we would always either get more
// data or an end-of-stream that would force a final flush.
delay(100L)
SimpleBatch(state = Batch.State.COMPLETE)
}
is PersistedBatch -> SimpleBatch(state = Batch.State.COMPLETE)
else -> throw IllegalStateException("Unexpected batch type: $batch")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ data class DestinationCatalog(val streams: List<DestinationStream> = emptyList()

fun asProtocolObject(): ConfiguredAirbyteCatalog =
ConfiguredAirbyteCatalog().withStreams(streams.map { it.asProtocolObject() })

fun size(): Int = streams.size
}

interface DestinationCatalogFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ abstract class DestinationConfiguration : Configuration {
*/
open val gracefulCancellationTimeoutMs: Long = 60 * 1000L // 1 minutes

open val numProcessRecordsWorkers: Int = 2
open val numProcessBatchWorkers: Int = 5
open val batchQueueDepth: Int = 10

/**
* Micronaut factory which glues [ConfigurationSpecificationSupplier] and
* [DestinationConfigurationFactory] together to produce a [DestinationConfiguration] singleton.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,25 @@

package io.airbyte.cdk.load.config

import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.load.command.DestinationConfiguration
import io.airbyte.cdk.load.message.BatchEnvelope
import io.airbyte.cdk.load.message.MultiProducerChannel
import io.airbyte.cdk.load.state.ReservationManager
import io.airbyte.cdk.load.task.implementor.FileAggregateMessage
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Value
import jakarta.inject.Named
import jakarta.inject.Singleton
import kotlin.math.min
import kotlinx.coroutines.channels.Channel

/** Factory for instantiating beans necessary for the sync process. */
@Factory
class SyncBeanFactory {
private val log = KotlinLogging.logger {}

@Singleton
@Named("memoryManager")
fun memoryManager(
Expand All @@ -31,4 +40,43 @@ class SyncBeanFactory {
): ReservationManager {
return ReservationManager(availableBytes)
}

/**
* The queue that sits between the aggregation (SpillToDiskTask) and load steps
* (ProcessRecordsTask).
*
* Since we are buffering on disk, we must consider the available disk space in our depth
* configuration.
*/
@Singleton
@Named("fileAggregateQueue")
fun fileAggregateQueue(
@Value("\${airbyte.resources.disk.bytes}") availableBytes: Long,
config: DestinationConfiguration,
catalog: DestinationCatalog
): MultiProducerChannel<FileAggregateMessage> {
val streamCount = catalog.size()
// total batches by disk capacity
val maxBatchesThatFitOnDisk = (availableBytes / config.recordBatchSizeBytes).toInt()
// account for batches in flight processing by the workers
val maxBatchesMinusUploadOverhead =
maxBatchesThatFitOnDisk - config.numProcessRecordsWorkers
// ideally we'd allow enough headroom to smooth out rate differences between consumer /
// producer streams
val idealDepth = 4 * config.numProcessRecordsWorkers
// take the smaller of the two—this should be the idealDepth except in corner cases
val capacity = min(maxBatchesMinusUploadOverhead, idealDepth)
log.info { "Creating file aggregate queue with limit $capacity" }
val channel = Channel<FileAggregateMessage>(capacity)
return MultiProducerChannel(streamCount.toLong(), channel)
}

@Singleton
@Named("batchQueue")
fun batchQueue(
config: DestinationConfiguration,
): MultiProducerChannel<BatchEnvelope<*>> {
val channel = Channel<BatchEnvelope<*>>(config.batchQueueDepth)
return MultiProducerChannel(config.numProcessRecordsWorkers.toLong(), channel)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ import java.time.temporal.ChronoUnit
*/
class TimeStringToInteger : AirbyteValueIdentityMapper() {
companion object {
private val DATE_TIME_FORMATTER: DateTimeFormatter =
val DATE_TIME_FORMATTER: DateTimeFormatter =
DateTimeFormatter.ofPattern(
"[yyyy][yy]['-']['/']['.'][' '][MMM][MM][M]['-']['/']['.'][' '][dd][d][[' '][G]][[' ']['T']HH:mm[':'ss[.][SSSSSS][SSSSS][SSSS][SSS][' '][z][zzz][Z][O][x][XXX][XX][X][[' '][G]]]]"
)
private val TIME_FORMATTER: DateTimeFormatter =
val TIME_FORMATTER: DateTimeFormatter =
DateTimeFormatter.ofPattern(
"HH:mm[':'ss[.][SSSSSS][SSSSS][SSSS][SSS][' '][z][zzz][Z][O][x][XXX][XX][X]]"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package io.airbyte.cdk.load.message
import com.google.common.collect.Range
import com.google.common.collect.RangeSet
import com.google.common.collect.TreeRangeSet
import io.airbyte.cdk.load.command.DestinationStream

/**
* Represents an accumulated batch of records in some stage of processing.
Expand Down Expand Up @@ -66,6 +67,13 @@ interface Batch {
}

val state: State

/**
* If a [Batch] is [State.COMPLETE], there's nothing further to do. If it is part of a group,
* then its state will be updated by the next batch in the group that advances.
*/
val requiresProcessing: Boolean
get() = state != State.COMPLETE && groupId == null
}

/** Simple batch: use if you need no other metadata for processing. */
Expand All @@ -80,14 +88,20 @@ data class SimpleBatch(
*/
data class BatchEnvelope<B : Batch>(
val batch: B,
val ranges: RangeSet<Long> = TreeRangeSet.create()
val ranges: RangeSet<Long> = TreeRangeSet.create(),
val streamDescriptor: DestinationStream.Descriptor
) {
constructor(
batch: B,
range: Range<Long>
) : this(batch = batch, ranges = TreeRangeSet.create(listOf(range)))
range: Range<Long>,
streamDescriptor: DestinationStream.Descriptor
) : this(
batch = batch,
ranges = TreeRangeSet.create(listOf(range)),
streamDescriptor = streamDescriptor
)

fun <C : Batch> withBatch(newBatch: C): BatchEnvelope<C> {
return BatchEnvelope(newBatch, ranges)
return BatchEnvelope(newBatch, ranges, streamDescriptor)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,26 @@ interface Sized {
*/
sealed class DestinationStreamEvent : Sized

/** Contains a record to be aggregated and processed. */
data class StreamRecordEvent(
val index: Long,
override val sizeBytes: Long,
val record: DestinationRecord
) : DestinationStreamEvent()

data class StreamCompleteEvent(
/**
* Indicates the stream is in a terminal (complete or incomplete) state as signalled by upstream.
*/
data class StreamEndEvent(
val index: Long,
) : DestinationStreamEvent() {
override val sizeBytes: Long = 0L
}

/**
* Emitted to trigger evaluation of the conditional flush logic of a stream. The consumer may or may
* not decide to flush.
*/
data class StreamFlushEvent(
val tickedAtMs: Long,
) : DestinationStreamEvent() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ interface QueueWriter<T> : CloseableCoroutine {
interface MessageQueue<T> : QueueReader<T>, QueueWriter<T>

abstract class ChannelMessageQueue<T> : MessageQueue<T> {
val channel = Channel<T>(Channel.UNLIMITED)
open val channel = Channel<T>(Channel.UNLIMITED)

override suspend fun publish(message: T) = channel.send(message)
override suspend fun consume(): Flow<T> = channel.receiveAsFlow()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.message

import io.github.oshai.kotlinlogging.KotlinLogging
import java.util.concurrent.atomic.AtomicLong
import kotlinx.coroutines.channels.Channel

/**
* A channel designed for use with a fixed amount of producers. Close will be called on the
* underlying channel, when there are no remaining registered producers.
*/
class MultiProducerChannel<T>(
producerCount: Long,
override val channel: Channel<T>,
) : ChannelMessageQueue<T>() {
private val log = KotlinLogging.logger {}
private val initializedProducerCount = producerCount
private val producerCount = AtomicLong(producerCount)

override suspend fun close() {
val count = producerCount.decrementAndGet()
log.info {
"Closing producer (active count=$count, initialized count: $initializedProducerCount)"
}
if (count == 0L) {
log.info { "Closing underlying queue" }
channel.close()
}
}
}
Loading

0 comments on commit 42775e3

Please sign in to comment.