From c0383270ce1f643f913fc5c31fbfbdf88e8df827 Mon Sep 17 00:00:00 2001 From: benmoriceau Date: Tue, 17 Dec 2024 07:40:07 -0800 Subject: [PATCH] Fix --- .../load/task/implementor/ProcessFileTask.kt | 36 +-------------- .../load/task/internal/InputConsumerTask.kt | 4 +- .../io/airbyte/cdk/load/write/StreamLoader.kt | 14 +++--- .../object_storage/FilePartAccumulator.kt | 45 ++++++++++++++++++- .../ObjectStorageStreamLoaderFactory.kt | 31 +++++++------ .../destination/s3_v2/S3V2WriteTest.kt | 16 +++++-- 6 files changed, 83 insertions(+), 63 deletions(-) diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessFileTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessFileTask.kt index 6a828afad6f3..f399bbd9af9a 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessFileTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessFileTask.kt @@ -39,45 +39,13 @@ class DefaultProcessFileTask( val streamLoader = syncManager.getOrAwaitStreamLoader(streamDescriptor) val acc = accumulators.getOrPut(streamDescriptor) { - streamLoader.createBatchAccumulator(true) + streamLoader.createFileBatchAccumulator(taskLauncher, outputQueue) } - val localFile = File(file.fileMessage.fileUrl) - val fileInputStream = localFile.inputStream() - - while (true) { - val bytePart = ByteArray(1024 * 1024 * 10) - val read = fileInputStream.read(bytePart) - - if (read == -1) { - handleFilePart(file, ByteArray(0), index, true, acc, streamDescriptor, index) - break - } else if (read < bytePart.size) { - handleFilePart(file, bytePart.copyOfRange(0, read), index, true, acc, streamDescriptor, index) - break - } else { - handleFilePart(file, bytePart, index, false, acc, streamDescriptor, index) - } - } - localFile.delete() + acc.processFilePart(file, index) } } } - - private suspend fun handleFilePart(file: DestinationFile, - bytePart: ByteArray, - partCount: Long, - endOfStream: Boolean, acc: BatchAccumulator, - streamDescriptor: DestinationStream.Descriptor, - index: Long,) { - val batch = acc.processFilePart(file, bytePart, partCount, endOfStream) - val wrapped = BatchEnvelope(batch, Range.singleton(index), streamDescriptor) - taskLauncher.handleNewBatch(streamDescriptor, wrapped) - if (batch.requiresProcessing) { - outputQueue.publish(wrapped) - } - - } } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt index 11e6df7d035d..538247878b74 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt @@ -4,6 +4,7 @@ package io.airbyte.cdk.load.task.internal +import com.google.common.collect.Range import edu.umd.cs.findbugs.annotations.SuppressFBWarnings import io.airbyte.cdk.load.command.DestinationCatalog import io.airbyte.cdk.load.command.DestinationStream @@ -109,7 +110,8 @@ class DefaultInputConsumerTask( val envelope = BatchEnvelope( SimpleBatch(Batch.State.COMPLETE), - streamDescriptor = message.stream + streamDescriptor = message.stream, + range = Range.singleton(manager.markEndOfStream(true)) ) destinationTaskLauncher.handleNewBatch(stream, envelope) } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamLoader.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamLoader.kt index 53ee5b583793..e1f5571c64f2 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamLoader.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamLoader.kt @@ -6,10 +6,13 @@ package io.airbyte.cdk.load.write import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.message.Batch +import io.airbyte.cdk.load.message.BatchEnvelope import io.airbyte.cdk.load.message.DestinationFile import io.airbyte.cdk.load.message.DestinationRecord +import io.airbyte.cdk.load.message.MultiProducerChannel import io.airbyte.cdk.load.message.SimpleBatch import io.airbyte.cdk.load.state.StreamProcessingFailed +import io.airbyte.cdk.load.task.DestinationTaskLauncher /** * Implementor interface. @@ -44,7 +47,9 @@ interface StreamLoader : BatchAccumulator { val stream: DestinationStream suspend fun start() {} - suspend fun createBatchAccumulator(isFile: Boolean = false): BatchAccumulator = this + suspend fun createBatchAccumulator(): BatchAccumulator = this + suspend fun createFileBatchAccumulator(taskLauncher: DestinationTaskLauncher, + outputQueue: MultiProducerChannel>,): BatchAccumulator = this suspend fun processFile(file: DestinationFile): Batch suspend fun processBatch(batch: Batch): Batch = SimpleBatch(Batch.State.COMPLETE) @@ -61,12 +66,7 @@ interface BatchAccumulator { "processRecords must be implemented if createBatchAccumulator is overridden" ) - suspend fun processFilePart( - file: DestinationFile, - filePart: ByteArray, - index: Long, - endOfStream: Boolean = false - ): Batch = + suspend fun processFilePart(file: DestinationFile, index: Long): Unit = throw NotImplementedError( "processRecords must be implemented if createBatchAccumulator is overridden" ) diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulator.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulator.kt index b95fe993da58..042b5d1af292 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulator.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulator.kt @@ -1,19 +1,26 @@ package io.airbyte.cdk.load.write.object_storage +import com.google.common.collect.Range import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory import io.airbyte.cdk.load.file.object_storage.PartFactory import io.airbyte.cdk.load.message.Batch +import io.airbyte.cdk.load.message.BatchEnvelope import io.airbyte.cdk.load.message.DestinationFile +import io.airbyte.cdk.load.message.MultiProducerChannel import io.airbyte.cdk.load.message.object_storage.LoadablePart +import io.airbyte.cdk.load.task.DestinationTaskLauncher import io.airbyte.cdk.load.write.BatchAccumulator +import java.io.File import java.nio.file.Path class FilePartAccumulator( private val pathFactory: ObjectStoragePathFactory, private val stream: DestinationStream, + private val taskLauncher: DestinationTaskLauncher, + private val outputQueue: MultiProducerChannel>, ): BatchAccumulator { - override suspend fun processFilePart(file: DestinationFile, filePart: ByteArray, index: Long, endOfStream: Boolean): Batch { + override suspend fun processFilePart(file: DestinationFile, index: Long) { val key = Path.of(pathFactory.getFinalDirectory(stream), "${file.fileMessage.fileRelativePath}") .toString() @@ -23,6 +30,40 @@ class FilePartAccumulator( fileNumber = index, ) - return LoadablePart(part.nextPart(filePart, isFinal = endOfStream)) + val localFile = File(file.fileMessage.fileUrl) + val fileInputStream = localFile.inputStream() + + while (true) { + val bytePart = ByteArray(1024 * 1024 * 10) + val read = fileInputStream.read(bytePart) + + if (read == -1) { + val filePart: ByteArray? = null + val batch = LoadablePart(part.nextPart(filePart, isFinal = true)) + handleFilePart(batch, stream.descriptor, index) + break + } else if (read < bytePart.size) { + val filePart: ByteArray = bytePart.copyOfRange(0, read) + val batch = LoadablePart(part.nextPart(filePart, isFinal = true)) + handleFilePart(batch, stream.descriptor, index) + break + } else { + val batch = LoadablePart(part.nextPart(bytePart, isFinal = false)) + handleFilePart(batch, stream.descriptor, index) + } + } + localFile.delete() + } + + private suspend fun handleFilePart(batch: Batch, + streamDescriptor: DestinationStream.Descriptor, + index: Long,) { + + val wrapped = BatchEnvelope(batch, Range.singleton(index), streamDescriptor) + taskLauncher.handleNewBatch(streamDescriptor, wrapped) + if (batch.requiresProcessing) { + outputQueue.publish(wrapped) + } + } } diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt index 4a6de36159cb..fd1962116e52 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt @@ -17,11 +17,14 @@ import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory import io.airbyte.cdk.load.file.object_storage.PartFactory import io.airbyte.cdk.load.file.object_storage.RemoteObject import io.airbyte.cdk.load.message.Batch +import io.airbyte.cdk.load.message.BatchEnvelope import io.airbyte.cdk.load.message.DestinationFile +import io.airbyte.cdk.load.message.MultiProducerChannel import io.airbyte.cdk.load.message.object_storage.* import io.airbyte.cdk.load.state.DestinationStateManager import io.airbyte.cdk.load.state.StreamProcessingFailed import io.airbyte.cdk.load.state.object_storage.ObjectStorageDestinationState +import io.airbyte.cdk.load.task.DestinationTaskLauncher import io.airbyte.cdk.load.write.BatchAccumulator import io.airbyte.cdk.load.write.StreamLoader import io.github.oshai.kotlinlogging.KotlinLogging @@ -88,22 +91,18 @@ class ObjectStorageStreamLoader, U : OutputStream>( val writer: BufferedFormattingWriter, ) - override suspend fun createBatchAccumulator(isFile: Boolean): BatchAccumulator { - return if (isFile) { - FilePartAccumulator( - pathFactory, - stream - ) - } else { - RecordToPartAccumulator( - pathFactory, - bufferedWriterFactory, - recordBatchSizeBytes, - stream, - fileNumber - ) - } - } + override suspend fun createBatchAccumulator(): BatchAccumulator = + RecordToPartAccumulator( + pathFactory, + bufferedWriterFactory, + recordBatchSizeBytes, + stream, + fileNumber + ) + + override suspend fun createFileBatchAccumulator(taskLauncher: DestinationTaskLauncher, + outputQueue: MultiProducerChannel>,): BatchAccumulator = + FilePartAccumulator(pathFactory, stream, taskLauncher, outputQueue) override suspend fun processFile(file: DestinationFile): Batch { if (pathFactory.supportsStaging) { diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt index 38489ed53469..c8ffeac6daaa 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt @@ -48,11 +48,11 @@ abstract class S3V2WriteTest( super.testAppendSchemaEvolution() } - /*@Disabled("Temporarily disable because failing in CI") + @Disabled("For most test the file test is not needed since it doesn't apply compression") @Test override fun testBasicWriteFile() { super.testBasicWriteFile() - }*/ + } } class S3V2WriteTestJsonUncompressed : @@ -67,6 +67,11 @@ class S3V2WriteTestJsonUncompressed : override fun testInterruptedTruncateWithPriorData() { super.testInterruptedTruncateWithPriorData() } + + @Test + override fun testBasicWriteFile() { + super.testBasicWriteFile() + } } class S3V2WriteTestJsonRootLevelFlattening : @@ -114,7 +119,12 @@ class S3V2WriteTestCsvUncompressed : promoteUnionToObject = false, preserveUndeclaredFields = true, allTypesBehavior = Untyped, - ) + ) { + @Test + override fun testBasicWriteFile() { + super.testBasicWriteFile() + } + } class S3V2WriteTestCsvRootLevelFlattening : S3V2WriteTest(