diff --git a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationWriter.kt b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationWriter.kt index c1e7db92c6b7..04ea591d91b1 100644 --- a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationWriter.kt +++ b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationWriter.kt @@ -12,7 +12,7 @@ import io.airbyte.cdk.load.message.Batch import io.airbyte.cdk.load.message.DestinationFile import io.airbyte.cdk.load.message.DestinationRecord import io.airbyte.cdk.load.message.SimpleBatch -import io.airbyte.cdk.load.state.StreamIncompleteResult +import io.airbyte.cdk.load.state.StreamProcessingFailed import io.airbyte.cdk.load.test.util.OutputRecord import io.airbyte.cdk.load.write.DestinationWriter import io.airbyte.cdk.load.write.StreamLoader @@ -42,7 +42,7 @@ class MockStreamLoader(override val stream: DestinationStream) : StreamLoader { override val state = Batch.State.PERSISTED } - override suspend fun close(streamFailure: StreamIncompleteResult?) { + override suspend fun close(streamFailure: StreamProcessingFailed?) { if (streamFailure == null) { when (val importType = stream.importType) { is Append -> { diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationCatalog.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationCatalog.kt index 4e9a3b1b94ef..2f84cdf19c86 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationCatalog.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationCatalog.kt @@ -37,6 +37,8 @@ data class DestinationCatalog(val streams: List = emptyList() fun asProtocolObject(): ConfiguredAirbyteCatalog = ConfiguredAirbyteCatalog().withStreams(streams.map { it.asProtocolObject() }) + + fun size(): Int = streams.size } interface DestinationCatalogFactory { diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt index 6f6f1f1c2389..488682f6b24c 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt @@ -4,6 +4,7 @@ package io.airbyte.cdk.load.config +import io.airbyte.cdk.load.command.DestinationCatalog import io.airbyte.cdk.load.command.DestinationConfiguration import io.airbyte.cdk.load.message.MultiProducerChannel import io.airbyte.cdk.load.state.ReservationManager @@ -51,7 +52,9 @@ class SyncBeanFactory { fun fileAggregateQueue( @Value("\${airbyte.resources.disk.bytes}") availableBytes: Long, config: DestinationConfiguration, + catalog: DestinationCatalog ): MultiProducerChannel { + val streamCount = catalog.size() // total batches by disk capacity val maxBatchesThatFitOnDisk = (availableBytes / config.recordBatchSizeBytes).toInt() // account for batches in flight processing by the workers @@ -64,6 +67,6 @@ class SyncBeanFactory { val capacity = min(maxBatchesMinusUploadOverhead, idealDepth) log.info { "Creating file aggregate queue with limit $capacity" } val channel = Channel(capacity) - return MultiProducerChannel(channel) + return MultiProducerChannel(streamCount.toLong(), channel) } } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessageQueues.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessageQueues.kt index 457799c1c9d0..7b68e69de3b9 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessageQueues.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessageQueues.kt @@ -26,18 +26,26 @@ interface Sized { */ sealed class DestinationStreamEvent : Sized +/** Contains a record to be aggregated and processed. */ data class StreamRecordEvent( val index: Long, override val sizeBytes: Long, val record: DestinationRecord ) : DestinationStreamEvent() -data class StreamCompleteEvent( +/** + * Indicates the stream is in a terminal (complete or incomplete) state as signalled by upstream. + */ +data class StreamEndEvent( val index: Long, ) : DestinationStreamEvent() { override val sizeBytes: Long = 0L } +/** + * Emitted to trigger evaluation of the conditional flush logic of a stream. The consumer may or may + * not decide to flush. + */ data class StreamFlushEvent( val tickedAtMs: Long, ) : DestinationStreamEvent() { diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MultiProducerChannel.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MultiProducerChannel.kt index 734a032a507a..db46835ab87b 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MultiProducerChannel.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MultiProducerChannel.kt @@ -5,37 +5,29 @@ package io.airbyte.cdk.load.message import io.github.oshai.kotlinlogging.KotlinLogging -import java.lang.IllegalStateException -import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicLong import kotlinx.coroutines.channels.Channel /** - * A channel designed for use with a dynamic amount of producers. Close will only close the + * A channel designed for use with a fixed amount of producers. Close will be called on the * underlying channel, when there are no remaining registered producers. */ -class MultiProducerChannel(override val channel: Channel) : ChannelMessageQueue() { +class MultiProducerChannel( + producerCount: Long, + override val channel: Channel, +) : ChannelMessageQueue() { private val log = KotlinLogging.logger {} - private val producerCount = AtomicLong(0) - private val closed = AtomicBoolean(false) - - fun registerProducer(): MultiProducerChannel { - if (closed.get()) { - throw IllegalStateException("Attempted to register producer for closed channel.") - } - - val count = producerCount.incrementAndGet() - log.info { "Registering producer (count=$count)" } - return this - } + private val initializedProducerCount = producerCount + private val producerCount = AtomicLong(producerCount) override suspend fun close() { val count = producerCount.decrementAndGet() - log.info { "Closing producer (count=$count)" } + log.info { + "Closing producer (active count=$count, initialized count: $initializedProducerCount)" + } if (count == 0L) { - log.info { "Closing queue" } + log.info { "Closing underlying queue" } channel.close() - closed.getAndSet(true) } } } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/StreamManager.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/StreamManager.kt index f83254ef4308..491b86fa808a 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/StreamManager.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/StreamManager.kt @@ -20,13 +20,9 @@ import kotlinx.coroutines.CompletableDeferred sealed interface StreamResult -sealed interface StreamIncompleteResult : StreamResult +data class StreamProcessingFailed(val streamException: Exception) : StreamResult -data class StreamFailed(val streamException: Exception) : StreamIncompleteResult - -data class StreamKilled(val syncException: Exception) : StreamIncompleteResult - -data object StreamSucceeded : StreamResult +data object StreamProcessingSucceeded : StreamResult /** Manages the state of a single stream. */ interface StreamManager { @@ -38,13 +34,17 @@ interface StreamManager { fun recordCount(): Long /** - * Mark the end-of-stream and return the record count. Expect this exactly once. Expect no - * further `countRecordIn`, and expect that [markSucceeded] or [markFailed] or [markKilled] will - * alway occur after this. + * Mark the end-of-stream, set the end of stream variant (complete or incomplete) and return the + * record count. Expect this exactly once. Expect no further `countRecordIn`, and expect that + * [markProcessingSucceeded] will always occur after this, while [markProcessingFailed] can + * occur before or after. */ - fun markEndOfStream(): Long + fun markEndOfStream(receivedStreamCompleteMessage: Boolean): Long fun endOfStreamRead(): Boolean + /** Whether we received a stream complete message for the managed stream. */ + fun isComplete(): Boolean + /** * Mark a checkpoint in the stream and return the current index and the number of records since * the last one. @@ -72,22 +72,23 @@ interface StreamManager { */ fun areRecordsPersistedUntil(index: Long): Boolean - /** Mark the stream as closed. This should only be called after all records have been read. */ - fun markSucceeded() - /** - * Mark that the stream was killed due to failure elsewhere. Returns false if task was already - * complete. + * Indicates destination processing of the stream succeeded, regardless of complete/incomplete + * status. This should only be called after all records and end of stream messages have been + * read. */ - fun markKilled(causedBy: Exception): Boolean + fun markProcessingSucceeded() - /** Mark that the stream itself failed. Return false if task was already complete */ - fun markFailed(causedBy: Exception): Boolean + /** + * Indicates destination processing of the stream failed. Returns false if task was already + * complete + */ + fun markProcessingFailed(causedBy: Exception): Boolean /** Suspend until the stream completes, returning the result. */ suspend fun awaitStreamResult(): StreamResult - /** True if the stream has not yet been marked successful, failed, or killed. */ + /** True if the stream processing has not yet been marked as successful or failed. */ fun isActive(): Boolean } @@ -105,6 +106,7 @@ class DefaultStreamManager( private val lastCheckpoint = AtomicLong(0L) private val markedEndOfStream = AtomicBoolean(false) + private val receivedComplete = AtomicBoolean(false) private val rangesState: ConcurrentHashMap> = ConcurrentHashMap() @@ -124,10 +126,11 @@ class DefaultStreamManager( return recordCount.get() } - override fun markEndOfStream(): Long { + override fun markEndOfStream(receivedStreamCompleteMessage: Boolean): Long { if (markedEndOfStream.getAndSet(true)) { throw IllegalStateException("Stream is closed for reading") } + receivedComplete.getAndSet(receivedStreamCompleteMessage) return recordCount.get() } @@ -136,6 +139,10 @@ class DefaultStreamManager( return markedEndOfStream.get() } + override fun isComplete(): Boolean { + return receivedComplete.get() + } + override fun markCheckpoint(): Pair { val index = recordCount.get() val lastCheckpoint = lastCheckpoint.getAndSet(index) @@ -220,19 +227,15 @@ class DefaultStreamManager( return isProcessingCompleteForState(index, Batch.State.PERSISTED) } - override fun markSucceeded() { + override fun markProcessingSucceeded() { if (!markedEndOfStream.get()) { throw IllegalStateException("Stream is not closed for reading") } - streamResult.complete(StreamSucceeded) - } - - override fun markKilled(causedBy: Exception): Boolean { - return streamResult.complete(StreamKilled(causedBy)) + streamResult.complete(StreamProcessingSucceeded) } - override fun markFailed(causedBy: Exception): Boolean { - return streamResult.complete(StreamFailed(causedBy)) + override fun markProcessingFailed(causedBy: Exception): Boolean { + return streamResult.complete(StreamProcessingFailed(causedBy)) } override suspend fun awaitStreamResult(): StreamResult { diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/SyncManager.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/SyncManager.kt index 3a54d8898cae..eeefe4d21aad 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/SyncManager.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/SyncManager.kt @@ -14,14 +14,14 @@ import jakarta.inject.Singleton import java.util.concurrent.ConcurrentHashMap import kotlinx.coroutines.CompletableDeferred -sealed interface SyncResult +sealed interface DestinationResult -data object SyncSuccess : SyncResult +data object DestinationSuccess : DestinationResult -data class SyncFailure( - val syncFailure: Exception, +data class DestinationFailure( + val cause: Exception, val streamResults: Map -) : SyncResult +) : DestinationResult /** Manages the state of all streams in the destination. */ interface SyncManager { @@ -35,18 +35,26 @@ interface SyncManager { suspend fun getOrAwaitStreamLoader(stream: DestinationStream.Descriptor): StreamLoader suspend fun getStreamLoaderOrNull(stream: DestinationStream.Descriptor): StreamLoader? - /** Suspend until all streams are complete. Returns false if any stream was failed/killed. */ - suspend fun awaitAllStreamsCompletedSuccessfully(): Boolean + /** + * Suspend until all streams are processed successfully. Returns false if processing failed for + * any stream. + */ + suspend fun awaitAllStreamsProcessedSuccessfully(): Boolean suspend fun markInputConsumed() suspend fun markCheckpointsProcessed() - suspend fun markFailed(causedBy: Exception): SyncFailure - suspend fun markSucceeded() + suspend fun markDestinationFailed(causedBy: Exception): DestinationFailure + suspend fun markDestinationSucceeded() + + /** + * Whether we received stream complete messages for all streams in the catalog from upstream. + */ + suspend fun allStreamsComplete(): Boolean fun isActive(): Boolean - suspend fun awaitInputProcessingComplete(): Unit - suspend fun awaitSyncResult(): SyncResult + suspend fun awaitInputProcessingComplete() + suspend fun awaitDestinationResult(): DestinationResult } @SuppressFBWarnings( @@ -56,7 +64,7 @@ interface SyncManager { class DefaultSyncManager( private val streamManagers: ConcurrentHashMap ) : SyncManager { - private val syncResult = CompletableDeferred() + private val destinationResult = CompletableDeferred() private val streamLoaders = ConcurrentHashMap>>() private val inputConsumed = CompletableDeferred() @@ -87,32 +95,38 @@ class DefaultSyncManager( return streamLoaders[stream]?.await()?.getOrNull() } - override suspend fun awaitAllStreamsCompletedSuccessfully(): Boolean { - return streamManagers.all { (_, manager) -> manager.awaitStreamResult() is StreamSucceeded } + override suspend fun awaitAllStreamsProcessedSuccessfully(): Boolean { + return streamManagers.all { (_, manager) -> + manager.awaitStreamResult() is StreamProcessingSucceeded + } } - override suspend fun markFailed(causedBy: Exception): SyncFailure { + override suspend fun markDestinationFailed(causedBy: Exception): DestinationFailure { val result = - SyncFailure(causedBy, streamManagers.mapValues { it.value.awaitStreamResult() }) - syncResult.complete(result) + DestinationFailure(causedBy, streamManagers.mapValues { it.value.awaitStreamResult() }) + destinationResult.complete(result) return result } - override suspend fun markSucceeded() { + override suspend fun markDestinationSucceeded() { if (streamManagers.values.any { it.isActive() }) { throw IllegalStateException( "Cannot mark sync as succeeded until all streams are complete" ) } - syncResult.complete(SyncSuccess) + destinationResult.complete(DestinationSuccess) + } + + override suspend fun allStreamsComplete(): Boolean { + return streamManagers.all { it.value.isComplete() } } override fun isActive(): Boolean { - return syncResult.isActive + return destinationResult.isActive } - override suspend fun awaitSyncResult(): SyncResult { - return syncResult.await() + override suspend fun awaitDestinationResult(): DestinationResult { + return destinationResult.await() } override suspend fun awaitInputProcessingComplete() { diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt index af35955cb748..3b0da20da844 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt @@ -125,7 +125,7 @@ class DefaultDestinationTaskLauncher( // File transfer @Value("\${airbyte.file-transfer.enabled}") private val fileTransferEnabled: Boolean, - // Input Comsumer requirements + // Input Consumer requirements private val inputFlow: SizedInputFlow>, private val recordQueueSupplier: MessageQueueSupplier>, diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/CloseStreamTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/CloseStreamTask.kt index a2a7d4f00831..14a1688e7ad9 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/CloseStreamTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/CloseStreamTask.kt @@ -16,7 +16,8 @@ interface CloseStreamTask : ImplementorScope /** * Wraps @[StreamLoader.close] and marks the stream as closed in the stream manager. Also starts the - * teardown task. + * teardown task. Called after the end of stream message (complete OR incomplete) has been received + * and all record messages have been processed. */ class DefaultCloseStreamTask( private val syncManager: SyncManager, @@ -27,7 +28,7 @@ class DefaultCloseStreamTask( override suspend fun execute() { val streamLoader = syncManager.getOrAwaitStreamLoader(streamDescriptor) streamLoader.close() - syncManager.getStreamManager(streamDescriptor).markSucceeded() + syncManager.getStreamManager(streamDescriptor).markProcessingSucceeded() taskLauncher.handleStreamClosed(streamLoader.stream.descriptor) } } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/FailStreamTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/FailStreamTask.kt index 4b810cd4e30b..9959a3286ab0 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/FailStreamTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/FailStreamTask.kt @@ -5,8 +5,8 @@ package io.airbyte.cdk.load.task.implementor import io.airbyte.cdk.load.command.DestinationStream -import io.airbyte.cdk.load.state.StreamIncompleteResult -import io.airbyte.cdk.load.state.StreamSucceeded +import io.airbyte.cdk.load.state.StreamProcessingFailed +import io.airbyte.cdk.load.state.StreamProcessingSucceeded import io.airbyte.cdk.load.state.SyncManager import io.airbyte.cdk.load.task.DestinationTaskLauncher import io.airbyte.cdk.load.task.ImplementorScope @@ -17,8 +17,8 @@ import jakarta.inject.Singleton interface FailStreamTask : ImplementorScope /** - * FailStreamTask is a task that is executed when a stream fails. It is responsible for cleaning up - * resources and reporting the failure. + * FailStreamTask is a task that is executed when the processing of a stream fails in the + * destination. It is responsible for cleaning up resources and reporting the failure. */ class DefaultFailStreamTask( private val taskLauncher: DestinationTaskLauncher, @@ -30,12 +30,12 @@ class DefaultFailStreamTask( override suspend fun execute() { val streamManager = syncManager.getStreamManager(stream) - streamManager.markFailed(exception) + streamManager.markProcessingFailed(exception) when (val streamResult = streamManager.awaitStreamResult()) { - is StreamSucceeded -> { + is StreamProcessingSucceeded -> { log.info { "Cannot fail stream $stream, which is already complete, doing nothing." } } - is StreamIncompleteResult -> { + is StreamProcessingFailed -> { syncManager.getStreamLoaderOrNull(stream)?.close(streamResult) ?: log.warn { "StreamLoader not found for stream $stream, cannot call close." } } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/FailSyncTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/FailSyncTask.kt index ff1d905bfb7d..10f64ab9de0f 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/FailSyncTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/FailSyncTask.kt @@ -16,8 +16,9 @@ import jakarta.inject.Singleton interface FailSyncTask : ImplementorScope /** - * FailSyncTask is a task that is executed when a sync fails. It is responsible for cleaning up - * resources and reporting the failure. + * FailSyncTask is a task that is executed only when the destination itself fails during a sync. If + * the sync is failed by upstream (e.g. an incomplete stream message is received), we do not call + * this task. It is responsible for cleaning up resources and reporting the failure. */ class DefaultFailSyncTask( private val taskLauncher: DestinationTaskLauncher, @@ -31,7 +32,7 @@ class DefaultFailSyncTask( override suspend fun execute() { // Ensure any remaining ready state gets captured: don't waste work! checkpointManager.flushReadyCheckpointMessages() - val result = syncManager.markFailed(exception) // awaits stream completion + val result = syncManager.markDestinationFailed(exception) // awaits stream completion log.info { "Calling teardown with failure result $result" } destinationWriter.teardown(result) taskLauncher.handleTeardownComplete(success = false) diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/TeardownTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/TeardownTask.kt index 324564c39a05..64dada897c6b 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/TeardownTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/TeardownTask.kt @@ -31,9 +31,9 @@ class DefaultTeardownTask( override suspend fun execute() { syncManager.awaitInputProcessingComplete() - log.info { "Teardown task awaiting stream completion" } - if (!syncManager.awaitAllStreamsCompletedSuccessfully()) { - log.info { "Streams failed to complete successfully, doing nothing." } + log.info { "Teardown task awaiting stream processing completion" } + if (!syncManager.awaitAllStreamsProcessedSuccessfully()) { + log.info { "Streams failed to be processed successfully, doing nothing." } return } @@ -41,7 +41,7 @@ class DefaultTeardownTask( log.info { "Starting teardown task" } destination.teardown() log.info { "Teardown task complete, marking sync succeeded." } - syncManager.markSucceeded() + syncManager.markDestinationSucceeded() taskLauncher.handleTeardownComplete() } } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt index db0c47d07da6..3bb0bb716545 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt @@ -27,7 +27,7 @@ import io.airbyte.cdk.load.message.QueueWriter import io.airbyte.cdk.load.message.SimpleBatch import io.airbyte.cdk.load.message.StreamCheckpoint import io.airbyte.cdk.load.message.StreamCheckpointWrapped -import io.airbyte.cdk.load.message.StreamCompleteEvent +import io.airbyte.cdk.load.message.StreamEndEvent import io.airbyte.cdk.load.message.StreamRecordEvent import io.airbyte.cdk.load.message.Undefined import io.airbyte.cdk.load.state.Reserved @@ -83,19 +83,23 @@ class DefaultInputConsumerTask( } is DestinationRecordStreamComplete -> { reserved.release() // safe because multiple calls conflate - val wrapped = StreamCompleteEvent(index = manager.markEndOfStream()) + val wrapped = StreamEndEvent(index = manager.markEndOfStream(true)) + recordQueue.publish(reserved.replace(wrapped)) + recordQueue.close() + } + is DestinationRecordStreamIncomplete -> { + reserved.release() // safe because multiple calls conflate + val wrapped = StreamEndEvent(index = manager.markEndOfStream(false)) recordQueue.publish(reserved.replace(wrapped)) recordQueue.close() } - is DestinationRecordStreamIncomplete -> - throw IllegalStateException("Stream $stream failed upstream, cannot continue.") is DestinationFile -> { val index = manager.countRecordIn() destinationTaskLauncher.handleFile(stream, message, index) } is DestinationFileStreamComplete -> { reserved.release() // safe because multiple calls conflate - manager.markEndOfStream() + manager.markEndOfStream(true) val envelope = BatchEnvelope(SimpleBatch(Batch.State.COMPLETE)) destinationTaskLauncher.handleNewBatch(stream, envelope) } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTask.kt index 1760dd5b6c53..ea8965002ea8 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTask.kt @@ -15,7 +15,7 @@ import io.airbyte.cdk.load.message.MessageQueueSupplier import io.airbyte.cdk.load.message.MultiProducerChannel import io.airbyte.cdk.load.message.QueueReader import io.airbyte.cdk.load.message.SimpleBatch -import io.airbyte.cdk.load.message.StreamCompleteEvent +import io.airbyte.cdk.load.message.StreamEndEvent import io.airbyte.cdk.load.message.StreamFlushEvent import io.airbyte.cdk.load.message.StreamRecordEvent import io.airbyte.cdk.load.state.FlushStrategy @@ -61,13 +61,12 @@ class DefaultSpillToDiskTask( override suspend fun execute() { val initialAccumulator = fileAccFactory.make() - val registration = outputQueue.registerProducer() - registration.use { + outputQueue.use { inputQueue.consume().fold(initialAccumulator) { acc, reserved -> reserved.use { when (val event = it.value) { is StreamRecordEvent -> accRecordEvent(acc, event) - is StreamCompleteEvent -> accStreamCompleteEvent(acc, event) + is StreamEndEvent -> accStreamEndEvent(acc, event) is StreamFlushEvent -> accFlushEvent(acc) } } @@ -117,12 +116,12 @@ class DefaultSpillToDiskTask( } /** - * Handles accumulation of stream completion events, triggering a final flush if the aggregate - * isn't empty. + * Handles accumulation of stream end events (complete or incomplete), triggering a final flush + * if the aggregate isn't empty. */ - private suspend fun accStreamCompleteEvent( + private suspend fun accStreamEndEvent( acc: FileAccumulator, - event: StreamCompleteEvent, + event: StreamEndEvent, ): FileAccumulator { val (spillFile, outputStream, timeWindow, range, sizeBytes) = acc if (sizeBytes == 0L) { diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/DestinationWriter.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/DestinationWriter.kt index 7ad369a3623d..665dd5abaae7 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/DestinationWriter.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/DestinationWriter.kt @@ -5,7 +5,7 @@ package io.airbyte.cdk.load.write import io.airbyte.cdk.load.command.DestinationStream -import io.airbyte.cdk.load.state.SyncFailure +import io.airbyte.cdk.load.state.DestinationFailure import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton @@ -22,7 +22,7 @@ interface DestinationWriter { // Called once at the end of the job, unconditionally. // NOTE: we don't pass Success here, because it depends on this completing successfully. - suspend fun teardown(syncFailure: SyncFailure? = null) {} + suspend fun teardown(destinationFailure: DestinationFailure? = null) {} } @Singleton diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamLoader.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamLoader.kt index cf6070a56927..ce0a21404e3b 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamLoader.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamLoader.kt @@ -9,7 +9,7 @@ import io.airbyte.cdk.load.message.Batch import io.airbyte.cdk.load.message.DestinationFile import io.airbyte.cdk.load.message.DestinationRecord import io.airbyte.cdk.load.message.SimpleBatch -import io.airbyte.cdk.load.state.StreamIncompleteResult +import io.airbyte.cdk.load.state.StreamProcessingFailed /** * Implementor interface. The framework calls open and close once per stream at the beginning and @@ -39,5 +39,5 @@ interface StreamLoader { suspend fun processRecords(records: Iterator, totalSizeBytes: Long): Batch suspend fun processFile(file: DestinationFile): Batch suspend fun processBatch(batch: Batch): Batch = SimpleBatch(Batch.State.COMPLETE) - suspend fun close(streamFailure: StreamIncompleteResult? = null) {} + suspend fun close(streamFailure: StreamProcessingFailed? = null) {} } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamsIncompleteException.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamsIncompleteException.kt new file mode 100644 index 000000000000..a4b52f166147 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamsIncompleteException.kt @@ -0,0 +1,16 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.write + +/** + * Thrown when the destination completes successfully, but some streams were indicated as incomplete + * by upstream. Without throwing an exception the sync will not be marked as succeed by the + * platform. + * + * TODO: Once the API with platform is updated to not require an exceptional exit code, remove this. + */ +class StreamsIncompleteException : Exception() { + override val message = "Some streams were indicated as incomplete by upstream." +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/WriteOperation.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/WriteOperation.kt index 357d08b17e52..5d673a5b26f6 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/WriteOperation.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/WriteOperation.kt @@ -5,9 +5,9 @@ package io.airbyte.cdk.load.write import io.airbyte.cdk.Operation -import io.airbyte.cdk.load.state.SyncFailure +import io.airbyte.cdk.load.state.DestinationFailure +import io.airbyte.cdk.load.state.DestinationSuccess import io.airbyte.cdk.load.state.SyncManager -import io.airbyte.cdk.load.state.SyncSuccess import io.airbyte.cdk.load.task.TaskLauncher import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Factory @@ -32,13 +32,19 @@ class WriteOperation( override fun execute() = runBlocking { taskLauncher.run() - when (val result = syncManager.awaitSyncResult()) { - is SyncSuccess -> { - log.info { "Sync completed successfully" } + when (val result = syncManager.awaitDestinationResult()) { + is DestinationSuccess -> { + if (!syncManager.allStreamsComplete()) { + log.info { + "Destination completed successfully but some streams were incomplete. Throwing to exit non-zero..." + } + throw StreamsIncompleteException() + } + log.info { "Destination completed successfully and all streams were complete." } } - is SyncFailure -> { - log.info { "Sync failed with stream results ${result.streamResults}" } - throw result.syncFailure + is DestinationFailure -> { + log.info { "Destination failed with stream results ${result.streamResults}" } + throw result.cause } } } diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/message/MultiProducerChannelTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/message/MultiProducerChannelTest.kt index e40e1c90a7c9..f301d585fe1a 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/message/MultiProducerChannelTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/message/MultiProducerChannelTest.kt @@ -7,12 +7,10 @@ package io.airbyte.cdk.load.message import io.mockk.coVerify import io.mockk.impl.annotations.MockK import io.mockk.junit5.MockKExtension -import java.lang.IllegalStateException import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.test.runTest import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertThrows import org.junit.jupiter.api.extension.ExtendWith @ExtendWith(MockKExtension::class) @@ -21,33 +19,23 @@ class MultiProducerChannelTest { private lateinit var channel: MultiProducerChannel + val size = 3L + @BeforeEach fun setup() { - channel = MultiProducerChannel(wrapped) + channel = MultiProducerChannel(size, wrapped) } @Test - fun `cannot register a producer if channel already closed`() = runTest { - channel.registerProducer() + fun `does not close until the expected number of producers have closed`() = runTest { channel.close() - assertThrows { channel.registerProducer() } - } - - @Test - fun `does not close underlying channel while registered producers exist`() = runTest { - channel.registerProducer() - channel.registerProducer() - channel.close() + coVerify(exactly = 0) { wrapped.close() } } @Test fun `closes underlying channel when no producers are registered`() = runTest { - channel.registerProducer() - channel.registerProducer() - channel.registerProducer() - channel.close() channel.close() channel.close() @@ -56,9 +44,7 @@ class MultiProducerChannelTest { @Test fun `subsequent calls to to close are idempotent`() = runTest { - channel.registerProducer() - channel.registerProducer() - + channel.close() channel.close() channel.close() channel.close() diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/state/StreamManagerTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/state/StreamManagerTest.kt index c6dc30ad799a..1997473e4aed 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/state/StreamManagerTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/state/StreamManagerTest.kt @@ -62,17 +62,19 @@ class StreamManagerTest { val manager = DefaultStreamManager(stream1) val channel = Channel(Channel.UNLIMITED) - launch { channel.send(manager.awaitStreamResult() is StreamSucceeded) } + launch { channel.send(manager.awaitStreamResult() is StreamProcessingSucceeded) } delay(500) Assertions.assertTrue(channel.tryReceive().isFailure) - Assertions.assertThrows(IllegalStateException::class.java) { manager.markSucceeded() } - manager.markEndOfStream() + Assertions.assertThrows(IllegalStateException::class.java) { + manager.markProcessingSucceeded() + } + manager.markEndOfStream(true) - manager.markSucceeded() + manager.markProcessingSucceeded() Assertions.assertTrue(channel.receive()) - Assertions.assertEquals(StreamSucceeded, manager.awaitStreamResult()) + Assertions.assertEquals(StreamProcessingSucceeded, manager.awaitStreamResult()) } @Test @@ -80,29 +82,14 @@ class StreamManagerTest { val manager = DefaultStreamManager(stream1) val channel = Channel(Channel.UNLIMITED) - launch { channel.send(manager.awaitStreamResult() is StreamSucceeded) } + launch { channel.send(manager.awaitStreamResult() is StreamProcessingSucceeded) } delay(500) Assertions.assertTrue(channel.tryReceive().isFailure) - manager.markFailed(Exception("test")) + manager.markProcessingFailed(Exception("test")) Assertions.assertFalse(channel.receive()) - Assertions.assertTrue(manager.awaitStreamResult() is StreamFailed) - } - - @Test - fun testMarkKilled() = runTest { - val manager = DefaultStreamManager(stream1) - val channel = Channel(Channel.UNLIMITED) - - launch { channel.send(manager.awaitStreamResult() is StreamSucceeded) } - - delay(500) - Assertions.assertTrue(channel.tryReceive().isFailure) - manager.markKilled(Exception("test")) - Assertions.assertFalse(channel.receive()) - - Assertions.assertTrue(manager.awaitStreamResult() is StreamKilled) + Assertions.assertTrue(manager.awaitStreamResult() is StreamProcessingFailed) } class TestUpdateBatchStateProvider : ArgumentsProvider { @@ -274,7 +261,7 @@ class StreamManagerTest { val manager = managers[stream.descriptor]!! when (event) { is SetRecordCount -> repeat(event.count.toInt()) { manager.countRecordIn() } - is SetEndOfStream -> manager.markEndOfStream() + is SetEndOfStream -> manager.markEndOfStream(true) is AddPersisted -> manager.updateBatchState( BatchEnvelope( @@ -310,23 +297,25 @@ class StreamManagerTest { val manager = DefaultStreamManager(stream1) // Can't mark success before end-of-stream - Assertions.assertThrows(IllegalStateException::class.java) { manager.markSucceeded() } + Assertions.assertThrows(IllegalStateException::class.java) { + manager.markProcessingSucceeded() + } manager.countRecordIn() - manager.markEndOfStream() + manager.markEndOfStream(true) // Can't update after end-of-stream Assertions.assertThrows(IllegalStateException::class.java) { manager.countRecordIn() } - Assertions.assertThrows(IllegalStateException::class.java) { manager.markEndOfStream() } + Assertions.assertThrows(IllegalStateException::class.java) { manager.markEndOfStream(true) } // Can close now - Assertions.assertDoesNotThrow(manager::markSucceeded) + Assertions.assertDoesNotThrow(manager::markProcessingSucceeded) } @Test fun testEmptyCompletedStreamYieldsBatchProcessingComplete() { val manager = DefaultStreamManager(stream1) - manager.markEndOfStream() + manager.markEndOfStream(true) Assertions.assertTrue(manager.isBatchProcessingComplete()) } @@ -407,7 +396,7 @@ class StreamManagerTest { val range2 = Range.closed(10, 19L) val batch2 = BatchEnvelope(SimpleBatch(Batch.State.PERSISTED, groupId = "foo"), range2) - manager.markEndOfStream() + manager.markEndOfStream(true) manager.updateBatchState(batch2) manager.updateBatchState(batch1) diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/state/SyncManagerTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/state/SyncManagerTest.kt index 6589794d1bec..29d1f0a8049d 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/state/SyncManagerTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/state/SyncManagerTest.kt @@ -41,43 +41,43 @@ class SyncManagerTest { // deferred; B) It should probably move into a writer wrapper. @Test - fun testAwaitAllStreamsCompletedSuccessfully() = runTest { + fun testAwaitAllStreamsProcessedSuccessfully() = runTest { val manager1 = syncManager.getStreamManager(stream1.descriptor) val manager2 = syncManager.getStreamManager(stream2.descriptor) val completionChannel = Channel(Channel.UNLIMITED) - manager1.markEndOfStream() - manager2.markEndOfStream() + manager1.markEndOfStream(true) + manager2.markEndOfStream(true) - launch { completionChannel.send(syncManager.awaitAllStreamsCompletedSuccessfully()) } + launch { completionChannel.send(syncManager.awaitAllStreamsProcessedSuccessfully()) } delay(500) Assertions.assertTrue(completionChannel.tryReceive().isFailure) - manager1.markSucceeded() + manager1.markProcessingSucceeded() delay(500) Assertions.assertTrue(completionChannel.tryReceive().isFailure) - manager2.markSucceeded() + manager2.markProcessingSucceeded() Assertions.assertTrue(completionChannel.receive()) } @Test - fun testAwaitAllStreamsCompletedSuccessfullyWithFailure() = runTest { + fun testAwaitAllStreamsProcessedSuccessfullyWithFailure() = runTest { val manager1 = syncManager.getStreamManager(stream1.descriptor) val manager2 = syncManager.getStreamManager(stream2.descriptor) val completionChannel = Channel(Channel.UNLIMITED) - launch { completionChannel.send(syncManager.awaitAllStreamsCompletedSuccessfully()) } + launch { completionChannel.send(syncManager.awaitAllStreamsProcessedSuccessfully()) } - manager1.markEndOfStream() - manager2.markEndOfStream() + manager1.markEndOfStream(true) + manager2.markEndOfStream(true) delay(500) Assertions.assertTrue(completionChannel.tryReceive().isFailure) - manager1.markSucceeded() + manager1.markProcessingSucceeded() delay(500) Assertions.assertTrue(completionChannel.tryReceive().isFailure) - manager2.markFailed(RuntimeException()) + manager2.markProcessingFailed(RuntimeException()) Assertions.assertFalse(completionChannel.receive()) } @@ -86,15 +86,15 @@ class SyncManagerTest { val manager1 = syncManager.getStreamManager(stream1.descriptor) val manager2 = syncManager.getStreamManager(stream2.descriptor) - manager1.markEndOfStream() - manager2.markEndOfStream() + manager1.markEndOfStream(true) + manager2.markEndOfStream(true) Assertions.assertTrue(syncManager.isActive()) - manager1.markSucceeded() + manager1.markProcessingSucceeded() Assertions.assertTrue(syncManager.isActive()) - manager2.markSucceeded() + manager2.markProcessingSucceeded() Assertions.assertTrue(syncManager.isActive()) - syncManager.markSucceeded() + syncManager.markDestinationSucceeded() Assertions.assertFalse(syncManager.isActive()) } @@ -103,35 +103,35 @@ class SyncManagerTest { val manager1 = syncManager.getStreamManager(stream1.descriptor) val manager2 = syncManager.getStreamManager(stream2.descriptor) - manager1.markEndOfStream() - manager2.markEndOfStream() + manager1.markEndOfStream(true) + manager2.markEndOfStream(true) - val completionChannel = Channel(Channel.UNLIMITED) + val completionChannel = Channel(Channel.UNLIMITED) - launch { completionChannel.send(syncManager.awaitSyncResult()) } + launch { completionChannel.send(syncManager.awaitDestinationResult()) } CoroutineTestUtils.assertThrows(IllegalStateException::class) { - syncManager.markSucceeded() + syncManager.markDestinationSucceeded() } Assertions.assertTrue(completionChannel.tryReceive().isFailure) - manager1.markSucceeded() + manager1.markProcessingSucceeded() CoroutineTestUtils.assertThrows(IllegalStateException::class) { - syncManager.markSucceeded() + syncManager.markDestinationSucceeded() } Assertions.assertTrue(completionChannel.tryReceive().isFailure) - manager2.markSucceeded() + manager2.markProcessingSucceeded() Assertions.assertTrue(completionChannel.tryReceive().isFailure) - syncManager.markSucceeded() - Assertions.assertEquals(SyncSuccess, completionChannel.receive()) + syncManager.markDestinationSucceeded() + Assertions.assertEquals(DestinationSuccess, completionChannel.receive()) } @Test fun testCrashOnNoEndOfStream() = runTest { val manager1 = syncManager.getStreamManager(stream1.descriptor) - manager1.markEndOfStream() + manager1.markEndOfStream(true) // This should fail, because stream2 was not marked with end of stream val e = assertThrows { syncManager.markInputConsumed() } assertEquals( diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt index a103ed9c7eab..ebdead74a42f 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt @@ -419,7 +419,7 @@ class DestinationTaskLauncherTest { syncManager.getStreamManager(MockDestinationCatalogFactory.stream1.descriptor) repeat(100) { streamManager.countRecordIn() } - streamManager.markEndOfStream() + streamManager.markEndOfStream(true) // Verify incomplete batch triggers process batch val incompleteBatch = BatchEnvelope(MockBatch(Batch.State.LOCAL), range) @@ -468,7 +468,7 @@ class DestinationTaskLauncherTest { val range = TreeRangeSet.create(listOf(Range.closed(0L, 0L))) val streamManager = syncManager.getStreamManager(MockDestinationCatalogFactory.stream1.descriptor) - streamManager.markEndOfStream() + streamManager.markEndOfStream(true) val emptyBatch = BatchEnvelope(MockBatch(Batch.State.COMPLETE), range) taskLauncher.handleNewBatch(MockDestinationCatalogFactory.stream1.descriptor, emptyBatch) diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskTest.kt index 0c056ee8f5ab..2623715bf598 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskTest.kt @@ -14,7 +14,7 @@ import io.airbyte.cdk.load.message.GlobalCheckpointWrapped import io.airbyte.cdk.load.message.MessageQueue import io.airbyte.cdk.load.message.MessageQueueSupplier import io.airbyte.cdk.load.message.StreamCheckpointWrapped -import io.airbyte.cdk.load.message.StreamCompleteEvent +import io.airbyte.cdk.load.message.StreamEndEvent import io.airbyte.cdk.load.message.StreamRecordEvent import io.airbyte.cdk.load.state.ReservationManager import io.airbyte.cdk.load.state.Reserved @@ -144,11 +144,11 @@ class InputConsumerTaskTest { Assertions.assertEquals(expectedRecords, messages1.map { it.value }) Assertions.assertEquals(expectedRecords.map { _ -> 1L }, messages1.map { it.bytesReserved }) - Assertions.assertEquals(StreamCompleteEvent(10), streamComplete1.value) + Assertions.assertEquals(StreamEndEvent(10), streamComplete1.value) Assertions.assertEquals(1, streamComplete1.bytesReserved) Assertions.assertEquals(10L, manager1.recordCount()) Assertions.assertEquals(emptyList(), queue1.consume().toList()) - Assertions.assertEquals(StreamCompleteEvent(0), streamComplete2.value) + Assertions.assertEquals(StreamEndEvent(0), streamComplete2.value) Assertions.assertEquals(emptyList(), queue2.consume().toList()) Assertions.assertEquals(0L, manager2.recordCount()) mockInputFlow.stop() @@ -208,7 +208,7 @@ class InputConsumerTaskTest { "test" ) ), - StreamCompleteEvent(1) + StreamEndEvent(1) ), queue2.consume().toList().map { it.value } ) @@ -220,7 +220,7 @@ class InputConsumerTaskTest { queue1.close() val messages1 = queue1.consume().toList() Assertions.assertEquals(11, messages1.size) - Assertions.assertEquals(messages1[10].value, StreamCompleteEvent(10)) + Assertions.assertEquals(messages1[10].value, StreamEndEvent(10)) Assertions.assertEquals( mockInputFlow.initialMemory - 11, mockInputFlow.memoryManager.remainingCapacityBytes, @@ -353,30 +353,6 @@ class InputConsumerTaskTest { mockInputFlow.stop() } - @Test - fun testStreamIncompleteThrows() = runTest { - mockInputFlow.addMessage( - StubDestinationMessageFactory.makeRecord(MockDestinationCatalogFactory.stream1, "test"), - 1L - ) - mockInputFlow.addMessage( - StubDestinationMessageFactory.makeStreamIncomplete( - MockDestinationCatalogFactory.stream1 - ), - 0L - ) - val task = - taskFactory.make( - mockCatalogFactory.make(), - mockInputFlow, - recordQueueSupplier, - checkpointQueue, - mockk(), - ) - CoroutineTestUtils.assertThrows(IllegalStateException::class) { task.execute() } - mockInputFlow.stop() - } - @Test fun testFileStreamIncompleteThrows() = runTest { mockInputFlow.addMessage( diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTaskTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTaskTest.kt index 333671c05819..8dec6cfc48c4 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTaskTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTaskTest.kt @@ -17,7 +17,7 @@ import io.airbyte.cdk.load.message.DestinationStreamEventQueue import io.airbyte.cdk.load.message.DestinationStreamQueueSupplier import io.airbyte.cdk.load.message.MessageQueueSupplier import io.airbyte.cdk.load.message.MultiProducerChannel -import io.airbyte.cdk.load.message.StreamCompleteEvent +import io.airbyte.cdk.load.message.StreamEndEvent import io.airbyte.cdk.load.message.StreamFlushEvent import io.airbyte.cdk.load.message.StreamRecordEvent import io.airbyte.cdk.load.state.FlushStrategy @@ -111,7 +111,7 @@ class SpillToDiskTaskTest { @Test fun `publishes 'spilled file' aggregates on stream complete event`() = runTest { - val completeMsg = StreamCompleteEvent(0L) + val completeMsg = StreamEndEvent(0L) inputQueue.publish(Reserved(value = completeMsg)) val job = launch { @@ -267,7 +267,7 @@ class SpillToDiskTaskTest { queue.publish( memoryManager.reserve( 0L, - StreamCompleteEvent(index = maxRecords), + StreamEndEvent(index = maxRecords), ), ) return recordsWritten diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt index a183268db380..c58193d311dc 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt @@ -18,7 +18,7 @@ import io.airbyte.cdk.load.message.Batch import io.airbyte.cdk.load.message.DestinationFile import io.airbyte.cdk.load.message.DestinationRecord import io.airbyte.cdk.load.state.DestinationStateManager -import io.airbyte.cdk.load.state.StreamIncompleteResult +import io.airbyte.cdk.load.state.StreamProcessingFailed import io.airbyte.cdk.load.state.object_storage.ObjectStorageDestinationState import io.airbyte.cdk.load.write.StreamLoader import io.github.oshai.kotlinlogging.KotlinLogging @@ -160,7 +160,7 @@ class ObjectStorageStreamLoader, U : OutputStream>( ) } - override suspend fun close(streamFailure: StreamIncompleteResult?) { + override suspend fun close(streamFailure: StreamProcessingFailed?) { if (streamFailure != null) { log.info { "Sync failed, persisting destination state for next run" } destinationStateManager.persistState(stream) diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt index 04bec228cc53..cfc19c2fb5b2 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt @@ -491,7 +491,7 @@ protected constructor( * both syncs are preserved. */ @Test - open fun testOverwriteSyncFailedResumedGeneration() { + fun testOverwriteSyncFailedResumedGeneration() { assumeTrue( implementsOverwrite(), "Destination's spec.json does not support overwrite sync mode." @@ -525,7 +525,7 @@ protected constructor( /** Test runs 2 failed syncs and verifies the previous sync objects are not cleaned up. */ @Test - open fun testOverwriteSyncMultipleFailedGenerationsFilesPreserved() { + fun testOverwriteSyncMultipleFailedGenerationsFilesPreserved() { assumeTrue( implementsOverwrite(), "Destination's spec.json does not support overwrite sync mode." diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml b/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml index ac113231876e..7f67f917ea30 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml +++ b/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: file connectorType: destination definitionId: 37a928c1-2d5c-431a-a97d-ae236bd1ea0c - dockerImageTag: 0.1.12 + dockerImageTag: 0.1.13 dockerRepository: airbyte/destination-iceberg-v2 githubIssueLabel: destination-iceberg-v2 icon: s3.svg diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergStreamLoader.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergStreamLoader.kt index d61db456764a..28695edcd9a8 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergStreamLoader.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergStreamLoader.kt @@ -11,7 +11,7 @@ import io.airbyte.cdk.load.message.Batch import io.airbyte.cdk.load.message.DestinationFile import io.airbyte.cdk.load.message.DestinationRecord import io.airbyte.cdk.load.message.SimpleBatch -import io.airbyte.cdk.load.state.StreamIncompleteResult +import io.airbyte.cdk.load.state.StreamProcessingFailed import io.airbyte.cdk.load.write.StreamLoader import io.airbyte.integrations.destination.iceberg.v2.io.IcebergTableCleaner import io.airbyte.integrations.destination.iceberg.v2.io.IcebergTableWriterFactory @@ -74,7 +74,7 @@ class IcebergStreamLoader( throw NotImplementedError("Destination Iceberg does not support universal file transfer.") } - override suspend fun close(streamFailure: StreamIncompleteResult?) { + override suspend fun close(streamFailure: StreamProcessingFailed?) { if (streamFailure == null) { // Doing it first to make sure that data coming in the current batch is written to the // main branch diff --git a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml index 539593629daf..947599e42402 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: file connectorType: destination definitionId: d6116991-e809-4c7c-ae09-c64712df5b66 - dockerImageTag: 0.3.1 + dockerImageTag: 0.3.2 dockerRepository: airbyte/destination-s3-v2 githubIssueLabel: destination-s3-v2 icon: s3.svg diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2AvroDestinationAcceptanceTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2AvroDestinationAcceptanceTest.kt index 1ca6a2aaf11f..13165999d5be 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2AvroDestinationAcceptanceTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2AvroDestinationAcceptanceTest.kt @@ -21,8 +21,4 @@ class S3V2AvroDestinationAcceptanceTest : S3BaseAvroDestinationAcceptanceTest() override val baseConfigJson: JsonNode get() = S3V2DestinationTestUtils.baseConfigJsonFilePath - - // Disable these tests until we fix the incomplete stream handling behavior. - override fun testOverwriteSyncMultipleFailedGenerationsFilesPreserved() {} - override fun testOverwriteSyncFailedResumedGeneration() {} } diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvAssumeRoleDestinationAcceptanceTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvAssumeRoleDestinationAcceptanceTest.kt index f46ff5513fce..b7e8700c2aed 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvAssumeRoleDestinationAcceptanceTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvAssumeRoleDestinationAcceptanceTest.kt @@ -22,8 +22,4 @@ class S3V2CsvAssumeRoleDestinationAcceptanceTest : S3BaseCsvDestinationAcceptanc override fun testFakeFileTransfer() { super.testFakeFileTransfer() } - - // Disable these tests until we fix the incomplete stream handling behavior. - override fun testOverwriteSyncMultipleFailedGenerationsFilesPreserved() {} - override fun testOverwriteSyncFailedResumedGeneration() {} } diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvDestinationAcceptanceTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvDestinationAcceptanceTest.kt index b695bf4c7a20..9c106d38588c 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvDestinationAcceptanceTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvDestinationAcceptanceTest.kt @@ -15,8 +15,4 @@ class S3V2CsvDestinationAcceptanceTest : S3BaseCsvDestinationAcceptanceTest() { override val baseConfigJson: JsonNode get() = S3V2DestinationTestUtils.baseConfigJsonFilePath - - // Disable these tests until we fix the incomplete stream handling behavior. - override fun testOverwriteSyncMultipleFailedGenerationsFilesPreserved() {} - override fun testOverwriteSyncFailedResumedGeneration() {} } diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvGzipDestinationAcceptanceTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvGzipDestinationAcceptanceTest.kt index 922312d10e62..880315a616ef 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvGzipDestinationAcceptanceTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvGzipDestinationAcceptanceTest.kt @@ -15,8 +15,4 @@ class S3V2CsvGzipDestinationAcceptanceTest : S3BaseCsvGzipDestinationAcceptanceT override val baseConfigJson: JsonNode get() = S3V2DestinationTestUtils.baseConfigJsonFilePath - - // Disable these tests until we fix the incomplete stream handling behavior. - override fun testOverwriteSyncMultipleFailedGenerationsFilesPreserved() {} - override fun testOverwriteSyncFailedResumedGeneration() {} } diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2JsonlDestinationAcceptanceTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2JsonlDestinationAcceptanceTest.kt index 1090fdc4e595..b6c68c8c1009 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2JsonlDestinationAcceptanceTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2JsonlDestinationAcceptanceTest.kt @@ -15,8 +15,4 @@ class S3V2JsonlDestinationAcceptanceTest : S3BaseJsonlDestinationAcceptanceTest( override val baseConfigJson: JsonNode get() = S3V2DestinationTestUtils.baseConfigJsonFilePath - - // Disable these tests until we fix the incomplete stream handling behavior. - override fun testOverwriteSyncMultipleFailedGenerationsFilesPreserved() {} - override fun testOverwriteSyncFailedResumedGeneration() {} } diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2JsonlGzipDestinationAcceptanceTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2JsonlGzipDestinationAcceptanceTest.kt index e6ffe789bdf1..7798966caf3e 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2JsonlGzipDestinationAcceptanceTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2JsonlGzipDestinationAcceptanceTest.kt @@ -15,8 +15,4 @@ class S3V2JsonlGzipDestinationAcceptanceTest : S3BaseJsonlGzipDestinationAccepta override val baseConfigJson: JsonNode get() = S3V2DestinationTestUtils.baseConfigJsonFilePath - - // Disable these tests until we fix the incomplete stream handling behavior. - override fun testOverwriteSyncMultipleFailedGenerationsFilesPreserved() {} - override fun testOverwriteSyncFailedResumedGeneration() {} } diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2ParquetDestinationAcceptanceTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2ParquetDestinationAcceptanceTest.kt index 5c19502dd729..c5c02597e7a2 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2ParquetDestinationAcceptanceTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2ParquetDestinationAcceptanceTest.kt @@ -73,8 +73,4 @@ class S3V2ParquetDestinationAcceptanceTest : S3BaseParquetDestinationAcceptanceT runSyncAndVerifyStateOutput(config, messages, configuredCatalog, false) } - - // Disable these tests until we fix the incomplete stream handling behavior. - override fun testOverwriteSyncMultipleFailedGenerationsFilesPreserved() {} - override fun testOverwriteSyncFailedResumedGeneration() {} }