Skip to content

Commit

Permalink
WIP fix tiny parts
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt committed Dec 17, 2024
1 parent 0cd8afe commit 729aa87
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class ObjectStorageStreamLoaderFactory<T : RemoteObject<*>, U : OutputStream>(
pathFactory,
bufferedWriterFactory,
destinationStateManager,
uploadConfigurationProvider.objectStorageUploadConfiguration.uploadPartSizeBytes,
uploadConfigurationProvider.objectStorageUploadConfiguration.fileSizeBytes
)
}
Expand All @@ -67,7 +68,8 @@ class ObjectStorageStreamLoader<T : RemoteObject<*>, U : OutputStream>(
private val pathFactory: ObjectStoragePathFactory,
private val bufferedWriterFactory: BufferedFormattingWriterFactory<U>,
private val destinationStateManager: DestinationStateManager<ObjectStorageDestinationState>,
private val recordBatchSizeBytes: Long,
private val partSizeBytes: Long,
private val fileSizeBytes: Long,
) : StreamLoader {
private val log = KotlinLogging.logger {}

Expand All @@ -88,7 +90,8 @@ class ObjectStorageStreamLoader<T : RemoteObject<*>, U : OutputStream>(
return RecordToPartAccumulator(
pathFactory,
bufferedWriterFactory,
recordBatchSizeBytes,
partSizeBytes = partSizeBytes,
fileSizeBytes = fileSizeBytes,
stream,
fileNumber
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ data class ObjectInProgress<T : OutputStream>(
class RecordToPartAccumulator<U : OutputStream>(
private val pathFactory: ObjectStoragePathFactory,
private val bufferedWriterFactory: BufferedFormattingWriterFactory<U>,
private val recordBatchSizeBytes: Long,
private val partSizeBytes: Long,
private val fileSizeBytes: Long,
private val stream: DestinationStream,
private val fileNumber: AtomicLong,
) : BatchAccumulator {
Expand Down Expand Up @@ -66,26 +67,37 @@ class RecordToPartAccumulator<U : OutputStream>(
partialUpload.writer.flush()

// Check if we have reached the target size.
val newSize = partialUpload.partFactory.totalSize + partialUpload.writer.bufferSize
if (newSize >= recordBatchSizeBytes || endOfStream) {
val bufferSize = partialUpload.writer.bufferSize
val newSize = partialUpload.partFactory.totalSize + bufferSize
if (newSize >= fileSizeBytes || endOfStream) {

// If we have reached target size, clear the object and yield a final part.
// If we have reached target file size, clear the object and yield a final part.
val bytes = partialUpload.writer.finish()
partialUpload.writer.close()
val part = partialUpload.partFactory.nextPart(bytes, isFinal = true)

log.info {
"Size $newSize/${recordBatchSizeBytes}b reached (endOfStream=$endOfStream), yielding final part ${part.partIndex} (empty=${part.isEmpty})"
val reason = if (endOfStream) "end of stream" else "file size ${fileSizeBytes}b"
"Buffered: ${bufferSize}b; total: ${newSize}b; $reason reached, yielding final part ${part.partIndex}"
}

currentObject.remove(key)
return LoadablePart(part)
} else {
// If we have not reached target size, just yield the next part.
} else if (bufferSize >= partSizeBytes) {
// If we have not reached file size, but have reached part size, yield a non-final part.
val bytes = partialUpload.writer.takeBytes()
val part = partialUpload.partFactory.nextPart(bytes)
log.info {
"Size $newSize/${recordBatchSizeBytes}b not reached, yielding part ${part.partIndex} (empty=${part.isEmpty})"
"Buffered: ${bufferSize}b; total ${newSize}b; part size ${partSizeBytes}b reached, yielding part ${part.partIndex}"
}

return LoadablePart(part)
} else {
// If we have not reached either the file or part size, yield a null part.
// TODO: Change this to a generator interface so we never have to do this.
val part = partialUpload.partFactory.nextPart(null)
log.info {
"Buffered: ${bufferSize}b; total ${newSize}b; part size ${partSizeBytes}b not reached, yielding null part ${part.partIndex}"
}

return LoadablePart(part)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class RecordToPartAccumulatorTest {
RecordToPartAccumulator(
pathFactory = pathFactory,
bufferedWriterFactory = bufferedWriterFactory,
recordBatchSizeBytes = recordBatchSizeBytes,
fileSizeBytes = recordBatchSizeBytes,
stream = stream,
fileNumber = fileNumber
)
Expand Down

0 comments on commit 729aa87

Please sign in to comment.