Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
benmoriceau committed Dec 17, 2024
1 parent af41f97 commit c038327
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,45 +39,13 @@ class DefaultProcessFileTask(
val streamLoader = syncManager.getOrAwaitStreamLoader(streamDescriptor)

val acc = accumulators.getOrPut(streamDescriptor) {
streamLoader.createBatchAccumulator(true)
streamLoader.createFileBatchAccumulator(taskLauncher, outputQueue)
}

val localFile = File(file.fileMessage.fileUrl)
val fileInputStream = localFile.inputStream()

while (true) {
val bytePart = ByteArray(1024 * 1024 * 10)
val read = fileInputStream.read(bytePart)

if (read == -1) {
handleFilePart(file, ByteArray(0), index, true, acc, streamDescriptor, index)
break
} else if (read < bytePart.size) {
handleFilePart(file, bytePart.copyOfRange(0, read), index, true, acc, streamDescriptor, index)
break
} else {
handleFilePart(file, bytePart, index, false, acc, streamDescriptor, index)
}
}
localFile.delete()
acc.processFilePart(file, index)
}
}
}

private suspend fun handleFilePart(file: DestinationFile,
bytePart: ByteArray,
partCount: Long,
endOfStream: Boolean, acc: BatchAccumulator,
streamDescriptor: DestinationStream.Descriptor,
index: Long,) {
val batch = acc.processFilePart(file, bytePart, partCount, endOfStream)
val wrapped = BatchEnvelope(batch, Range.singleton(index), streamDescriptor)
taskLauncher.handleNewBatch(streamDescriptor, wrapped)
if (batch.requiresProcessing) {
outputQueue.publish(wrapped)
}

}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.cdk.load.task.internal

