Skip to content

Commit

Permalink
WIP fix tiny parts
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt committed Dec 17, 2024
1 parent b28c4a8 commit eea648b
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ airbyte:
flush:
rate-ms: 900000 # 15 minutes
window-ms: 900000 # 15 minutes
destination:
record-batch-size-override: ${AIRBYTE_DESTINATION_RECORD_BATCH_SIZE_OVERRIDE:null}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class DockerizedDestination(
"-v",
"$fileTransferMountSource:/tmp",
"-e",
"AIRBYTE_DESTINATION_RECORD_BATCH_SIZE=1",
"AIRBYTE_DESTINATION_RECORD_BATCH_SIZE_OVERRIDE=1",
"-e",
"USE_FILE_TRANSFER=$useFileTransfer",
) +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,8 @@ class ObjectStorageDestinationState(
val partNumber: Long,
)

@get:JsonIgnore
val generations: Sequence<Generation>
get() =
private suspend fun getGenerations(): Sequence<Generation> =
accessLock.withLock {
generationMap.entries
.asSequence()
.map { (state, gens) ->
Expand All @@ -100,23 +99,25 @@ class ObjectStorageDestinationState(
}
}
.flatten()
}

@get:JsonIgnore
val nextPartNumber: Long
get() = generations.flatMap { it.objects }.map { it.partNumber }.maxOrNull()?.plus(1) ?: 0L
suspend fun getNextPartNumber(): Long =
getGenerations().flatMap { it.objects }.map { it.partNumber }.maxOrNull()?.plus(1) ?: 0L

/** Returns generationId -> objectAndPart for all staged objects that should be kept. */
fun getStagedObjectsToFinalize(minimumGenerationId: Long): Sequence<Pair<Long, ObjectAndPart>> =
generations
suspend fun getStagedObjectsToFinalize(
minimumGenerationId: Long
): Sequence<Pair<Long, ObjectAndPart>> =
getGenerations()
.filter { it.isStaging && it.generationId >= minimumGenerationId }
.flatMap { it.objects.map { obj -> it.generationId to obj } }

