From 671d4de84f54deabe1271e29dcb9bcb80f374d6a Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Tue, 24 Dec 2024 10:36:48 -0800 Subject: [PATCH] Add file support to the bulk CDK (#49931) --- .../MockDestinationWriter.kt | 4 - .../load/command/DestinationConfiguration.kt | 1 + .../cdk/load/config/SyncBeanFactory.kt | 10 ++ .../cdk/load/task/DestinationTaskLauncher.kt | 29 +++--- .../load/task/implementor/ProcessBatchTask.kt | 3 +- .../load/task/implementor/ProcessFileTask.kt | 47 ++++++---- .../load/task/internal/InputConsumerTask.kt | 16 +++- .../io/airbyte/cdk/load/write/StreamLoader.kt | 20 +++- .../load/task/DestinationTaskLauncherTest.kt | 4 +- .../load/task/DestinationTaskLauncherUTest.kt | 20 +--- .../airbyte/cdk/load/task/MockTaskLauncher.kt | 9 -- .../task/implementor/ProcessFileTaskTest.kt | 49 ---------- .../task/internal/InputConsumerTaskTest.kt | 55 ++++++----- .../ObjectStorageFormattingWriter.kt | 2 + .../object_storage/FilePartAccumulator.kt | 76 +++++++++++++++ .../ObjectStorageStreamLoaderFactory.kt | 39 ++------ .../object_storage/FilePartAccumulatorTest.kt | 94 +++++++++++++++++++ .../ObjectStorageStreamLoaderTest.kt | 90 ------------------ .../bulk/toolkits/load-s3/build.gradle | 2 +- .../destination-dev-null/metadata.yaml | 2 +- .../destination/dev_null/DevNullWriter.kt | 35 ------- ...evNullBasicFunctionalityIntegrationTest.kt | 8 +- .../destination-iceberg-v2/metadata.yaml | 2 +- .../iceberg/v2/IcebergStreamLoader.kt | 5 - .../iceberg/v2/IcebergV2WriteTest.kt | 6 +- .../s3/S3V2AvroDestinationAcceptanceTest.kt | 1 + .../s3/S3V2CsvDestinationAcceptanceTest.kt | 1 + .../S3V2CsvGzipDestinationAcceptanceTest.kt | 2 + .../s3/S3V2JsonlDestinationAcceptanceTest.kt | 1 + .../S3V2JsonlGzipDestinationAcceptanceTest.kt | 1 + .../S3V2ParquetDestinationAcceptanceTest.kt | 1 + .../destination/s3_v2/S3V2WriteTest.kt | 14 ++- docs/integrations/destinations/dev-null.md | 1 + 33 files changed, 339 insertions(+), 311 deletions(-) delete mode 100644 airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/implementor/ProcessFileTaskTest.kt create mode 100644 airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulator.kt create mode 100644 airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulatorTest.kt delete mode 100644 airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderTest.kt diff --git a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationWriter.kt b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationWriter.kt index f6f99528a8a7..efc49f2463cb 100644 --- a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationWriter.kt +++ b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationWriter.kt @@ -79,10 +79,6 @@ class MockStreamLoader(override val stream: DestinationStream) : StreamLoader { return LocalBatch(records.asSequence().toList()) } - override suspend fun processFile(file: DestinationFile): Batch { - return LocalFileBatch(file) - } - override suspend fun processBatch(batch: Batch): Batch { return when (batch) { is LocalBatch -> { diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationConfiguration.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationConfiguration.kt index 035d31159a6c..a36d0044c950 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationConfiguration.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationConfiguration.kt @@ -87,6 +87,7 @@ abstract class DestinationConfiguration : Configuration { open val numProcessRecordsWorkers: Int = 2 open val numProcessBatchWorkers: Int = 5 + open val numProcessBatchWorkersForFileTransfer: Int = 3 open val batchQueueDepth: Int = 10 companion object { diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt index 380c7143baea..700fcc2c3bc2 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt @@ -10,6 +10,7 @@ import io.airbyte.cdk.load.message.BatchEnvelope import io.airbyte.cdk.load.message.MultiProducerChannel import io.airbyte.cdk.load.state.ReservationManager import io.airbyte.cdk.load.task.implementor.FileAggregateMessage +import io.airbyte.cdk.load.task.implementor.FileTransferQueueMessage import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Factory import io.micronaut.context.annotation.Value @@ -79,4 +80,13 @@ class SyncBeanFactory { val channel = Channel>(config.batchQueueDepth) return MultiProducerChannel(config.numProcessRecordsWorkers.toLong(), channel, "batchQueue") } + + @Singleton + @Named("fileMessageQueue") + fun fileMessageQueue( + config: DestinationConfiguration, + ): MultiProducerChannel { + val channel = Channel(config.batchQueueDepth) + return MultiProducerChannel(1, channel, "fileMessageQueue") + } } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt index f022a6804a79..8443b98410c6 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt @@ -10,8 +10,8 @@ import io.airbyte.cdk.load.command.DestinationConfiguration import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.message.BatchEnvelope import io.airbyte.cdk.load.message.CheckpointMessageWrapped -import io.airbyte.cdk.load.message.DestinationFile import io.airbyte.cdk.load.message.DestinationStreamEvent +import io.airbyte.cdk.load.message.MessageQueue import io.airbyte.cdk.load.message.MessageQueueSupplier import io.airbyte.cdk.load.message.QueueWriter import io.airbyte.cdk.load.state.Reserved @@ -19,6 +19,7 @@ import io.airbyte.cdk.load.state.SyncManager import io.airbyte.cdk.load.task.implementor.CloseStreamTaskFactory import io.airbyte.cdk.load.task.implementor.FailStreamTaskFactory import io.airbyte.cdk.load.task.implementor.FailSyncTaskFactory +import io.airbyte.cdk.load.task.implementor.FileTransferQueueMessage import io.airbyte.cdk.load.task.implementor.OpenStreamTaskFactory import io.airbyte.cdk.load.task.implementor.ProcessBatchTaskFactory import io.airbyte.cdk.load.task.implementor.ProcessFileTaskFactory @@ -36,6 +37,7 @@ import io.airbyte.cdk.load.util.setOnce import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Secondary import io.micronaut.context.annotation.Value +import jakarta.inject.Named import jakarta.inject.Singleton import java.util.concurrent.atomic.AtomicBoolean import kotlinx.coroutines.CancellationException @@ -49,8 +51,6 @@ interface DestinationTaskLauncher : TaskLauncher { suspend fun handleNewBatch(stream: DestinationStream.Descriptor, wrapped: BatchEnvelope<*>) suspend fun handleStreamClosed(stream: DestinationStream.Descriptor) suspend fun handleTeardownComplete(success: Boolean = true) - suspend fun handleFile(stream: DestinationStream.Descriptor, file: DestinationFile, index: Long) - suspend fun handleException(e: Exception) suspend fun handleFailStreamComplete(stream: DestinationStream.Descriptor, e: Exception) } @@ -128,6 +128,7 @@ class DefaultDestinationTaskLauncher( private val recordQueueSupplier: MessageQueueSupplier>, private val checkpointQueue: QueueWriter>, + @Named("fileMessageQueue") private val fileTransferQueue: MessageQueue ) : DestinationTaskLauncher { private val log = KotlinLogging.logger {} @@ -179,7 +180,8 @@ class DefaultDestinationTaskLauncher( inputFlow = inputFlow, recordQueueSupplier = recordQueueSupplier, checkpointQueue = checkpointQueue, - this, + fileTransferQueue = fileTransferQueue, + destinationTaskLauncher = this, ) enqueue(inputConsumerTask) @@ -208,6 +210,17 @@ class DefaultDestinationTaskLauncher( val task = processBatchTaskFactory.make(this) enqueue(task) } + } else { + repeat(config.numProcessRecordsWorkers) { + log.info { "Launching process file task $it" } + enqueue(processFileTaskFactory.make(this)) + } + + repeat(config.numProcessBatchWorkersForFileTransfer) { + log.info { "Launching process batch task $it" } + val task = processBatchTaskFactory.make(this) + enqueue(task) + } } // Start flush task @@ -283,14 +296,6 @@ class DefaultDestinationTaskLauncher( } } - override suspend fun handleFile( - stream: DestinationStream.Descriptor, - file: DestinationFile, - index: Long - ) { - enqueue(processFileTaskFactory.make(this, stream, file, index)) - } - override suspend fun handleException(e: Exception) { catalog.streams .map { failStreamTaskFactory.make(this, e, it.descriptor) } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessBatchTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessBatchTask.kt index 34b4d94a88a1..1d0e43d86242 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessBatchTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessBatchTask.kt @@ -10,6 +10,7 @@ import io.airbyte.cdk.load.state.SyncManager import io.airbyte.cdk.load.task.DestinationTaskLauncher import io.airbyte.cdk.load.task.KillableScope import io.airbyte.cdk.load.write.StreamLoader +import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Secondary import jakarta.inject.Named import jakarta.inject.Singleton @@ -22,7 +23,7 @@ class DefaultProcessBatchTask( private val batchQueue: MultiProducerChannel>, private val taskLauncher: DestinationTaskLauncher ) : ProcessBatchTask { - + val log = KotlinLogging.logger {} override suspend fun execute() { batchQueue.consume().collect { batchEnvelope -> val streamLoader = syncManager.getOrAwaitStreamLoader(batchEnvelope.streamDescriptor) diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessFileTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessFileTask.kt index 36801ce82156..0f2dcb0c3cf7 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessFileTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessFileTask.kt @@ -4,44 +4,53 @@ package io.airbyte.cdk.load.task.implementor -import com.google.common.collect.Range import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.message.BatchEnvelope import io.airbyte.cdk.load.message.DestinationFile +import io.airbyte.cdk.load.message.MessageQueue +import io.airbyte.cdk.load.message.MultiProducerChannel import io.airbyte.cdk.load.state.SyncManager import io.airbyte.cdk.load.task.DestinationTaskLauncher import io.airbyte.cdk.load.task.ImplementorScope +import io.airbyte.cdk.load.util.use +import io.airbyte.cdk.load.write.FileBatchAccumulator import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Secondary +import jakarta.inject.Named import jakarta.inject.Singleton +import java.util.concurrent.ConcurrentHashMap interface ProcessFileTask : ImplementorScope class DefaultProcessFileTask( - private val streamDescriptor: DestinationStream.Descriptor, - private val taskLauncher: DestinationTaskLauncher, private val syncManager: SyncManager, - private val file: DestinationFile, - private val index: Long, + private val taskLauncher: DestinationTaskLauncher, + private val inputQueue: MessageQueue, + private val outputQueue: MultiProducerChannel>, ) : ProcessFileTask { val log = KotlinLogging.logger {} + private val accumulators = + ConcurrentHashMap() override suspend fun execute() { - val streamLoader = syncManager.getOrAwaitStreamLoader(streamDescriptor) + outputQueue.use { + inputQueue.consume().collect { (streamDescriptor, file, index) -> + val streamLoader = syncManager.getOrAwaitStreamLoader(streamDescriptor) - val batch = streamLoader.processFile(file) + val acc = + accumulators.getOrPut(streamDescriptor) { + streamLoader.createFileBatchAccumulator(outputQueue) + } - val wrapped = BatchEnvelope(batch, Range.singleton(index), streamDescriptor) - taskLauncher.handleNewBatch(streamDescriptor, wrapped) + acc.processFilePart(file, index) + } + } } } interface ProcessFileTaskFactory { fun make( taskLauncher: DestinationTaskLauncher, - stream: DestinationStream.Descriptor, - file: DestinationFile, - index: Long, ): ProcessFileTask } @@ -49,13 +58,19 @@ interface ProcessFileTaskFactory { @Secondary class DefaultFileRecordsTaskFactory( private val syncManager: SyncManager, + @Named("fileMessageQueue") + private val fileTransferQueue: MessageQueue, + @Named("batchQueue") private val outputQueue: MultiProducerChannel>, ) : ProcessFileTaskFactory { override fun make( taskLauncher: DestinationTaskLauncher, - stream: DestinationStream.Descriptor, - file: DestinationFile, - index: Long, ): ProcessFileTask { - return DefaultProcessFileTask(stream, taskLauncher, syncManager, file, index) + return DefaultProcessFileTask(syncManager, taskLauncher, fileTransferQueue, outputQueue) } } + +data class FileTransferQueueMessage( + val streamDescriptor: DestinationStream.Descriptor, + val file: DestinationFile, + val index: Long, +) diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt index fb114ac05699..e084bcc4fe41 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt @@ -21,6 +21,7 @@ import io.airbyte.cdk.load.message.DestinationStreamAffinedMessage import io.airbyte.cdk.load.message.DestinationStreamEvent import io.airbyte.cdk.load.message.GlobalCheckpoint import io.airbyte.cdk.load.message.GlobalCheckpointWrapped +import io.airbyte.cdk.load.message.MessageQueue import io.airbyte.cdk.load.message.MessageQueueSupplier import io.airbyte.cdk.load.message.QueueWriter import io.airbyte.cdk.load.message.SimpleBatch @@ -33,9 +34,11 @@ import io.airbyte.cdk.load.state.Reserved import io.airbyte.cdk.load.state.SyncManager import io.airbyte.cdk.load.task.DestinationTaskLauncher import io.airbyte.cdk.load.task.KillableScope +import io.airbyte.cdk.load.task.implementor.FileTransferQueueMessage import io.airbyte.cdk.load.util.use import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Secondary +import jakarta.inject.Named import jakarta.inject.Singleton interface InputConsumerTask : KillableScope @@ -60,6 +63,8 @@ class DefaultInputConsumerTask( private val checkpointQueue: QueueWriter>, private val syncManager: SyncManager, private val destinationTaskLauncher: DestinationTaskLauncher, + @Named("fileMessageQueue") + private val fileTransferQueue: MessageQueue, ) : InputConsumerTask { private val log = KotlinLogging.logger {} @@ -96,15 +101,17 @@ class DefaultInputConsumerTask( } is DestinationFile -> { val index = manager.countRecordIn() - destinationTaskLauncher.handleFile(stream, message, index) + // destinationTaskLauncher.handleFile(stream, message, index) + fileTransferQueue.publish(FileTransferQueueMessage(stream, message, index)) } is DestinationFileStreamComplete -> { reserved.release() // safe because multiple calls conflate manager.markEndOfStream(true) + fileTransferQueue.close() val envelope = BatchEnvelope( SimpleBatch(Batch.State.COMPLETE), - streamDescriptor = message.stream + streamDescriptor = message.stream, ) destinationTaskLauncher.handleNewBatch(stream, envelope) } @@ -197,6 +204,7 @@ interface InputConsumerTaskFactory { MessageQueueSupplier>, checkpointQueue: QueueWriter>, destinationTaskLauncher: DestinationTaskLauncher, + fileTransferQueue: MessageQueue ): InputConsumerTask } @@ -211,6 +219,7 @@ class DefaultInputConsumerTaskFactory(private val syncManager: SyncManager) : MessageQueueSupplier>, checkpointQueue: QueueWriter>, destinationTaskLauncher: DestinationTaskLauncher, + fileTransferQueue: MessageQueue, ): InputConsumerTask { return DefaultInputConsumerTask( catalog, @@ -218,7 +227,8 @@ class DefaultInputConsumerTaskFactory(private val syncManager: SyncManager) : recordQueueSupplier, checkpointQueue, syncManager, - destinationTaskLauncher + destinationTaskLauncher, + fileTransferQueue, ) } } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamLoader.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamLoader.kt index 308dc5e99c09..f45d3b67c81d 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamLoader.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamLoader.kt @@ -6,8 +6,10 @@ package io.airbyte.cdk.load.write import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.message.Batch +import io.airbyte.cdk.load.message.BatchEnvelope import io.airbyte.cdk.load.message.DestinationFile import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue +import io.airbyte.cdk.load.message.MultiProducerChannel import io.airbyte.cdk.load.message.SimpleBatch import io.airbyte.cdk.load.state.StreamProcessingFailed @@ -43,13 +45,15 @@ import io.airbyte.cdk.load.state.StreamProcessingFailed * but only if [start] returned successfully. If any exception was thrown during processing, it is * passed as an argument to [close]. */ -interface StreamLoader : BatchAccumulator { +interface StreamLoader : BatchAccumulator, FileBatchAccumulator { val stream: DestinationStream suspend fun start() {} suspend fun createBatchAccumulator(): BatchAccumulator = this + suspend fun createFileBatchAccumulator( + outputQueue: MultiProducerChannel>, + ): FileBatchAccumulator = this - suspend fun processFile(file: DestinationFile): Batch suspend fun processBatch(batch: Batch): Batch = SimpleBatch(Batch.State.COMPLETE) suspend fun close(streamFailure: StreamProcessingFailed? = null) {} } @@ -64,3 +68,15 @@ interface BatchAccumulator { "processRecords must be implemented if createBatchAccumulator is overridden" ) } + +interface FileBatchAccumulator { + /** + * This is an unusal way to process a message (the DestinationFile). The batch are pushed to the + * queue immediately instead of being return by the method, the main reason is that we nned to + * keep a single instance of a PartFactory for the whole file. + */ + suspend fun processFilePart(file: DestinationFile, index: Long): Unit = + throw NotImplementedError( + "processRecords must be implemented if createBatchAccumulator is overridden" + ) +} diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt index a5f59c711fa3..3c9671d5deaf 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt @@ -29,6 +29,7 @@ import io.airbyte.cdk.load.task.implementor.FailStreamTask import io.airbyte.cdk.load.task.implementor.FailStreamTaskFactory import io.airbyte.cdk.load.task.implementor.FailSyncTask import io.airbyte.cdk.load.task.implementor.FailSyncTaskFactory +import io.airbyte.cdk.load.task.implementor.FileTransferQueueMessage import io.airbyte.cdk.load.task.implementor.OpenStreamTask import io.airbyte.cdk.load.task.implementor.OpenStreamTaskFactory import io.airbyte.cdk.load.task.implementor.ProcessBatchTaskFactory @@ -153,7 +154,8 @@ class DestinationTaskLauncherTest { MessageQueueSupplier< DestinationStream.Descriptor, Reserved>, checkpointQueue: QueueWriter>, - destinationTaskLauncher: DestinationTaskLauncher + destinationTaskLauncher: DestinationTaskLauncher, + fileTransferQueue: MessageQueue, ): InputConsumerTask { return object : InputConsumerTask { override suspend fun execute() { diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt index 0a92a0a83b78..f62914848a68 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt @@ -9,6 +9,7 @@ import io.airbyte.cdk.load.command.DestinationConfiguration import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.message.CheckpointMessageWrapped import io.airbyte.cdk.load.message.DestinationStreamEvent +import io.airbyte.cdk.load.message.MessageQueue import io.airbyte.cdk.load.message.MessageQueueSupplier import io.airbyte.cdk.load.message.QueueWriter import io.airbyte.cdk.load.state.Reserved @@ -17,9 +18,9 @@ import io.airbyte.cdk.load.task.implementor.CloseStreamTaskFactory import io.airbyte.cdk.load.task.implementor.FailStreamTask import io.airbyte.cdk.load.task.implementor.FailStreamTaskFactory import io.airbyte.cdk.load.task.implementor.FailSyncTaskFactory +import io.airbyte.cdk.load.task.implementor.FileTransferQueueMessage import io.airbyte.cdk.load.task.implementor.OpenStreamTaskFactory import io.airbyte.cdk.load.task.implementor.ProcessBatchTaskFactory -import io.airbyte.cdk.load.task.implementor.ProcessFileTask import io.airbyte.cdk.load.task.implementor.ProcessFileTaskFactory import io.airbyte.cdk.load.task.implementor.ProcessRecordsTaskFactory import io.airbyte.cdk.load.task.implementor.SetupTaskFactory @@ -79,6 +80,7 @@ class DestinationTaskLauncherUTest { mockk(relaxed = true) private val checkpointQueue: QueueWriter> = mockk(relaxed = true) + private val fileTransferQueue: MessageQueue = mockk(relaxed = true) private fun getDefaultDestinationTaskLauncher( useFileTranfer: Boolean ): DefaultDestinationTaskLauncher { @@ -106,6 +108,7 @@ class DestinationTaskLauncherUTest { inputFlow, recordQueueSupplier, checkpointQueue, + fileTransferQueue, ) } @@ -143,21 +146,6 @@ class DestinationTaskLauncherUTest { coVerify { spillToDiskTaskFactory.make(any(), any()) } } - class MockedTaskWrapper(override val innerTask: ScopedTask) : WrappedTask { - override suspend fun execute() {} - } - - @Test - fun `test handle file`() = runTest { - val processFileTask = mockk(relaxed = true) - every { processFileTaskFactory.make(any(), any(), any(), any()) } returns processFileTask - - val destinationTaskLauncher = getDefaultDestinationTaskLauncher(true) - destinationTaskLauncher.handleFile(mockk(), mockk(), 1L) - - coVerify { taskScopeProvider.launch(match { it.innerTask is ProcessFileTask }) } - } - @Test fun `test handle exception`() = runTest { val destinationTaskLauncher = getDefaultDestinationTaskLauncher(true) diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/MockTaskLauncher.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/MockTaskLauncher.kt index 913a7e425d6b..da78d46d0e2a 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/MockTaskLauncher.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/MockTaskLauncher.kt @@ -6,7 +6,6 @@ package io.airbyte.cdk.load.task import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.message.BatchEnvelope -import io.airbyte.cdk.load.message.DestinationFile import io.micronaut.context.annotation.Primary import io.micronaut.context.annotation.Requires import jakarta.inject.Singleton @@ -40,14 +39,6 @@ class MockTaskLauncher : DestinationTaskLauncher { throw NotImplementedError() } - override suspend fun handleFile( - stream: DestinationStream.Descriptor, - file: DestinationFile, - index: Long - ) { - throw NotImplementedError("This destination does not support file transfer.") - } - override suspend fun handleException(e: Exception) { TODO("Not yet implemented") } diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/implementor/ProcessFileTaskTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/implementor/ProcessFileTaskTest.kt deleted file mode 100644 index 0c550b927c1e..000000000000 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/implementor/ProcessFileTaskTest.kt +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright (c) 2024 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.cdk.load.task.implementor - -import com.google.common.collect.Range -import io.airbyte.cdk.load.command.DestinationStream -import io.airbyte.cdk.load.message.Batch -import io.airbyte.cdk.load.message.BatchEnvelope -import io.airbyte.cdk.load.message.DestinationFile -import io.airbyte.cdk.load.message.SimpleBatch -import io.airbyte.cdk.load.state.SyncManager -import io.airbyte.cdk.load.task.DestinationTaskLauncher -import io.airbyte.cdk.load.write.StreamLoader -import io.mockk.coEvery -import io.mockk.coVerify -import io.mockk.mockk -import kotlinx.coroutines.test.runTest -import org.junit.jupiter.api.Test - -class ProcessFileTaskTest { - private val stream: DestinationStream.Descriptor = - DestinationStream.Descriptor("namespace", "name") - private val taskLauncher: DestinationTaskLauncher = mockk(relaxed = true) - private val syncManager: SyncManager = mockk(relaxed = true) - private val file: DestinationFile = mockk(relaxed = true) - private val index = 1234L - - val defaultProcessFileTask = - DefaultProcessFileTask(stream, taskLauncher, syncManager, file, index) - - @Test - fun `the the file process task execution`() = runTest { - val mockedStreamLoader = mockk(relaxed = true) - coEvery { syncManager.getOrAwaitStreamLoader(stream) } returns mockedStreamLoader - val batch = SimpleBatch(Batch.State.COMPLETE) - coEvery { mockedStreamLoader.processFile(file) } returns batch - - defaultProcessFileTask.execute() - - coVerify { - taskLauncher.handleNewBatch( - stream, - BatchEnvelope(batch, Range.singleton(index), stream) - ) - } - } -} diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskTest.kt index 02d89192c3e4..02a402904910 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskTest.kt @@ -94,11 +94,12 @@ class InputConsumerTaskTest { val task = DefaultInputConsumerTaskFactory(syncManager) .make( - catalog, - inputFlow, - recordQueueSupplier, - checkpointQueue, - mockk(), + catalog = catalog, + inputFlow = inputFlow, + recordQueueSupplier = recordQueueSupplier, + checkpointQueue = checkpointQueue, + destinationTaskLauncher = mockk(), + fileTransferQueue = mockk(relaxed = true), ) task.execute() @@ -150,11 +151,12 @@ class InputConsumerTaskTest { val task = DefaultInputConsumerTaskFactory(syncManager) .make( - catalog, - inputFlow, - recordQueueSupplier, - checkpointQueue, - mockk(), + catalog = catalog, + inputFlow = inputFlow, + recordQueueSupplier = recordQueueSupplier, + checkpointQueue = checkpointQueue, + destinationTaskLauncher = mockk(), + fileTransferQueue = mockk(relaxed = true), ) task.execute() coVerifySequence { @@ -187,11 +189,12 @@ class InputConsumerTaskTest { val task = DefaultInputConsumerTaskFactory(syncManager) .make( - catalog, - inputFlow, - recordQueueSupplier, - checkpointQueue, - mockk(), + catalog = catalog, + inputFlow = inputFlow, + recordQueueSupplier = recordQueueSupplier, + checkpointQueue = checkpointQueue, + destinationTaskLauncher = mockk(), + fileTransferQueue = mockk(relaxed = true), ) coEvery { inputFlow.collect(any()) } coAnswers { @@ -242,11 +245,12 @@ class InputConsumerTaskTest { val task = DefaultInputConsumerTaskFactory(syncManager) .make( - catalog, - inputFlow, - recordQueueSupplier, - checkpointQueue, - mockk(), + catalog = catalog, + inputFlow = inputFlow, + recordQueueSupplier = recordQueueSupplier, + checkpointQueue = checkpointQueue, + destinationTaskLauncher = mockk(), + fileTransferQueue = mockk(relaxed = true), ) coEvery { inputFlow.collect(any()) } coAnswers @@ -314,11 +318,12 @@ class InputConsumerTaskTest { val task = DefaultInputConsumerTaskFactory(syncManager) .make( - catalog, - inputFlow, - recordQueueSupplier, - checkpointQueue, - mockk(relaxed = true), + catalog = catalog, + inputFlow = inputFlow, + recordQueueSupplier = recordQueueSupplier, + checkpointQueue = checkpointQueue, + destinationTaskLauncher = mockk(relaxed = true), + fileTransferQueue = mockk(relaxed = true), ) assertThrows(IllegalStateException::class) { task.execute() } diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageFormattingWriter.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageFormattingWriter.kt index 7832b4279798..286e302fcbcc 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageFormattingWriter.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageFormattingWriter.kt @@ -52,6 +52,8 @@ class ObjectStorageFormattingWriterFactory( outputStream: OutputStream ): ObjectStorageFormattingWriter { val flatten = formatConfigProvider.objectStorageFormatConfiguration.rootLevelFlattening + // TODO: FileWriter + return when (formatConfigProvider.objectStorageFormatConfiguration) { is JsonFormatConfiguration -> JsonFormattingWriter(stream, outputStream, flatten) is AvroFormatConfiguration -> diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulator.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulator.kt new file mode 100644 index 000000000000..cbf0199082df --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulator.kt @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.write.object_storage + +import com.google.common.collect.Range +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.command.object_storage.ObjectStorageUploadConfiguration +import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory +import io.airbyte.cdk.load.file.object_storage.PartFactory +import io.airbyte.cdk.load.message.Batch +import io.airbyte.cdk.load.message.BatchEnvelope +import io.airbyte.cdk.load.message.DestinationFile +import io.airbyte.cdk.load.message.MultiProducerChannel +import io.airbyte.cdk.load.message.object_storage.LoadablePart +import io.airbyte.cdk.load.write.FileBatchAccumulator +import java.io.File +import java.nio.file.Path + +@SuppressFBWarnings( + "NP_NONNULL_PARAM_VIOLATION", + justification = "state is guaranteed to be non-null by Kotlin's type system" +) +class FilePartAccumulator( + private val pathFactory: ObjectStoragePathFactory, + private val stream: DestinationStream, + private val outputQueue: MultiProducerChannel>, +) : FileBatchAccumulator { + override suspend fun processFilePart(file: DestinationFile, index: Long) { + val key = + Path.of(pathFactory.getFinalDirectory(stream), "${file.fileMessage.fileRelativePath}") + .toString() + + val part = + PartFactory( + key = key, + fileNumber = index, + ) + + val localFile = File(file.fileMessage.fileUrl) + val fileInputStream = localFile.inputStream() + + while (true) { + val bytePart = + ByteArray(ObjectStorageUploadConfiguration.DEFAULT_FILE_SIZE_BYTES.toInt()) + val read = fileInputStream.read(bytePart) + + if (read == -1) { + val filePart: ByteArray? = null + val batch = LoadablePart(part.nextPart(filePart, isFinal = true)) + handleFilePart(batch, stream.descriptor, index) + break + } else if (read < bytePart.size) { + val filePart: ByteArray = bytePart.copyOfRange(0, read) + val batch = LoadablePart(part.nextPart(filePart, isFinal = true)) + handleFilePart(batch, stream.descriptor, index) + break + } else { + val batch = LoadablePart(part.nextPart(bytePart, isFinal = false)) + handleFilePart(batch, stream.descriptor, index) + } + } + localFile.delete() + } + + private suspend fun handleFilePart( + batch: Batch, + streamDescriptor: DestinationStream.Descriptor, + index: Long, + ) { + val wrapped = BatchEnvelope(batch, Range.singleton(index), streamDescriptor) + outputQueue.publish(wrapped) + } +} diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt index 5f362fcffa49..d9fccdde6657 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt @@ -15,20 +15,22 @@ import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory import io.airbyte.cdk.load.file.object_storage.RemoteObject import io.airbyte.cdk.load.message.Batch -import io.airbyte.cdk.load.message.DestinationFile +import io.airbyte.cdk.load.message.BatchEnvelope +import io.airbyte.cdk.load.message.MultiProducerChannel +import io.airbyte.cdk.load.message.object_storage.* import io.airbyte.cdk.load.message.object_storage.LoadedObject import io.airbyte.cdk.load.message.object_storage.ObjectStorageBatch import io.airbyte.cdk.load.state.DestinationStateManager import io.airbyte.cdk.load.state.StreamProcessingFailed import io.airbyte.cdk.load.state.object_storage.ObjectStorageDestinationState import io.airbyte.cdk.load.write.BatchAccumulator +import io.airbyte.cdk.load.write.FileBatchAccumulator import io.airbyte.cdk.load.write.StreamLoader import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton import java.io.File import java.io.OutputStream -import java.nio.file.Path import java.util.concurrent.atomic.AtomicLong @Singleton @@ -97,36 +99,9 @@ class ObjectStorageStreamLoader, U : OutputStream>( ) } - override suspend fun processFile(file: DestinationFile): Batch { - if (pathFactory.supportsStaging) { - throw IllegalStateException("Staging is not supported for files") - } - val fileUrl = file.fileMessage.fileUrl ?: "" - if (!File(fileUrl).exists()) { - log.error { "File does not exist: $fileUrl" } - throw IllegalStateException("File does not exist: $fileUrl") - } - val key = - Path.of(pathFactory.getFinalDirectory(stream), "${file.fileMessage.fileRelativePath}") - .toString() - - val state = destinationStateManager.getState(stream) - state.addObject( - generationId = stream.generationId, - key = key, - partNumber = 0, - isStaging = false - ) - - val metadata = ObjectStorageDestinationState.metadataFor(stream) - val obj = - client.streamingUpload(key, metadata, streamProcessor = compressor) { outputStream -> - File(fileUrl).inputStream().use { it.copyTo(outputStream) } - } - val localFile = createFile(fileUrl) - localFile.delete() - return LoadedObject(remoteObject = obj, fileNumber = 0) - } + override suspend fun createFileBatchAccumulator( + outputQueue: MultiProducerChannel>, + ): FileBatchAccumulator = FilePartAccumulator(pathFactory, stream, outputQueue) @VisibleForTesting fun createFile(url: String) = File(url) diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulatorTest.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulatorTest.kt new file mode 100644 index 000000000000..25d232bd3328 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulatorTest.kt @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.write.object_storage + +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.command.object_storage.ObjectStorageUploadConfiguration +import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory +import io.airbyte.cdk.load.message.BatchEnvelope +import io.airbyte.cdk.load.message.DestinationFile +import io.airbyte.cdk.load.message.MultiProducerChannel +import io.mockk.coVerify +import io.mockk.every +import io.mockk.mockk +import java.io.File +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test + +class FilePartAccumulatorTest { + private val pathFactory: ObjectStoragePathFactory = mockk(relaxed = true) + private val stream: DestinationStream = mockk(relaxed = true) + private val outputQueue: MultiProducerChannel> = mockk(relaxed = true) + + private val filePartAccumulator = FilePartAccumulator(pathFactory, stream, outputQueue) + + private val fileRelativePath = "relativePath" + private val descriptor = DestinationStream.Descriptor("namespace", "name") + + @BeforeEach + fun init() { + every { stream.descriptor } returns descriptor + } + + @Test + fun testFilePartAccumulatorSmall() = runTest { + val finalDirectory = "finalDirectory" + every { pathFactory.getFinalDirectory(stream) } returns finalDirectory + val file = createFile(10) + val index = 21L + val fileMessage = createFileMessage(file) + + filePartAccumulator.processFilePart(fileMessage, index) + + coVerify(exactly = 1) { outputQueue.publish(any()) } + } + + @Test + fun testFilePartAccumulatorExactlyPartSize() = runTest { + val finalDirectory = "finalDirectory" + every { pathFactory.getFinalDirectory(stream) } returns finalDirectory + val file = createFile(ObjectStorageUploadConfiguration.DEFAULT_FILE_SIZE_BYTES.toInt()) + val index = 21L + val fileMessage = createFileMessage(file) + + filePartAccumulator.processFilePart(fileMessage, index) + + coVerify(exactly = 2) { outputQueue.publish(any()) } + } + + @Test + fun testFilePartAccumulatorBig() = runTest { + val finalDirectory = "finalDirectory" + every { pathFactory.getFinalDirectory(stream) } returns finalDirectory + val file = + createFile(ObjectStorageUploadConfiguration.DEFAULT_FILE_SIZE_BYTES.toInt() + 1000) + val index = 21L + val fileMessage = createFileMessage(file) + + filePartAccumulator.processFilePart(fileMessage, index) + + coVerify(exactly = 2) { outputQueue.publish(any()) } + } + + private fun createFile(sizeInBytes: Int): File { + val file = File.createTempFile("test", ".txt") + val text = CharArray(sizeInBytes) { 'a' }.concatToString() + file.writeText(text) + return file + } + + private fun createFileMessage(file: File): DestinationFile { + return DestinationFile( + descriptor, + 0, + "", + DestinationFile.AirbyteRecordMessageFile( + fileUrl = file.absolutePath, + fileRelativePath = fileRelativePath, + ) + ) + } +} diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderTest.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderTest.kt deleted file mode 100644 index 47631dd552c2..000000000000 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderTest.kt +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Copyright (c) 2024 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.cdk.load.write.object_storage - -import io.airbyte.cdk.load.command.DestinationStream -import io.airbyte.cdk.load.file.StreamProcessor -import io.airbyte.cdk.load.file.object_storage.BufferedFormattingWriterFactory -import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient -import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory -import io.airbyte.cdk.load.file.object_storage.RemoteObject -import io.airbyte.cdk.load.message.DestinationFile -import io.airbyte.cdk.load.message.object_storage.* -import io.airbyte.cdk.load.state.DestinationStateManager -import io.airbyte.cdk.load.state.object_storage.ObjectStorageDestinationState -import io.mockk.coEvery -import io.mockk.coVerify -import io.mockk.every -import io.mockk.mockk -import io.mockk.spyk -import io.mockk.verify -import java.io.ByteArrayOutputStream -import java.io.File -import java.nio.file.Files -import java.nio.file.Path -import kotlin.test.assertEquals -import kotlinx.coroutines.test.runTest -import org.junit.jupiter.api.Test - -class ObjectStorageStreamLoaderTest { - private val stream: DestinationStream = mockk(relaxed = true) - private val client: ObjectStorageClient> = mockk(relaxed = true) - private val compressor: StreamProcessor = mockk(relaxed = true) - private val pathFactory: ObjectStoragePathFactory = mockk(relaxed = true) - private val writerFactory: BufferedFormattingWriterFactory = - mockk(relaxed = true) - private val destinationStateManager: DestinationStateManager = - mockk(relaxed = true) - private val fileSize: Long = 2 - private val partSize: Long = 1 - - private val objectStorageStreamLoader = - spyk( - ObjectStorageStreamLoader( - stream, - client, - compressor, - pathFactory, - writerFactory, - destinationStateManager, - partSizeBytes = partSize, - fileSizeBytes = fileSize - ) - ) - - @Test - fun `test processFile`() = runTest { - val fileUrl = "/tmp/fileUrl" - Files.deleteIfExists(Path.of(fileUrl)) - Files.createFile(Path.of(fileUrl)) - val stagingDirectory = "stagingDirectory" - val generationId = 12L - val destinationFile = mockk() - every { destinationFile.fileMessage } returns - DestinationFile.AirbyteRecordMessageFile(fileUrl = fileUrl, fileRelativePath = fileUrl) - every { pathFactory.getFinalDirectory(any()) } returns stagingDirectory - every { stream.generationId } returns generationId - val mockedStateStorage: ObjectStorageDestinationState = mockk(relaxed = true) - coEvery { destinationStateManager.getState(stream) } returns mockedStateStorage - val mockedFile = mockk(relaxed = true) - every { objectStorageStreamLoader.createFile(any()) } returns mockedFile - - val expectedKey = Path.of(stagingDirectory, fileUrl).toString() - val metadata = - mapOf( - ObjectStorageDestinationState.METADATA_GENERATION_ID_KEY to generationId.toString() - ) - val mockRemoteObject: RemoteObject = mockk(relaxed = true) - coEvery { client.streamingUpload(any(), any(), compressor, any()) } returns mockRemoteObject - - val result = objectStorageStreamLoader.processFile(destinationFile) - - coVerify { mockedStateStorage.addObject(generationId, expectedKey, 0, false) } - coVerify { client.streamingUpload(expectedKey, metadata, compressor, any()) } - assertEquals(mockRemoteObject, (result as LoadedObject<*>).remoteObject) - verify { mockedFile.delete() } - Files.deleteIfExists(Path.of(fileUrl)) - } -} diff --git a/airbyte-cdk/bulk/toolkits/load-s3/build.gradle b/airbyte-cdk/bulk/toolkits/load-s3/build.gradle index 00336f267d49..e19bc337b342 100644 --- a/airbyte-cdk/bulk/toolkits/load-s3/build.gradle +++ b/airbyte-cdk/bulk/toolkits/load-s3/build.gradle @@ -5,5 +5,5 @@ dependencies { api project(':airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-object-storage') testFixturesApi(testFixtures(project(":airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-object-storage"))) - implementation("aws.sdk.kotlin:s3:1.0.0") + implementation("aws.sdk.kotlin:s3:1.3.94") } diff --git a/airbyte-integrations/connectors/destination-dev-null/metadata.yaml b/airbyte-integrations/connectors/destination-dev-null/metadata.yaml index 698f7f924ead..ad33d9ea4322 100644 --- a/airbyte-integrations/connectors/destination-dev-null/metadata.yaml +++ b/airbyte-integrations/connectors/destination-dev-null/metadata.yaml @@ -10,7 +10,7 @@ data: - suite: integrationTests connectorType: destination definitionId: a7bcc9d8-13b3-4e49-b80d-d020b90045e3 - dockerImageTag: 0.7.14 + dockerImageTag: 0.7.15 dockerRepository: airbyte/destination-dev-null documentationUrl: https://docs.airbyte.com/integrations/destinations/dev-null githubIssueLabel: destination-dev-null diff --git a/airbyte-integrations/connectors/destination-dev-null/src/main/kotlin/io/airbyte/integrations/destination/dev_null/DevNullWriter.kt b/airbyte-integrations/connectors/destination-dev-null/src/main/kotlin/io/airbyte/integrations/destination/dev_null/DevNullWriter.kt index cdb5da037782..13a6e7d669a7 100644 --- a/airbyte-integrations/connectors/destination-dev-null/src/main/kotlin/io/airbyte/integrations/destination/dev_null/DevNullWriter.kt +++ b/airbyte-integrations/connectors/destination-dev-null/src/main/kotlin/io/airbyte/integrations/destination/dev_null/DevNullWriter.kt @@ -7,7 +7,6 @@ package io.airbyte.integrations.destination.dev_null import edu.umd.cs.findbugs.annotations.SuppressFBWarnings import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.message.Batch -import io.airbyte.cdk.load.message.DestinationFile import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue import io.airbyte.cdk.load.message.SimpleBatch import io.airbyte.cdk.load.write.DestinationWriter @@ -90,12 +89,6 @@ class LoggingStreamLoader(override val stream: DestinationStream, loggingConfig: return SimpleBatch(state = Batch.State.COMPLETE) } - - override suspend fun processFile(file: DestinationFile): Batch { - log.info { "Ignoring file" } - - return SimpleBatch(state = Batch.State.COMPLETE) - } } class SilentStreamLoader(override val stream: DestinationStream) : StreamLoader { @@ -106,10 +99,6 @@ class SilentStreamLoader(override val stream: DestinationStream) : StreamLoader ): Batch { return SimpleBatch(state = Batch.State.COMPLETE) } - - override suspend fun processFile(file: DestinationFile): Batch { - return SimpleBatch(state = Batch.State.COMPLETE) - } } @SuppressFBWarnings( @@ -134,14 +123,6 @@ class ThrottledStreamLoader( return SimpleBatch(state = Batch.State.COMPLETE) } - - override suspend fun processFile(file: DestinationFile): Batch { - log.info { "Processing a file with delay of $millisPerRecord" } - delay(millisPerRecord) - log.info { "Completed file." } - - return SimpleBatch(state = Batch.State.COMPLETE) - } } class FailingStreamLoader(override val stream: DestinationStream, private val numMessages: Int) : @@ -173,20 +154,4 @@ class FailingStreamLoader(override val stream: DestinationStream, private val nu return SimpleBatch(state = Batch.State.COMPLETE) } - - override suspend fun processFile(file: DestinationFile): Batch { - log.info { "Processing a file with failure after $numMessages messages" } - - messageCount.getAndIncrement().let { messageCount -> - if (messageCount > numMessages) { - val message = - "Failing Destination(stream=$stream, numMessages=$numMessages: failing at $file" - log.info { message } - throw RuntimeException(message) - } - } - log.info { "Completed file." } - - return SimpleBatch(state = Batch.State.COMPLETE) - } } diff --git a/airbyte-integrations/connectors/destination-dev-null/src/test-integration/kotlin/io/airbyte/integrations/destination/dev_null/DevNullBasicFunctionalityIntegrationTest.kt b/airbyte-integrations/connectors/destination-dev-null/src/test-integration/kotlin/io/airbyte/integrations/destination/dev_null/DevNullBasicFunctionalityIntegrationTest.kt index 510bb6225ef0..908df3d2e02e 100644 --- a/airbyte-integrations/connectors/destination-dev-null/src/test-integration/kotlin/io/airbyte/integrations/destination/dev_null/DevNullBasicFunctionalityIntegrationTest.kt +++ b/airbyte-integrations/connectors/destination-dev-null/src/test-integration/kotlin/io/airbyte/integrations/destination/dev_null/DevNullBasicFunctionalityIntegrationTest.kt @@ -38,9 +38,7 @@ class DevNullBasicFunctionalityIntegrationTest : super.testMidSyncCheckpointingStreamState() } - @Test - @Disabled("File transfer is not supported") - override fun testBasicWriteFile() { - super.testBasicWriteFile() - } + @Test @Disabled("File transfer is not supported") override fun testBasicWriteFile() {} + + @Test @Disabled("DevNull does not support Unknown types") override fun testUnknownTypes() {} } diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml b/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml index d8fbfc8a9d5e..091c226bc62e 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml +++ b/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml @@ -16,7 +16,7 @@ data: alias: airbyte-connector-testing-secret-store connectorType: destination definitionId: 37a928c1-2d5c-431a-a97d-ae236bd1ea0c - dockerImageTag: 0.2.3 + dockerImageTag: 0.2.4 dockerRepository: airbyte/destination-iceberg-v2 documentationUrl: https://docs.airbyte.com/integrations/destinations/s3 githubIssueLabel: destination-iceberg-v2 diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergStreamLoader.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergStreamLoader.kt index 0f46cdbd61c2..7f20f5b9f55d 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergStreamLoader.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergStreamLoader.kt @@ -8,7 +8,6 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.data.MapperPipeline import io.airbyte.cdk.load.message.Batch -import io.airbyte.cdk.load.message.DestinationFile import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue import io.airbyte.cdk.load.message.SimpleBatch import io.airbyte.cdk.load.state.StreamProcessingFailed @@ -71,10 +70,6 @@ class IcebergStreamLoader( return SimpleBatch(Batch.State.COMPLETE) } - override suspend fun processFile(file: DestinationFile): Batch { - throw NotImplementedError("Destination Iceberg does not support universal file transfer.") - } - override suspend fun close(streamFailure: StreamProcessingFailed?) { if (streamFailure == null) { // Doing it first to make sure that data coming in the current batch is written to the diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt index d09cb90af109..0a6c656f9de8 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt @@ -81,7 +81,11 @@ class IcebergGlueWriteTest : IcebergV2TestUtil.parseConfig(IcebergV2TestUtil.GLUE_CONFIG_PATH) ) ), - ) + ) { + @Test + @Disabled("dest iceberge-v2 doesn't support unknown types") + override fun testUnknownTypes() {} +} @Disabled( "This is currently disabled until we are able to make it run via airbyte-ci. It works as expected locally" diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2AvroDestinationAcceptanceTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2AvroDestinationAcceptanceTest.kt index 1ca6a2aaf11f..82b8311d0c29 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2AvroDestinationAcceptanceTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2AvroDestinationAcceptanceTest.kt @@ -25,4 +25,5 @@ class S3V2AvroDestinationAcceptanceTest : S3BaseAvroDestinationAcceptanceTest() // Disable these tests until we fix the incomplete stream handling behavior. override fun testOverwriteSyncMultipleFailedGenerationsFilesPreserved() {} override fun testOverwriteSyncFailedResumedGeneration() {} + override fun testFakeFileTransfer() {} } diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvDestinationAcceptanceTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvDestinationAcceptanceTest.kt index b695bf4c7a20..448f5344e6c0 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvDestinationAcceptanceTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvDestinationAcceptanceTest.kt @@ -19,4 +19,5 @@ class S3V2CsvDestinationAcceptanceTest : S3BaseCsvDestinationAcceptanceTest() { // Disable these tests until we fix the incomplete stream handling behavior. override fun testOverwriteSyncMultipleFailedGenerationsFilesPreserved() {} override fun testOverwriteSyncFailedResumedGeneration() {} + override fun testFakeFileTransfer() {} } diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvGzipDestinationAcceptanceTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvGzipDestinationAcceptanceTest.kt index 922312d10e62..e92bfa8a60ba 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvGzipDestinationAcceptanceTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvGzipDestinationAcceptanceTest.kt @@ -19,4 +19,6 @@ class S3V2CsvGzipDestinationAcceptanceTest : S3BaseCsvGzipDestinationAcceptanceT // Disable these tests until we fix the incomplete stream handling behavior. override fun testOverwriteSyncMultipleFailedGenerationsFilesPreserved() {} override fun testOverwriteSyncFailedResumedGeneration() {} + + override fun testFakeFileTransfer() {} } diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2JsonlDestinationAcceptanceTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2JsonlDestinationAcceptanceTest.kt index 1090fdc4e595..d08304b48bb2 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2JsonlDestinationAcceptanceTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2JsonlDestinationAcceptanceTest.kt @@ -19,4 +19,5 @@ class S3V2JsonlDestinationAcceptanceTest : S3BaseJsonlDestinationAcceptanceTest( // Disable these tests until we fix the incomplete stream handling behavior. override fun testOverwriteSyncMultipleFailedGenerationsFilesPreserved() {} override fun testOverwriteSyncFailedResumedGeneration() {} + override fun testFakeFileTransfer() {} } diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2JsonlGzipDestinationAcceptanceTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2JsonlGzipDestinationAcceptanceTest.kt index e6ffe789bdf1..63634caa57fb 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2JsonlGzipDestinationAcceptanceTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2JsonlGzipDestinationAcceptanceTest.kt @@ -19,4 +19,5 @@ class S3V2JsonlGzipDestinationAcceptanceTest : S3BaseJsonlGzipDestinationAccepta // Disable these tests until we fix the incomplete stream handling behavior. override fun testOverwriteSyncMultipleFailedGenerationsFilesPreserved() {} override fun testOverwriteSyncFailedResumedGeneration() {} + override fun testFakeFileTransfer() {} } diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2ParquetDestinationAcceptanceTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2ParquetDestinationAcceptanceTest.kt index 5c19502dd729..900240a78aa7 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2ParquetDestinationAcceptanceTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2ParquetDestinationAcceptanceTest.kt @@ -77,4 +77,5 @@ class S3V2ParquetDestinationAcceptanceTest : S3BaseParquetDestinationAcceptanceT // Disable these tests until we fix the incomplete stream handling behavior. override fun testOverwriteSyncMultipleFailedGenerationsFilesPreserved() {} override fun testOverwriteSyncFailedResumedGeneration() {} + override fun testFakeFileTransfer() {} } diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt index 896a8da27a9f..02169a3a85bd 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt @@ -50,7 +50,7 @@ abstract class S3V2WriteTest( super.testAppendSchemaEvolution() } - @Disabled("Temporarily disable because failing in CI") + @Disabled("For most test the file test is not needed since it doesn't apply compression") @Test override fun testBasicWriteFile() { super.testBasicWriteFile() @@ -74,6 +74,11 @@ class S3V2WriteTestJsonUncompressed : override fun testBasicTypes() { super.testBasicTypes() } + + @Test + override fun testBasicWriteFile() { + super.testBasicWriteFile() + } } class S3V2WriteTestJsonRootLevelFlattening : @@ -121,7 +126,12 @@ class S3V2WriteTestCsvUncompressed : promoteUnionToObject = false, preserveUndeclaredFields = true, allTypesBehavior = Untyped, - ) + ) { + @Test + override fun testBasicWriteFile() { + super.testBasicWriteFile() + } +} class S3V2WriteTestCsvRootLevelFlattening : S3V2WriteTest( diff --git a/docs/integrations/destinations/dev-null.md b/docs/integrations/destinations/dev-null.md index 8b3d8c0604b2..0426772b86f0 100644 --- a/docs/integrations/destinations/dev-null.md +++ b/docs/integrations/destinations/dev-null.md @@ -49,6 +49,7 @@ The OSS and Cloud variants have the same version number starting from version `0 | Version | Date | Pull Request | Subject | |:------------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------------------------| +| 0.7.15 | 2024-12-19 | [49899](https://github.com/airbytehq/airbyte/pull/49931) | Non-functional CDK changes | | 0.7.14 | 2024-12-20 | [49974](https://github.com/airbytehq/airbyte/pull/49974) | Non-functional CDK changes | | 0.7.13 | 2024-12-18 | [49899](https://github.com/airbytehq/airbyte/pull/49899) | Use a base image: airbyte/java-connector-base:1.0.0 | | 0.7.12 | 2024-12-04 | [48794](https://github.com/airbytehq/airbyte/pull/48794) | Promoting release candidate 0.7.12-rc.2 to a main version. |