From bdaae730ab9bedf6b1554cb3a79fe020aa2eeedd Mon Sep 17 00:00:00 2001
From: "Ryan Br..." <ryan.broughan@gmail.com>
Date: Fri, 13 Dec 2024 15:37:05 -0800
Subject: [PATCH] Rbroughan/dont fail fast on stream incomplete (#49455)

---
 .../MockDestinationWriter.kt                  |  4 +-
 .../cdk/load/command/DestinationCatalog.kt    |  2 +
 .../cdk/load/config/SyncBeanFactory.kt        |  5 +-
 .../load/message/DestinationMessageQueues.kt  | 10 +++-
 .../cdk/load/message/MultiProducerChannel.kt  | 30 ++++------
 .../airbyte/cdk/load/state/StreamManager.kt   | 59 ++++++++++---------
 .../io/airbyte/cdk/load/state/SyncManager.kt  | 58 +++++++++++-------
 .../cdk/load/task/DestinationTaskLauncher.kt  |  2 +-
 .../load/task/implementor/CloseStreamTask.kt  |  5 +-
 .../load/task/implementor/FailStreamTask.kt   | 14 ++---
 .../cdk/load/task/implementor/FailSyncTask.kt |  7 ++-
 .../cdk/load/task/implementor/TeardownTask.kt |  8 +--
 .../load/task/internal/InputConsumerTask.kt   | 14 +++--
 .../cdk/load/task/internal/SpillToDiskTask.kt | 15 +++--
 .../cdk/load/write/DestinationWriter.kt       |  4 +-
 .../io/airbyte/cdk/load/write/StreamLoader.kt |  4 +-
 .../load/write/StreamsIncompleteException.kt  | 16 +++++
 .../airbyte/cdk/load/write/WriteOperation.kt  | 22 ++++---
 .../load/message/MultiProducerChannelTest.kt  | 26 ++------
 .../cdk/load/state/StreamManagerTest.kt       | 49 ++++++---------
 .../airbyte/cdk/load/state/SyncManagerTest.kt | 56 +++++++++---------
 .../load/task/DestinationTaskLauncherTest.kt  |  4 +-
 .../task/internal/InputConsumerTaskTest.kt    | 34 ++---------
 .../load/task/internal/SpillToDiskTaskTest.kt |  6 +-
 .../ObjectStorageStreamLoaderFactory.kt       |  4 +-
 .../s3/S3DestinationAcceptanceTest.kt         |  4 +-
 .../destination-iceberg-v2/metadata.yaml      |  2 +-
 .../iceberg/v2/IcebergStreamLoader.kt         |  4 +-
 .../destination-s3-v2/metadata.yaml           |  2 +-
 .../s3/S3V2AvroDestinationAcceptanceTest.kt   |  4 --
 ...2CsvAssumeRoleDestinationAcceptanceTest.kt |  4 --
 .../s3/S3V2CsvDestinationAcceptanceTest.kt    |  4 --
 .../S3V2CsvGzipDestinationAcceptanceTest.kt   |  4 --
 .../s3/S3V2JsonlDestinationAcceptanceTest.kt  |  4 --
 .../S3V2JsonlGzipDestinationAcceptanceTest.kt |  4 --
 .../S3V2ParquetDestinationAcceptanceTest.kt   |  4 --
 36 files changed, 235 insertions(+), 263 deletions(-)
 create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamsIncompleteException.kt

diff --git a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationWriter.kt b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationWriter.kt
index c1e7db92c6b7..04ea591d91b1 100644
--- a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationWriter.kt
+++ b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationWriter.kt
@@ -12,7 +12,7 @@ import io.airbyte.cdk.load.message.Batch
 import io.airbyte.cdk.load.message.DestinationFile
 import io.airbyte.cdk.load.message.DestinationRecord
 import io.airbyte.cdk.load.message.SimpleBatch
-import io.airbyte.cdk.load.state.StreamIncompleteResult
+import io.airbyte.cdk.load.state.StreamProcessingFailed
 import io.airbyte.cdk.load.test.util.OutputRecord
 import io.airbyte.cdk.load.write.DestinationWriter
 import io.airbyte.cdk.load.write.StreamLoader
@@ -42,7 +42,7 @@ class MockStreamLoader(override val stream: DestinationStream) : StreamLoader {
         override val state = Batch.State.PERSISTED
     }
 
-    override suspend fun close(streamFailure: StreamIncompleteResult?) {
+    override suspend fun close(streamFailure: StreamProcessingFailed?) {
         if (streamFailure == null) {
             when (val importType = stream.importType) {
                 is Append -> {
diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationCatalog.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationCatalog.kt
index 4e9a3b1b94ef..2f84cdf19c86 100644
--- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationCatalog.kt
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationCatalog.kt
@@ -37,6 +37,8 @@ data class DestinationCatalog(val streams: List<DestinationStream> = emptyList()
 
     fun asProtocolObject(): ConfiguredAirbyteCatalog =
         ConfiguredAirbyteCatalog().withStreams(streams.map { it.asProtocolObject() })
+
+    fun size(): Int = streams.size
 }
 
 interface DestinationCatalogFactory {
diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt
index 6f6f1f1c2389..488682f6b24c 100644
--- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt
@@ -4,6 +4,7 @@
 
 package io.airbyte.cdk.load.config
 
+import io.airbyte.cdk.load.command.DestinationCatalog
 import io.airbyte.cdk.load.command.DestinationConfiguration
 import io.airbyte.cdk.load.message.MultiProducerChannel
 import io.airbyte.cdk.load.state.ReservationManager
@@ -51,7 +52,9 @@ class SyncBeanFactory {
     fun fileAggregateQueue(
         @Value("\${airbyte.resources.disk.bytes}") availableBytes: Long,
         config: DestinationConfiguration,
+        catalog: DestinationCatalog
     ): MultiProducerChannel<FileAggregateMessage> {
+        val streamCount = catalog.size()
         // total batches by disk capacity
         val maxBatchesThatFitOnDisk = (availableBytes / config.recordBatchSizeBytes).toInt()
         // account for batches in flight processing by the workers
@@ -64,6 +67,6 @@ class SyncBeanFactory {
         val capacity = min(maxBatchesMinusUploadOverhead, idealDepth)
         log.info { "Creating file aggregate queue with limit $capacity" }
         val channel = Channel<FileAggregateMessage>(capacity)
-        return MultiProducerChannel(channel)
+        return MultiProducerChannel(streamCount.toLong(), channel)
     }
 }
diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessageQueues.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessageQueues.kt
index 457799c1c9d0..7b68e69de3b9 100644
--- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessageQueues.kt
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessageQueues.kt
@@ -26,18 +26,26 @@ interface Sized {
  */
 sealed class DestinationStreamEvent : Sized
 
+/** Contains a record to be aggregated and processed. */
 data class StreamRecordEvent(
     val index: Long,
     override val sizeBytes: Long,
     val record: DestinationRecord
 ) : DestinationStreamEvent()
 
-data class StreamCompleteEvent(
+/**
+ * Indicates the stream is in a terminal (complete or incomplete) state as signalled by upstream.
+ */
+data class StreamEndEvent(
     val index: Long,
 ) : DestinationStreamEvent() {
     override val sizeBytes: Long = 0L
 }
 
+/**
+ * Emitted to trigger evaluation of the conditional flush logic of a stream. The consumer may or may
+ * not decide to flush.
+ */
 data class StreamFlushEvent(
     val tickedAtMs: Long,
 ) : DestinationStreamEvent() {
diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MultiProducerChannel.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MultiProducerChannel.kt
index 734a032a507a..db46835ab87b 100644
--- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MultiProducerChannel.kt
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MultiProducerChannel.kt
@@ -5,37 +5,29 @@
 package io.airbyte.cdk.load.message
 
 import io.github.oshai.kotlinlogging.KotlinLogging
-import java.lang.IllegalStateException
-import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.atomic.AtomicLong
 import kotlinx.coroutines.channels.Channel
 
 /**
- * A channel designed for use with a dynamic amount of producers. Close will only close the
+ * A channel designed for use with a fixed amount of producers. Close will be called on the
  * underlying channel, when there are no remaining registered producers.
  */
-class MultiProducerChannel<T>(override val channel: Channel<T>) : ChannelMessageQueue<T>() {
+class MultiProducerChannel<T>(
+    producerCount: Long,
+    override val channel: Channel<T>,
+) : ChannelMessageQueue<T>() {
     private val log = KotlinLogging.logger {}
-    private val producerCount = AtomicLong(0)
-    private val closed = AtomicBoolean(false)
-
-    fun registerProducer(): MultiProducerChannel<T> {
-        if (closed.get()) {
-            throw IllegalStateException("Attempted to register producer for closed channel.")
-        }
-
-        val count = producerCount.incrementAndGet()
-        log.info { "Registering producer (count=$count)" }
-        return this
-    }
+    private val initializedProducerCount = producerCount
+    private val producerCount = AtomicLong(producerCount)
 
     override suspend fun close() {
         val count = producerCount.decrementAndGet()
-        log.info { "Closing producer (count=$count)" }
+        log.info {
+            "Closing producer (active count=$count, initialized count: $initializedProducerCount)"
+        }
         if (count == 0L) {
-            log.info { "Closing queue" }
+            log.info { "Closing underlying queue" }
             channel.close()
-            closed.getAndSet(true)
         }
     }
 }
diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/StreamManager.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/StreamManager.kt
index f83254ef4308..491b86fa808a 100644
--- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/StreamManager.kt
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/StreamManager.kt
@@ -20,13 +20,9 @@ import kotlinx.coroutines.CompletableDeferred
 
 sealed interface StreamResult
 
-sealed interface StreamIncompleteResult : StreamResult
+data class StreamProcessingFailed(val streamException: Exception) : StreamResult
 
-data class StreamFailed(val streamException: Exception) : StreamIncompleteResult
-
-data class StreamKilled(val syncException: Exception) : StreamIncompleteResult
-
-data object StreamSucceeded : StreamResult
+data object StreamProcessingSucceeded : StreamResult
 
 /** Manages the state of a single stream. */
 interface StreamManager {
@@ -38,13 +34,17 @@ interface StreamManager {
     fun recordCount(): Long
 
     /**
-     * Mark the end-of-stream and return the record count. Expect this exactly once. Expect no
-     * further `countRecordIn`, and expect that [markSucceeded] or [markFailed] or [markKilled] will
-     * alway occur after this.
+     * Mark the end-of-stream, set the end of stream variant (complete or incomplete) and return the
+     * record count. Expect this exactly once. Expect no further `countRecordIn`, and expect that
+     * [markProcessingSucceeded] will always occur after this, while [markProcessingFailed] can
+     * occur before or after.
      */
-    fun markEndOfStream(): Long
+    fun markEndOfStream(receivedStreamCompleteMessage: Boolean): Long
     fun endOfStreamRead(): Boolean
 
+    /** Whether we received a stream complete message for the managed stream. */
+    fun isComplete(): Boolean
+
     /**
      * Mark a checkpoint in the stream and return the current index and the number of records since
      * the last one.
@@ -72,22 +72,23 @@ interface StreamManager {
      */
     fun areRecordsPersistedUntil(index: Long): Boolean
 
-    /** Mark the stream as closed. This should only be called after all records have been read. */
-    fun markSucceeded()
-
     /**
-     * Mark that the stream was killed due to failure elsewhere. Returns false if task was already
-     * complete.
+     * Indicates destination processing of the stream succeeded, regardless of complete/incomplete
+     * status. This should only be called after all records and end of stream messages have been
+     * read.
      */
-    fun markKilled(causedBy: Exception): Boolean
+    fun markProcessingSucceeded()
 
-    /** Mark that the stream itself failed. Return false if task was already complete */
-    fun markFailed(causedBy: Exception): Boolean
+    /**
+     * Indicates destination processing of the stream failed. Returns false if task was already
+     * complete
+     */
+    fun markProcessingFailed(causedBy: Exception): Boolean
 
     /** Suspend until the stream completes, returning the result. */
     suspend fun awaitStreamResult(): StreamResult
 
-    /** True if the stream has not yet been marked successful, failed, or killed. */
+    /** True if the stream processing has not yet been marked as successful or failed. */
     fun isActive(): Boolean
 }
 
@@ -105,6 +106,7 @@ class DefaultStreamManager(
     private val lastCheckpoint = AtomicLong(0L)
 
     private val markedEndOfStream = AtomicBoolean(false)
+    private val receivedComplete = AtomicBoolean(false)
 
     private val rangesState: ConcurrentHashMap<Batch.State, RangeSet<Long>> = ConcurrentHashMap()
 
@@ -124,10 +126,11 @@ class DefaultStreamManager(
         return recordCount.get()
     }
 
-    override fun markEndOfStream(): Long {
+    override fun markEndOfStream(receivedStreamCompleteMessage: Boolean): Long {
         if (markedEndOfStream.getAndSet(true)) {
             throw IllegalStateException("Stream is closed for reading")
         }
+        receivedComplete.getAndSet(receivedStreamCompleteMessage)
 
         return recordCount.get()
     }
@@ -136,6 +139,10 @@ class DefaultStreamManager(
         return markedEndOfStream.get()
     }
 
+    override fun isComplete(): Boolean {
+        return receivedComplete.get()
+    }
+
     override fun markCheckpoint(): Pair<Long, Long> {
         val index = recordCount.get()
         val lastCheckpoint = lastCheckpoint.getAndSet(index)
@@ -220,19 +227,15 @@ class DefaultStreamManager(
         return isProcessingCompleteForState(index, Batch.State.PERSISTED)
     }
 
-    override fun markSucceeded() {
+    override fun markProcessingSucceeded() {
         if (!markedEndOfStream.get()) {
             throw IllegalStateException("Stream is not closed for reading")
         }
-        streamResult.complete(StreamSucceeded)
-    }
-
-    override fun markKilled(causedBy: Exception): Boolean {
-        return streamResult.complete(StreamKilled(causedBy))
+        streamResult.complete(StreamProcessingSucceeded)
     }
 
-    override fun markFailed(causedBy: Exception): Boolean {
-        return streamResult.complete(StreamFailed(causedBy))
+    override fun markProcessingFailed(causedBy: Exception): Boolean {
+        return streamResult.complete(StreamProcessingFailed(causedBy))
     }
 
     override suspend fun awaitStreamResult(): StreamResult {
diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/SyncManager.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/SyncManager.kt
index 3a54d8898cae..eeefe4d21aad 100644
--- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/SyncManager.kt
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/SyncManager.kt
@@ -14,14 +14,14 @@ import jakarta.inject.Singleton
 import java.util.concurrent.ConcurrentHashMap
 import kotlinx.coroutines.CompletableDeferred
 
-sealed interface SyncResult
+sealed interface DestinationResult
 
-data object SyncSuccess : SyncResult
+data object DestinationSuccess : DestinationResult
 
-data class SyncFailure(
-    val syncFailure: Exception,
+data class DestinationFailure(
+    val cause: Exception,
     val streamResults: Map<DestinationStream.Descriptor, StreamResult>
-) : SyncResult
+) : DestinationResult
 
 /** Manages the state of all streams in the destination. */
 interface SyncManager {
@@ -35,18 +35,26 @@ interface SyncManager {
     suspend fun getOrAwaitStreamLoader(stream: DestinationStream.Descriptor): StreamLoader
     suspend fun getStreamLoaderOrNull(stream: DestinationStream.Descriptor): StreamLoader?
 
-    /** Suspend until all streams are complete. Returns false if any stream was failed/killed. */
-    suspend fun awaitAllStreamsCompletedSuccessfully(): Boolean
+    /**
+     * Suspend until all streams are processed successfully. Returns false if processing failed for
+     * any stream.
+     */
+    suspend fun awaitAllStreamsProcessedSuccessfully(): Boolean
 
     suspend fun markInputConsumed()
     suspend fun markCheckpointsProcessed()
-    suspend fun markFailed(causedBy: Exception): SyncFailure
-    suspend fun markSucceeded()
+    suspend fun markDestinationFailed(causedBy: Exception): DestinationFailure
+    suspend fun markDestinationSucceeded()
+
+    /**
+     * Whether we received stream complete messages for all streams in the catalog from upstream.
+     */
+    suspend fun allStreamsComplete(): Boolean
 
     fun isActive(): Boolean
 
-    suspend fun awaitInputProcessingComplete(): Unit
-    suspend fun awaitSyncResult(): SyncResult
+    suspend fun awaitInputProcessingComplete()
+    suspend fun awaitDestinationResult(): DestinationResult
 }
 
 @SuppressFBWarnings(
@@ -56,7 +64,7 @@ interface SyncManager {
 class DefaultSyncManager(
     private val streamManagers: ConcurrentHashMap<DestinationStream.Descriptor, StreamManager>
 ) : SyncManager {
-    private val syncResult = CompletableDeferred<SyncResult>()
+    private val destinationResult = CompletableDeferred<DestinationResult>()
     private val streamLoaders =
         ConcurrentHashMap<DestinationStream.Descriptor, CompletableDeferred<Result<StreamLoader>>>()
     private val inputConsumed = CompletableDeferred<Boolean>()
@@ -87,32 +95,38 @@ class DefaultSyncManager(
         return streamLoaders[stream]?.await()?.getOrNull()
     }
 
-    override suspend fun awaitAllStreamsCompletedSuccessfully(): Boolean {
-        return streamManagers.all { (_, manager) -> manager.awaitStreamResult() is StreamSucceeded }
+    override suspend fun awaitAllStreamsProcessedSuccessfully(): Boolean {
+        return streamManagers.all { (_, manager) ->
+            manager.awaitStreamResult() is StreamProcessingSucceeded
+        }
     }
 
-    override suspend fun markFailed(causedBy: Exception): SyncFailure {
+    override suspend fun markDestinationFailed(causedBy: Exception): DestinationFailure {
         val result =
-            SyncFailure(causedBy, streamManagers.mapValues { it.value.awaitStreamResult() })
-        syncResult.complete(result)
+            DestinationFailure(causedBy, streamManagers.mapValues { it.value.awaitStreamResult() })
+        destinationResult.complete(result)
         return result
     }
 
-    override suspend fun markSucceeded() {
+    override suspend fun markDestinationSucceeded() {
         if (streamManagers.values.any { it.isActive() }) {
             throw IllegalStateException(
                 "Cannot mark sync as succeeded until all streams are complete"
             )
         }
-        syncResult.complete(SyncSuccess)
+        destinationResult.complete(DestinationSuccess)
+    }
+
+    override suspend fun allStreamsComplete(): Boolean {
+        return streamManagers.all { it.value.isComplete() }
     }
 
     override fun isActive(): Boolean {
-        return syncResult.isActive
+        return destinationResult.isActive
     }
 
-    override suspend fun awaitSyncResult(): SyncResult {
-        return syncResult.await()
+    override suspend fun awaitDestinationResult(): DestinationResult {
+        return destinationResult.await()
     }
 
     override suspend fun awaitInputProcessingComplete() {
diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt
index af35955cb748..3b0da20da844 100644
--- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt
@@ -125,7 +125,7 @@ class DefaultDestinationTaskLauncher(
     // File transfer
     @Value("\${airbyte.file-transfer.enabled}") private val fileTransferEnabled: Boolean,
 
-    // Input Comsumer requirements
+    // Input Consumer requirements
     private val inputFlow: SizedInputFlow<Reserved<DestinationMessage>>,
     private val recordQueueSupplier:
         MessageQueueSupplier<DestinationStream.Descriptor, Reserved<DestinationStreamEvent>>,
diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/CloseStreamTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/CloseStreamTask.kt
index a2a7d4f00831..14a1688e7ad9 100644
--- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/CloseStreamTask.kt
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/CloseStreamTask.kt
@@ -16,7 +16,8 @@ interface CloseStreamTask : ImplementorScope
 
 /**
  * Wraps @[StreamLoader.close] and marks the stream as closed in the stream manager. Also starts the
- * teardown task.
+ * teardown task. Called after the end of stream message (complete OR incomplete) has been received
+ * and all record messages have been processed.
  */
 class DefaultCloseStreamTask(
     private val syncManager: SyncManager,
@@ -27,7 +28,7 @@ class DefaultCloseStreamTask(
     override suspend fun execute() {
         val streamLoader = syncManager.getOrAwaitStreamLoader(streamDescriptor)
         streamLoader.close()
-        syncManager.getStreamManager(streamDescriptor).markSucceeded()
+        syncManager.getStreamManager(streamDescriptor).markProcessingSucceeded()
         taskLauncher.handleStreamClosed(streamLoader.stream.descriptor)
     }
 }
diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/FailStreamTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/FailStreamTask.kt
index 4b810cd4e30b..9959a3286ab0 100644
--- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/FailStreamTask.kt
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/FailStreamTask.kt
@@ -5,8 +5,8 @@
 package io.airbyte.cdk.load.task.implementor
 
 import io.airbyte.cdk.load.command.DestinationStream
-import io.airbyte.cdk.load.state.StreamIncompleteResult
-import io.airbyte.cdk.load.state.StreamSucceeded
+import io.airbyte.cdk.load.state.StreamProcessingFailed
+import io.airbyte.cdk.load.state.StreamProcessingSucceeded
 import io.airbyte.cdk.load.state.SyncManager
 import io.airbyte.cdk.load.task.DestinationTaskLauncher
 import io.airbyte.cdk.load.task.ImplementorScope
@@ -17,8 +17,8 @@ import jakarta.inject.Singleton
 interface FailStreamTask : ImplementorScope
 
 /**
- * FailStreamTask is a task that is executed when a stream fails. It is responsible for cleaning up
- * resources and reporting the failure.
+ * FailStreamTask is a task that is executed when the processing of a stream fails in the
+ * destination. It is responsible for cleaning up resources and reporting the failure.
  */
 class DefaultFailStreamTask(
     private val taskLauncher: DestinationTaskLauncher,
@@ -30,12 +30,12 @@ class DefaultFailStreamTask(
 
     override suspend fun execute() {
         val streamManager = syncManager.getStreamManager(stream)
-        streamManager.markFailed(exception)
+        streamManager.markProcessingFailed(exception)
         when (val streamResult = streamManager.awaitStreamResult()) {
-            is StreamSucceeded -> {
+            is StreamProcessingSucceeded -> {
                 log.info { "Cannot fail stream $stream, which is already complete, doing nothing." }
             }
-            is StreamIncompleteResult -> {
+            is StreamProcessingFailed -> {
                 syncManager.getStreamLoaderOrNull(stream)?.close(streamResult)
                     ?: log.warn { "StreamLoader not found for stream $stream, cannot call close." }
             }
diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/FailSyncTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/FailSyncTask.kt
index ff1d905bfb7d..10f64ab9de0f 100644
--- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/FailSyncTask.kt
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/FailSyncTask.kt
@@ -16,8 +16,9 @@ import jakarta.inject.Singleton
 interface FailSyncTask : ImplementorScope
 
 /**
- * FailSyncTask is a task that is executed when a sync fails. It is responsible for cleaning up
- * resources and reporting the failure.
+ * FailSyncTask is a task that is executed only when the destination itself fails during a sync. If
+ * the sync is failed by upstream (e.g. an incomplete stream message is received), we do not call
+ * this task. It is responsible for cleaning up resources and reporting the failure.
  */
 class DefaultFailSyncTask(
     private val taskLauncher: DestinationTaskLauncher,
@@ -31,7 +32,7 @@ class DefaultFailSyncTask(
     override suspend fun execute() {
         // Ensure any remaining ready state gets captured: don't waste work!
         checkpointManager.flushReadyCheckpointMessages()
-        val result = syncManager.markFailed(exception) // awaits stream completion
+        val result = syncManager.markDestinationFailed(exception) // awaits stream completion
         log.info { "Calling teardown with failure result $result" }
         destinationWriter.teardown(result)
         taskLauncher.handleTeardownComplete(success = false)
diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/TeardownTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/TeardownTask.kt
index 324564c39a05..64dada897c6b 100644
--- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/TeardownTask.kt
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/TeardownTask.kt
@@ -31,9 +31,9 @@ class DefaultTeardownTask(
     override suspend fun execute() {
         syncManager.awaitInputProcessingComplete()
 
-        log.info { "Teardown task awaiting stream completion" }
-        if (!syncManager.awaitAllStreamsCompletedSuccessfully()) {
-            log.info { "Streams failed to complete successfully, doing nothing." }
+        log.info { "Teardown task awaiting stream processing completion" }
+        if (!syncManager.awaitAllStreamsProcessedSuccessfully()) {
+            log.info { "Streams failed to be processed successfully, doing nothing." }
             return
         }
 
@@ -41,7 +41,7 @@ class DefaultTeardownTask(
         log.info { "Starting teardown task" }
         destination.teardown()
         log.info { "Teardown task complete, marking sync succeeded." }
-        syncManager.markSucceeded()
+        syncManager.markDestinationSucceeded()
         taskLauncher.handleTeardownComplete()
     }
 }
diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt
index db0c47d07da6..3bb0bb716545 100644
--- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt
@@ -27,7 +27,7 @@ import io.airbyte.cdk.load.message.QueueWriter
 import io.airbyte.cdk.load.message.SimpleBatch
 import io.airbyte.cdk.load.message.StreamCheckpoint
 import io.airbyte.cdk.load.message.StreamCheckpointWrapped
-import io.airbyte.cdk.load.message.StreamCompleteEvent
+import io.airbyte.cdk.load.message.StreamEndEvent
 import io.airbyte.cdk.load.message.StreamRecordEvent
 import io.airbyte.cdk.load.message.Undefined
 import io.airbyte.cdk.load.state.Reserved
@@ -83,19 +83,23 @@ class DefaultInputConsumerTask(
             }
             is DestinationRecordStreamComplete -> {
                 reserved.release() // safe because multiple calls conflate
-                val wrapped = StreamCompleteEvent(index = manager.markEndOfStream())
+                val wrapped = StreamEndEvent(index = manager.markEndOfStream(true))
+                recordQueue.publish(reserved.replace(wrapped))
+                recordQueue.close()
+            }
+            is DestinationRecordStreamIncomplete -> {
+                reserved.release() // safe because multiple calls conflate
+                val wrapped = StreamEndEvent(index = manager.markEndOfStream(false))
                 recordQueue.publish(reserved.replace(wrapped))
                 recordQueue.close()
             }
-            is DestinationRecordStreamIncomplete ->
-                throw IllegalStateException("Stream $stream failed upstream, cannot continue.")
             is DestinationFile -> {
                 val index = manager.countRecordIn()
                 destinationTaskLauncher.handleFile(stream, message, index)
             }
             is DestinationFileStreamComplete -> {
                 reserved.release() // safe because multiple calls conflate
-                manager.markEndOfStream()
+                manager.markEndOfStream(true)
                 val envelope = BatchEnvelope(SimpleBatch(Batch.State.COMPLETE))
                 destinationTaskLauncher.handleNewBatch(stream, envelope)
             }
diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTask.kt
index 1760dd5b6c53..ea8965002ea8 100644
--- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTask.kt
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTask.kt
@@ -15,7 +15,7 @@ import io.airbyte.cdk.load.message.MessageQueueSupplier
 import io.airbyte.cdk.load.message.MultiProducerChannel
 import io.airbyte.cdk.load.message.QueueReader
 import io.airbyte.cdk.load.message.SimpleBatch
-import io.airbyte.cdk.load.message.StreamCompleteEvent
+import io.airbyte.cdk.load.message.StreamEndEvent
 import io.airbyte.cdk.load.message.StreamFlushEvent
 import io.airbyte.cdk.load.message.StreamRecordEvent
 import io.airbyte.cdk.load.state.FlushStrategy
@@ -61,13 +61,12 @@ class DefaultSpillToDiskTask(
     override suspend fun execute() {
         val initialAccumulator = fileAccFactory.make()
 
-        val registration = outputQueue.registerProducer()
-        registration.use {
+        outputQueue.use {
             inputQueue.consume().fold(initialAccumulator) { acc, reserved ->
                 reserved.use {
                     when (val event = it.value) {
                         is StreamRecordEvent -> accRecordEvent(acc, event)
-                        is StreamCompleteEvent -> accStreamCompleteEvent(acc, event)
+                        is StreamEndEvent -> accStreamEndEvent(acc, event)
                         is StreamFlushEvent -> accFlushEvent(acc)
                     }
                 }
@@ -117,12 +116,12 @@ class DefaultSpillToDiskTask(
     }
 
     /**
-     * Handles accumulation of stream completion events, triggering a final flush if the aggregate
-     * isn't empty.
+     * Handles accumulation of stream end events (complete or incomplete), triggering a final flush
+     * if the aggregate isn't empty.
      */
-    private suspend fun accStreamCompleteEvent(
+    private suspend fun accStreamEndEvent(
         acc: FileAccumulator,
-        event: StreamCompleteEvent,
+        event: StreamEndEvent,
     ): FileAccumulator {
         val (spillFile, outputStream, timeWindow, range, sizeBytes) = acc
         if (sizeBytes == 0L) {
diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/DestinationWriter.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/DestinationWriter.kt
index 7ad369a3623d..665dd5abaae7 100644
--- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/DestinationWriter.kt
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/DestinationWriter.kt
@@ -5,7 +5,7 @@
 package io.airbyte.cdk.load.write
 
 import io.airbyte.cdk.load.command.DestinationStream
-import io.airbyte.cdk.load.state.SyncFailure
+import io.airbyte.cdk.load.state.DestinationFailure
 import io.micronaut.context.annotation.Secondary
 import jakarta.inject.Singleton
 
@@ -22,7 +22,7 @@ interface DestinationWriter {
 
     // Called once at the end of the job, unconditionally.
     // NOTE: we don't pass Success here, because it depends on this completing successfully.
-    suspend fun teardown(syncFailure: SyncFailure? = null) {}
+    suspend fun teardown(destinationFailure: DestinationFailure? = null) {}
 }
 
 @Singleton
diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamLoader.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamLoader.kt
index cf6070a56927..ce0a21404e3b 100644
--- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamLoader.kt
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamLoader.kt
@@ -9,7 +9,7 @@ import io.airbyte.cdk.load.message.Batch
 import io.airbyte.cdk.load.message.DestinationFile
 import io.airbyte.cdk.load.message.DestinationRecord
 import io.airbyte.cdk.load.message.SimpleBatch
-import io.airbyte.cdk.load.state.StreamIncompleteResult
+import io.airbyte.cdk.load.state.StreamProcessingFailed
 
 /**
  * Implementor interface. The framework calls open and close once per stream at the beginning and
@@ -39,5 +39,5 @@ interface StreamLoader {
     suspend fun processRecords(records: Iterator<DestinationRecord>, totalSizeBytes: Long): Batch
     suspend fun processFile(file: DestinationFile): Batch
     suspend fun processBatch(batch: Batch): Batch = SimpleBatch(Batch.State.COMPLETE)
-    suspend fun close(streamFailure: StreamIncompleteResult? = null) {}
+    suspend fun close(streamFailure: StreamProcessingFailed? = null) {}
 }
diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamsIncompleteException.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamsIncompleteException.kt
new file mode 100644
index 000000000000..a4b52f166147
--- /dev/null
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamsIncompleteException.kt
@@ -0,0 +1,16 @@
+/*
+ * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.cdk.load.write
+
+/**
+ * Thrown when the destination completes successfully, but some streams were indicated as incomplete
+ * by upstream. Without throwing an exception the sync will not be marked as succeed by the
+ * platform.
+ *
+ * TODO: Once the API with platform is updated to not require an exceptional exit code, remove this.
+ */
+class StreamsIncompleteException : Exception() {
+    override val message = "Some streams were indicated as incomplete by upstream."
+}
diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/WriteOperation.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/WriteOperation.kt
index 357d08b17e52..5d673a5b26f6 100644
--- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/WriteOperation.kt
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/WriteOperation.kt
@@ -5,9 +5,9 @@
 package io.airbyte.cdk.load.write
 
 import io.airbyte.cdk.Operation
-import io.airbyte.cdk.load.state.SyncFailure
+import io.airbyte.cdk.load.state.DestinationFailure
+import io.airbyte.cdk.load.state.DestinationSuccess
 import io.airbyte.cdk.load.state.SyncManager
-import io.airbyte.cdk.load.state.SyncSuccess
 import io.airbyte.cdk.load.task.TaskLauncher
 import io.github.oshai.kotlinlogging.KotlinLogging
 import io.micronaut.context.annotation.Factory
@@ -32,13 +32,19 @@ class WriteOperation(
     override fun execute() = runBlocking {
         taskLauncher.run()
 
-        when (val result = syncManager.awaitSyncResult()) {
-            is SyncSuccess -> {
-                log.info { "Sync completed successfully" }
+        when (val result = syncManager.awaitDestinationResult()) {
+            is DestinationSuccess -> {
+                if (!syncManager.allStreamsComplete()) {
+                    log.info {
+                        "Destination completed successfully but some streams were incomplete. Throwing to exit non-zero..."
+                    }
+                    throw StreamsIncompleteException()
+                }
+                log.info { "Destination completed successfully and all streams were complete." }
             }
-            is SyncFailure -> {
-                log.info { "Sync failed with stream results ${result.streamResults}" }
-                throw result.syncFailure
+            is DestinationFailure -> {
+                log.info { "Destination failed with stream results ${result.streamResults}" }
+                throw result.cause
             }
         }
     }
diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/message/MultiProducerChannelTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/message/MultiProducerChannelTest.kt
index e40e1c90a7c9..f301d585fe1a 100644
--- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/message/MultiProducerChannelTest.kt
+++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/message/MultiProducerChannelTest.kt
@@ -7,12 +7,10 @@ package io.airbyte.cdk.load.message
 import io.mockk.coVerify
 import io.mockk.impl.annotations.MockK
 import io.mockk.junit5.MockKExtension
-import java.lang.IllegalStateException
 import kotlinx.coroutines.channels.Channel
 import kotlinx.coroutines.test.runTest
 import org.junit.jupiter.api.BeforeEach
 import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertThrows
 import org.junit.jupiter.api.extension.ExtendWith
 
 @ExtendWith(MockKExtension::class)
@@ -21,33 +19,23 @@ class MultiProducerChannelTest {
 
     private lateinit var channel: MultiProducerChannel<String>
 
+    val size = 3L
+
     @BeforeEach
     fun setup() {
-        channel = MultiProducerChannel(wrapped)
+        channel = MultiProducerChannel(size, wrapped)
     }
 
     @Test
-    fun `cannot register a producer if channel already closed`() = runTest {
-        channel.registerProducer()
+    fun `does not close until the expected number of producers have closed`() = runTest {
         channel.close()
-        assertThrows<IllegalStateException> { channel.registerProducer() }
-    }
-
-    @Test
-    fun `does not close underlying channel while registered producers exist`() = runTest {
-        channel.registerProducer()
-        channel.registerProducer()
-
         channel.close()
+
         coVerify(exactly = 0) { wrapped.close() }
     }
 
     @Test
     fun `closes underlying channel when no producers are registered`() = runTest {
-        channel.registerProducer()
-        channel.registerProducer()
-        channel.registerProducer()
-
         channel.close()
         channel.close()
         channel.close()
@@ -56,9 +44,7 @@ class MultiProducerChannelTest {
 
     @Test
     fun `subsequent calls to to close are idempotent`() = runTest {
-        channel.registerProducer()
-        channel.registerProducer()
-
+        channel.close()
         channel.close()
         channel.close()
         channel.close()
diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/state/StreamManagerTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/state/StreamManagerTest.kt
index c6dc30ad799a..1997473e4aed 100644
--- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/state/StreamManagerTest.kt
+++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/state/StreamManagerTest.kt
@@ -62,17 +62,19 @@ class StreamManagerTest {
         val manager = DefaultStreamManager(stream1)
         val channel = Channel<Boolean>(Channel.UNLIMITED)
 
-        launch { channel.send(manager.awaitStreamResult() is StreamSucceeded) }
+        launch { channel.send(manager.awaitStreamResult() is StreamProcessingSucceeded) }
 
         delay(500)
         Assertions.assertTrue(channel.tryReceive().isFailure)
-        Assertions.assertThrows(IllegalStateException::class.java) { manager.markSucceeded() }
-        manager.markEndOfStream()
+        Assertions.assertThrows(IllegalStateException::class.java) {
+            manager.markProcessingSucceeded()
+        }
+        manager.markEndOfStream(true)
 
-        manager.markSucceeded()
+        manager.markProcessingSucceeded()
         Assertions.assertTrue(channel.receive())
 
-        Assertions.assertEquals(StreamSucceeded, manager.awaitStreamResult())
+        Assertions.assertEquals(StreamProcessingSucceeded, manager.awaitStreamResult())
     }
 
     @Test
@@ -80,29 +82,14 @@ class StreamManagerTest {
         val manager = DefaultStreamManager(stream1)
         val channel = Channel<Boolean>(Channel.UNLIMITED)
 
-        launch { channel.send(manager.awaitStreamResult() is StreamSucceeded) }
+        launch { channel.send(manager.awaitStreamResult() is StreamProcessingSucceeded) }
 
         delay(500)
         Assertions.assertTrue(channel.tryReceive().isFailure)
-        manager.markFailed(Exception("test"))
+        manager.markProcessingFailed(Exception("test"))
         Assertions.assertFalse(channel.receive())
 
-        Assertions.assertTrue(manager.awaitStreamResult() is StreamFailed)
-    }
-
-    @Test
-    fun testMarkKilled() = runTest {
-        val manager = DefaultStreamManager(stream1)
-        val channel = Channel<Boolean>(Channel.UNLIMITED)
-
-        launch { channel.send(manager.awaitStreamResult() is StreamSucceeded) }
-
-        delay(500)
-        Assertions.assertTrue(channel.tryReceive().isFailure)
-        manager.markKilled(Exception("test"))
-        Assertions.assertFalse(channel.receive())
-
-        Assertions.assertTrue(manager.awaitStreamResult() is StreamKilled)
+        Assertions.assertTrue(manager.awaitStreamResult() is StreamProcessingFailed)
     }
 
     class TestUpdateBatchStateProvider : ArgumentsProvider {
@@ -274,7 +261,7 @@ class StreamManagerTest {
             val manager = managers[stream.descriptor]!!
             when (event) {
                 is SetRecordCount -> repeat(event.count.toInt()) { manager.countRecordIn() }
-                is SetEndOfStream -> manager.markEndOfStream()
+                is SetEndOfStream -> manager.markEndOfStream(true)
                 is AddPersisted ->
                     manager.updateBatchState(
                         BatchEnvelope(
@@ -310,23 +297,25 @@ class StreamManagerTest {
         val manager = DefaultStreamManager(stream1)
 
         // Can't mark success before end-of-stream
-        Assertions.assertThrows(IllegalStateException::class.java) { manager.markSucceeded() }
+        Assertions.assertThrows(IllegalStateException::class.java) {
+            manager.markProcessingSucceeded()
+        }
 
         manager.countRecordIn()
-        manager.markEndOfStream()
+        manager.markEndOfStream(true)
 
         // Can't update after end-of-stream
         Assertions.assertThrows(IllegalStateException::class.java) { manager.countRecordIn() }
-        Assertions.assertThrows(IllegalStateException::class.java) { manager.markEndOfStream() }
+        Assertions.assertThrows(IllegalStateException::class.java) { manager.markEndOfStream(true) }
 
         // Can close now
-        Assertions.assertDoesNotThrow(manager::markSucceeded)
+        Assertions.assertDoesNotThrow(manager::markProcessingSucceeded)
     }
 
     @Test
     fun testEmptyCompletedStreamYieldsBatchProcessingComplete() {
         val manager = DefaultStreamManager(stream1)
-        manager.markEndOfStream()
+        manager.markEndOfStream(true)
         Assertions.assertTrue(manager.isBatchProcessingComplete())
     }
 
@@ -407,7 +396,7 @@ class StreamManagerTest {
 
         val range2 = Range.closed(10, 19L)
         val batch2 = BatchEnvelope(SimpleBatch(Batch.State.PERSISTED, groupId = "foo"), range2)
-        manager.markEndOfStream()
+        manager.markEndOfStream(true)
 
         manager.updateBatchState(batch2)
         manager.updateBatchState(batch1)
diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/state/SyncManagerTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/state/SyncManagerTest.kt
index 6589794d1bec..29d1f0a8049d 100644
--- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/state/SyncManagerTest.kt
+++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/state/SyncManagerTest.kt
@@ -41,43 +41,43 @@ class SyncManagerTest {
     //  deferred; B) It should probably move into a writer wrapper.
 
     @Test
-    fun testAwaitAllStreamsCompletedSuccessfully() = runTest {
+    fun testAwaitAllStreamsProcessedSuccessfully() = runTest {
         val manager1 = syncManager.getStreamManager(stream1.descriptor)
         val manager2 = syncManager.getStreamManager(stream2.descriptor)
         val completionChannel = Channel<Boolean>(Channel.UNLIMITED)
 
-        manager1.markEndOfStream()
-        manager2.markEndOfStream()
+        manager1.markEndOfStream(true)
+        manager2.markEndOfStream(true)
 
-        launch { completionChannel.send(syncManager.awaitAllStreamsCompletedSuccessfully()) }
+        launch { completionChannel.send(syncManager.awaitAllStreamsProcessedSuccessfully()) }
 
         delay(500)
         Assertions.assertTrue(completionChannel.tryReceive().isFailure)
-        manager1.markSucceeded()
+        manager1.markProcessingSucceeded()
         delay(500)
         Assertions.assertTrue(completionChannel.tryReceive().isFailure)
-        manager2.markSucceeded()
+        manager2.markProcessingSucceeded()
         Assertions.assertTrue(completionChannel.receive())
     }
 
     @Test
-    fun testAwaitAllStreamsCompletedSuccessfullyWithFailure() = runTest {
+    fun testAwaitAllStreamsProcessedSuccessfullyWithFailure() = runTest {
         val manager1 = syncManager.getStreamManager(stream1.descriptor)
         val manager2 = syncManager.getStreamManager(stream2.descriptor)
 
         val completionChannel = Channel<Boolean>(Channel.UNLIMITED)
 
-        launch { completionChannel.send(syncManager.awaitAllStreamsCompletedSuccessfully()) }
+        launch { completionChannel.send(syncManager.awaitAllStreamsProcessedSuccessfully()) }
 
-        manager1.markEndOfStream()
-        manager2.markEndOfStream()
+        manager1.markEndOfStream(true)
+        manager2.markEndOfStream(true)
 
         delay(500)
         Assertions.assertTrue(completionChannel.tryReceive().isFailure)
-        manager1.markSucceeded()
+        manager1.markProcessingSucceeded()
         delay(500)
         Assertions.assertTrue(completionChannel.tryReceive().isFailure)
-        manager2.markFailed(RuntimeException())
+        manager2.markProcessingFailed(RuntimeException())
         Assertions.assertFalse(completionChannel.receive())
     }
 
@@ -86,15 +86,15 @@ class SyncManagerTest {
         val manager1 = syncManager.getStreamManager(stream1.descriptor)
         val manager2 = syncManager.getStreamManager(stream2.descriptor)
 
-        manager1.markEndOfStream()
-        manager2.markEndOfStream()
+        manager1.markEndOfStream(true)
+        manager2.markEndOfStream(true)
 
         Assertions.assertTrue(syncManager.isActive())
-        manager1.markSucceeded()
+        manager1.markProcessingSucceeded()
         Assertions.assertTrue(syncManager.isActive())
-        manager2.markSucceeded()
+        manager2.markProcessingSucceeded()
         Assertions.assertTrue(syncManager.isActive())
-        syncManager.markSucceeded()
+        syncManager.markDestinationSucceeded()
         Assertions.assertFalse(syncManager.isActive())
     }
 
@@ -103,35 +103,35 @@ class SyncManagerTest {
         val manager1 = syncManager.getStreamManager(stream1.descriptor)
         val manager2 = syncManager.getStreamManager(stream2.descriptor)
 
-        manager1.markEndOfStream()
-        manager2.markEndOfStream()
+        manager1.markEndOfStream(true)
+        manager2.markEndOfStream(true)
 
-        val completionChannel = Channel<SyncResult>(Channel.UNLIMITED)
+        val completionChannel = Channel<DestinationResult>(Channel.UNLIMITED)
 
-        launch { completionChannel.send(syncManager.awaitSyncResult()) }
+        launch { completionChannel.send(syncManager.awaitDestinationResult()) }
 
         CoroutineTestUtils.assertThrows(IllegalStateException::class) {
-            syncManager.markSucceeded()
+            syncManager.markDestinationSucceeded()
         }
         Assertions.assertTrue(completionChannel.tryReceive().isFailure)
 
-        manager1.markSucceeded()
+        manager1.markProcessingSucceeded()
         CoroutineTestUtils.assertThrows(IllegalStateException::class) {
-            syncManager.markSucceeded()
+            syncManager.markDestinationSucceeded()
         }
         Assertions.assertTrue(completionChannel.tryReceive().isFailure)
 
-        manager2.markSucceeded()
+        manager2.markProcessingSucceeded()
         Assertions.assertTrue(completionChannel.tryReceive().isFailure)
 
-        syncManager.markSucceeded()
-        Assertions.assertEquals(SyncSuccess, completionChannel.receive())
+        syncManager.markDestinationSucceeded()
+        Assertions.assertEquals(DestinationSuccess, completionChannel.receive())
     }
 
     @Test
     fun testCrashOnNoEndOfStream() = runTest {
         val manager1 = syncManager.getStreamManager(stream1.descriptor)
-        manager1.markEndOfStream()
+        manager1.markEndOfStream(true)
         // This should fail, because stream2 was not marked with end of stream
         val e = assertThrows<IllegalStateException> { syncManager.markInputConsumed() }
         assertEquals(
diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt
index a103ed9c7eab..ebdead74a42f 100644
--- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt
+++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt
@@ -419,7 +419,7 @@ class DestinationTaskLauncherTest<T : ScopedTask> {
             syncManager.getStreamManager(MockDestinationCatalogFactory.stream1.descriptor)
         repeat(100) { streamManager.countRecordIn() }
 
-        streamManager.markEndOfStream()
+        streamManager.markEndOfStream(true)
 
         // Verify incomplete batch triggers process batch
         val incompleteBatch = BatchEnvelope(MockBatch(Batch.State.LOCAL), range)
@@ -468,7 +468,7 @@ class DestinationTaskLauncherTest<T : ScopedTask> {
         val range = TreeRangeSet.create(listOf(Range.closed(0L, 0L)))
         val streamManager =
             syncManager.getStreamManager(MockDestinationCatalogFactory.stream1.descriptor)
-        streamManager.markEndOfStream()
+        streamManager.markEndOfStream(true)
 
         val emptyBatch = BatchEnvelope(MockBatch(Batch.State.COMPLETE), range)
         taskLauncher.handleNewBatch(MockDestinationCatalogFactory.stream1.descriptor, emptyBatch)
diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskTest.kt
index 0c056ee8f5ab..2623715bf598 100644
--- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskTest.kt
+++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskTest.kt
@@ -14,7 +14,7 @@ import io.airbyte.cdk.load.message.GlobalCheckpointWrapped
 import io.airbyte.cdk.load.message.MessageQueue
 import io.airbyte.cdk.load.message.MessageQueueSupplier
 import io.airbyte.cdk.load.message.StreamCheckpointWrapped
-import io.airbyte.cdk.load.message.StreamCompleteEvent
+import io.airbyte.cdk.load.message.StreamEndEvent
 import io.airbyte.cdk.load.message.StreamRecordEvent
 import io.airbyte.cdk.load.state.ReservationManager
 import io.airbyte.cdk.load.state.Reserved
@@ -144,11 +144,11 @@ class InputConsumerTaskTest {
 
         Assertions.assertEquals(expectedRecords, messages1.map { it.value })
         Assertions.assertEquals(expectedRecords.map { _ -> 1L }, messages1.map { it.bytesReserved })
-        Assertions.assertEquals(StreamCompleteEvent(10), streamComplete1.value)
+        Assertions.assertEquals(StreamEndEvent(10), streamComplete1.value)
         Assertions.assertEquals(1, streamComplete1.bytesReserved)
         Assertions.assertEquals(10L, manager1.recordCount())
         Assertions.assertEquals(emptyList<DestinationStreamEvent>(), queue1.consume().toList())
-        Assertions.assertEquals(StreamCompleteEvent(0), streamComplete2.value)
+        Assertions.assertEquals(StreamEndEvent(0), streamComplete2.value)
         Assertions.assertEquals(emptyList<DestinationStreamEvent>(), queue2.consume().toList())
         Assertions.assertEquals(0L, manager2.recordCount())
         mockInputFlow.stop()
@@ -208,7 +208,7 @@ class InputConsumerTaskTest {
                         "test"
                     )
                 ),
-                StreamCompleteEvent(1)
+                StreamEndEvent(1)
             ),
             queue2.consume().toList().map { it.value }
         )
@@ -220,7 +220,7 @@ class InputConsumerTaskTest {
         queue1.close()
         val messages1 = queue1.consume().toList()
         Assertions.assertEquals(11, messages1.size)
-        Assertions.assertEquals(messages1[10].value, StreamCompleteEvent(10))
+        Assertions.assertEquals(messages1[10].value, StreamEndEvent(10))
         Assertions.assertEquals(
             mockInputFlow.initialMemory - 11,
             mockInputFlow.memoryManager.remainingCapacityBytes,
@@ -353,30 +353,6 @@ class InputConsumerTaskTest {
         mockInputFlow.stop()
     }
 
-    @Test
-    fun testStreamIncompleteThrows() = runTest {
-        mockInputFlow.addMessage(
-            StubDestinationMessageFactory.makeRecord(MockDestinationCatalogFactory.stream1, "test"),
-            1L
-        )
-        mockInputFlow.addMessage(
-            StubDestinationMessageFactory.makeStreamIncomplete(
-                MockDestinationCatalogFactory.stream1
-            ),
-            0L
-        )
-        val task =
-            taskFactory.make(
-                mockCatalogFactory.make(),
-                mockInputFlow,
-                recordQueueSupplier,
-                checkpointQueue,
-                mockk(),
-            )
-        CoroutineTestUtils.assertThrows(IllegalStateException::class) { task.execute() }
-        mockInputFlow.stop()
-    }
-
     @Test
     fun testFileStreamIncompleteThrows() = runTest {
         mockInputFlow.addMessage(
diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTaskTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTaskTest.kt
index 333671c05819..8dec6cfc48c4 100644
--- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTaskTest.kt
+++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTaskTest.kt
@@ -17,7 +17,7 @@ import io.airbyte.cdk.load.message.DestinationStreamEventQueue
 import io.airbyte.cdk.load.message.DestinationStreamQueueSupplier
 import io.airbyte.cdk.load.message.MessageQueueSupplier
 import io.airbyte.cdk.load.message.MultiProducerChannel
-import io.airbyte.cdk.load.message.StreamCompleteEvent
+import io.airbyte.cdk.load.message.StreamEndEvent
 import io.airbyte.cdk.load.message.StreamFlushEvent
 import io.airbyte.cdk.load.message.StreamRecordEvent
 import io.airbyte.cdk.load.state.FlushStrategy
@@ -111,7 +111,7 @@ class SpillToDiskTaskTest {
 
         @Test
         fun `publishes 'spilled file' aggregates on stream complete event`() = runTest {
-            val completeMsg = StreamCompleteEvent(0L)
+            val completeMsg = StreamEndEvent(0L)
             inputQueue.publish(Reserved(value = completeMsg))
 
             val job = launch {
@@ -267,7 +267,7 @@ class SpillToDiskTaskTest {
             queue.publish(
                 memoryManager.reserve(
                     0L,
-                    StreamCompleteEvent(index = maxRecords),
+                    StreamEndEvent(index = maxRecords),
                 ),
             )
             return recordsWritten
diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt
index a183268db380..c58193d311dc 100644
--- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt
+++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt
@@ -18,7 +18,7 @@ import io.airbyte.cdk.load.message.Batch
 import io.airbyte.cdk.load.message.DestinationFile
 import io.airbyte.cdk.load.message.DestinationRecord
 import io.airbyte.cdk.load.state.DestinationStateManager
-import io.airbyte.cdk.load.state.StreamIncompleteResult
+import io.airbyte.cdk.load.state.StreamProcessingFailed
 import io.airbyte.cdk.load.state.object_storage.ObjectStorageDestinationState
 import io.airbyte.cdk.load.write.StreamLoader
 import io.github.oshai.kotlinlogging.KotlinLogging
@@ -160,7 +160,7 @@ class ObjectStorageStreamLoader<T : RemoteObject<*>, U : OutputStream>(
         )
     }
 
-    override suspend fun close(streamFailure: StreamIncompleteResult?) {
+    override suspend fun close(streamFailure: StreamProcessingFailed?) {
         if (streamFailure != null) {
             log.info { "Sync failed, persisting destination state for next run" }
             destinationStateManager.persistState(stream)
diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt
index 04bec228cc53..cfc19c2fb5b2 100644
--- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt
+++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt
@@ -491,7 +491,7 @@ protected constructor(
      * both syncs are preserved.
      */
     @Test
-    open fun testOverwriteSyncFailedResumedGeneration() {
+    fun testOverwriteSyncFailedResumedGeneration() {
         assumeTrue(
             implementsOverwrite(),
             "Destination's spec.json does not support overwrite sync mode."
@@ -525,7 +525,7 @@ protected constructor(
 
     /** Test runs 2 failed syncs and verifies the previous sync objects are not cleaned up. */
     @Test
-    open fun testOverwriteSyncMultipleFailedGenerationsFilesPreserved() {
+    fun testOverwriteSyncMultipleFailedGenerationsFilesPreserved() {
         assumeTrue(
             implementsOverwrite(),
             "Destination's spec.json does not support overwrite sync mode."
diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml b/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml
index ac113231876e..7f67f917ea30 100644
--- a/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml
+++ b/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml
@@ -2,7 +2,7 @@ data:
   connectorSubtype: file
   connectorType: destination
   definitionId: 37a928c1-2d5c-431a-a97d-ae236bd1ea0c
-  dockerImageTag: 0.1.12
+  dockerImageTag: 0.1.13
   dockerRepository: airbyte/destination-iceberg-v2
   githubIssueLabel: destination-iceberg-v2
   icon: s3.svg
diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergStreamLoader.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergStreamLoader.kt
index d61db456764a..28695edcd9a8 100644
--- a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergStreamLoader.kt
+++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergStreamLoader.kt
@@ -11,7 +11,7 @@ import io.airbyte.cdk.load.message.Batch
 import io.airbyte.cdk.load.message.DestinationFile
 import io.airbyte.cdk.load.message.DestinationRecord
 import io.airbyte.cdk.load.message.SimpleBatch
-import io.airbyte.cdk.load.state.StreamIncompleteResult
+import io.airbyte.cdk.load.state.StreamProcessingFailed
 import io.airbyte.cdk.load.write.StreamLoader
 import io.airbyte.integrations.destination.iceberg.v2.io.IcebergTableCleaner
 import io.airbyte.integrations.destination.iceberg.v2.io.IcebergTableWriterFactory
@@ -74,7 +74,7 @@ class IcebergStreamLoader(
         throw NotImplementedError("Destination Iceberg does not support universal file transfer.")
     }
 
-    override suspend fun close(streamFailure: StreamIncompleteResult?) {
+    override suspend fun close(streamFailure: StreamProcessingFailed?) {
         if (streamFailure == null) {
             // Doing it first to make sure that data coming in the current batch is written to the
             // main branch
diff --git a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml
index 539593629daf..947599e42402 100644
--- a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml
+++ b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml
@@ -2,7 +2,7 @@ data:
   connectorSubtype: file
   connectorType: destination
   definitionId: d6116991-e809-4c7c-ae09-c64712df5b66
-  dockerImageTag: 0.3.1
+  dockerImageTag: 0.3.2
   dockerRepository: airbyte/destination-s3-v2
   githubIssueLabel: destination-s3-v2
   icon: s3.svg
diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2AvroDestinationAcceptanceTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2AvroDestinationAcceptanceTest.kt
index 1ca6a2aaf11f..13165999d5be 100644
--- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2AvroDestinationAcceptanceTest.kt
+++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2AvroDestinationAcceptanceTest.kt
@@ -21,8 +21,4 @@ class S3V2AvroDestinationAcceptanceTest : S3BaseAvroDestinationAcceptanceTest()
 
     override val baseConfigJson: JsonNode
         get() = S3V2DestinationTestUtils.baseConfigJsonFilePath
-
-    // Disable these tests until we fix the incomplete stream handling behavior.
-    override fun testOverwriteSyncMultipleFailedGenerationsFilesPreserved() {}
-    override fun testOverwriteSyncFailedResumedGeneration() {}
 }
diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvAssumeRoleDestinationAcceptanceTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvAssumeRoleDestinationAcceptanceTest.kt
index f46ff5513fce..b7e8700c2aed 100644
--- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvAssumeRoleDestinationAcceptanceTest.kt
+++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvAssumeRoleDestinationAcceptanceTest.kt
@@ -22,8 +22,4 @@ class S3V2CsvAssumeRoleDestinationAcceptanceTest : S3BaseCsvDestinationAcceptanc
     override fun testFakeFileTransfer() {
         super.testFakeFileTransfer()
     }
-
-    // Disable these tests until we fix the incomplete stream handling behavior.
-    override fun testOverwriteSyncMultipleFailedGenerationsFilesPreserved() {}
-    override fun testOverwriteSyncFailedResumedGeneration() {}
 }
diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvDestinationAcceptanceTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvDestinationAcceptanceTest.kt
index b695bf4c7a20..9c106d38588c 100644
--- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvDestinationAcceptanceTest.kt
+++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvDestinationAcceptanceTest.kt
@@ -15,8 +15,4 @@ class S3V2CsvDestinationAcceptanceTest : S3BaseCsvDestinationAcceptanceTest() {
 
     override val baseConfigJson: JsonNode
         get() = S3V2DestinationTestUtils.baseConfigJsonFilePath
-
-    // Disable these tests until we fix the incomplete stream handling behavior.
-    override fun testOverwriteSyncMultipleFailedGenerationsFilesPreserved() {}
-    override fun testOverwriteSyncFailedResumedGeneration() {}
 }
diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvGzipDestinationAcceptanceTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvGzipDestinationAcceptanceTest.kt
index 922312d10e62..880315a616ef 100644
--- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvGzipDestinationAcceptanceTest.kt
+++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvGzipDestinationAcceptanceTest.kt
@@ -15,8 +15,4 @@ class S3V2CsvGzipDestinationAcceptanceTest : S3BaseCsvGzipDestinationAcceptanceT
 
     override val baseConfigJson: JsonNode
         get() = S3V2DestinationTestUtils.baseConfigJsonFilePath
-
-    // Disable these tests until we fix the incomplete stream handling behavior.
-    override fun testOverwriteSyncMultipleFailedGenerationsFilesPreserved() {}
-    override fun testOverwriteSyncFailedResumedGeneration() {}
 }
diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2JsonlDestinationAcceptanceTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2JsonlDestinationAcceptanceTest.kt
index 1090fdc4e595..b6c68c8c1009 100644
--- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2JsonlDestinationAcceptanceTest.kt
+++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2JsonlDestinationAcceptanceTest.kt
@@ -15,8 +15,4 @@ class S3V2JsonlDestinationAcceptanceTest : S3BaseJsonlDestinationAcceptanceTest(
 
     override val baseConfigJson: JsonNode
         get() = S3V2DestinationTestUtils.baseConfigJsonFilePath
-
-    // Disable these tests until we fix the incomplete stream handling behavior.
-    override fun testOverwriteSyncMultipleFailedGenerationsFilesPreserved() {}
-    override fun testOverwriteSyncFailedResumedGeneration() {}
 }
diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2JsonlGzipDestinationAcceptanceTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2JsonlGzipDestinationAcceptanceTest.kt
index e6ffe789bdf1..7798966caf3e 100644
--- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2JsonlGzipDestinationAcceptanceTest.kt
+++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2JsonlGzipDestinationAcceptanceTest.kt
@@ -15,8 +15,4 @@ class S3V2JsonlGzipDestinationAcceptanceTest : S3BaseJsonlGzipDestinationAccepta
 
     override val baseConfigJson: JsonNode
         get() = S3V2DestinationTestUtils.baseConfigJsonFilePath
-
-    // Disable these tests until we fix the incomplete stream handling behavior.
-    override fun testOverwriteSyncMultipleFailedGenerationsFilesPreserved() {}
-    override fun testOverwriteSyncFailedResumedGeneration() {}
 }
diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2ParquetDestinationAcceptanceTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2ParquetDestinationAcceptanceTest.kt
index 5c19502dd729..c5c02597e7a2 100644
--- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2ParquetDestinationAcceptanceTest.kt
+++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2ParquetDestinationAcceptanceTest.kt
@@ -73,8 +73,4 @@ class S3V2ParquetDestinationAcceptanceTest : S3BaseParquetDestinationAcceptanceT
 
         runSyncAndVerifyStateOutput(config, messages, configuredCatalog, false)
     }
-
-    // Disable these tests until we fix the incomplete stream handling behavior.
-    override fun testOverwriteSyncMultipleFailedGenerationsFilesPreserved() {}
-    override fun testOverwriteSyncFailedResumedGeneration() {}
 }