import com.google.common.collect.Range
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.load.command.DestinationStream
Expand Down Expand Up @@ -109,7 +110,8 @@ class DefaultInputConsumerTask(
val envelope =
BatchEnvelope(
SimpleBatch(Batch.State.COMPLETE),
streamDescriptor = message.stream
streamDescriptor = message.stream,
range = Range.singleton(manager.markEndOfStream(true))
)
destinationTaskLauncher.handleNewBatch(stream, envelope)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ package io.airbyte.cdk.load.write

import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.load.message.BatchEnvelope
import io.airbyte.cdk.load.message.DestinationFile
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.message.MultiProducerChannel
import io.airbyte.cdk.load.message.SimpleBatch
import io.airbyte.cdk.load.state.StreamProcessingFailed
import io.airbyte.cdk.load.task.DestinationTaskLauncher

/**
* Implementor interface.
Expand Down Expand Up @@ -44,7 +47,9 @@ interface StreamLoader : BatchAccumulator {
val stream: DestinationStream

suspend fun start() {}
suspend fun createBatchAccumulator(isFile: Boolean = false): BatchAccumulator = this
suspend fun createBatchAccumulator(): BatchAccumulator = this
suspend fun createFileBatchAccumulator(taskLauncher: DestinationTaskLauncher,
outputQueue: MultiProducerChannel<BatchEnvelope<*>>,): BatchAccumulator = this

suspend fun processFile(file: DestinationFile): Batch
suspend fun processBatch(batch: Batch): Batch = SimpleBatch(Batch.State.COMPLETE)
Expand All @@ -61,12 +66,7 @@ interface BatchAccumulator {
"processRecords must be implemented if createBatchAccumulator is overridden"
)

suspend fun processFilePart(
file: DestinationFile,
filePart: ByteArray,
index: Long,
endOfStream: Boolean = false
): Batch =
suspend fun processFilePart(file: DestinationFile, index: Long): Unit =
throw NotImplementedError(
"processRecords must be implemented if createBatchAccumulator is overridden"
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
package io.airbyte.cdk.load.write.object_storage

import com.google.common.collect.Range
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory
import io.airbyte.cdk.load.file.object_storage.PartFactory
import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.load.message.BatchEnvelope
import io.airbyte.cdk.load.message.DestinationFile
import io.airbyte.cdk.load.message.MultiProducerChannel
import io.airbyte.cdk.load.message.object_storage.LoadablePart
import io.airbyte.cdk.load.task.DestinationTaskLauncher
import io.airbyte.cdk.load.write.BatchAccumulator
import java.io.File
import java.nio.file.Path

class FilePartAccumulator(
private val pathFactory: ObjectStoragePathFactory,
private val stream: DestinationStream,
private val taskLauncher: DestinationTaskLauncher,
private val outputQueue: MultiProducerChannel<BatchEnvelope<*>>,
): BatchAccumulator {
override suspend fun processFilePart(file: DestinationFile, filePart: ByteArray, index: Long, endOfStream: Boolean): Batch {
override suspend fun processFilePart(file: DestinationFile, index: Long) {
val key =
Path.of(pathFactory.getFinalDirectory(stream), "${file.fileMessage.fileRelativePath}")
.toString()
Expand All @@ -23,6 +30,40 @@ class FilePartAccumulator(
fileNumber = index,
)

return LoadablePart(part.nextPart(filePart, isFinal = endOfStream))
val localFile = File(file.fileMessage.fileUrl)
val fileInputStream = localFile.inputStream()

while (true) {
val bytePart = ByteArray(1024 * 1024 * 10)
val read = fileInputStream.read(bytePart)

if (read == -1) {
val filePart: ByteArray? = null
val batch = LoadablePart(part.nextPart(filePart, isFinal = true))
handleFilePart(batch, stream.descriptor, index)
break
} else if (read < bytePart.size) {
val filePart: ByteArray = bytePart.copyOfRange(0, read)
val batch = LoadablePart(part.nextPart(filePart, isFinal = true))
handleFilePart(batch, stream.descriptor, index)
break
} else {
val batch = LoadablePart(part.nextPart(bytePart, isFinal = false))
handleFilePart(batch, stream.descriptor, index)
}
}
localFile.delete()
}

private suspend fun handleFilePart(batch: Batch,
streamDescriptor: DestinationStream.Descriptor,
index: Long,) {

val wrapped = BatchEnvelope(batch, Range.singleton(index), streamDescriptor)
taskLauncher.handleNewBatch(streamDescriptor, wrapped)
if (batch.requiresProcessing) {
outputQueue.publish(wrapped)
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory
import io.airbyte.cdk.load.file.object_storage.PartFactory
import io.airbyte.cdk.load.file.object_storage.RemoteObject
import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.load.message.BatchEnvelope
import io.airbyte.cdk.load.message.DestinationFile
import io.airbyte.cdk.load.message.MultiProducerChannel
import io.airbyte.cdk.load.message.object_storage.*
import io.airbyte.cdk.load.state.DestinationStateManager
import io.airbyte.cdk.load.state.StreamProcessingFailed
import io.airbyte.cdk.load.state.object_storage.ObjectStorageDestinationState
import io.airbyte.cdk.load.task.DestinationTaskLauncher
import io.airbyte.cdk.load.write.BatchAccumulator
import io.airbyte.cdk.load.write.StreamLoader
import io.github.oshai.kotlinlogging.KotlinLogging
Expand Down Expand Up @@ -88,22 +91,18 @@ class ObjectStorageStreamLoader<T : RemoteObject<*>, U : OutputStream>(
val writer: BufferedFormattingWriter<T>,
)

override suspend fun createBatchAccumulator(isFile: Boolean): BatchAccumulator {
return if (isFile) {
FilePartAccumulator(
pathFactory,
stream
)
} else {
RecordToPartAccumulator(
pathFactory,
bufferedWriterFactory,
recordBatchSizeBytes,
stream,
fileNumber
)
}
}
override suspend fun createBatchAccumulator(): BatchAccumulator =
RecordToPartAccumulator(
pathFactory,
bufferedWriterFactory,
recordBatchSizeBytes,
stream,
fileNumber
)

override suspend fun createFileBatchAccumulator(taskLauncher: DestinationTaskLauncher,
outputQueue: MultiProducerChannel<BatchEnvelope<*>>,): BatchAccumulator =
FilePartAccumulator(pathFactory, stream, taskLauncher, outputQueue)

override suspend fun processFile(file: DestinationFile): Batch {
if (pathFactory.supportsStaging) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ abstract class S3V2WriteTest(
super.testAppendSchemaEvolution()
}

/*@Disabled("Temporarily disable because failing in CI")
@Disabled("For most test the file test is not needed since it doesn't apply compression")
@Test
override fun testBasicWriteFile() {
super.testBasicWriteFile()
}*/
}
}

class S3V2WriteTestJsonUncompressed :
Expand All @@ -67,6 +67,11 @@ class S3V2WriteTestJsonUncompressed :
override fun testInterruptedTruncateWithPriorData() {
super.testInterruptedTruncateWithPriorData()
}

@Test
override fun testBasicWriteFile() {
super.testBasicWriteFile()
}
}

class S3V2WriteTestJsonRootLevelFlattening :
Expand Down Expand Up @@ -114,7 +119,12 @@ class S3V2WriteTestCsvUncompressed :
promoteUnionToObject = false,
preserveUndeclaredFields = true,
allTypesBehavior = Untyped,
)
) {
@Test
override fun testBasicWriteFile() {
super.testBasicWriteFile()
}
}

class S3V2WriteTestCsvRootLevelFlattening :
S3V2WriteTest(
Expand Down

0 comments on commit c038327

Please sign in to comment.