From 0ac4d2728cf2d489a2fbd048083c5d25468180cb Mon Sep 17 00:00:00 2001 From: Gireesh Sreepathi Date: Mon, 1 Jul 2024 16:54:50 -0700 Subject: [PATCH] Destination Snowflake: Storage ops to support refreshes (#39473) --- airbyte-cdk/java/airbyte-cdk/README.md | 9 +- .../integrations/base/IntegrationRunner.kt | 1 + .../src/main/resources/version.properties | 2 +- .../typing_deduping/CatalogParser.kt | 2 +- .../typing_deduping/BaseTypingDedupingTest.kt | 82 ++++-- .../destination-snowflake/build.gradle | 2 +- .../destination-snowflake/metadata.yaml | 3 +- .../snowflake/SnowflakeDestination.kt | 6 + .../SnowflakeAbMetaAndGenIdMigration.kt | 8 + .../operation/SnowflakeStagingClient.kt | 10 +- .../operation/SnowflakeStorageOperation.kt | 85 ++++++- .../SnowflakeDestinationHandler.kt | 37 ++- .../snowflake/SnowflakeTestUtils.kt | 15 +- ...nowflakeStorageOperationIntegrationTest.kt | 236 ++++++++++++++++++ .../AbstractSnowflakeTypingDedupingTest.kt | 93 +++---- ...refresh_append_with_new_gen_id_final.jsonl | 9 + ...resh_overwrite_with_new_gen_id_final.jsonl | 3 + ...efresh_overwrite_with_new_gen_id_raw.jsonl | 3 + ..._expectedrecords_with_new_gen_id_raw.jsonl | 10 + .../operation/SnowflakeStagingClientTest.kt | 2 +- .../SnowflakeStorageOperationTest.kt | 20 +- .../SnowflakeSqlGeneratorTest.kt | 3 + docs/integrations/destinations/snowflake.md | 1 + 23 files changed, 507 insertions(+), 135 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStorageOperationIntegrationTest.kt create mode 100644 airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_append_with_new_gen_id_final.jsonl create mode 100644 airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_with_new_gen_id_final.jsonl create mode 100644 airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_with_new_gen_id_raw.jsonl create mode 100644 airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_with_new_gen_id_raw.jsonl diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index 1c83e4961e47..9cff7e2029cd 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -174,18 +174,18 @@ corresponds to that version. | Version | Date | Pull Request | Subject | |:--------| :--------- | :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 0.40.8 | 2024-06-25 | [\#40499](https://github.com/airbytehq/airbyte/pull/40499) | Make JdbcDatabase SQL statement logging optional; add generation_id support to JdbcSqlGenerator | -| 0.38.3 | 2024-06-25 | [\#40499](https://github.com/airbytehq/airbyte/pull/40499) | (backport) Make JdbcDatabase SQL statement logging optional; add generation_id support to JdbcSqlGenerator | -| 0.40.7 | 2024-07-01 | [40516](https://github.com/airbytehq/airbyte/pull/40516) | Remove dbz hearbeat. | +| 0.40.9 | 2024-07-01 | [\#39473](https://github.com/airbytehq/airbyte/pull/39473) | minor changes around error logging and testing | +| 0.40.8 | 2024-07-01 | [\#40499](https://github.com/airbytehq/airbyte/pull/40499) | Make JdbcDatabase SQL statement logging optional; add generation_id support to JdbcSqlGenerator | +| 0.40.7 | 2024-07-01 | [\#40516](https://github.com/airbytehq/airbyte/pull/40516) | Remove dbz hearbeat. | | ~~0.40.6~~ | | | (this version does not exist) | | 0.40.5 | 2024-06-26 | [\#40517](https://github.com/airbytehq/airbyte/pull/40517) | JdbcDatabase.executeWithinTransaction allows disabling SQL statement logging | -| 0.35.16 | 2024-06-25 | [\#40517](https://github.com/airbytehq/airbyte/pull/40517) | (backport) JdbcDatabase.executeWithinTransaction allows disabling SQL statement logging | | 0.40.4 | 2024-06-18 | [\#40254](https://github.com/airbytehq/airbyte/pull/40254) | Destinations: Do not throw on unrecognized airbyte message type (ignore message instead) | | 0.40.3 | 2024-06-18 | [\#39526](https://github.com/airbytehq/airbyte/pull/39526) | Destinations: INCOMPLETE stream status is a TRANSIENT error rather than SYSTEM | | 0.40.2 | 2024-06-18 | [\#39552](https://github.com/airbytehq/airbyte/pull/39552) | Destinations: Throw error if the ConfiguredCatalog has no streams | | 0.40.1 | 2024-06-14 | [\#39349](https://github.com/airbytehq/airbyte/pull/39349) | Source stats for full refresh streams | | 0.40.0 | 2024-06-17 | [\#38622](https://github.com/airbytehq/airbyte/pull/38622) | Destinations: Implement refreshes logic in AbstractStreamOperation | | 0.39.0 | 2024-06-17 | [\#38067](https://github.com/airbytehq/airbyte/pull/38067) | Destinations: Breaking changes for refreshes (fail on INCOMPLETE stream status; ignore OVERWRITE sync mode) | +| 0.38.3 | 2024-06-25 | [\#40499](https://github.com/airbytehq/airbyte/pull/40499) | (backport) Make JdbcDatabase SQL statement logging optional; add generation_id support to JdbcSqlGenerator | | 0.38.2 | 2024-06-14 | [\#39460](https://github.com/airbytehq/airbyte/pull/39460) | Bump postgres JDBC driver version | | 0.38.1 | 2024-06-13 | [\#39445](https://github.com/airbytehq/airbyte/pull/39445) | Sources: More CDK changes to handle big initial snapshots. | | 0.38.0 | 2024-06-11 | [\#39405](https://github.com/airbytehq/airbyte/pull/39405) | Sources: Debezium properties manager interface changed to accept a list of streams to scope to | @@ -198,6 +198,7 @@ corresponds to that version. | 0.36.4 | 2024-05-31 | [\#38824](https://github.com/airbytehq/airbyte/pull/38824) | Param marked as non-null to nullable in JdbcDestinationHandler for NPE fix | | 0.36.2 | 2024-05-29 | [\#38538](https://github.com/airbytehq/airbyte/pull/38357) | Exit connector when encountering a config error. | | 0.36.0 | 2024-05-29 | [\#38358](https://github.com/airbytehq/airbyte/pull/38358) | Plumb generation_id / sync_id to destinations code | +| 0.35.16 | 2024-06-25 | [\#40517](https://github.com/airbytehq/airbyte/pull/40517) | (backport) JdbcDatabase.executeWithinTransaction allows disabling SQL statement logging | | 0.35.15 | 2024-05-31 | [\#38824](https://github.com/airbytehq/airbyte/pull/38824) | Param marked as non-null to nullable in JdbcDestinationHandler for NPE fix | | 0.35.14 | 2024-05-28 | [\#38738](https://github.com/airbytehq/airbyte/pull/38738) | make ThreadCreationInfo cast as nullable | | 0.35.13 | 2024-05-28 | [\#38632](https://github.com/airbytehq/airbyte/pull/38632) | minor changes to allow conversion of snowflake tests to kotlin | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt index b22cf8c74dcb..f707c6b5cfec 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt @@ -213,6 +213,7 @@ internal constructor( } } } catch (e: Exception) { + LOGGER.error(e) { "caught exception!" } // Many of the exceptions thrown are nested inside layers of RuntimeExceptions. An // attempt is made // to diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 17a0c4aa56d6..899bf5c00e3d 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.40.8 +version=0.40.9 diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt index 553ff6c162f3..aa16052c071f 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt @@ -135,7 +135,7 @@ constructor( @VisibleForTesting fun toStreamConfig(stream: ConfiguredAirbyteStream): StreamConfig { - if (stream.generationId == null) { + if (stream.generationId == null || stream.minimumGenerationId == null) { throw ConfigErrorException( "You must upgrade your platform version to use this connector version. Either downgrade your connector or upgrade platform to 0.63.0" ) diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt index e696cb4803b4..afd75dde9758 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt @@ -40,8 +40,6 @@ import org.junit.jupiter.api.Assumptions.assumeTrue import org.junit.jupiter.api.function.Executable import org.junit.jupiter.api.parallel.Execution import org.junit.jupiter.api.parallel.ExecutionMode -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.EnumSource private val LOGGER = KotlinLogging.logger {} /** @@ -230,15 +228,11 @@ abstract class BaseTypingDedupingTest { * Starting with an empty destination, execute a full refresh overwrite sync. Verify that the * records are written to the destination table. Then run a second sync, and verify that the * records are overwritten. - * - * Parameterized on destination sync mode. After the refreshes project, APPEND and OVERWRITE - * behave identically. */ - @ParameterizedTest - @EnumSource(DestinationSyncMode::class, names = ["APPEND", "OVERWRITE"]) @Throws(Exception::class) - fun truncateRefresh() { - val catalog = + @Test + open fun truncateRefresh() { + val catalog1 = io.airbyte.protocol.models.v0 .ConfiguredAirbyteCatalog() .withStreams( @@ -247,8 +241,8 @@ abstract class BaseTypingDedupingTest { .withSyncId(42) .withGenerationId(43) .withMinimumGenerationId(43) + .withDestinationSyncMode(DestinationSyncMode.APPEND) .withSyncMode(SyncMode.FULL_REFRESH) - .withDestinationSyncMode(DestinationSyncMode.OVERWRITE) .withStream( AirbyteStream() .withNamespace(streamNamespace) @@ -261,7 +255,7 @@ abstract class BaseTypingDedupingTest { // First sync val messages1 = readMessages("dat/sync1_messages.jsonl") - runSync(catalog, messages1) + runSync(catalog1, messages1) val expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl") val expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl") @@ -269,13 +263,34 @@ abstract class BaseTypingDedupingTest { // Second sync val messages2 = readMessages("dat/sync2_messages.jsonl") + val catalog2 = + io.airbyte.protocol.models.v0 + .ConfiguredAirbyteCatalog() + .withStreams( + java.util.List.of( + ConfiguredAirbyteStream() + .withSyncId(42) + .withGenerationId(44) + .withMinimumGenerationId(44) + .withDestinationSyncMode(DestinationSyncMode.OVERWRITE) + .withSyncMode(SyncMode.FULL_REFRESH) + .withStream( + AirbyteStream() + .withNamespace(streamNamespace) + .withName(streamName) + .withJsonSchema(SCHEMA) + ) + ) + ) - runSync(catalog, messages2) + runSync(catalog2, messages2) val expectedRawRecords2 = - readRecords("dat/sync2_expectedrecords_fullrefresh_overwrite_raw.jsonl") + readRecords("dat/sync2_expectedrecords_fullrefresh_overwrite_with_new_gen_id_raw.jsonl") val expectedFinalRecords2 = - readRecords("dat/sync2_expectedrecords_fullrefresh_overwrite_final.jsonl") + readRecords( + "dat/sync2_expectedrecords_fullrefresh_overwrite_with_new_gen_id_final.jsonl" + ) verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison()) } @@ -283,14 +298,11 @@ abstract class BaseTypingDedupingTest { * Starting with an empty destination, execute a full refresh append sync. Verify that the * records are written to the destination table. Then run a second sync, and verify that the old * and new records are all present. - * - * Similar to [truncateRefresh], this is parameterized on sync mode. */ - @ParameterizedTest - @EnumSource(DestinationSyncMode::class, names = ["APPEND", "OVERWRITE"]) @Throws(Exception::class) - fun mergeRefresh() { - val catalog = + @Test + open fun mergeRefresh() { + val catalog1 = io.airbyte.protocol.models.v0 .ConfiguredAirbyteCatalog() .withStreams( @@ -299,8 +311,8 @@ abstract class BaseTypingDedupingTest { .withSyncId(42) .withGenerationId(43) .withMinimumGenerationId(0) - .withSyncMode(SyncMode.FULL_REFRESH) .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withSyncMode(SyncMode.FULL_REFRESH) .withStream( AirbyteStream() .withNamespace(streamNamespace) @@ -313,7 +325,7 @@ abstract class BaseTypingDedupingTest { // First sync val messages1 = readMessages("dat/sync1_messages.jsonl") - runSync(catalog, messages1) + runSync(catalog1, messages1) val expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl") val expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl") @@ -321,12 +333,31 @@ abstract class BaseTypingDedupingTest { // Second sync val messages2 = readMessages("dat/sync2_messages.jsonl") + val catalog2 = + io.airbyte.protocol.models.v0 + .ConfiguredAirbyteCatalog() + .withStreams( + java.util.List.of( + ConfiguredAirbyteStream() + .withSyncId(42) + .withGenerationId(44) + .withMinimumGenerationId(0) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withSyncMode(SyncMode.FULL_REFRESH) + .withStream( + AirbyteStream() + .withNamespace(streamNamespace) + .withName(streamName) + .withJsonSchema(SCHEMA) + ) + ) + ) - runSync(catalog, messages2) + runSync(catalog2, messages2) - val expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_raw.jsonl") + val expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_with_new_gen_id_raw.jsonl") val expectedFinalRecords2 = - readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl") + readRecords("dat/sync2_expectedrecords_fullrefresh_append_with_new_gen_id_final.jsonl") verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison()) } @@ -1024,6 +1055,7 @@ abstract class BaseTypingDedupingTest { disableFinalTableComparison: Boolean ) { val actualRawRecords = dumpRawTableRecords(streamNamespace, streamName) + if (disableFinalTableComparison) { DIFFER!!.diffRawTableRecords(expectedRawRecords, actualRawRecords) } else { diff --git a/airbyte-integrations/connectors/destination-snowflake/build.gradle b/airbyte-integrations/connectors/destination-snowflake/build.gradle index bbf62f964dd7..0ff098315a74 100644 --- a/airbyte-integrations/connectors/destination-snowflake/build.gradle +++ b/airbyte-integrations/connectors/destination-snowflake/build.gradle @@ -3,7 +3,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.37.1' + cdkVersionRequired = '0.40.9' features = ['db-destinations', 's3-destinations', 'typing-deduping'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml index d32c083e1882..8e61b9b175fa 100644 --- a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml +++ b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 424892c4-daac-4491-b35d-c6688ba547ba - dockerImageTag: 3.10.1 + dockerImageTag: 3.11.0 dockerRepository: airbyte/destination-snowflake documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake githubIssueLabel: destination-snowflake @@ -34,6 +34,7 @@ data: memory_request: 2Gi supportLevel: certified supportsDbt: true + supportsRefreshes: true tags: - language:java connectorTestSuitesOptions: diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.kt b/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.kt index 20519034b395..b57f8aad1486 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.kt +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.kt @@ -122,6 +122,12 @@ constructor( hasUnprocessedRecords = true, maxProcessedTimestamp = Optional.empty() ), + initialTempRawTableStatus = + InitialRawTableStatus( + rawTableExists = false, + hasUnprocessedRecords = true, + maxProcessedTimestamp = Optional.empty() + ), isSchemaMismatch = true, isFinalTableEmpty = true, destinationState = diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/migrations/SnowflakeAbMetaAndGenIdMigration.kt b/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/migrations/SnowflakeAbMetaAndGenIdMigration.kt index f67b038f16fe..56c68043b866 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/migrations/SnowflakeAbMetaAndGenIdMigration.kt +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/migrations/SnowflakeAbMetaAndGenIdMigration.kt @@ -122,6 +122,14 @@ class SnowflakeAbMetaAndGenIdMigration(private val database: JdbcDatabase) : state.destinationState.copy(isAirbyteMetaPresentInRaw = true), true ) + } else if (!state.isFinalTablePresent) { + log.info { + "skipping migration of generation_id for table ${stream.id.finalNamespace}.${stream.id.finalName} because final table doesn't exist" + } + } else { + log.info { + "skipping migration of generation_id for table ${stream.id.finalNamespace}.${stream.id.finalName} because schemas match" + } } // Final table is untouched, so we don't need to fetch the initial status diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStagingClient.kt b/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStagingClient.kt index c9cb6d74fe9e..3e6acf9ebff5 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStagingClient.kt +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStagingClient.kt @@ -169,11 +169,12 @@ class SnowflakeStagingClient(private val database: JdbcDatabase) { stageName: String, stagingPath: String, stagedFiles: List, - streamId: StreamId + streamId: StreamId, + suffix: String = "" ) { try { val queryId = UUID.randomUUID() - val query = getCopyQuery(stageName, stagingPath, stagedFiles, streamId) + val query = getCopyQuery(stageName, stagingPath, stagedFiles, streamId, suffix) log.info { "query $queryId, $query" } // queryJsons is intentionally used here to get the error message in case of failure // instead of execute @@ -252,12 +253,13 @@ class SnowflakeStagingClient(private val database: JdbcDatabase) { stageName: String, stagingPath: String, stagedFiles: List, - streamId: StreamId + streamId: StreamId, + suffix: String ): String { return String.format( COPY_QUERY_1S1T + generateFilesList(stagedFiles) + ";", streamId.rawNamespace, - streamId.rawName, + streamId.rawName + suffix, stageName, stagingPath ) diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStorageOperation.kt b/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStorageOperation.kt index a0a2e60329a2..d873df31890f 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStorageOperation.kt +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStorageOperation.kt @@ -4,6 +4,7 @@ package io.airbyte.integrations.destination.snowflake.operation +import com.fasterxml.jackson.databind.JsonNode import io.airbyte.cdk.integrations.base.JavaBaseConstants import io.airbyte.cdk.integrations.destination.StandardNameTransformer import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer @@ -15,7 +16,6 @@ import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil import io.airbyte.integrations.destination.snowflake.SnowflakeSQLNameTransformer import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeDestinationHandler import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeSqlGenerator -import io.airbyte.protocol.models.v0.DestinationSyncMode import io.github.oshai.kotlinlogging.KotlinLogging import java.time.Instant import java.time.ZoneOffset @@ -35,19 +35,77 @@ class SnowflakeStorageOperation( private val connectionId = UUID.randomUUID() private val syncDateTime = Instant.now() - override fun prepareStage(streamId: StreamId, destinationSyncMode: DestinationSyncMode) { + override fun prepareStage(streamId: StreamId, suffix: String, replace: Boolean) { // create raw table - destinationHandler.execute(Sql.of(createTableQuery(streamId))) - if (destinationSyncMode == DestinationSyncMode.OVERWRITE) { - destinationHandler.execute(Sql.of(truncateTableQuery(streamId))) + destinationHandler.execute(Sql.of(createTableQuery(streamId, suffix))) + if (replace) { + destinationHandler.execute(Sql.of(truncateTableQuery(streamId, suffix))) } // create stage staging.createStageIfNotExists(getStageName(streamId)) } - internal fun createTableQuery(streamId: StreamId): String { + override fun overwriteStage(streamId: StreamId, suffix: String) { + if (suffix.isBlank()) { + throw IllegalArgumentException("Cannot overwrite raw table with empty suffix") + } + // Something weird happening with SWAP WITH in truncateRefresh tests, + // so using DROP AND ALTER RENAME instead + destinationHandler.execute( + Sql.of("DROP TABLE IF EXISTS \"${streamId.rawNamespace}\".\"${streamId.rawName}\"") + ) + val swapQuery = + """ + | ALTER TABLE "${streamId.rawNamespace}"."${streamId.rawName+suffix}" RENAME TO "${streamId.rawNamespace}"."${streamId.rawName}"; + """.trimMargin() + destinationHandler.execute(Sql.of(swapQuery)) + } + + override fun transferFromTempStage(streamId: StreamId, suffix: String) { + if (suffix.isBlank()) { + throw IllegalArgumentException( + "Cannot transfer records from temp raw table with empty suffix" + ) + } + destinationHandler.execute( + Sql.of( + """ + INSERT INTO "${streamId.rawNamespace}"."${streamId.rawName}" + SELECT * FROM "${streamId.rawNamespace}"."${streamId.rawName + suffix}" + """.trimIndent() + ) + ) + destinationHandler.execute( + Sql.of( + """ + DROP TABLE "${streamId.rawNamespace}"."${streamId.rawName + suffix}" + """.trimIndent() + ) + ) + } + + override fun getStageGeneration(streamId: StreamId, suffix: String): Long? { + val results = + destinationHandler.query( + """ + SELECT "${JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID}" FROM "${streamId.rawNamespace}"."${streamId.rawName + suffix}" LIMIT 1 + """.trimIndent() + ) + if (results.isEmpty()) return null + var generationIdNode: JsonNode? = + results.first().get(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID) + if (generationIdNode == null) { + // This is the dance where QUOTED_IDENTIFIERS_IGNORE_CASE will return uppercase column + // as result, so check for fallback. + generationIdNode = + results.first().get(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID.uppercase()) + } + return generationIdNode?.asLong() + } + + internal fun createTableQuery(streamId: StreamId, suffix: String): String { return """ - |CREATE TABLE IF NOT EXISTS "${streamId.rawNamespace}"."${streamId.rawName}"( + |CREATE TABLE IF NOT EXISTS "${streamId.rawNamespace}"."${streamId.rawName + suffix}"( | "${JavaBaseConstants.COLUMN_NAME_AB_RAW_ID}" VARCHAR PRIMARY KEY, | "${JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT}" TIMESTAMP WITH TIME ZONE DEFAULT current_timestamp(), | "${JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT}" TIMESTAMP WITH TIME ZONE DEFAULT NULL, @@ -58,11 +116,15 @@ class SnowflakeStorageOperation( """.trimMargin() } - internal fun truncateTableQuery(streamId: StreamId): String { - return "TRUNCATE TABLE \"${streamId.rawNamespace}\".\"${streamId.rawName}\";\n" + internal fun truncateTableQuery(streamId: StreamId, suffix: String): String { + return "TRUNCATE TABLE \"${streamId.rawNamespace}\".\"${streamId.rawName + suffix}\";\n" } - override fun writeToStage(streamConfig: StreamConfig, data: SerializableBuffer) { + override fun writeToStage( + streamConfig: StreamConfig, + suffix: String, + data: SerializableBuffer + ) { val stageName = getStageName(streamConfig.id) val stagingPath = getStagingPath() val stagedFileName = staging.uploadRecordsToStage(data, stageName, stagingPath) @@ -70,7 +132,8 @@ class SnowflakeStorageOperation( stageName, stagingPath, listOf(stagedFileName), - streamConfig.id + streamConfig.id, + suffix ) } override fun cleanupStage(streamId: StreamId) { diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.kt b/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.kt index b67205dd4f0a..8f0435f5a429 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.kt +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.kt @@ -11,6 +11,7 @@ import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler import io.airbyte.commons.json.Jsons.emptyObject +import io.airbyte.integrations.base.destination.operation.AbstractStreamOperation import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType import io.airbyte.integrations.base.destination.typing_deduping.Array @@ -85,14 +86,24 @@ class SnowflakeDestinationHandler( @Throws(Exception::class) private fun getInitialRawTableState( id: StreamId, + suffix: String, ): InitialRawTableStatus { - // Short-circuit for overwrite, table will be truncated anyway + val rawTableName = id.rawName + suffix val tableExists = database.executeMetadataQuery { databaseMetaData: DatabaseMetaData -> - LOGGER.info("Retrieving table from Db metadata: {} {}", id.rawNamespace, id.rawName) + LOGGER.info( + "Retrieving table from Db metadata: {} {}", + id.rawNamespace, + rawTableName + ) try { val rs = - databaseMetaData.getTables(databaseName, id.rawNamespace, id.rawName, null) + databaseMetaData.getTables( + databaseName, + id.rawNamespace, + rawTableName, + null + ) // When QUOTED_IDENTIFIERS_IGNORE_CASE is set to true, the raw table is // interpreted as uppercase // in db metadata calls. check for both @@ -100,7 +111,7 @@ class SnowflakeDestinationHandler( databaseMetaData.getTables( databaseName, id.rawNamespace.uppercase(), - id.rawName.uppercase(), + rawTableName.uppercase(), null ) rs.next() || rsUppercase.next() @@ -130,7 +141,7 @@ class SnowflakeDestinationHandler( StringSubstitutor( java.util.Map.of( "raw_table", - id.rawTableId(SnowflakeSqlGenerator.QUOTE) + id.rawTableId(SnowflakeSqlGenerator.QUOTE, suffix) ) ) .replace( @@ -186,7 +197,7 @@ class SnowflakeDestinationHandler( StringSubstitutor( java.util.Map.of( "raw_table", - id.rawTableId(SnowflakeSqlGenerator.QUOTE) + id.rawTableId(SnowflakeSqlGenerator.QUOTE, suffix) ) ) .replace( @@ -286,7 +297,7 @@ class SnowflakeDestinationHandler( "VARIANT" == existingTable.columns[abMetaColumnName]!!.type } - fun isAirbyteGenerationIdColumnMatch(existingTable: TableDefinition): Boolean { + private fun isAirbyteGenerationIdColumnMatch(existingTable: TableDefinition): Boolean { val abGenerationIdColumnName: String = JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID.uppercase(Locale.getDefault()) return existingTable.columns.containsKey(abGenerationIdColumnName) && @@ -388,7 +399,12 @@ class SnowflakeDestinationHandler( !existingSchemaMatchesStreamConfig(streamConfig, existingTable!!) isFinalTableEmpty = hasRowCount && tableRowCounts[namespace]!![name] == 0 } - val initialRawTableState = getInitialRawTableState(streamConfig.id) + val initialRawTableState = getInitialRawTableState(streamConfig.id, "") + val tempRawTableState = + getInitialRawTableState( + streamConfig.id, + AbstractStreamOperation.TMP_TABLE_SUFFIX + ) val destinationState = destinationStates.getOrDefault( streamConfig.id.asPair(), @@ -398,6 +414,7 @@ class SnowflakeDestinationHandler( streamConfig, isFinalTablePresent, initialRawTableState, + tempRawTableState, isSchemaMismatch, isFinalTableEmpty, destinationState @@ -466,6 +483,10 @@ class SnowflakeDestinationHandler( } } + fun query(sql: String): List { + return database.queryJsons(sql) + } + companion object { private val LOGGER: Logger = LoggerFactory.getLogger(SnowflakeDestinationHandler::class.java) diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeTestUtils.kt b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeTestUtils.kt index 673f8198aa45..f31c3cfc0d3e 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeTestUtils.kt +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeTestUtils.kt @@ -11,7 +11,6 @@ import java.sql.Connection import java.sql.ResultSet import java.sql.SQLException import java.util.* -import java.util.Map import java.util.stream.Collectors import kotlin.collections.List import org.apache.commons.text.StringSubstitutor @@ -100,14 +99,12 @@ object SnowflakeTestUtils { .createStatement() .executeQuery( StringSubstitutor( - Map.of( - "columns", - columns.stream().collect(Collectors.joining(",")), - "table", - tableIdentifier, - "extracted_at", - if (upcaseExtractedAt) "_AIRBYTE_EXTRACTED_AT" - else "\"_airbyte_extracted_at\"" + mapOf( + "columns" to columns.stream().collect(Collectors.joining(",")), + "table" to tableIdentifier, + "extracted_at" to + if (upcaseExtractedAt) "_AIRBYTE_EXTRACTED_AT" + else "\"_airbyte_extracted_at\"" ) ) .replace( diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStorageOperationIntegrationTest.kt b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStorageOperationIntegrationTest.kt new file mode 100644 index 000000000000..c30718de52de --- /dev/null +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStorageOperationIntegrationTest.kt @@ -0,0 +1,236 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.snowflake.operation + +import com.fasterxml.jackson.databind.JsonNode +import io.airbyte.cdk.db.jdbc.JdbcDatabase +import io.airbyte.cdk.db.jdbc.JdbcUtils +import io.airbyte.cdk.integrations.base.JavaBaseConstants +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteRecordMessage +import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer +import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat +import io.airbyte.cdk.integrations.destination.staging.StagingSerializedBufferFactory +import io.airbyte.commons.json.Jsons +import io.airbyte.commons.string.Strings +import io.airbyte.integrations.base.destination.operation.AbstractStreamOperation +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig +import io.airbyte.integrations.base.destination.typing_deduping.StreamId +import io.airbyte.integrations.destination.snowflake.OssCloudEnvVarConsts +import io.airbyte.integrations.destination.snowflake.SnowflakeDatabaseUtils +import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeDestinationHandler +import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeSqlGenerator +import io.airbyte.protocol.models.v0.AirbyteMessage +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta +import io.airbyte.protocol.models.v0.DestinationSyncMode +import java.nio.file.Files +import java.nio.file.Paths +import java.util.* +import net.snowflake.client.jdbc.SnowflakeSQLException +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.Assertions.assertThrows +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.mockito.Mockito.mock + +class SnowflakeStorageOperationIntegrationTest { + + private var streamId: StreamId = mock() + private var streamConfig: StreamConfig = mock() + @BeforeEach + fun setup() { + val randomString = Strings.addRandomSuffix("", "", 10) + streamId = + StreamId( + finalNamespace = "final_namespace_$randomString", + finalName = "final_name_$randomString", + rawNamespace = "raw_namespace_$randomString", + rawName = "raw_name_$randomString", + originalNamespace = "original_namespace_$randomString", + originalName = "original_name_$randomString", + ) + streamConfig = + StreamConfig( + streamId, + DestinationSyncMode.APPEND, + emptyList(), + Optional.empty(), + LinkedHashMap(), + GENERATION_ID, + 0, + SYNC_ID, + ) + database.execute( + """ + CREATE SCHEMA "${streamId.rawNamespace}" + """.trimIndent() + ) + } + + @AfterEach + fun teardown() { + database.execute("DROP SCHEMA IF EXISTS \"${streamId.rawNamespace}\" CASCADE") + } + + private fun record(recordNumber: Int): PartialAirbyteMessage { + val serializedData = """{"record_number": $recordNumber}""" + return PartialAirbyteMessage() + .withType(AirbyteMessage.Type.RECORD) + .withSerialized(serializedData) + .withRecord( + PartialAirbyteRecordMessage() + .withNamespace(streamId.originalNamespace) + .withStream(streamId.originalName) + .withEmittedAt(10_000) + .withMeta( + AirbyteRecordMessageMeta() + .withChanges(emptyList()) + .withAdditionalProperty( + JavaBaseConstants.AIRBYTE_META_SYNC_ID_KEY, + SYNC_ID, + ), + ) + .withData(Jsons.deserialize(serializedData)), + ) + } + + private fun buffer( + partialAirbyteMessage: PartialAirbyteMessage, + callback: (buffer: SerializableBuffer) -> Unit + ) { + val csvBuffer = + StagingSerializedBufferFactory.initializeBuffer( + FileUploadFormat.CSV, + JavaBaseConstants.DestinationColumns.V2_WITH_GENERATION + ) + csvBuffer.use { + it.accept( + partialAirbyteMessage.serialized!!, + Jsons.serialize(partialAirbyteMessage.record!!.meta), + streamConfig.generationId, + partialAirbyteMessage.record!!.emittedAt + ) + it.flush() + callback(csvBuffer) + } + } + + private fun dumpRawRecords(suffix: String): List { + val query = + """ + SELECT * FROM ${streamId.rawTableId("\"", suffix)} + """.trimIndent() + return database.queryJsons(query) + } + + @Test + fun testTransferStage() { + storageOperation.prepareStage(streamId, "") + storageOperation.prepareStage(streamId, AbstractStreamOperation.TMP_TABLE_SUFFIX) + // Table is currently empty, so expect null generation. + assertNull( + storageOperation.getStageGeneration(streamId, AbstractStreamOperation.TMP_TABLE_SUFFIX) + ) + // Write one record to the real raw table + buffer(record(1)) { + storageOperation.writeToStage( + streamConfig, + "", + it, + ) + } + println(dumpRawRecords("")) + assertEquals( + listOf(Jsons.deserialize("""{"record_number":1}""")), + dumpRawRecords("").map { it["_airbyte_data"] }, + ) + // And write one record to the temp final table + buffer(record(2)) { + storageOperation.writeToStage( + streamConfig, + AbstractStreamOperation.TMP_TABLE_SUFFIX, + it, + ) + } + assertEquals( + listOf(Jsons.deserialize("""{"record_number": 2}""")), + dumpRawRecords(AbstractStreamOperation.TMP_TABLE_SUFFIX).map { it["_airbyte_data"] }, + ) + assertEquals( + GENERATION_ID, + storageOperation.getStageGeneration(streamId, AbstractStreamOperation.TMP_TABLE_SUFFIX) + ) + // If we transfer the records, we should end up with 2 records in the real raw table. + storageOperation.transferFromTempStage(streamId, AbstractStreamOperation.TMP_TABLE_SUFFIX) + assertEquals( + listOf( + Jsons.deserialize("""{"record_number": 1}"""), + Jsons.deserialize("""{"record_number": 2}"""), + ), + dumpRawRecords("") + .map { it["_airbyte_data"] } + .sortedBy { it.get("record_number").asLong() }, + ) + // After transferring the records to the real table, the temp table should no longer exist. + assertThrows(SnowflakeSQLException::class.java) { + dumpRawRecords(AbstractStreamOperation.TMP_TABLE_SUFFIX) + } + } + + @Test + fun testOverwriteStage() { + // If we then create another temp raw table and _overwrite_ the real raw table, + // we should end up with a single raw record. + storageOperation.prepareStage(streamId, "") + storageOperation.prepareStage(streamId, AbstractStreamOperation.TMP_TABLE_SUFFIX) + buffer(record(3)) { + storageOperation.writeToStage( + streamConfig, + "", + it, + ) + } + buffer(record(4)) { + storageOperation.writeToStage( + streamConfig, + AbstractStreamOperation.TMP_TABLE_SUFFIX, + it, + ) + } + storageOperation.overwriteStage(streamId, AbstractStreamOperation.TMP_TABLE_SUFFIX) + assertEquals( + listOf(Jsons.deserialize("""{"record_number": 4}""")), + dumpRawRecords("").map { it["_airbyte_data"] }, + ) + assertThrows(SnowflakeSQLException::class.java) { + dumpRawRecords(AbstractStreamOperation.TMP_TABLE_SUFFIX) + } + } + + companion object { + private val config = + Jsons.deserialize( + Files.readString(Paths.get("secrets/1s1t_internal_staging_config.json")) + ) + private val datasource = + SnowflakeDatabaseUtils.createDataSource(config, OssCloudEnvVarConsts.AIRBYTE_OSS) + private val database: JdbcDatabase = SnowflakeDatabaseUtils.getDatabase(datasource) + private val storageOperation: SnowflakeStorageOperation = + SnowflakeStorageOperation( + SnowflakeSqlGenerator(0), + SnowflakeDestinationHandler( + config[JdbcUtils.DATABASE_KEY].asText(), + database, + config[JdbcUtils.SCHEMA_KEY].asText(), + ), + 0, + SnowflakeStagingClient(database), + ) + private const val SYNC_ID = 12L + private const val GENERATION_ID = 42L + } +} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.kt b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.kt index b178ffe73001..b9f4af56186b 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.kt +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.kt @@ -18,6 +18,7 @@ import io.airbyte.integrations.base.destination.typing_deduping.StreamId import io.airbyte.integrations.destination.snowflake.* import io.airbyte.protocol.models.v0.* import io.airbyte.workers.exception.TestHarnessException +import io.github.oshai.kotlinlogging.KotlinLogging import java.nio.file.Path import java.sql.SQLException import java.util.* @@ -28,6 +29,8 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertThrows import org.junit.jupiter.api.Test +private val LOGGER = KotlinLogging.logger {} + abstract class AbstractSnowflakeTypingDedupingTest : BaseTypingDedupingTest() { private var databaseName: String? = null private var database: JdbcDatabase? = null @@ -132,6 +135,9 @@ abstract class AbstractSnowflakeTypingDedupingTest : BaseTypingDedupingTest() { .withSyncMode(SyncMode.INCREMENTAL) .withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP) .withPrimaryKey(java.util.List.of(listOf("id1"), listOf("id2"))) + .withGenerationId(0) + .withMinimumGenerationId(0) + .withSyncId(0) .withStream( AirbyteStream() .withNamespace(streamNamespace) @@ -191,20 +197,21 @@ abstract class AbstractSnowflakeTypingDedupingTest : BaseTypingDedupingTest() { catalog, messages1, "airbyte/destination-snowflake:2.1.7", - ) { config: JsonNode? -> - // Defensive to avoid weird behaviors or test failures if the original config is being - // altered by - // another thread, thanks jackson for a mutable JsonNode - val copiedConfig = Jsons.clone(config!!) - if (config is ObjectNode) { - // Opt out of T+D to run old V1 sync - (copiedConfig as ObjectNode?)!!.put( - "use_1s1t_format", - false, - ) - } - copiedConfig - } + { config: JsonNode? -> + // Defensive to avoid weird behaviors or test failures if the original config is + // being altered by another thread, thanks jackson for a mutable JsonNode + val copiedConfig = Jsons.clone(config!!) + if (config is ObjectNode) { + // Opt out of T+D to run old V1 sync + (copiedConfig as ObjectNode?)!!.put( + "use_1s1t_format", + false, + ) + } + copiedConfig + }, + streamStatus = null + ) // The record differ code is already adapted to V2 columns format, use the post V2 sync // to verify that append mode preserved all the raw records and final records. @@ -225,13 +232,16 @@ abstract class AbstractSnowflakeTypingDedupingTest : BaseTypingDedupingTest() { @Test @Throws(Exception::class) - fun testExtractedAtUtcTimezoneMigration() { + open fun testExtractedAtUtcTimezoneMigration() { val catalog = ConfiguredAirbyteCatalog() .withStreams( java.util.List.of( ConfiguredAirbyteStream() .withSyncMode(SyncMode.INCREMENTAL) + .withGenerationId(0) + .withSyncId(0) + .withMinimumGenerationId(0) .withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP) .withPrimaryKey(java.util.List.of(listOf("id1"), listOf("id2"))) .withCursorField(listOf("updated_at")) @@ -246,7 +256,7 @@ abstract class AbstractSnowflakeTypingDedupingTest : BaseTypingDedupingTest() { // First sync val messages1 = readMessages("dat/sync1_messages.jsonl") - runSync(catalog, messages1, "airbyte/destination-snowflake:3.5.11") + runSync(catalog, messages1, "airbyte/destination-snowflake:3.5.11", streamStatus = null) // The dumpRawTable code already accounts for Meta and GenID columns, so we cannot use it // to verify expected records. We will rely on the second sync to verify raw and final @@ -264,7 +274,7 @@ abstract class AbstractSnowflakeTypingDedupingTest : BaseTypingDedupingTest() { } @Test - fun testAirbyteMetaAndGenerationIdMigration() { + open fun testAirbyteMetaAndGenerationIdMigration() { val catalog = ConfiguredAirbyteCatalog() .withStreams( @@ -286,7 +296,7 @@ abstract class AbstractSnowflakeTypingDedupingTest : BaseTypingDedupingTest() { // First sync val messages1 = readMessages("dat/sync1_messages.jsonl") - runSync(catalog, messages1, "airbyte/destination-snowflake:3.9.1") + runSync(catalog, messages1, "airbyte/destination-snowflake:3.9.1", streamStatus = null) // Second sync val messages2 = readMessages("dat/sync2_messages.jsonl") @@ -338,42 +348,7 @@ abstract class AbstractSnowflakeTypingDedupingTest : BaseTypingDedupingTest() { } @Test - fun testAirbyteMetaAndGenerationIdMigrationForOverwrite() { - val catalog = - ConfiguredAirbyteCatalog() - .withStreams( - listOf( - ConfiguredAirbyteStream() - .withSyncMode(SyncMode.FULL_REFRESH) - .withDestinationSyncMode(DestinationSyncMode.OVERWRITE) - .withSyncId(42L) - .withGenerationId(43L) - .withMinimumGenerationId(0L) - .withStream( - AirbyteStream() - .withNamespace(streamNamespace) - .withName(streamName) - .withJsonSchema(BaseTypingDedupingTest.Companion.SCHEMA), - ), - ), - ) - - // First sync - val messages1 = readMessages("dat/sync1_messages.jsonl") - runSync(catalog, messages1, "airbyte/destination-snowflake:3.9.1") - - // Second sync - val messages2 = readMessages("dat/sync2_messages.jsonl") - runSync(catalog, messages2) - - val expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_overwrite_raw.jsonl") - val expectedFinalRecords2 = - readRecords("dat/sync2_expectedrecords_fullrefresh_overwrite_final.jsonl") - verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison()) - } - - @Test - fun testAirbyteMetaAndGenerationIdMigrationForOverwrite310Broken() { + open fun testAirbyteMetaAndGenerationIdMigrationForOverwrite() { val catalog = ConfiguredAirbyteCatalog() .withStreams( @@ -383,7 +358,7 @@ abstract class AbstractSnowflakeTypingDedupingTest : BaseTypingDedupingTest() { .withDestinationSyncMode(DestinationSyncMode.OVERWRITE) .withSyncId(42L) .withGenerationId(43L) - .withMinimumGenerationId(0L) + .withMinimumGenerationId(43L) .withStream( AirbyteStream() .withNamespace(streamNamespace) @@ -395,15 +370,9 @@ abstract class AbstractSnowflakeTypingDedupingTest : BaseTypingDedupingTest() { // First sync val messages1 = readMessages("dat/sync1_messages.jsonl") - runSync(catalog, messages1, "airbyte/destination-snowflake:3.9.1") + runSync(catalog, messages1, "airbyte/destination-snowflake:3.9.1", streamStatus = null) // Second sync - // This throws exception due to a broken migration in connector - assertThrows(TestHarnessException::class.java) { - runSync(catalog, messages1, "airbyte/destination-snowflake:3.10.0") - } - - // Third sync val messages2 = readMessages("dat/sync2_messages.jsonl") runSync(catalog, messages2) diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_append_with_new_gen_id_final.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_append_with_new_gen_id_final.jsonl new file mode 100644 index 000000000000..0f34e9450361 --- /dev/null +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_append_with_new_gen_id_final.jsonl @@ -0,0 +1,9 @@ +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000Z", "_AIRBYTE_META": {"changes":[],"sync_id":42}, "_AIRBYTE_GENERATION_ID": 43, "ID1": 1, "ID2": 200, "UPDATED_AT": "2000-01-01T00:00:00.000000000Z", "NAME": "Alice", "ADDRESS": {"city": "San Francisco", "state": "CA"}} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000Z", "_AIRBYTE_META": {"changes":[],"sync_id":42}, "_AIRBYTE_GENERATION_ID": 43, "ID1": 1, "ID2": 200, "UPDATED_AT": "2000-01-01T00:01:00.000000000Z", "NAME": "Alice", "ADDRESS": {"city": "Los Angeles", "state": "CA"}} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000Z", "_AIRBYTE_META": {"changes":[],"sync_id":42}, "_AIRBYTE_GENERATION_ID": 43, "ID1": 1, "ID2": 201, "UPDATED_AT": "2000-01-01T00:02:00.000000000Z", "NAME": "Bob", "ADDRESS": {"city": "Boston", "state": "MA"}} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000Z", "_AIRBYTE_META": {"changes":[],"sync_id":42}, "_AIRBYTE_GENERATION_ID": 43, "ID1": 2, "ID2": 200, "UPDATED_AT": "2000-01-01T00:03:00.000000000Z", "NAME": "Charlie", "AGE": 42, "REGISTRATION_DATE": "2023-12-23"} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000Z", "_AIRBYTE_META": {"changes":[],"sync_id":42}, "_AIRBYTE_GENERATION_ID": 43, "ID1": 3, "ID2": 200, "UPDATED_AT": "2000-01-01T00:04:00.000000000Z", "NAME": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"} + +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:02.000000000Z", "_AIRBYTE_META": {"changes":[],"sync_id":42}, "_AIRBYTE_GENERATION_ID": 44, "ID1": 1, "ID2": 200, "UPDATED_AT": "2000-01-02T00:00:00.000000000Z", "NAME": "Alice", "ADDRESS": {"city": "Seattle", "state": "WA"}} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:02.000000000Z", "_AIRBYTE_META": {"changes":[],"sync_id":42}, "_AIRBYTE_GENERATION_ID": 44, "ID1": 1, "ID2": 201, "UPDATED_AT": "2000-01-02T00:00:00.000000000Z", "NAME": "Bob", "ADDRESS": {"city": "New York", "state": "NY"}} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:02.000000000Z", "_AIRBYTE_META": {"changes":[],"sync_id":42}, "_AIRBYTE_GENERATION_ID": 44, "ID1": 1, "ID2": 201, "UPDATED_AT": "2000-01-02T00:01:00.000000000Z", "_AB_CDC_DELETED_AT": "1970-01-01T00:00:00.000000000Z"} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_with_new_gen_id_final.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_with_new_gen_id_final.jsonl new file mode 100644 index 000000000000..df3ccb1b8131 --- /dev/null +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_with_new_gen_id_final.jsonl @@ -0,0 +1,3 @@ +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:02.000000000Z", "_AIRBYTE_META": {"changes":[],"sync_id":42}, "_AIRBYTE_GENERATION_ID": 44, "ID1": 1, "ID2": 200, "UPDATED_AT": "2000-01-02T00:00:00.000000000Z", "NAME": "Alice", "ADDRESS": {"city": "Seattle", "state": "WA"}} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:02.000000000Z", "_AIRBYTE_META": {"changes":[],"sync_id":42}, "_AIRBYTE_GENERATION_ID": 44, "ID1": 1, "ID2": 201, "UPDATED_AT": "2000-01-02T00:00:00.000000000Z", "NAME": "Bob", "ADDRESS": {"city": "New York", "state": "NY"}} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:02.000000000Z", "_AIRBYTE_META": {"changes":[],"sync_id":42}, "_AIRBYTE_GENERATION_ID": 44, "ID1": 1, "ID2": 201, "UPDATED_AT": "2000-01-02T00:01:00.000000000Z", "_AB_CDC_DELETED_AT": "1970-01-01T00:00:00.000000000Z"} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_with_new_gen_id_raw.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_with_new_gen_id_raw.jsonl new file mode 100644 index 000000000000..9deb338f67d5 --- /dev/null +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_with_new_gen_id_raw.jsonl @@ -0,0 +1,3 @@ +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 44} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Bob", "address": {"city": "New York", "state": "NY"}}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 44} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z"}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 44} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_with_new_gen_id_raw.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_with_new_gen_id_raw.jsonl new file mode 100644 index 000000000000..9e00d3955b08 --- /dev/null +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_with_new_gen_id_raw.jsonl @@ -0,0 +1,10 @@ +// We keep the records from the first sync +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 43} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 43} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 43} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000Z", "_airbyte_data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": 42, "registration_date": "2023-12-23"}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 43} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000Z", "_airbyte_data": {"id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 43} +// And append the records from the second sync +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 44} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Bob", "address": {"city": "New York", "state": "NY"}}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 44} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z"}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 44} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStagingClientTest.kt b/airbyte-integrations/connectors/destination-snowflake/src/test/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStagingClientTest.kt index edc649ccefe2..dd51dc310537 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStagingClientTest.kt +++ b/airbyte-integrations/connectors/destination-snowflake/src/test/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStagingClientTest.kt @@ -100,7 +100,7 @@ class SnowflakeStagingClientTest { inOrder .verify(database) .queryJsons( - stagingClient.getCopyQuery(stageName, stagingPath, stagedFiles, streamId) + stagingClient.getCopyQuery(stageName, stagingPath, stagedFiles, streamId, "") ) verifyNoMoreInteractions(database) } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStorageOperationTest.kt b/airbyte-integrations/connectors/destination-snowflake/src/test/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStorageOperationTest.kt index 4c5e89623b7c..b80c0a6ce628 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStorageOperationTest.kt +++ b/airbyte-integrations/connectors/destination-snowflake/src/test/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStorageOperationTest.kt @@ -42,10 +42,10 @@ class SnowflakeStorageOperationTest { @Test fun verifyPrepareStageCreatesTableAndStage() { val inOrder = inOrder(destinationHandler, stagingClient) - storageOperation.prepareStage(streamId, DestinationSyncMode.APPEND) + storageOperation.prepareStage(streamId, "", false) inOrder .verify(destinationHandler) - .execute(Sql.of(storageOperation.createTableQuery(streamId))) + .execute(Sql.of(storageOperation.createTableQuery(streamId, ""))) inOrder .verify(stagingClient) .createStageIfNotExists(storageOperation.getStageName(streamId)) @@ -55,13 +55,13 @@ class SnowflakeStorageOperationTest { @Test fun verifyPrepareStageOverwriteTruncatesTable() { val inOrder = inOrder(destinationHandler, stagingClient) - storageOperation.prepareStage(streamId, DestinationSyncMode.OVERWRITE) + storageOperation.prepareStage(streamId, "", true) inOrder .verify(destinationHandler) - .execute(Sql.of(storageOperation.createTableQuery(streamId))) + .execute(Sql.of(storageOperation.createTableQuery(streamId, ""))) inOrder .verify(destinationHandler) - .execute(Sql.of(storageOperation.truncateTableQuery(streamId))) + .execute(Sql.of(storageOperation.truncateTableQuery(streamId, ""))) inOrder .verify(stagingClient) .createStageIfNotExists(storageOperation.getStageName(streamId)) @@ -80,12 +80,18 @@ class SnowflakeStorageOperationTest { val storageOperation = SnowflakeStorageOperation(sqlGenerator, destinationHandler, 1, stagingClient) - storageOperation.writeToStage(streamConfig, data) + storageOperation.writeToStage(streamConfig, "", data) val inOrder = inOrder(stagingClient) inOrder.verify(stagingClient).uploadRecordsToStage(any(), eq(stageName), any()) inOrder .verify(stagingClient) - .copyIntoTableFromStage(eq(stageName), any(), eq(listOf(mockTmpFileName)), eq(streamId)) + .copyIntoTableFromStage( + eq(stageName), + any(), + eq(listOf(mockTmpFileName)), + eq(streamId), + eq("") + ) verifyNoMoreInteractions(stagingClient) } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorTest.kt b/airbyte-integrations/connectors/destination-snowflake/src/test/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorTest.kt index 0394820b4898..7613f371f5e4 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorTest.kt +++ b/airbyte-integrations/connectors/destination-snowflake/src/test/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorTest.kt @@ -92,6 +92,9 @@ class SnowflakeSqlGeneratorTest { ConfiguredAirbyteStream() .withSyncMode(SyncMode.INCREMENTAL) .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withGenerationId(0) + .withMinimumGenerationId(0) + .withSyncId(0) .withStream( AirbyteStream() .withName("foo") diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index 32bededc31fa..c205af3b73ba 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -268,6 +268,7 @@ desired namespace. | Version | Date | Pull Request | Subject | |:----------------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.11.0 | 2024-06-25 | [\#39473](https://github.com/airbytehq/airbyte/pull/39473) | support refreshes | | 3.10.1 | 2024-06-11 | [\#39399](https://github.com/airbytehq/airbyte/pull/39399) | Bug fix for _airbyte_meta not migrated in OVERWRITE mode | | 3.10.0 | 2024-06-10 | [\#39107](https://github.com/airbytehq/airbyte/pull/39107) | _airbyte_meta and _airbyte_generation_id in Raw tables and final tables | | 3.9.1 | 2024-06-05 | [\#39135](https://github.com/airbytehq/airbyte/pull/39135) | Improved error handling for Staging files |