diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt index c3b2db03ba77..771e3b635d1b 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt @@ -51,6 +51,7 @@ class ObjectStorageStreamLoaderFactory, U : OutputStream>( pathFactory, bufferedWriterFactory, destinationStateManager, + uploadConfigurationProvider.objectStorageUploadConfiguration.uploadPartSizeBytes, uploadConfigurationProvider.objectStorageUploadConfiguration.fileSizeBytes ) } @@ -67,7 +68,8 @@ class ObjectStorageStreamLoader, U : OutputStream>( private val pathFactory: ObjectStoragePathFactory, private val bufferedWriterFactory: BufferedFormattingWriterFactory, private val destinationStateManager: DestinationStateManager, - private val recordBatchSizeBytes: Long, + private val partSizeBytes: Long, + private val fileSizeBytes: Long, ) : StreamLoader { private val log = KotlinLogging.logger {} @@ -88,7 +90,8 @@ class ObjectStorageStreamLoader, U : OutputStream>( return RecordToPartAccumulator( pathFactory, bufferedWriterFactory, - recordBatchSizeBytes, + partSizeBytes = partSizeBytes, + fileSizeBytes = fileSizeBytes, stream, fileNumber ) diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulator.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulator.kt index 4ebfe5a79634..5434e79002d0 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulator.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulator.kt @@ -26,7 +26,8 @@ data class ObjectInProgress( class RecordToPartAccumulator( private val pathFactory: ObjectStoragePathFactory, private val bufferedWriterFactory: BufferedFormattingWriterFactory, - private val recordBatchSizeBytes: Long, + private val partSizeBytes: Long, + private val fileSizeBytes: Long, private val stream: DestinationStream, private val fileNumber: AtomicLong, ) : BatchAccumulator { @@ -66,26 +67,37 @@ class RecordToPartAccumulator( partialUpload.writer.flush() // Check if we have reached the target size. - val newSize = partialUpload.partFactory.totalSize + partialUpload.writer.bufferSize - if (newSize >= recordBatchSizeBytes || endOfStream) { + val bufferSize = partialUpload.writer.bufferSize + val newSize = partialUpload.partFactory.totalSize + bufferSize + if (newSize >= fileSizeBytes || endOfStream) { - // If we have reached target size, clear the object and yield a final part. + // If we have reached target file size, clear the object and yield a final part. val bytes = partialUpload.writer.finish() partialUpload.writer.close() val part = partialUpload.partFactory.nextPart(bytes, isFinal = true) log.info { - "Size $newSize/${recordBatchSizeBytes}b reached (endOfStream=$endOfStream), yielding final part ${part.partIndex} (empty=${part.isEmpty})" + val reason = if (endOfStream) "end of stream" else "file size ${fileSizeBytes}b" + "Buffered: ${bufferSize}b; total: ${newSize}b; $reason reached, yielding final part ${part.partIndex}" } currentObject.remove(key) return LoadablePart(part) - } else { - // If we have not reached target size, just yield the next part. + } else if (bufferSize >= partSizeBytes) { + // If we have not reached file size, but have reached part size, yield a non-final part. val bytes = partialUpload.writer.takeBytes() val part = partialUpload.partFactory.nextPart(bytes) log.info { - "Size $newSize/${recordBatchSizeBytes}b not reached, yielding part ${part.partIndex} (empty=${part.isEmpty})" + "Buffered: ${bufferSize}b; total ${newSize}b; part size ${partSizeBytes}b reached, yielding part ${part.partIndex}" + } + + return LoadablePart(part) + } else { + // If we have not reached either the file or part size, yield a null part. + // TODO: Change this to a generator interface so we never have to do this. + val part = partialUpload.partFactory.nextPart(null) + log.info { + "Buffered: ${bufferSize}b; total ${newSize}b; part size ${partSizeBytes}b not reached, yielding null part ${part.partIndex}" } return LoadablePart(part) diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulatorTest.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulatorTest.kt index 511b2b956928..64d3c1e1227e 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulatorTest.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulatorTest.kt @@ -63,7 +63,7 @@ class RecordToPartAccumulatorTest { RecordToPartAccumulator( pathFactory = pathFactory, bufferedWriterFactory = bufferedWriterFactory, - recordBatchSizeBytes = recordBatchSizeBytes, + fileSizeBytes = recordBatchSizeBytes, stream = stream, fileNumber = fileNumber )