diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperation.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperation.kt index e7881fc4602a..4b755c858d3d 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperation.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperation.kt @@ -11,7 +11,6 @@ import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableS import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus -import io.airbyte.protocol.models.v0.DestinationSyncMode import io.github.oshai.kotlinlogging.KotlinLogging import java.util.Optional import java.util.stream.Stream @@ -24,6 +23,7 @@ abstract class AbstractStreamOperation false + stream.generationId -> true + else -> { + // This is technically already handled in CatalogParser. + throw IllegalArgumentException("Hybrid refreshes are not yet supported.") + } + } + + if (isTruncateSync) { + if (initialRawTableStatus.tempRawTableExists) { + val tempStageGeneration = + storageOperation.getStageGeneration(stream.id, TMP_TABLE_SUFFIX) + if (tempStageGeneration != null && tempStageGeneration != stream.generationId) { + // The temp stage is from the wrong generation. Nuke it. + storageOperation.prepareStage( + stream.id, + TMP_TABLE_SUFFIX, + replace = true, + ) + } + // (if the existing temp stage is from the correct generation, then we're resuming + // a truncate refresh, and should keep the previous temp stage). + } else { + // We're initiating a new truncate refresh. Create a new temp stage. + storageOperation.prepareStage( + stream.id, + TMP_TABLE_SUFFIX, + ) + } + rawTableSuffix = TMP_TABLE_SUFFIX + } else { + if (initialRawTableStatus.tempRawTableExists) { + // There was a previous truncate refresh attempt, which failed, and left some + // records behind. + // Retrieve those records and put them in the real stage. + storageOperation.transferFromTempStage(stream.id, TMP_TABLE_SUFFIX) + // TODO refetch initial table status? or set initialRawTableStatus.hasUnprocessedRecords=true + } + rawTableSuffix = NO_SUFFIX + storageOperation.prepareStage(stream.id, NO_SUFFIX) + } + if (!disableTypeDedupe) { // Prepare final tables based on sync mode. finalTmpTableSuffix = prepareFinalTable(destinationInitialStatus) @@ -51,7 +93,7 @@ abstract class AbstractStreamOperation return prepareFinalTableForOverwrite(initialStatus) - DestinationSyncMode.APPEND, - DestinationSyncMode.APPEND_DEDUP -> { - if ( - initialStatus.isSchemaMismatch || - initialStatus.destinationState.needsSoftReset() - ) { - // We're loading data directly into the existing table. - // Make sure it has the right schema. - // Also, if a raw table migration wants us to do a soft reset, do that - // here. - log.info { "Executing soft-reset on final table of stream $stream" } - storageOperation.softResetFinalTable(stream) - } - return NO_SUFFIX + if (isTruncateSync) { + // Truncate refresh. Use a temp final table. + return prepareFinalTableForOverwrite(initialStatus) + } else { + if ( + initialStatus.isSchemaMismatch || + initialStatus.destinationState.needsSoftReset() + ) { + // We're loading data directly into the existing table. + // Make sure it has the right schema. + // Also, if a raw table migration wants us to do a soft reset, do that + // here. + log.info { "Executing soft-reset on final table of stream $stream" } + storageOperation.softResetFinalTable(stream) } + return NO_SUFFIX } } @@ -100,14 +137,13 @@ abstract class AbstractStreamOperation 0 || - (isNotOverwriteSync && initialRawTableStatus.hasUnprocessedRecords) - hasRecordsNeedingTd - } else { - false - } + val shouldRunTypingDeduping = + // Normal syncs should T+D regardless of status, so the user sees progress after + // every attempt. And we should T+D records from this sync, _or_ a previous sync. + (!isTruncateSync && (syncSummary.recordsWritten > 0 || initialRawTableStatus.hasUnprocessedRecords)) || + // But truncate syncs should only T+D if the sync was successful, since we're T+Ding + // into a temp final table anyway. And we only need to check if _this_ sync emitted + // records, since we've nuked the old raw data. + (isTruncateSync && syncSummary.terminalStatus == AirbyteStreamStatus.COMPLETE && syncSummary.recordsWritten > 0) if (!shouldRunTypingDeduping) { log.info { "Skipping typing and deduping for stream ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName} " + "because it had no records during this sync and no unprocessed records from a previous sync." } - } else { - // In overwrite mode, we want to read all the raw records. Typically, this is equivalent - // to filtering on timestamp, but might as well be explicit. - val timestampFilter = - if (isNotOverwriteSync) { - initialRawTableStatus.maxProcessedTimestamp - } else { - Optional.empty() - } - storageOperation.typeAndDedupe(streamConfig, timestampFilter, finalTmpTableSuffix) + return } - if ( - streamConfig.destinationSyncMode == DestinationSyncMode.OVERWRITE && - finalTmpTableSuffix.isNotBlank() - // We should only overwrite the final table if the stream was successful. - // This prevents data downtime if the stream didn't emit all the data. - && - syncSummary.terminalStatus == AirbyteStreamStatus.COMPLETE - ) { + // In truncate mode, we want to read all the raw records. Typically, this is equivalent + // to filtering on timestamp, but might as well be explicit. + val timestampFilter = + if (!isTruncateSync) { + initialRawTableStatus.maxProcessedTimestamp + } else { + Optional.empty() + } + storageOperation.typeAndDedupe(streamConfig, timestampFilter, finalTmpTableSuffix) + + // The `shouldRunTypingDeduping` check means we'll only ever reach this point if stream + // status was COMPLETE, so we don't need to include + // `&& syncSummary.terminalStatus == AirbyteStreamStatus.COMPLETE` in this clause. + if (isTruncateSync && finalTmpTableSuffix.isNotBlank()) { storageOperation.overwriteFinalTable(streamConfig, finalTmpTableSuffix) } } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StorageOperation.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StorageOperation.kt index 83072da6f119..84907465b067 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StorageOperation.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StorageOperation.kt @@ -40,8 +40,10 @@ interface StorageOperation { * * [AbstractStreamOperation] is responsible for orchestrating the stages so that the temp stage * always contains exactly one generation. + * + * @return The generation ID of a record in the stage, or `null` if the stage is empty. */ - fun getStageGeneration(streamId: StreamId, suffix: String): Long + fun getStageGeneration(streamId: StreamId, suffix: String): Long? /** Delete previously staged data, using deterministic information from streamId. */ fun cleanupStage(streamId: StreamId) diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/InitialRawTableStatus.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/InitialRawTableStatus.kt index 49df7a54bc49..2f90d5f44202 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/InitialRawTableStatus.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/InitialRawTableStatus.kt @@ -8,6 +8,7 @@ import java.util.* data class InitialRawTableStatus( val rawTableExists: Boolean, + val tempRawTableExists: Boolean, val hasUnprocessedRecords: Boolean, val maxProcessedTimestamp: Optional ) diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperationTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperationTest.kt index 762d246babd1..68335788e825 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperationTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperationTest.kt @@ -75,6 +75,7 @@ class DefaultSyncOperationTest { initialRawTableStatus = InitialRawTableStatus( rawTableExists = true, + tempRawTableExists = false, hasUnprocessedRecords = false, maxProcessedTimestamp = Optional.empty(), ), @@ -172,6 +173,7 @@ class DefaultSyncOperationTest { initialRawTableStatus = InitialRawTableStatus( rawTableExists = true, + tempRawTableExists = false, hasUnprocessedRecords = false, maxProcessedTimestamp = Optional.empty(), ), diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.kt index a9e38fb18d9b..3e10a8a6f700 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.kt @@ -119,7 +119,12 @@ class DefaultTyperDeduperTest { initialStates.forEach( Consumer { initialState: DestinationInitialStatus -> Mockito.`when`(initialState.initialRawTableStatus) - .thenReturn(InitialRawTableStatus(true, true, Optional.empty())) + .thenReturn(InitialRawTableStatus( + rawTableExists = true, + tempRawTableExists = false, + hasUnprocessedRecords = true, + maxProcessedTimestamp = Optional.empty() + )) } ) @@ -316,9 +321,10 @@ class DefaultTyperDeduperTest { Mockito.`when`(initialState.initialRawTableStatus) .thenReturn( InitialRawTableStatus( - true, - true, - Optional.of(Instant.parse("2023-01-01T12:34:56Z")) + rawTableExists = true, + tempRawTableExists = false, + hasUnprocessedRecords = true, + maxProcessedTimestamp = Optional.of(Instant.parse("2023-01-01T12:34:56Z")) ) ) } @@ -413,7 +419,12 @@ class DefaultTyperDeduperTest { Mockito.`when`(initialState.isFinalTableEmpty).thenReturn(false) Mockito.`when`(initialState.isSchemaMismatch).thenReturn(false) Mockito.`when`(initialState.initialRawTableStatus) - .thenReturn(InitialRawTableStatus(true, true, Optional.of(Instant.now()))) + .thenReturn(InitialRawTableStatus( + rawTableExists = true, + tempRawTableExists = false, + hasUnprocessedRecords = true, + maxProcessedTimestamp = Optional.of(Instant.now()) + )) } ) @@ -471,7 +482,12 @@ class DefaultTyperDeduperTest { initialStates.forEach( Consumer { initialState: DestinationInitialStatus -> Mockito.`when`(initialState.initialRawTableStatus) - .thenReturn(InitialRawTableStatus(true, false, Optional.empty())) + .thenReturn(InitialRawTableStatus( + rawTableExists = true, + tempRawTableExists = false, + hasUnprocessedRecords = false, + maxProcessedTimestamp = Optional.empty() + )) } ) @@ -521,9 +537,10 @@ class DefaultTyperDeduperTest { Mockito.`when`(initialState.initialRawTableStatus) .thenReturn( InitialRawTableStatus( - true, - true, - Optional.of(Instant.parse("2023-01-23T12:34:56Z")) + rawTableExists = true, + tempRawTableExists = false, + hasUnprocessedRecords = true, + maxProcessedTimestamp = Optional.of(Instant.parse("2023-01-23T12:34:56Z")) ) ) } @@ -600,7 +617,12 @@ class DefaultTyperDeduperTest { DestinationInitialStatus( OVERWRITE_STREAM_CONFIG, true, - InitialRawTableStatus(true, true, Optional.of(Instant.ofEpochMilli(42))), + InitialRawTableStatus( + rawTableExists = true, + tempRawTableExists = false, + hasUnprocessedRecords = true, + maxProcessedTimestamp = Optional.of(Instant.ofEpochMilli(42)) + ), true, false, MockState(false, false, true) @@ -608,7 +630,12 @@ class DefaultTyperDeduperTest { DestinationInitialStatus( APPEND_STREAM_CONFIG, true, - InitialRawTableStatus(true, true, Optional.of(Instant.ofEpochMilli(42))), + InitialRawTableStatus( + rawTableExists = true, + tempRawTableExists = false, + hasUnprocessedRecords = true, + maxProcessedTimestamp = Optional.of(Instant.ofEpochMilli(42)) + ), true, false, MockState(false, false, true) @@ -616,7 +643,12 @@ class DefaultTyperDeduperTest { DestinationInitialStatus( DEDUPE_STREAM_CONFIG, true, - InitialRawTableStatus(true, true, Optional.of(Instant.ofEpochMilli(42))), + InitialRawTableStatus( + rawTableExists = true, + tempRawTableExists = false, + hasUnprocessedRecords = true, + maxProcessedTimestamp = Optional.of(Instant.ofEpochMilli(42)) + ), true, false, MockState(false, false, true) @@ -712,7 +744,12 @@ class DefaultTyperDeduperTest { DestinationInitialStatus( OVERWRITE_STREAM_CONFIG, true, - InitialRawTableStatus(true, true, Optional.of(Instant.ofEpochMilli(42))), + InitialRawTableStatus( + rawTableExists = true, + tempRawTableExists = false, + hasUnprocessedRecords = true, + maxProcessedTimestamp = Optional.of(Instant.ofEpochMilli(42)) + ), false, false, MockState(false, false, false) @@ -720,7 +757,12 @@ class DefaultTyperDeduperTest { DestinationInitialStatus( APPEND_STREAM_CONFIG, true, - InitialRawTableStatus(true, true, Optional.of(Instant.ofEpochMilli(42))), + InitialRawTableStatus( + rawTableExists = true, + tempRawTableExists = false, + hasUnprocessedRecords = true, + maxProcessedTimestamp = Optional.of(Instant.ofEpochMilli(42)) + ), false, false, MockState(false, false, false) @@ -728,7 +770,12 @@ class DefaultTyperDeduperTest { DestinationInitialStatus( DEDUPE_STREAM_CONFIG, true, - InitialRawTableStatus(true, true, Optional.of(Instant.ofEpochMilli(42))), + InitialRawTableStatus( + rawTableExists = true, + tempRawTableExists = false, + hasUnprocessedRecords = true, + maxProcessedTimestamp = Optional.of(Instant.ofEpochMilli(42)) + ), false, false, MockState(false, false, false) @@ -815,7 +862,12 @@ class DefaultTyperDeduperTest { DestinationInitialStatus( OVERWRITE_STREAM_CONFIG, true, - InitialRawTableStatus(true, true, Optional.of(Instant.ofEpochMilli(42))), + InitialRawTableStatus( + rawTableExists = true, + tempRawTableExists = false, + hasUnprocessedRecords = true, + maxProcessedTimestamp = Optional.of(Instant.ofEpochMilli(42)) + ), false, false, MockState(true, false, false) @@ -823,7 +875,12 @@ class DefaultTyperDeduperTest { DestinationInitialStatus( APPEND_STREAM_CONFIG, true, - InitialRawTableStatus(true, true, Optional.of(Instant.ofEpochMilli(42))), + InitialRawTableStatus( + rawTableExists = true, + tempRawTableExists = false, + hasUnprocessedRecords = true, + maxProcessedTimestamp = Optional.of(Instant.ofEpochMilli(42)) + ), false, false, MockState(true, false, false) @@ -831,7 +888,12 @@ class DefaultTyperDeduperTest { DestinationInitialStatus( DEDUPE_STREAM_CONFIG, true, - InitialRawTableStatus(true, true, Optional.of(Instant.ofEpochMilli(42))), + InitialRawTableStatus( + rawTableExists = true, + tempRawTableExists = false, + hasUnprocessedRecords = true, + maxProcessedTimestamp = Optional.of(Instant.ofEpochMilli(42)) + ), false, false, MockState(true, false, false) diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt index 579373ebb5af..04d397455410 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt @@ -587,14 +587,24 @@ abstract class BaseSqlGeneratorIntegrationTest