From a4d643b2dd19880cb612dd60e7708fe2103c95c8 Mon Sep 17 00:00:00 2001 From: Johnny Schmidt Date: Fri, 15 Nov 2024 11:44:35 -0800 Subject: [PATCH 1/3] [WIP] Prerelease S3V2 Connector --- .../airbyte/cdk/load/message/MessageQueue.kt | 2 +- .../ObjectStorageStreamLoaderFactory.kt | 3 ++ .../cdk/load/file/s3/S3MultipartUpload.kt | 18 ++++++++-- .../connectors/destination-s3-v2/build.gradle | 16 ++++----- .../destination-s3-v2/metadata.yaml | 35 +++++++++++++------ .../src/main/kotlin/S3V2Specification.kt | 7 ++++ docs/integrations/destinations/s3.md | 24 ++++++------- 7 files changed, 70 insertions(+), 35 deletions(-) diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MessageQueue.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MessageQueue.kt index e9ae6c33e39f..d74a0056a5cf 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MessageQueue.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MessageQueue.kt @@ -23,7 +23,7 @@ interface QueueWriter : CloseableCoroutine { interface MessageQueue : QueueReader, QueueWriter abstract class ChannelMessageQueue : MessageQueue { - open val channel = Channel(Channel.UNLIMITED) + open val channel: Channel = Channel(Channel.UNLIMITED) override suspend fun publish(message: T) = channel.send(message) override suspend fun consume(): Flow = channel.receiveAsFlow() diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt index 10afada4bccc..03e56093fcb4 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt @@ -32,6 +32,9 @@ import jakarta.inject.Singleton import java.io.File import java.io.OutputStream import java.util.concurrent.atomic.AtomicLong +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.take @Singleton @Secondary diff --git a/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt index b12dab4b4c52..6cc75f27ac15 100644 --- a/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt +++ b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt @@ -20,6 +20,9 @@ import io.github.oshai.kotlinlogging.KotlinLogging import java.io.ByteArrayOutputStream import java.io.OutputStream import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicLong +import kotlin.time.measureTime +import kotlin.time.measureTimedValue import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch @@ -51,6 +54,7 @@ class S3MultipartUpload( private val wrappingBuffer = streamProcessor.wrapper(underlyingBuffer) private val partQueue = Channel(Channel.UNLIMITED) private val isClosed = AtomicBoolean(false) + private val channelSize = AtomicLong(0L) /** * Run the upload using the provided block. This should only be used by the @@ -73,6 +77,7 @@ class S3MultipartUpload( launch { val uploadedParts = mutableListOf() for (bytes in partQueue) { + channelSize.decrementAndGet() val part = uploadPart(bytes, uploadedParts) uploadedParts.add(part) } @@ -117,7 +122,11 @@ class S3MultipartUpload( wrappingBuffer.flush() val bytes = underlyingBuffer.toByteArray() underlyingBuffer.reset() - runBlocking { partQueue.send(bytes) } + channelSize.incrementAndGet() + val duration = measureTime { runBlocking { partQueue.send(bytes) } } + log.info { + "Enqueued part in $duration (channelSize = ${channelSize.get()}; uploadId = ${response.uploadId})" + } } private suspend fun uploadPart( @@ -132,10 +141,13 @@ class S3MultipartUpload( body = ByteStream.fromBytes(bytes) this.partNumber = partNumber } - val uploadResponse = client.uploadPart(request) + val uploadResponse = measureTimedValue { client.uploadPart(request) } + log.info { + "Uploaded part $partNumber in ${uploadResponse.duration} (channelSize = ${channelSize.get()}; uploadId = ${response.uploadId})" + } return CompletedPart { this.partNumber = partNumber - this.eTag = uploadResponse.eTag + this.eTag = uploadResponse.value.eTag } } diff --git a/airbyte-integrations/connectors/destination-s3-v2/build.gradle b/airbyte-integrations/connectors/destination-s3-v2/build.gradle index d370a2d14c0c..a48674517442 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/build.gradle +++ b/airbyte-integrations/connectors/destination-s3-v2/build.gradle @@ -17,14 +17,14 @@ application { // Uncomment to run locally: // '--add-opens', 'java.base/java.lang=ALL-UNNAMED' // Uncomment to enable remote profiling: -// '-XX:NativeMemoryTracking=detail', -// '-Djava.rmi.server.hostname=localhost', -// '-Dcom.sun.management.jmxremote=true', -// '-Dcom.sun.management.jmxremote.port=6000', -// '-Dcom.sun.management.jmxremote.rmi.port=6000', -// '-Dcom.sun.management.jmxremote.local.only=false', -// '-Dcom.sun.management.jmxremote.authenticate=false', -// '-Dcom.sun.management.jmxremote.ssl=false' + '-XX:NativeMemoryTracking=detail', + '-Djava.rmi.server.hostname=localhost', + '-Dcom.sun.management.jmxremote=true', + '-Dcom.sun.management.jmxremote.port=6000', + '-Dcom.sun.management.jmxremote.rmi.port=6000', + '-Dcom.sun.management.jmxremote.local.only=false', + '-Dcom.sun.management.jmxremote.authenticate=false', + '-Dcom.sun.management.jmxremote.ssl=false' ] } diff --git a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml index 71c12501a29a..70869b693b41 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml @@ -1,27 +1,40 @@ data: connectorSubtype: file connectorType: destination - definitionId: d6116991-e809-4c7c-ae09-c64712df5b66 - dockerImageTag: 0.3.6 - dockerRepository: airbyte/destination-s3-v2 - githubIssueLabel: destination-s3-v2 + definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362 + dockerImageTag: 1.5.0 + dockerRepository: airbyte/destination-s3 + githubIssueLabel: destination-s3 icon: s3.svg license: ELv2 - name: S3 V2 Destination + name: S3 registryOverrides: cloud: - enabled: false + enabled: true oss: - enabled: false - releaseStage: alpha + enabled: true + releaseStage: generally_available + releases: + breakingChanges: + 1.0.0: + message: > + **This release includes breaking changes, including major revisions to the schema of stored data. Do not upgrade without reviewing the migration guide.** + upgradeDeadline: "2024-10-08" + resourceRequirements: + jobSpecific: + - jobType: sync + resourceRequirements: + memory_limit: 2Gi + memory_request: 2Gi documentationUrl: https://docs.airbyte.com/integrations/destinations/s3 tags: - language:java ab_internal: - sl: 100 - ql: 100 - supportLevel: community + sl: 300 + ql: 300 + supportLevel: certified supportsRefreshes: true + supportsFileTransfer: true connectorTestSuitesOptions: - suite: unitTests - suite: integrationTests diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Specification.kt b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Specification.kt index 536d5673d22e..8b216bbbfd0d 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Specification.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Specification.kt @@ -4,6 +4,7 @@ package io.airbyte.integrations.destination.s3_v2 +import com.fasterxml.jackson.annotation.JsonProperty import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaInject import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle import io.airbyte.cdk.command.ConfigurationSpecification @@ -81,6 +82,12 @@ class S3V2Specification : // // @get:JsonSchemaInject(json = """{"examples":["__staging/data_sync/test"],"order":11}""") // override val s3StagingPrefix: String? = null + + @get:JsonProperty("num_process_records_workers") + val numProcessRecordsWorkers: Int? = 2 + + @get:JsonProperty("estimated_record_memory_overhead_ratio") + val estimatedRecordMemoryOverheadRatio: Double? = 5.0 } @Singleton diff --git a/docs/integrations/destinations/s3.md b/docs/integrations/destinations/s3.md index 1cc5220b6a28..ebf35fe2dd3a 100644 --- a/docs/integrations/destinations/s3.md +++ b/docs/integrations/destinations/s3.md @@ -544,18 +544,18 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------| -| 1.4.0 | 2024-10-23 | [46302](https://github.com/airbytehq/airbyte/pull/46302) | add support for file transfer | -| 1.3.0 | 2024-09-30 | [46281](https://github.com/airbytehq/airbyte/pull/46281) | fix tests | -| 1.2.1 | 2024-09-20 | [45700](https://github.com/airbytehq/airbyte/pull/45700) | Improve resiliency to jsonschema fields | -| 1.2.0 | 2024-09-18 | [45402](https://github.com/airbytehq/airbyte/pull/45402) | fix exception with columnless streams | -| 1.1.0 | 2024-09-18 | [45436](https://github.com/airbytehq/airbyte/pull/45436) | upgrade all dependencies | -| 1.0.5 | 2024-09-05 | [45143](https://github.com/airbytehq/airbyte/pull/45143) | don't overwrite (and delete) existing files, skip indexes instead | -| 1.0.4 | 2024-08-30 | [44933](https://github.com/airbytehq/airbyte/pull/44933) | Fix: Avro/Parquet: handle empty schemas in nested objects/lists | -| 1.0.3 | 2024-08-20 | [44476](https://github.com/airbytehq/airbyte/pull/44476) | Increase message parsing limit to 100mb | -| 1.0.2 | 2024-08-19 | [44401](https://github.com/airbytehq/airbyte/pull/44401) | Fix: S3 Avro/Parquet: handle nullable top-level schema | -| 1.0.1 | 2024-08-14 | [42579](https://github.com/airbytehq/airbyte/pull/42579) | OVERWRITE MODE: Deletes deferred until successful sync. | -| 1.0.0 | 2024-08-08 | [42409](https://github.com/airbytehq/airbyte/pull/42409) | Major breaking changes: new destination schema, change capture, Avro/Parquet improvements, bugfixes | -| 0.1.15 | 2024-12-18 | [49879](https://github.com/airbytehq/airbyte/pull/49879) | Use a base image: airbyte/java-connector-base:1.0.0 | +| 1.5.0 | 2024-11-08 | []() | Migrate to Bulk Load CDK; adds opt-in support for staging | +| 1.4.0 | 2024-10-23 | [46302](https://github.com/airbytehq/airbyte/pull/46302) | add support for file transfer | +| 1.3.0 | 2024-09-30 | [46281](https://github.com/airbytehq/airbyte/pull/46281) | fix tests | +| 1.2.1 | 2024-09-20 | [45700](https://github.com/airbytehq/airbyte/pull/45700) | Improve resiliency to jsonschema fields | +| 1.2.0 | 2024-09-18 | [45402](https://github.com/airbytehq/airbyte/pull/45402) | fix exception with columnless streams | +| 1.1.0 | 2024-09-18 | [45436](https://github.com/airbytehq/airbyte/pull/45436) | upgrade all dependencies | +| 1.0.5 | 2024-09-05 | [45143](https://github.com/airbytehq/airbyte/pull/45143) | don't overwrite (and delete) existing files, skip indexes instead | +| 1.0.4 | 2024-08-30 | [44933](https://github.com/airbytehq/airbyte/pull/44933) | Fix: Avro/Parquet: handle empty schemas in nested objects/lists | +| 1.0.3 | 2024-08-20 | [44476](https://github.com/airbytehq/airbyte/pull/44476) | Increase message parsing limit to 100mb | +| 1.0.2 | 2024-08-19 | [44401](https://github.com/airbytehq/airbyte/pull/44401) | Fix: S3 Avro/Parquet: handle nullable top-level schema | +| 1.0.1 | 2024-08-14 | [42579](https://github.com/airbytehq/airbyte/pull/42579) | OVERWRITE MODE: Deletes deferred until successful sync. | +| 1.0.0 | 2024-08-08 | [42409](https://github.com/airbytehq/airbyte/pull/42409) | Major breaking changes: new destination schema, change capture, Avro/Parquet improvements, bugfixes | | 0.6.7 | 2024-08-11 | [43713](https://github.com/airbytehq/airbyte/issues/43713) | Decreased memory ratio (0.7 -> 0.5) and thread allocation (5 -> 2) for async S3 uploads. | | 0.6.6 | 2024-08-06 | [43343](https://github.com/airbytehq/airbyte/pull/43343) | Use Kotlin 2.0.0 | | 0.6.5 | 2024-08-01 | [42405](https://github.com/airbytehq/airbyte/pull/42405) | S3 parallelizes workloads, checkpoints, submits counts, support for generationId in metadata for refreshes. | From eb1e08c56c003ecc5e7db0e24b7bb8443fde8e60 Mon Sep 17 00:00:00 2001 From: Johnny Schmidt Date: Tue, 24 Dec 2024 15:20:04 -0800 Subject: [PATCH 2/3] Destination S3V2: Avro does not fail on unsupported string formats --- .../airbyte/cdk/load/data/json/JsonSchemaToAirbyteType.kt | 8 +++++++- .../load/data/json/JsonSchemaToAirbyteSchemaTypeTest.kt | 7 +++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonSchemaToAirbyteType.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonSchemaToAirbyteType.kt index 4f9736282172..f3a0ba33ffbd 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonSchemaToAirbyteType.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonSchemaToAirbyteType.kt @@ -8,8 +8,11 @@ import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.node.JsonNodeFactory import com.fasterxml.jackson.databind.node.ObjectNode import io.airbyte.cdk.load.data.* +import io.github.oshai.kotlinlogging.KotlinLogging class JsonSchemaToAirbyteType { + private val log = KotlinLogging.logger {} + fun convert(schema: JsonNode): AirbyteType = convertInner(schema)!! private fun convertInner(schema: JsonNode): AirbyteType? { @@ -87,7 +90,10 @@ class JsonSchemaToAirbyteType { TimestampTypeWithTimezone } null -> StringType - else -> UnknownType(schema) + else -> { + log.warn { "Ignoring unrecognized string format: ${schema.get("format").asText()}" } + StringType + } } private fun fromNumber(schema: ObjectNode): AirbyteType = diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/JsonSchemaToAirbyteSchemaTypeTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/JsonSchemaToAirbyteSchemaTypeTest.kt index 59e8ccd33bd8..2b6cc39f0fd0 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/JsonSchemaToAirbyteSchemaTypeTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/JsonSchemaToAirbyteSchemaTypeTest.kt @@ -251,4 +251,11 @@ class JsonSchemaToAirbyteSchemaTypeTest { val airbyteType = JsonSchemaToAirbyteType().convert(inputSchema) Assertions.assertEquals(UnionType.of(StringType, IntegerType), airbyteType) } + + @Test + fun testUnrecognizedStringFormats() { + val schemaNode = ofType("string").put("format", "foo") + val airbyteType = JsonSchemaToAirbyteType().convert(schemaNode) + Assertions.assertTrue(airbyteType is StringType) + } } From 108485734fb96fffb68005e69fc66aeb17d5f3c8 Mon Sep 17 00:00:00 2001 From: Johnny Schmidt Date: Tue, 24 Dec 2024 11:14:19 -0800 Subject: [PATCH 3/3] WIP: Simplify scope and queue closure --- .../io/airbyte/cdk/AirbyteConnectorRunner.kt | 1 + .../load/command/DestinationConfiguration.kt | 5 + .../cdk/load/task/DestinationTaskLauncher.kt | 77 +++++---- .../load/task/DestinationTaskScopeProvider.kt | 151 ------------------ .../kotlin/io/airbyte/cdk/load/task/Task.kt | 21 --- .../load/task/implementor/CloseStreamTask.kt | 4 +- .../load/task/implementor/FailStreamTask.kt | 4 +- .../cdk/load/task/implementor/FailSyncTask.kt | 4 +- .../load/task/implementor/OpenStreamTask.kt | 4 +- .../load/task/implementor/ProcessBatchTask.kt | 4 +- .../load/task/implementor/ProcessFileTask.kt | 4 +- .../task/implementor/ProcessRecordsTask.kt | 8 +- .../cdk/load/task/implementor/SetupTask.kt | 4 +- .../cdk/load/task/implementor/TeardownTask.kt | 4 +- .../task/internal/FlushCheckpointsTask.kt | 4 +- .../cdk/load/task/internal/FlushTickTask.kt | 4 +- .../load/task/internal/InputConsumerTask.kt | 4 +- .../cdk/load/task/internal/SpillToDiskTask.kt | 4 +- .../TimedForcedCheckpointFlushTask.kt | 4 +- .../task/internal/UpdateCheckpointsTask.kt | 4 +- .../load/task/DestinationTaskLauncherTest.kt | 26 +-- .../load/task/DestinationTaskLauncherUTest.kt | 6 - .../cdk/load/task/MockScopeProvider.kt | 35 ---- .../destination/s3_v2/S3V2WriteTest.kt | 2 + 24 files changed, 92 insertions(+), 296 deletions(-) delete mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskScopeProvider.kt delete mode 100644 airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/MockScopeProvider.kt diff --git a/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/AirbyteConnectorRunner.kt b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/AirbyteConnectorRunner.kt index 218ef56c1723..0923aaed3bbd 100644 --- a/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/AirbyteConnectorRunner.kt +++ b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/AirbyteConnectorRunner.kt @@ -84,6 +84,7 @@ sealed class AirbyteConnectorRunner( ApplicationContext.builder(R::class.java, *envs) .propertySources( *listOfNotNull( + null, airbytePropertySource, commandLinePropertySource, MetadataYamlPropertySource(), diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationConfiguration.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationConfiguration.kt index a36d0044c950..24bf401c1634 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationConfiguration.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationConfiguration.kt @@ -85,8 +85,13 @@ abstract class DestinationConfiguration : Configuration { */ open val gracefulCancellationTimeoutMs: Long = 60 * 1000L // 1 minutes + // TODO: Generalize this configuration (when we generalize the accumulator tasks) open val numProcessRecordsWorkers: Int = 2 + open val processRecordsWorkersAreIO: Boolean = true // TODO: Refactor reading from the spill-disk out of this task scope + open val numProcessBatchWorkers: Int = 5 + open val processBatchWorkersAreIO: Boolean = true + open val numProcessBatchWorkersForFileTransfer: Int = 3 open val batchQueueDepth: Int = 10 diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt index 8443b98410c6..e89be175f50d 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt @@ -41,7 +41,12 @@ import jakarta.inject.Named import jakarta.inject.Singleton import java.util.concurrent.atomic.AtomicBoolean import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock @@ -92,7 +97,6 @@ interface DestinationTaskLauncher : TaskLauncher { justification = "arguments are guaranteed to be non-null by Kotlin's type system" ) class DefaultDestinationTaskLauncher( - private val taskScopeProvider: TaskScopeProvider>, private val catalog: DestinationCatalog, private val config: DestinationConfiguration, private val syncManager: SyncManager, @@ -132,15 +136,27 @@ class DefaultDestinationTaskLauncher( ) : DestinationTaskLauncher { private val log = KotlinLogging.logger {} + private val supervisor = SupervisorJob() + private val ioScope = CoroutineScope(Dispatchers.IO + supervisor) + private val defaultScope = CoroutineScope(Dispatchers.Default + supervisor) + private val batchUpdateLock = Mutex() private val succeeded = Channel(Channel.UNLIMITED) private val teardownIsEnqueued = AtomicBoolean(false) private val failSyncIsEnqueued = AtomicBoolean(false) + abstract class WrappedTask : Task { + abstract val innerTask: Task + + override fun toString(): String { + return "$this($innerTask)" + } + } + inner class TaskWrapper( - override val innerTask: ScopedTask, - ) : WrappedTask { + override val innerTask: Task, + ): WrappedTask() { override suspend fun execute() { try { innerTask.execute() @@ -159,16 +175,22 @@ class DefaultDestinationTaskLauncher( } inner class NoopWrapper( - override val innerTask: ScopedTask, - ) : WrappedTask { + override val innerTask: Task, + ) : WrappedTask() { override suspend fun execute() { innerTask.execute() } } - private suspend fun enqueue(task: ScopedTask, withExceptionHandling: Boolean = true) { + private suspend fun launch( + task: Task, + withExceptionHandling: Boolean = true, + isIOTask: Boolean = false) { val wrapped = if (withExceptionHandling) TaskWrapper(task) else NoopWrapper(task) - taskScopeProvider.launch(wrapped) + val scope = if (isIOTask) ioScope else defaultScope + scope.launch { + wrapped.execute() + } } override suspend fun run() { @@ -183,12 +205,12 @@ class DefaultDestinationTaskLauncher( fileTransferQueue = fileTransferQueue, destinationTaskLauncher = this, ) - enqueue(inputConsumerTask) + launch(inputConsumerTask, isIOTask = true) // Launch the client interface setup task log.info { "Starting startup task" } val setupTask = setupTaskFactory.make(this) - enqueue(setupTask) + launch(setupTask, isIOTask = true) // TODO: pluggable file transfer if (!fileTransferEnabled) { @@ -196,50 +218,49 @@ class DefaultDestinationTaskLauncher( catalog.streams.forEach { stream -> log.info { "Starting spill-to-disk task for $stream" } val spillTask = spillToDiskTaskFactory.make(this, stream.descriptor) - enqueue(spillTask) + launch(spillTask, isIOTask = true) } repeat(config.numProcessRecordsWorkers) { log.info { "Launching process records task $it" } val task = processRecordsTaskFactory.make(this) - enqueue(task) + launch(task, isIOTask = config.processRecordsWorkersAreIO) } repeat(config.numProcessBatchWorkers) { log.info { "Launching process batch task $it" } val task = processBatchTaskFactory.make(this) - enqueue(task) + launch(task, isIOTask = config.processBatchWorkersAreIO) } } else { repeat(config.numProcessRecordsWorkers) { log.info { "Launching process file task $it" } - enqueue(processFileTaskFactory.make(this)) + launch(processFileTaskFactory.make(this), isIOTask = true) } repeat(config.numProcessBatchWorkersForFileTransfer) { log.info { "Launching process batch task $it" } val task = processBatchTaskFactory.make(this) - enqueue(task) + launch(task, isIOTask = true) } } // Start flush task log.info { "Starting timed file aggregate flush task " } - enqueue(flushTickTask) + launch(flushTickTask, isIOTask = false) // Start the checkpoint management tasks log.info { "Starting timed checkpoint flush task" } - enqueue(timedCheckpointFlushTask) + launch(timedCheckpointFlushTask, isIOTask = true) log.info { "Starting checkpoint update task" } - enqueue(updateCheckpointsTask) + launch(updateCheckpointsTask, isIOTask = false) // Await completion - if (succeeded.receive()) { - taskScopeProvider.close() - } else { - taskScopeProvider.kill() - } + val success = succeeded.receive() + log.info { "Sync complete (success=$success), killing all remaining tasks" } + supervisor.cancelAndJoin() + // TODO: Close queues here } /** Called when the initial destination setup completes. */ @@ -247,7 +268,7 @@ class DefaultDestinationTaskLauncher( catalog.streams.forEach { log.info { "Starting open stream task for $it" } val task = openStreamTaskFactory.make(this, it) - enqueue(task) + launch(task, isIOTask = true) } } @@ -273,14 +294,14 @@ class DefaultDestinationTaskLauncher( log.info { "Batch $wrapped is persisted: Starting flush checkpoints task for $stream" } - enqueue(flushCheckpointsTaskFactory.make()) + launch(flushCheckpointsTaskFactory.make(), isIOTask = true) } if (streamManager.isBatchProcessingComplete()) { log.info { "Batch processing complete: Starting close stream task for $stream" } val task = closeStreamTaskFactory.make(this, stream) - enqueue(task) + launch(task, isIOTask = true) } else { log.info { "Batch processing not complete: nothing else to do." } } @@ -290,7 +311,7 @@ class DefaultDestinationTaskLauncher( /** Called when a stream is closed. */ override suspend fun handleStreamClosed(stream: DestinationStream.Descriptor) { if (teardownIsEnqueued.setOnce()) { - enqueue(teardownTaskFactory.make(this)) + launch(teardownTaskFactory.make(this), isIOTask = true) } else { log.info { "Teardown task already enqueued, not enqueuing another one" } } @@ -299,7 +320,7 @@ class DefaultDestinationTaskLauncher( override suspend fun handleException(e: Exception) { catalog.streams .map { failStreamTaskFactory.make(this, e, it.descriptor) } - .forEach { enqueue(it, withExceptionHandling = false) } + .forEach { launch(it, withExceptionHandling = false, isIOTask = true) } } override suspend fun handleFailStreamComplete( @@ -307,7 +328,7 @@ class DefaultDestinationTaskLauncher( e: Exception ) { if (failSyncIsEnqueued.setOnce()) { - enqueue(failSyncTaskFactory.make(this, e)) + launch(failSyncTaskFactory.make(this, e), isIOTask = true) } else { log.info { "Teardown task already enqueued, not enqueuing another one" } } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskScopeProvider.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskScopeProvider.kt deleted file mode 100644 index 4478e39079ea..000000000000 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskScopeProvider.kt +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Copyright (c) 2024 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.cdk.load.task - -import io.airbyte.cdk.load.command.DestinationConfiguration -import io.github.oshai.kotlinlogging.KotlinLogging -import io.micronaut.context.annotation.Secondary -import jakarta.inject.Singleton -import java.util.concurrent.Executors -import java.util.concurrent.atomic.AtomicLong -import java.util.concurrent.atomic.AtomicReference -import kotlin.system.measureTimeMillis -import kotlinx.coroutines.CompletableJob -import kotlinx.coroutines.CoroutineDispatcher -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.Job -import kotlinx.coroutines.asCoroutineDispatcher -import kotlinx.coroutines.launch -import kotlinx.coroutines.withTimeoutOrNull - -/** - * The scope in which a task should run - * - [InternalScope]: - * ``` - * - internal to the task launcher - * - should not be blockable by implementor errors - * - killable w/o side effects - * ``` - * - [ImplementorScope]: implemented by the destination - * ``` - * - calls implementor interface - * - should not block internal tasks (esp reading from stdin) - * - should complete if possible even when failing the sync - * ``` - */ -sealed interface ScopedTask : Task - -interface InternalScope : ScopedTask - -interface ImplementorScope : ScopedTask - -/** - * Some tasks should be immediately cancelled upon any failure (for example, reading from stdin, the - * every-15-minutes flush). Those tasks should be placed into the fail-fast scope. - */ -interface KillableScope : ScopedTask - -interface WrappedTask : Task { - val innerTask: T -} - -@Singleton -@Secondary -class DestinationTaskScopeProvider(config: DestinationConfiguration) : - TaskScopeProvider> { - private val log = KotlinLogging.logger {} - - private val timeoutMs = config.gracefulCancellationTimeoutMs - - data class ControlScope( - val name: String, - val job: CompletableJob, - val dispatcher: CoroutineDispatcher - ) { - val scope: CoroutineScope = CoroutineScope(dispatcher + job) - val runningJobs: AtomicLong = AtomicLong(0) - } - - private val internalScope = ControlScope("internal", Job(), Dispatchers.IO) - - private val implementorScope = - ControlScope( - "implementor", - Job(), - Executors.newFixedThreadPool(config.maxNumImplementorTaskThreads) - .asCoroutineDispatcher() - ) - - private val failFastScope = ControlScope("input", Job(), Dispatchers.IO) - - override suspend fun launch(task: WrappedTask) { - val scope = - when (task.innerTask) { - is InternalScope -> internalScope - is ImplementorScope -> implementorScope - is KillableScope -> failFastScope - } - scope.scope.launch { - var nJobs = scope.runningJobs.incrementAndGet() - log.info { "Launching task $task in scope ${scope.name} ($nJobs now running)" } - val elapsed = measureTimeMillis { task.execute() } - nJobs = scope.runningJobs.decrementAndGet() - log.info { "Task $task completed in $elapsed ms ($nJobs now running)" } - } - } - - override suspend fun close() { - // Under normal operation, all tasks should be complete - // (except things like force flush, which loop). So - // - it's safe to force cancel the internal tasks - // - implementor scope should join immediately - log.info { "Closing task scopes (${implementorScope.runningJobs.get()} remaining)" } - val uncaughtExceptions = AtomicReference() - implementorScope.job.children.forEach { - it.invokeOnCompletion { cause -> - if (cause != null) { - log.error { "Uncaught exception in implementor task: $cause" } - uncaughtExceptions.set(cause) - } - } - } - implementorScope.job.complete() - implementorScope.job.join() - if (uncaughtExceptions.get() != null) { - throw IllegalStateException( - "Uncaught exceptions in implementor tasks", - uncaughtExceptions.get() - ) - } - log.info { - "Implementor tasks completed, cancelling internal tasks (${internalScope.runningJobs.get()} remaining)." - } - internalScope.job.cancel() - } - - override suspend fun kill() { - log.info { "Killing task scopes" } - // Terminate tasks which should be immediately terminated - failFastScope.job.cancel() - - // Give the implementor tasks a chance to fail gracefully - withTimeoutOrNull(timeoutMs) { - log.info { - "Cancelled internal tasks, waiting ${timeoutMs}ms for implementor tasks to complete" - } - implementorScope.job.complete() - implementorScope.job.join() - log.info { "Implementor tasks completed" } - } - ?: run { - log.error { "Implementor tasks did not complete within ${timeoutMs}ms, cancelling" } - implementorScope.job.cancel() - } - - log.info { "Cancelling internal tasks" } - internalScope.job.cancel() - } -} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/Task.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/Task.kt index 60a03a3d9418..3ec5b530d702 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/Task.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/Task.kt @@ -21,24 +21,3 @@ interface TaskLauncher { */ suspend fun run() } - -/** - * Wraps tasks with exception handling. It should perform all necessary exception handling, then - * execute the provided callback. - */ -interface TaskExceptionHandler { - // Wrap a task with exception handling. - suspend fun withExceptionHandling(task: T): U - - // Set a callback that will be invoked when any exception handling is done. - suspend fun setCallback(callback: suspend () -> Unit) -} - -/** Provides the scope(s) in which tasks run. */ -interface TaskScopeProvider : CloseableCoroutine { - /** Launch a task in the correct scope. */ - suspend fun launch(task: T) - - /** Unliked [close], may attempt to fail gracefully, but should guarantee return. */ - suspend fun kill() -} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/CloseStreamTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/CloseStreamTask.kt index 14a1688e7ad9..17e8ea7891f2 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/CloseStreamTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/CloseStreamTask.kt @@ -7,12 +7,12 @@ package io.airbyte.cdk.load.task.implementor import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.state.SyncManager import io.airbyte.cdk.load.task.DestinationTaskLauncher -import io.airbyte.cdk.load.task.ImplementorScope +import io.airbyte.cdk.load.task.Task import io.airbyte.cdk.load.write.StreamLoader import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton -interface CloseStreamTask : ImplementorScope +interface CloseStreamTask : Task /** * Wraps @[StreamLoader.close] and marks the stream as closed in the stream manager. Also starts the diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/FailStreamTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/FailStreamTask.kt index 9959a3286ab0..a34574387328 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/FailStreamTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/FailStreamTask.kt @@ -9,12 +9,12 @@ import io.airbyte.cdk.load.state.StreamProcessingFailed import io.airbyte.cdk.load.state.StreamProcessingSucceeded import io.airbyte.cdk.load.state.SyncManager import io.airbyte.cdk.load.task.DestinationTaskLauncher -import io.airbyte.cdk.load.task.ImplementorScope +import io.airbyte.cdk.load.task.Task import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton -interface FailStreamTask : ImplementorScope +interface FailStreamTask : Task /** * FailStreamTask is a task that is executed when the processing of a stream fails in the diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/FailSyncTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/FailSyncTask.kt index 10f64ab9de0f..1eebc4f2eab9 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/FailSyncTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/FailSyncTask.kt @@ -7,13 +7,13 @@ package io.airbyte.cdk.load.task.implementor import io.airbyte.cdk.load.state.CheckpointManager import io.airbyte.cdk.load.state.SyncManager import io.airbyte.cdk.load.task.DestinationTaskLauncher -import io.airbyte.cdk.load.task.ImplementorScope +import io.airbyte.cdk.load.task.Task import io.airbyte.cdk.load.write.DestinationWriter import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton -interface FailSyncTask : ImplementorScope +interface FailSyncTask : Task /** * FailSyncTask is a task that is executed only when the destination itself fails during a sync. If diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/OpenStreamTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/OpenStreamTask.kt index 80f8e99024c7..0234f02b6bcf 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/OpenStreamTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/OpenStreamTask.kt @@ -7,13 +7,13 @@ package io.airbyte.cdk.load.task.implementor import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.state.SyncManager import io.airbyte.cdk.load.task.DestinationTaskLauncher -import io.airbyte.cdk.load.task.ImplementorScope +import io.airbyte.cdk.load.task.Task import io.airbyte.cdk.load.write.DestinationWriter import io.airbyte.cdk.load.write.StreamLoader import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton -interface OpenStreamTask : ImplementorScope +interface OpenStreamTask : Task /** * Wraps @[StreamLoader.start] and starts the spill-to-disk tasks. diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessBatchTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessBatchTask.kt index 1d0e43d86242..a06f52151cbc 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessBatchTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessBatchTask.kt @@ -8,14 +8,14 @@ import io.airbyte.cdk.load.message.BatchEnvelope import io.airbyte.cdk.load.message.MultiProducerChannel import io.airbyte.cdk.load.state.SyncManager import io.airbyte.cdk.load.task.DestinationTaskLauncher -import io.airbyte.cdk.load.task.KillableScope +import io.airbyte.cdk.load.task.Task import io.airbyte.cdk.load.write.StreamLoader import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Secondary import jakarta.inject.Named import jakarta.inject.Singleton -interface ProcessBatchTask : KillableScope +interface ProcessBatchTask : Task /** Wraps @[StreamLoader.processBatch] and handles the resulting batch. */ class DefaultProcessBatchTask( diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessFileTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessFileTask.kt index 0f2dcb0c3cf7..44ec4bc8bdcb 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessFileTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessFileTask.kt @@ -11,7 +11,7 @@ import io.airbyte.cdk.load.message.MessageQueue import io.airbyte.cdk.load.message.MultiProducerChannel import io.airbyte.cdk.load.state.SyncManager import io.airbyte.cdk.load.task.DestinationTaskLauncher -import io.airbyte.cdk.load.task.ImplementorScope +import io.airbyte.cdk.load.task.Task import io.airbyte.cdk.load.util.use import io.airbyte.cdk.load.write.FileBatchAccumulator import io.github.oshai.kotlinlogging.KotlinLogging @@ -20,7 +20,7 @@ import jakarta.inject.Named import jakarta.inject.Singleton import java.util.concurrent.ConcurrentHashMap -interface ProcessFileTask : ImplementorScope +interface ProcessFileTask : Task class DefaultProcessFileTask( private val syncManager: SyncManager, diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessRecordsTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessRecordsTask.kt index 7cc62d5c57a6..b513ca70cccb 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessRecordsTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessRecordsTask.kt @@ -20,7 +20,7 @@ import io.airbyte.cdk.load.message.ProtocolMessageDeserializer import io.airbyte.cdk.load.state.ReservationManager import io.airbyte.cdk.load.state.SyncManager import io.airbyte.cdk.load.task.DestinationTaskLauncher -import io.airbyte.cdk.load.task.KillableScope +import io.airbyte.cdk.load.task.Task import io.airbyte.cdk.load.task.internal.SpilledRawMessagesLocalFile import io.airbyte.cdk.load.util.lineSequence import io.airbyte.cdk.load.util.use @@ -34,7 +34,7 @@ import java.io.InputStream import java.util.concurrent.ConcurrentHashMap import kotlin.io.path.inputStream -interface ProcessRecordsTask : KillableScope +interface ProcessRecordsTask : Task /** * Wraps @[StreamLoader.processRecords] and feeds it a lazy iterator over the last batch of spooled @@ -131,7 +131,9 @@ class DefaultProcessRecordsTask( .takeWhile { it !is DestinationRecordStreamComplete && it !is DestinationRecordStreamIncomplete } - .map { (it as DestinationRecord).asRecordMarshaledToAirbyteValue() } + .map { + (it as DestinationRecord).asRecordMarshaledToAirbyteValue() + } .iterator() } } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/SetupTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/SetupTask.kt index 1bf807973d13..d5beca9f875e 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/SetupTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/SetupTask.kt @@ -5,12 +5,12 @@ package io.airbyte.cdk.load.task.implementor import io.airbyte.cdk.load.task.DestinationTaskLauncher -import io.airbyte.cdk.load.task.ImplementorScope +import io.airbyte.cdk.load.task.Task import io.airbyte.cdk.load.write.DestinationWriter import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton -interface SetupTask : ImplementorScope +interface SetupTask : Task /** * Wraps @[DestinationWriter.setup] and starts the open stream tasks. diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/TeardownTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/TeardownTask.kt index 64dada897c6b..f4502c633a9a 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/TeardownTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/TeardownTask.kt @@ -7,13 +7,13 @@ package io.airbyte.cdk.load.task.implementor import io.airbyte.cdk.load.state.CheckpointManager import io.airbyte.cdk.load.state.SyncManager import io.airbyte.cdk.load.task.DestinationTaskLauncher -import io.airbyte.cdk.load.task.ImplementorScope +import io.airbyte.cdk.load.task.Task import io.airbyte.cdk.load.write.DestinationWriter import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton -interface TeardownTask : ImplementorScope +interface TeardownTask : Task /** * Wraps @[DestinationWriter.teardown] and stops the task launcher. diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/FlushCheckpointsTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/FlushCheckpointsTask.kt index 37901ecb9fe6..1f0616db1f74 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/FlushCheckpointsTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/FlushCheckpointsTask.kt @@ -5,11 +5,11 @@ package io.airbyte.cdk.load.task.internal import io.airbyte.cdk.load.state.CheckpointManager -import io.airbyte.cdk.load.task.InternalScope +import io.airbyte.cdk.load.task.Task import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton -interface FlushCheckpointsTask : InternalScope +interface FlushCheckpointsTask : Task class DefaultFlushCheckpointsTask( private val checkpointManager: CheckpointManager<*, *>, diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/FlushTickTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/FlushTickTask.kt index 0e69940b4c17..5d083477e3e7 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/FlushTickTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/FlushTickTask.kt @@ -12,7 +12,7 @@ import io.airbyte.cdk.load.message.DestinationStreamEvent import io.airbyte.cdk.load.message.MessageQueueSupplier import io.airbyte.cdk.load.message.StreamFlushEvent import io.airbyte.cdk.load.state.Reserved -import io.airbyte.cdk.load.task.KillableScope +import io.airbyte.cdk.load.task.Task import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Secondary import io.micronaut.context.annotation.Value @@ -29,7 +29,7 @@ class FlushTickTask( private val catalog: DestinationCatalog, private val recordQueueSupplier: MessageQueueSupplier>, -) : KillableScope { +) : Task { private val log = KotlinLogging.logger {} override suspend fun execute() { diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt index e084bcc4fe41..47b0839e07d9 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt @@ -33,7 +33,7 @@ import io.airbyte.cdk.load.message.Undefined import io.airbyte.cdk.load.state.Reserved import io.airbyte.cdk.load.state.SyncManager import io.airbyte.cdk.load.task.DestinationTaskLauncher -import io.airbyte.cdk.load.task.KillableScope +import io.airbyte.cdk.load.task.Task import io.airbyte.cdk.load.task.implementor.FileTransferQueueMessage import io.airbyte.cdk.load.util.use import io.github.oshai.kotlinlogging.KotlinLogging @@ -41,7 +41,7 @@ import io.micronaut.context.annotation.Secondary import jakarta.inject.Named import jakarta.inject.Singleton -interface InputConsumerTask : KillableScope +interface InputConsumerTask : Task /** * Routes @[DestinationStreamAffinedMessage]s by stream to the appropriate channel and @ diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTask.kt index 182bbe3d9fba..d00d7edc6255 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTask.kt @@ -24,7 +24,7 @@ import io.airbyte.cdk.load.state.ReservationManager import io.airbyte.cdk.load.state.Reserved import io.airbyte.cdk.load.state.TimeWindowTrigger import io.airbyte.cdk.load.task.DestinationTaskLauncher -import io.airbyte.cdk.load.task.KillableScope +import io.airbyte.cdk.load.task.Task import io.airbyte.cdk.load.task.implementor.FileAggregateMessage import io.airbyte.cdk.load.util.use import io.airbyte.cdk.load.util.withNextAdjacentValue @@ -40,7 +40,7 @@ import kotlin.io.path.deleteExisting import kotlin.io.path.outputStream import kotlinx.coroutines.flow.fold -interface SpillToDiskTask : KillableScope +interface SpillToDiskTask : Task /** * Reads records from the message queue and writes them to disk. Completes once the upstream diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/TimedForcedCheckpointFlushTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/TimedForcedCheckpointFlushTask.kt index 92aaf8b12beb..378f52002673 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/TimedForcedCheckpointFlushTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/TimedForcedCheckpointFlushTask.kt @@ -10,13 +10,13 @@ import io.airbyte.cdk.load.file.TimeProvider import io.airbyte.cdk.load.message.ChannelMessageQueue import io.airbyte.cdk.load.message.QueueWriter import io.airbyte.cdk.load.state.CheckpointManager -import io.airbyte.cdk.load.task.KillableScope +import io.airbyte.cdk.load.task.Task import io.airbyte.cdk.load.util.use import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton -interface TimedForcedCheckpointFlushTask : KillableScope +interface TimedForcedCheckpointFlushTask : Task @Singleton @Secondary diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/UpdateCheckpointsTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/UpdateCheckpointsTask.kt index c73a8ffd6376..bd7f61442ce4 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/UpdateCheckpointsTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/UpdateCheckpointsTask.kt @@ -13,12 +13,12 @@ import io.airbyte.cdk.load.message.StreamCheckpointWrapped import io.airbyte.cdk.load.state.CheckpointManager import io.airbyte.cdk.load.state.Reserved import io.airbyte.cdk.load.state.SyncManager -import io.airbyte.cdk.load.task.InternalScope +import io.airbyte.cdk.load.task.Task import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton -interface UpdateCheckpointsTask : InternalScope +interface UpdateCheckpointsTask : Task @Singleton @Secondary diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt index 3c9671d5deaf..5261517f4afd 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt @@ -76,8 +76,7 @@ import org.junit.jupiter.api.Test "MockScopeProvider", ] ) -class DestinationTaskLauncherTest { - @Inject lateinit var mockScopeProvider: MockScopeProvider +class DestinationTaskLauncherTest { @Inject lateinit var taskLauncher: DestinationTaskLauncher @Inject lateinit var syncManager: SyncManager @@ -447,32 +446,11 @@ class DestinationTaskLauncherTest { teardownTaskFactory.hasRun.receive() } - @Test - fun testHandleTeardownComplete() = runTest { - // This should close the scope provider. - launch { - taskLauncher.run() - Assertions.assertTrue(mockScopeProvider.didClose) - } - taskLauncher.handleTeardownComplete() - } - - @Test - fun testHandleCallbackWithFailure() = runTest { - launch { - taskLauncher.run() - Assertions.assertTrue(mockScopeProvider.didKill) - } - taskLauncher.handleTeardownComplete(success = false) - } - @Test fun `test exceptions in tasks throw`(catalog: DestinationCatalog) = runTest { mockSpillToDiskTaskFactory.forceFailure.getAndSet(true) - val job = launch { taskLauncher.run() } - taskLauncher.handleTeardownComplete() - job.join() + launch { taskLauncher.run() } mockFailStreamTaskFactory.didRunFor.close() diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt index f62914848a68..126d1d87f84a 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt @@ -44,8 +44,6 @@ import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test class DestinationTaskLauncherUTest { - private val taskScopeProvider: TaskScopeProvider> = - mockk(relaxed = true) private val catalog: DestinationCatalog = mockk(relaxed = true) private val syncManager: SyncManager = mockk(relaxed = true) @@ -85,7 +83,6 @@ class DestinationTaskLauncherUTest { useFileTranfer: Boolean ): DefaultDestinationTaskLauncher { return DefaultDestinationTaskLauncher( - taskScopeProvider, catalog, config, syncManager, @@ -114,8 +111,6 @@ class DestinationTaskLauncherUTest { @BeforeEach fun init() { - coEvery { taskScopeProvider.launch(any()) } returns Unit - val stream = mockk(relaxed = true) val streamDescriptor = DestinationStream.Descriptor("namespace", "name") every { stream.descriptor } returns streamDescriptor @@ -155,6 +150,5 @@ class DestinationTaskLauncherUTest { destinationTaskLauncher.handleTeardownComplete() coVerify { failStreamTaskFactory.make(any(), e, any()) } - coVerify { taskScopeProvider.launch(match { it.innerTask is FailStreamTask }) } } } diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/MockScopeProvider.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/MockScopeProvider.kt deleted file mode 100644 index 62ed56dfbd2c..000000000000 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/MockScopeProvider.kt +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright (c) 2024 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.cdk.load.task - -import io.micronaut.context.annotation.Primary -import io.micronaut.context.annotation.Requires -import jakarta.inject.Singleton -import java.util.concurrent.atomic.AtomicBoolean - -@Singleton -@Primary -@Requires(env = ["MockScopeProvider"]) -class MockScopeProvider : TaskScopeProvider> { - private val didCloseAB = AtomicBoolean(false) - private val didKillAB = AtomicBoolean(false) - - val didClose - get() = didCloseAB.get() - val didKill - get() = didKillAB.get() - - override suspend fun launch(task: WrappedTask) { - task.execute() - } - - override suspend fun close() { - didCloseAB.set(true) - } - - override suspend fun kill() { - didKillAB.set(true) - } -} diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt index 118e4b7e0c38..0f1c026f9b75 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt @@ -76,6 +76,7 @@ class S3V2WriteTestJsonUncompressed : } @Test + @Disabled("not thread-safe") override fun testBasicWriteFile() { super.testBasicWriteFile() } @@ -128,6 +129,7 @@ class S3V2WriteTestCsvUncompressed : allTypesBehavior = Untyped, ) { @Test + @Disabled("Not thread-safe") override fun testBasicWriteFile() { super.testBasicWriteFile() }