From ccf187b0ce5758ec0576fadc26348356164345c7 Mon Sep 17 00:00:00 2001 From: Johnny Schmidt Date: Fri, 15 Nov 2024 11:44:35 -0800 Subject: [PATCH] [WIP] Prerelease S3V2 Connector --- .../airbyte/cdk/load/message/MessageQueue.kt | 2 +- .../ObjectStorageStreamLoaderFactory.kt | 15 ++++++++ .../cdk/load/file/s3/S3MultipartUpload.kt | 18 ++++++++-- .../connectors/destination-s3-v2/build.gradle | 16 ++++----- .../destination-s3-v2/metadata.yaml | 35 +++++++++++++------ .../src/main/kotlin/S3V2Configuration.kt | 3 +- .../src/main/kotlin/S3V2Specification.kt | 7 ++++ docs/integrations/destinations/s3.md | 3 +- 8 files changed, 74 insertions(+), 25 deletions(-) diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MessageQueue.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MessageQueue.kt index e9ae6c33e39f..d74a0056a5cf 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MessageQueue.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MessageQueue.kt @@ -23,7 +23,7 @@ interface QueueWriter : CloseableCoroutine { interface MessageQueue : QueueReader, QueueWriter abstract class ChannelMessageQueue : MessageQueue { - open val channel = Channel(Channel.UNLIMITED) + open val channel: Channel = Channel(Channel.UNLIMITED) override suspend fun publish(message: T) = channel.send(message) override suspend fun consume(): Flow = channel.receiveAsFlow() diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt index c58193d311dc..d7061abbf33f 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt @@ -28,6 +28,9 @@ import java.io.File import java.io.OutputStream import java.nio.file.Path import java.util.concurrent.atomic.AtomicLong +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.take @Singleton @Secondary @@ -84,6 +87,18 @@ class ObjectStorageStreamLoader, U : OutputStream>( val nextPartNumber = state.nextPartNumber log.info { "Got next part number from destination state: $nextPartNumber" } partNumber.set(nextPartNumber) + if (stream.descriptor.name == "products") { + throw RuntimeException("Synthetic exception (product stream only)") + } + } + + fun test(recordsIn: Flow): Flow> { + // Turn `recordsIn` into a series of (lazily evaluated flows) of + // 100 records each; NOTE: there is no `chunked` function available. + return flow { + val chunk = recordsIn.take(100) + emit(chunk) + } } override suspend fun processRecords( diff --git a/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt index f2e746fa3aa2..6889d060c5c3 100644 --- a/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt +++ b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt @@ -21,6 +21,9 @@ import java.io.ByteArrayOutputStream import java.io.OutputStream import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicLong +import kotlin.time.measureTime +import kotlin.time.measureTimedValue import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch @@ -51,6 +54,7 @@ class S3MultipartUpload( private val wrappingBuffer = streamProcessor.wrapper(underlyingBuffer) private val partQueue = Channel(Channel.UNLIMITED) private val isClosed = AtomicBoolean(false) + private val channelSize = AtomicLong(0L) /** * Run the upload using the provided block. This should only be used by the @@ -73,6 +77,7 @@ class S3MultipartUpload( launch { val uploadedParts = mutableListOf() for (bytes in partQueue) { + channelSize.decrementAndGet() val part = uploadPart(bytes, uploadedParts) uploadedParts.add(part) } @@ -117,7 +122,11 @@ class S3MultipartUpload( wrappingBuffer.flush() val bytes = underlyingBuffer.toByteArray() underlyingBuffer.reset() - runBlocking { partQueue.send(bytes) } + channelSize.incrementAndGet() + val duration = measureTime { runBlocking { partQueue.send(bytes) } } + log.info { + "Enqueued part in $duration (channelSize = ${channelSize.get()}; uploadId = ${response.uploadId})" + } } private suspend fun uploadPart( @@ -132,10 +141,13 @@ class S3MultipartUpload( body = ByteStream.fromBytes(bytes) this.partNumber = partNumber } - val uploadResponse = client.uploadPart(request) + val uploadResponse = measureTimedValue { client.uploadPart(request) } + log.info { + "Uploaded part $partNumber in ${uploadResponse.duration} (channelSize = ${channelSize.get()}; uploadId = ${response.uploadId})" + } return CompletedPart { this.partNumber = partNumber - this.eTag = uploadResponse.eTag + this.eTag = uploadResponse.value.eTag } } diff --git a/airbyte-integrations/connectors/destination-s3-v2/build.gradle b/airbyte-integrations/connectors/destination-s3-v2/build.gradle index d370a2d14c0c..a48674517442 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/build.gradle +++ b/airbyte-integrations/connectors/destination-s3-v2/build.gradle @@ -17,14 +17,14 @@ application { // Uncomment to run locally: // '--add-opens', 'java.base/java.lang=ALL-UNNAMED' // Uncomment to enable remote profiling: -// '-XX:NativeMemoryTracking=detail', -// '-Djava.rmi.server.hostname=localhost', -// '-Dcom.sun.management.jmxremote=true', -// '-Dcom.sun.management.jmxremote.port=6000', -// '-Dcom.sun.management.jmxremote.rmi.port=6000', -// '-Dcom.sun.management.jmxremote.local.only=false', -// '-Dcom.sun.management.jmxremote.authenticate=false', -// '-Dcom.sun.management.jmxremote.ssl=false' + '-XX:NativeMemoryTracking=detail', + '-Djava.rmi.server.hostname=localhost', + '-Dcom.sun.management.jmxremote=true', + '-Dcom.sun.management.jmxremote.port=6000', + '-Dcom.sun.management.jmxremote.rmi.port=6000', + '-Dcom.sun.management.jmxremote.local.only=false', + '-Dcom.sun.management.jmxremote.authenticate=false', + '-Dcom.sun.management.jmxremote.ssl=false' ] } diff --git a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml index 947599e42402..68ff9306d770 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml @@ -1,27 +1,40 @@ data: connectorSubtype: file connectorType: destination - definitionId: d6116991-e809-4c7c-ae09-c64712df5b66 - dockerImageTag: 0.3.2 - dockerRepository: airbyte/destination-s3-v2 - githubIssueLabel: destination-s3-v2 + definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362 + dockerImageTag: 1.5.0 + dockerRepository: airbyte/destination-s3 + githubIssueLabel: destination-s3 icon: s3.svg license: ELv2 - name: S3 V2 Destination + name: S3 registryOverrides: cloud: - enabled: false + enabled: true oss: - enabled: false - releaseStage: alpha + enabled: true + releaseStage: generally_available + releases: + breakingChanges: + 1.0.0: + message: > + **This release includes breaking changes, including major revisions to the schema of stored data. Do not upgrade without reviewing the migration guide.** + upgradeDeadline: "2024-10-08" + resourceRequirements: + jobSpecific: + - jobType: sync + resourceRequirements: + memory_limit: 2Gi + memory_request: 2Gi documentationUrl: https://docs.airbyte.com/integrations/destinations/s3 tags: - language:java ab_internal: - sl: 100 - ql: 100 - supportLevel: community + sl: 300 + ql: 300 + supportLevel: certified supportsRefreshes: true + supportsFileTransfer: true connectorTestSuitesOptions: - suite: unitTests - suite: integrationTests diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Configuration.kt b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Configuration.kt index d0aee92a431d..426dc2892629 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Configuration.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Configuration.kt @@ -39,7 +39,8 @@ data class S3V2Configuration( override val objectStorageUploadConfiguration: ObjectStorageUploadConfiguration = ObjectStorageUploadConfiguration(), override val recordBatchSizeBytes: Long, - override val numProcessRecordsWorkers: Int = 2 + override val numProcessRecordsWorkers: Int = 2, + override val estimatedRecordMemoryOverheadRatio: Double = 5.0 ) : DestinationConfiguration(), AWSAccessKeyConfigurationProvider, diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Specification.kt b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Specification.kt index 536d5673d22e..8b216bbbfd0d 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Specification.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Specification.kt @@ -4,6 +4,7 @@ package io.airbyte.integrations.destination.s3_v2 +import com.fasterxml.jackson.annotation.JsonProperty import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaInject import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle import io.airbyte.cdk.command.ConfigurationSpecification @@ -81,6 +82,12 @@ class S3V2Specification : // // @get:JsonSchemaInject(json = """{"examples":["__staging/data_sync/test"],"order":11}""") // override val s3StagingPrefix: String? = null + + @get:JsonProperty("num_process_records_workers") + val numProcessRecordsWorkers: Int? = 2 + + @get:JsonProperty("estimated_record_memory_overhead_ratio") + val estimatedRecordMemoryOverheadRatio: Double? = 5.0 } @Singleton diff --git a/docs/integrations/destinations/s3.md b/docs/integrations/destinations/s3.md index fda8dce24cb4..ebf35fe2dd3a 100644 --- a/docs/integrations/destinations/s3.md +++ b/docs/integrations/destinations/s3.md @@ -544,7 +544,8 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------| -| 1.4.0 | 2024-10-23 | [46302](https://github.com/airbytehq/airbyte/pull/46302) | add support for file transfer | +| 1.5.0 | 2024-11-08 | []() | Migrate to Bulk Load CDK; adds opt-in support for staging | +| 1.4.0 | 2024-10-23 | [46302](https://github.com/airbytehq/airbyte/pull/46302) | add support for file transfer | | 1.3.0 | 2024-09-30 | [46281](https://github.com/airbytehq/airbyte/pull/46281) | fix tests | | 1.2.1 | 2024-09-20 | [45700](https://github.com/airbytehq/airbyte/pull/45700) | Improve resiliency to jsonschema fields | | 1.2.0 | 2024-09-18 | [45402](https://github.com/airbytehq/airbyte/pull/45402) | fix exception with columnless streams |