/**
* Returns generationId -> objectAndPart for all objects (staged and unstaged) that should be
* cleaned up.
*/
fun getObjectsToDelete(minimumGenerationId: Long): Sequence<Pair<Long, ObjectAndPart>> {
val (toKeep, toDrop) = generations.partition { it.generationId >= minimumGenerationId }
suspend fun getObjectsToDelete(minimumGenerationId: Long): Sequence<Pair<Long, ObjectAndPart>> {
val (toKeep, toDrop) = getGenerations().partition { it.generationId >= minimumGenerationId }
val keepKeys = toKeep.flatMap { it.objects.map { obj -> obj.key } }.toSet()
return toDrop.asSequence().flatMap {
it.objects.filter { obj -> obj.key !in keepKeys }.map { obj -> it.generationId to obj }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class ObjectStorageStreamLoaderFactory<T : RemoteObject<*>, U : OutputStream>(
pathFactory,
bufferedWriterFactory,
destinationStateManager,
uploadConfigurationProvider.objectStorageUploadConfiguration.uploadPartSizeBytes,
uploadConfigurationProvider.objectStorageUploadConfiguration.fileSizeBytes
)
}
Expand All @@ -67,7 +68,8 @@ class ObjectStorageStreamLoader<T : RemoteObject<*>, U : OutputStream>(
private val pathFactory: ObjectStoragePathFactory,
private val bufferedWriterFactory: BufferedFormattingWriterFactory<U>,
private val destinationStateManager: DestinationStateManager<ObjectStorageDestinationState>,
private val recordBatchSizeBytes: Long,
private val partSizeBytes: Long,
private val fileSizeBytes: Long,
) : StreamLoader {
private val log = KotlinLogging.logger {}

Expand All @@ -79,7 +81,7 @@ class ObjectStorageStreamLoader<T : RemoteObject<*>, U : OutputStream>(
val state = destinationStateManager.getState(stream)
// This is the number used to populate {part_number} on the object path.
// We'll call it file number here to avoid confusion with the part index used for uploads.
val fileNumber = state.nextPartNumber
val fileNumber = state.getNextPartNumber()
log.info { "Got next file number from destination state: $fileNumber" }
this.fileNumber.set(fileNumber)
}
Expand All @@ -88,7 +90,8 @@ class ObjectStorageStreamLoader<T : RemoteObject<*>, U : OutputStream>(
return RecordToPartAccumulator(
pathFactory,
bufferedWriterFactory,
recordBatchSizeBytes,
partSizeBytes = partSizeBytes,
fileSizeBytes = fileSizeBytes,
stream,
fileNumber
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ data class ObjectInProgress<T : OutputStream>(
class RecordToPartAccumulator<U : OutputStream>(
private val pathFactory: ObjectStoragePathFactory,
private val bufferedWriterFactory: BufferedFormattingWriterFactory<U>,
private val recordBatchSizeBytes: Long,
private val partSizeBytes: Long,
private val fileSizeBytes: Long,
private val stream: DestinationStream,
private val fileNumber: AtomicLong,
) : BatchAccumulator {
Expand Down Expand Up @@ -66,26 +67,37 @@ class RecordToPartAccumulator<U : OutputStream>(
partialUpload.writer.flush()

// Check if we have reached the target size.
val newSize = partialUpload.partFactory.totalSize + partialUpload.writer.bufferSize
if (newSize >= recordBatchSizeBytes || endOfStream) {
val bufferSize = partialUpload.writer.bufferSize
val newSize = partialUpload.partFactory.totalSize + bufferSize
if (newSize >= fileSizeBytes || endOfStream) {

// If we have reached target size, clear the object and yield a final part.
// If we have reached target file size, clear the object and yield a final part.
val bytes = partialUpload.writer.finish()
partialUpload.writer.close()
val part = partialUpload.partFactory.nextPart(bytes, isFinal = true)

log.info {
"Size $newSize/${recordBatchSizeBytes}b reached (endOfStream=$endOfStream), yielding final part ${part.partIndex} (empty=${part.isEmpty})"
val reason = if (endOfStream) "end of stream" else "file size ${fileSizeBytes}b"
"Buffered: ${bufferSize}b; total: ${newSize}b; $reason reached, yielding final part ${part.partIndex}"
}

currentObject.remove(key)
return LoadablePart(part)
} else {
// If we have not reached target size, just yield the next part.
} else if (bufferSize >= partSizeBytes) {
// If we have not reached file size, but have reached part size, yield a non-final part.
val bytes = partialUpload.writer.takeBytes()
val part = partialUpload.partFactory.nextPart(bytes)
log.info {
"Size $newSize/${recordBatchSizeBytes}b not reached, yielding part ${part.partIndex} (empty=${part.isEmpty})"
"Buffered: ${bufferSize}b; total ${newSize}b; part size ${partSizeBytes}b reached, yielding part ${part.partIndex}"
}

return LoadablePart(part)
} else {
// If we have not reached either the file or part size, yield a null part.
// TODO: Change this to a generator interface so we never have to do this.
val part = partialUpload.partFactory.nextPart(null)
log.info {
"Buffered: ${bufferSize}b; total ${newSize}b; part size ${partSizeBytes}b not reached, yielding null part ${part.partIndex}"
}

return LoadablePart(part)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class ObjectStorageStreamLoaderTest {
mockk(relaxed = true)
private val destinationStateManager: DestinationStateManager<ObjectStorageDestinationState> =
mockk(relaxed = true)
private val fileSize: Long = 2
private val partSize: Long = 1

private val objectStorageStreamLoader =
Expand All @@ -48,7 +49,8 @@ class ObjectStorageStreamLoaderTest {
pathFactory,
writerFactory,
destinationStateManager,
partSize
partSizeBytes = partSize,
fileSizeBytes = fileSize
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test

class RecordToPartAccumulatorTest {
private val recordBatchSizeBytes: Long = 10L
private val partSizeBytes: Long = 2L
private val fileSizeBytes: Long = 4L

private lateinit var pathFactory: ObjectStoragePathFactory
private lateinit var bufferedWriterFactory: BufferedFormattingWriterFactory<OutputStream>
Expand Down Expand Up @@ -63,7 +64,8 @@ class RecordToPartAccumulatorTest {
RecordToPartAccumulator(
pathFactory = pathFactory,
bufferedWriterFactory = bufferedWriterFactory,
recordBatchSizeBytes = recordBatchSizeBytes,
partSizeBytes = partSizeBytes,
fileSizeBytes = fileSizeBytes,
stream = stream,
fileNumber = fileNumber
)
Expand Down Expand Up @@ -93,11 +95,11 @@ class RecordToPartAccumulatorTest {

// Object 1

// part 6/10b => not data sufficient, should be first and nonfinal
when (val batch = acc.processRecords(makeRecords(6), 0L, false) as ObjectStorageBatch) {
// part 0->1/2b of 4b total => not data sufficient, should be first and empty
when (val batch = acc.processRecords(makeRecords(1), 0L, false) as ObjectStorageBatch) {
is LoadablePart -> {
assert(batch.part.bytes.contentEquals(makeBytes(6)))
assert(batch.part.partIndex == 1)
assert(batch.part.isEmpty)
assert(batch.part.partIndex == 0)
assert(batch.part.fileNumber == 111L)
assert(!batch.isPersisted())
assert(!batch.part.isFinal)
Expand All @@ -110,6 +112,19 @@ class RecordToPartAccumulatorTest {
when (val batch = acc.processRecords(makeRecords(0), 0L, false) as ObjectStorageBatch) {
is LoadablePart -> {
assert(batch.part.isEmpty)
assert(batch.part.partIndex == 0)
assert(batch.part.fileNumber == 111L)
assert(!batch.isPersisted())
assert(!batch.part.isFinal)
assert(batch.part.key == "path.111")
}
else -> assert(false)
}

// part 1->3/2b of 4b total => data sufficient for part, should be first part and nonfinal
when (val batch = acc.processRecords(makeRecords(2), 0L, false) as ObjectStorageBatch) {
is LoadablePart -> {
assert(batch.part.bytes.contentEquals(makeBytes(3)))
assert(batch.part.partIndex == 1)
assert(batch.part.fileNumber == 111L)
assert(!batch.isPersisted())
Expand All @@ -119,10 +134,12 @@ class RecordToPartAccumulatorTest {
else -> assert(false)
}

// part 11/10b => data sufficient, should be second now and final
when (val batch = acc.processRecords(makeRecords(5), 0L, false) as ObjectStorageBatch) {
// part 3->4/2b of 4b total => data sufficient for file (but not part! this is expected!),
// should be second part and final (and not empty)
when (val batch = acc.processRecords(makeRecords(1), 0L, false) as ObjectStorageBatch) {
is LoadablePart -> {
assert(batch.part.bytes.contentEquals(makeBytes(5)))
println(batch.part.bytes.contentToString())
assert(batch.part.bytes.contentEquals(makeBytes(1)))
assert(batch.part.partIndex == 2)
assert(batch.part.fileNumber == 111L)
assert(!batch.isPersisted())
Expand All @@ -134,7 +151,7 @@ class RecordToPartAccumulatorTest {

// Object 2

// Next part 10/10b => data sufficient, should be first and final
// Next part 10/4b => data sufficient, should be first and final
when (val batch = acc.processRecords(makeRecords(10), 0L, false) as ObjectStorageBatch) {
is LoadablePart -> {
assert(batch.part.bytes.contentEquals(makeBytes(10)))
Expand Down Expand Up @@ -163,7 +180,7 @@ class RecordToPartAccumulatorTest {

// One flush per call, one create/close per finished object
coVerify(exactly = 3) { bufferedWriterFactory.create(any()) }
coVerify(exactly = 5) { bufferedWriter.flush() }
coVerify(exactly = 6) { bufferedWriter.flush() }
coVerify(exactly = 3) { bufferedWriter.close() }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ data class DevNullConfiguration(
@Singleton
class DevNullConfigurationFactory(
@Value("\${airbyte.destination.record-batch-size-override}")
private val recordBatchSizeBytes: Long
private val recordBatchSizeBytesOverride: Long?
) : DestinationConfigurationFactory<DevNullSpecification, DevNullConfiguration> {
private val log = KotlinLogging.logger {}

override fun makeWithoutExceptionHandling(pojo: DevNullSpecification): DevNullConfiguration {
log.info { "Record batch size from environment: $recordBatchSizeBytes" }
log.info { "Record batch size from environment: $recordBatchSizeBytesOverride" }
return when (pojo) {
is DevNullSpecificationOss -> {
when (pojo.testDestination) {
Expand Down Expand Up @@ -108,7 +108,7 @@ class DevNullConfigurationFactory(
}
}
}
}.copy(recordBatchSizeBytes = recordBatchSizeBytes)
}.copy(recordBatchSizeBytes = recordBatchSizeBytesOverride ?: DestinationConfiguration.DEFAULT_RECORD_BATCH_SIZE_BYTES)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ airbyte:
flush:
rate-ms: 900000 # 15 minutes
window-ms: 900000 # 15 minutes
destination:
record-batch-size-override: ${AIRBYTE_DESTINATION_RECORD_BATCH_SIZE_OVERRIDE:null}

0 comments on commit eea648b

Please sign in to comment.