diff --git a/airbyte-integrations/connectors/destination-snowflake/build.gradle b/airbyte-integrations/connectors/destination-snowflake/build.gradle index bbf62f964dd7..f164597af92f 100644 --- a/airbyte-integrations/connectors/destination-snowflake/build.gradle +++ b/airbyte-integrations/connectors/destination-snowflake/build.gradle @@ -5,7 +5,7 @@ plugins { airbyteJavaConnector { cdkVersionRequired = '0.37.1' features = ['db-destinations', 's3-destinations', 'typing-deduping'] - useLocalCdk = false + useLocalCdk = true } java { diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.kt b/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.kt index 20519034b395..b57f8aad1486 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.kt +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.kt @@ -122,6 +122,12 @@ constructor( hasUnprocessedRecords = true, maxProcessedTimestamp = Optional.empty() ), + initialTempRawTableStatus = + InitialRawTableStatus( + rawTableExists = false, + hasUnprocessedRecords = true, + maxProcessedTimestamp = Optional.empty() + ), isSchemaMismatch = true, isFinalTableEmpty = true, destinationState = diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStagingClient.kt b/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStagingClient.kt index c9cb6d74fe9e..3e6acf9ebff5 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStagingClient.kt +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStagingClient.kt @@ -169,11 +169,12 @@ class SnowflakeStagingClient(private val database: JdbcDatabase) { stageName: String, stagingPath: String, stagedFiles: List, - streamId: StreamId + streamId: StreamId, + suffix: String = "" ) { try { val queryId = UUID.randomUUID() - val query = getCopyQuery(stageName, stagingPath, stagedFiles, streamId) + val query = getCopyQuery(stageName, stagingPath, stagedFiles, streamId, suffix) log.info { "query $queryId, $query" } // queryJsons is intentionally used here to get the error message in case of failure // instead of execute @@ -252,12 +253,13 @@ class SnowflakeStagingClient(private val database: JdbcDatabase) { stageName: String, stagingPath: String, stagedFiles: List, - streamId: StreamId + streamId: StreamId, + suffix: String ): String { return String.format( COPY_QUERY_1S1T + generateFilesList(stagedFiles) + ";", streamId.rawNamespace, - streamId.rawName, + streamId.rawName + suffix, stageName, stagingPath ) diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStorageOperation.kt b/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStorageOperation.kt index a0a2e60329a2..d873df31890f 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStorageOperation.kt +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStorageOperation.kt @@ -4,6 +4,7 @@ package io.airbyte.integrations.destination.snowflake.operation +import com.fasterxml.jackson.databind.JsonNode import io.airbyte.cdk.integrations.base.JavaBaseConstants import io.airbyte.cdk.integrations.destination.StandardNameTransformer import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer @@ -15,7 +16,6 @@ import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil import io.airbyte.integrations.destination.snowflake.SnowflakeSQLNameTransformer import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeDestinationHandler import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeSqlGenerator -import io.airbyte.protocol.models.v0.DestinationSyncMode import io.github.oshai.kotlinlogging.KotlinLogging import java.time.Instant import java.time.ZoneOffset @@ -35,19 +35,77 @@ class SnowflakeStorageOperation( private val connectionId = UUID.randomUUID() private val syncDateTime = Instant.now() - override fun prepareStage(streamId: StreamId, destinationSyncMode: DestinationSyncMode) { + override fun prepareStage(streamId: StreamId, suffix: String, replace: Boolean) { // create raw table - destinationHandler.execute(Sql.of(createTableQuery(streamId))) - if (destinationSyncMode == DestinationSyncMode.OVERWRITE) { - destinationHandler.execute(Sql.of(truncateTableQuery(streamId))) + destinationHandler.execute(Sql.of(createTableQuery(streamId, suffix))) + if (replace) { + destinationHandler.execute(Sql.of(truncateTableQuery(streamId, suffix))) } // create stage staging.createStageIfNotExists(getStageName(streamId)) } - internal fun createTableQuery(streamId: StreamId): String { + override fun overwriteStage(streamId: StreamId, suffix: String) { + if (suffix.isBlank()) { + throw IllegalArgumentException("Cannot overwrite raw table with empty suffix") + } + // Something weird happening with SWAP WITH in truncateRefresh tests, + // so using DROP AND ALTER RENAME instead + destinationHandler.execute( + Sql.of("DROP TABLE IF EXISTS \"${streamId.rawNamespace}\".\"${streamId.rawName}\"") + ) + val swapQuery = + """ + | ALTER TABLE "${streamId.rawNamespace}"."${streamId.rawName+suffix}" RENAME TO "${streamId.rawNamespace}"."${streamId.rawName}"; + """.trimMargin() + destinationHandler.execute(Sql.of(swapQuery)) + } + + override fun transferFromTempStage(streamId: StreamId, suffix: String) { + if (suffix.isBlank()) { + throw IllegalArgumentException( + "Cannot transfer records from temp raw table with empty suffix" + ) + } + destinationHandler.execute( + Sql.of( + """ + INSERT INTO "${streamId.rawNamespace}"."${streamId.rawName}" + SELECT * FROM "${streamId.rawNamespace}"."${streamId.rawName + suffix}" + """.trimIndent() + ) + ) + destinationHandler.execute( + Sql.of( + """ + DROP TABLE "${streamId.rawNamespace}"."${streamId.rawName + suffix}" + """.trimIndent() + ) + ) + } + + override fun getStageGeneration(streamId: StreamId, suffix: String): Long? { + val results = + destinationHandler.query( + """ + SELECT "${JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID}" FROM "${streamId.rawNamespace}"."${streamId.rawName + suffix}" LIMIT 1 + """.trimIndent() + ) + if (results.isEmpty()) return null + var generationIdNode: JsonNode? = + results.first().get(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID) + if (generationIdNode == null) { + // This is the dance where QUOTED_IDENTIFIERS_IGNORE_CASE will return uppercase column + // as result, so check for fallback. + generationIdNode = + results.first().get(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID.uppercase()) + } + return generationIdNode?.asLong() + } + + internal fun createTableQuery(streamId: StreamId, suffix: String): String { return """ - |CREATE TABLE IF NOT EXISTS "${streamId.rawNamespace}"."${streamId.rawName}"( + |CREATE TABLE IF NOT EXISTS "${streamId.rawNamespace}"."${streamId.rawName + suffix}"( | "${JavaBaseConstants.COLUMN_NAME_AB_RAW_ID}" VARCHAR PRIMARY KEY, | "${JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT}" TIMESTAMP WITH TIME ZONE DEFAULT current_timestamp(), | "${JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT}" TIMESTAMP WITH TIME ZONE DEFAULT NULL, @@ -58,11 +116,15 @@ class SnowflakeStorageOperation( """.trimMargin() } - internal fun truncateTableQuery(streamId: StreamId): String { - return "TRUNCATE TABLE \"${streamId.rawNamespace}\".\"${streamId.rawName}\";\n" + internal fun truncateTableQuery(streamId: StreamId, suffix: String): String { + return "TRUNCATE TABLE \"${streamId.rawNamespace}\".\"${streamId.rawName + suffix}\";\n" } - override fun writeToStage(streamConfig: StreamConfig, data: SerializableBuffer) { + override fun writeToStage( + streamConfig: StreamConfig, + suffix: String, + data: SerializableBuffer + ) { val stageName = getStageName(streamConfig.id) val stagingPath = getStagingPath() val stagedFileName = staging.uploadRecordsToStage(data, stageName, stagingPath) @@ -70,7 +132,8 @@ class SnowflakeStorageOperation( stageName, stagingPath, listOf(stagedFileName), - streamConfig.id + streamConfig.id, + suffix ) } override fun cleanupStage(streamId: StreamId) { diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.kt b/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.kt index b67205dd4f0a..8f0435f5a429 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.kt +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.kt @@ -11,6 +11,7 @@ import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler import io.airbyte.commons.json.Jsons.emptyObject +import io.airbyte.integrations.base.destination.operation.AbstractStreamOperation import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType import io.airbyte.integrations.base.destination.typing_deduping.Array @@ -85,14 +86,24 @@ class SnowflakeDestinationHandler( @Throws(Exception::class) private fun getInitialRawTableState( id: StreamId, + suffix: String, ): InitialRawTableStatus { - // Short-circuit for overwrite, table will be truncated anyway + val rawTableName = id.rawName + suffix val tableExists = database.executeMetadataQuery { databaseMetaData: DatabaseMetaData -> - LOGGER.info("Retrieving table from Db metadata: {} {}", id.rawNamespace, id.rawName) + LOGGER.info( + "Retrieving table from Db metadata: {} {}", + id.rawNamespace, + rawTableName + ) try { val rs = - databaseMetaData.getTables(databaseName, id.rawNamespace, id.rawName, null) + databaseMetaData.getTables( + databaseName, + id.rawNamespace, + rawTableName, + null + ) // When QUOTED_IDENTIFIERS_IGNORE_CASE is set to true, the raw table is // interpreted as uppercase // in db metadata calls. check for both @@ -100,7 +111,7 @@ class SnowflakeDestinationHandler( databaseMetaData.getTables( databaseName, id.rawNamespace.uppercase(), - id.rawName.uppercase(), + rawTableName.uppercase(), null ) rs.next() || rsUppercase.next() @@ -130,7 +141,7 @@ class SnowflakeDestinationHandler( StringSubstitutor( java.util.Map.of( "raw_table", - id.rawTableId(SnowflakeSqlGenerator.QUOTE) + id.rawTableId(SnowflakeSqlGenerator.QUOTE, suffix) ) ) .replace( @@ -186,7 +197,7 @@ class SnowflakeDestinationHandler( StringSubstitutor( java.util.Map.of( "raw_table", - id.rawTableId(SnowflakeSqlGenerator.QUOTE) + id.rawTableId(SnowflakeSqlGenerator.QUOTE, suffix) ) ) .replace( @@ -286,7 +297,7 @@ class SnowflakeDestinationHandler( "VARIANT" == existingTable.columns[abMetaColumnName]!!.type } - fun isAirbyteGenerationIdColumnMatch(existingTable: TableDefinition): Boolean { + private fun isAirbyteGenerationIdColumnMatch(existingTable: TableDefinition): Boolean { val abGenerationIdColumnName: String = JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID.uppercase(Locale.getDefault()) return existingTable.columns.containsKey(abGenerationIdColumnName) && @@ -388,7 +399,12 @@ class SnowflakeDestinationHandler( !existingSchemaMatchesStreamConfig(streamConfig, existingTable!!) isFinalTableEmpty = hasRowCount && tableRowCounts[namespace]!![name] == 0 } - val initialRawTableState = getInitialRawTableState(streamConfig.id) + val initialRawTableState = getInitialRawTableState(streamConfig.id, "") + val tempRawTableState = + getInitialRawTableState( + streamConfig.id, + AbstractStreamOperation.TMP_TABLE_SUFFIX + ) val destinationState = destinationStates.getOrDefault( streamConfig.id.asPair(), @@ -398,6 +414,7 @@ class SnowflakeDestinationHandler( streamConfig, isFinalTablePresent, initialRawTableState, + tempRawTableState, isSchemaMismatch, isFinalTableEmpty, destinationState @@ -466,6 +483,10 @@ class SnowflakeDestinationHandler( } } + fun query(sql: String): List { + return database.queryJsons(sql) + } + companion object { private val LOGGER: Logger = LoggerFactory.getLogger(SnowflakeDestinationHandler::class.java) diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStorageOperationIntegrationTest.kt b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStorageOperationIntegrationTest.kt new file mode 100644 index 000000000000..01b1c2003d3d --- /dev/null +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStorageOperationIntegrationTest.kt @@ -0,0 +1,235 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.snowflake.operation + +import com.fasterxml.jackson.databind.JsonNode +import io.airbyte.cdk.db.jdbc.JdbcDatabase +import io.airbyte.cdk.db.jdbc.JdbcUtils +import io.airbyte.cdk.integrations.base.JavaBaseConstants +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteRecordMessage +import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer +import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat +import io.airbyte.cdk.integrations.destination.staging.StagingSerializedBufferFactory +import io.airbyte.commons.json.Jsons +import io.airbyte.commons.string.Strings +import io.airbyte.integrations.base.destination.operation.AbstractStreamOperation +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig +import io.airbyte.integrations.base.destination.typing_deduping.StreamId +import io.airbyte.integrations.destination.snowflake.OssCloudEnvVarConsts +import io.airbyte.integrations.destination.snowflake.SnowflakeDatabaseUtils +import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeDestinationHandler +import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeSqlGenerator +import io.airbyte.protocol.models.v0.AirbyteMessage +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta +import io.airbyte.protocol.models.v0.DestinationSyncMode +import java.nio.file.Files +import java.nio.file.Paths +import java.util.* +import net.snowflake.client.jdbc.SnowflakeSQLException +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.Assertions.assertThrows +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test + +class SnowflakeStorageOperationIntegrationTest { + + private lateinit var streamId: StreamId + private lateinit var streamConfig: StreamConfig + @BeforeEach + fun setup() { + val randomString = Strings.addRandomSuffix("", "", 10) + streamId = + StreamId( + finalNamespace = "final_namespace_$randomString", + finalName = "final_name_$randomString", + rawNamespace = "raw_namespace_$randomString", + rawName = "raw_name_$randomString", + originalNamespace = "original_namespace_$randomString", + originalName = "original_name_$randomString", + ) + streamConfig = + StreamConfig( + streamId, + DestinationSyncMode.APPEND, + emptyList(), + Optional.empty(), + LinkedHashMap(), + GENERATION_ID, + 0, + SYNC_ID, + ) + database.execute( + """ + CREATE SCHEMA "${streamId.rawNamespace}" + """.trimIndent() + ) + } + + @AfterEach + fun teardown() { + database.execute("DROP SCHEMA IF EXISTS \"${streamId.rawNamespace}\" CASCADE") + } + + private fun record(recordNumber: Int): PartialAirbyteMessage { + val serializedData = """{"record_number": $recordNumber}""" + return PartialAirbyteMessage() + .withType(AirbyteMessage.Type.RECORD) + .withSerialized(serializedData) + .withRecord( + PartialAirbyteRecordMessage() + .withNamespace(streamId.originalNamespace) + .withStream(streamId.originalName) + .withEmittedAt(10_000) + .withMeta( + AirbyteRecordMessageMeta() + .withChanges(emptyList()) + .withAdditionalProperty( + JavaBaseConstants.AIRBYTE_META_SYNC_ID_KEY, + SYNC_ID, + ), + ) + .withData(Jsons.deserialize(serializedData)), + ) + } + + private fun buffer( + partialAirbyteMessage: PartialAirbyteMessage, + callback: (buffer: SerializableBuffer) -> Unit + ) { + val csvBuffer = + StagingSerializedBufferFactory.initializeBuffer( + FileUploadFormat.CSV, + JavaBaseConstants.DestinationColumns.V2_WITH_GENERATION + ) + csvBuffer.use { + it.accept( + partialAirbyteMessage.serialized!!, + Jsons.serialize(partialAirbyteMessage.record!!.meta), + streamConfig.generationId, + partialAirbyteMessage.record!!.emittedAt + ) + it.flush() + callback(csvBuffer) + } + } + + private fun dumpRawRecords(suffix: String): List { + val query = + """ + SELECT * FROM ${streamId.rawTableId("\"", suffix)} + """.trimIndent() + return database.queryJsons(query) + } + + @Test + fun testTransferStage() { + storageOperation.prepareStage(streamId, "") + storageOperation.prepareStage(streamId, AbstractStreamOperation.TMP_TABLE_SUFFIX) + // Table is currently empty, so expect null generation. + assertNull( + storageOperation.getStageGeneration(streamId, AbstractStreamOperation.TMP_TABLE_SUFFIX) + ) + // Write one record to the real raw table + buffer(record(1)) { + storageOperation.writeToStage( + streamConfig, + "", + it, + ) + } + println(dumpRawRecords("")) + assertEquals( + listOf(Jsons.deserialize("""{"record_number":1}""")), + dumpRawRecords("").map { it["_airbyte_data"] }, + ) + // And write one record to the temp final table + buffer(record(2)) { + storageOperation.writeToStage( + streamConfig, + AbstractStreamOperation.TMP_TABLE_SUFFIX, + it, + ) + } + assertEquals( + listOf(Jsons.deserialize("""{"record_number": 2}""")), + dumpRawRecords(AbstractStreamOperation.TMP_TABLE_SUFFIX).map { it["_airbyte_data"] }, + ) + assertEquals( + GENERATION_ID, + storageOperation.getStageGeneration(streamId, AbstractStreamOperation.TMP_TABLE_SUFFIX) + ) + // If we transfer the records, we should end up with 2 records in the real raw table. + storageOperation.transferFromTempStage(streamId, AbstractStreamOperation.TMP_TABLE_SUFFIX) + assertEquals( + listOf( + Jsons.deserialize("""{"record_number": 1}"""), + Jsons.deserialize("""{"record_number": 2}"""), + ), + dumpRawRecords("") + .map { it["_airbyte_data"] } + .sortedBy { it.get("record_number").asLong() }, + ) + // After transferring the records to the real table, the temp table should no longer exist. + assertThrows(SnowflakeSQLException::class.java) { + dumpRawRecords(AbstractStreamOperation.TMP_TABLE_SUFFIX) + } + } + + @Test + fun testOverwriteStage() { + // If we then create another temp raw table and _overwrite_ the real raw table, + // we should end up with a single raw record. + storageOperation.prepareStage(streamId, "") + storageOperation.prepareStage(streamId, AbstractStreamOperation.TMP_TABLE_SUFFIX) + buffer(record(3)) { + storageOperation.writeToStage( + streamConfig, + "", + it, + ) + } + buffer(record(4)) { + storageOperation.writeToStage( + streamConfig, + AbstractStreamOperation.TMP_TABLE_SUFFIX, + it, + ) + } + storageOperation.overwriteStage(streamId, AbstractStreamOperation.TMP_TABLE_SUFFIX) + assertEquals( + listOf(Jsons.deserialize("""{"record_number": 4}""")), + dumpRawRecords("").map { it["_airbyte_data"] }, + ) + assertThrows(SnowflakeSQLException::class.java) { + dumpRawRecords(AbstractStreamOperation.TMP_TABLE_SUFFIX) + } + } + + companion object { + private val config = + Jsons.deserialize( + Files.readString(Paths.get("secrets/1s1t_internal_staging_config.json")) + ) + private val datasource = + SnowflakeDatabaseUtils.createDataSource(config, OssCloudEnvVarConsts.AIRBYTE_OSS) + private val database: JdbcDatabase = SnowflakeDatabaseUtils.getDatabase(datasource) + private val storageOperation: SnowflakeStorageOperation = + SnowflakeStorageOperation( + SnowflakeSqlGenerator(0), + SnowflakeDestinationHandler( + config[JdbcUtils.DATABASE_KEY].asText(), + database, + config[JdbcUtils.SCHEMA_KEY].asText(), + ), + 0, + SnowflakeStagingClient(database), + ) + private const val SYNC_ID = 12L + private const val GENERATION_ID = 42L + } +} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.kt b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.kt index b178ffe73001..0238683baa2e 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.kt +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.kt @@ -191,20 +191,22 @@ abstract class AbstractSnowflakeTypingDedupingTest : BaseTypingDedupingTest() { catalog, messages1, "airbyte/destination-snowflake:2.1.7", - ) { config: JsonNode? -> - // Defensive to avoid weird behaviors or test failures if the original config is being - // altered by - // another thread, thanks jackson for a mutable JsonNode - val copiedConfig = Jsons.clone(config!!) - if (config is ObjectNode) { - // Opt out of T+D to run old V1 sync - (copiedConfig as ObjectNode?)!!.put( - "use_1s1t_format", - false, - ) + { config: JsonNode? -> + // Defensive to avoid weird behaviors or test failures if the original config is + // being + // altered by + // another thread, thanks jackson for a mutable JsonNode + val copiedConfig = Jsons.clone(config!!) + if (config is ObjectNode) { + // Opt out of T+D to run old V1 sync + (copiedConfig as ObjectNode?)!!.put( + "use_1s1t_format", + false, + ) + } + copiedConfig } - copiedConfig - } + ) // The record differ code is already adapted to V2 columns format, use the post V2 sync // to verify that append mode preserved all the raw records and final records. diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStagingClientTest.kt b/airbyte-integrations/connectors/destination-snowflake/src/test/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStagingClientTest.kt index edc649ccefe2..dd51dc310537 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStagingClientTest.kt +++ b/airbyte-integrations/connectors/destination-snowflake/src/test/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStagingClientTest.kt @@ -100,7 +100,7 @@ class SnowflakeStagingClientTest { inOrder .verify(database) .queryJsons( - stagingClient.getCopyQuery(stageName, stagingPath, stagedFiles, streamId) + stagingClient.getCopyQuery(stageName, stagingPath, stagedFiles, streamId, "") ) verifyNoMoreInteractions(database) } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStorageOperationTest.kt b/airbyte-integrations/connectors/destination-snowflake/src/test/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStorageOperationTest.kt index 4c5e89623b7c..4c0d51065862 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStorageOperationTest.kt +++ b/airbyte-integrations/connectors/destination-snowflake/src/test/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStorageOperationTest.kt @@ -42,10 +42,10 @@ class SnowflakeStorageOperationTest { @Test fun verifyPrepareStageCreatesTableAndStage() { val inOrder = inOrder(destinationHandler, stagingClient) - storageOperation.prepareStage(streamId, DestinationSyncMode.APPEND) + storageOperation.prepareStage(streamId, "", false) inOrder .verify(destinationHandler) - .execute(Sql.of(storageOperation.createTableQuery(streamId))) + .execute(Sql.of(storageOperation.createTableQuery(streamId, ""))) inOrder .verify(stagingClient) .createStageIfNotExists(storageOperation.getStageName(streamId)) @@ -55,13 +55,13 @@ class SnowflakeStorageOperationTest { @Test fun verifyPrepareStageOverwriteTruncatesTable() { val inOrder = inOrder(destinationHandler, stagingClient) - storageOperation.prepareStage(streamId, DestinationSyncMode.OVERWRITE) + storageOperation.prepareStage(streamId, "", true) inOrder .verify(destinationHandler) - .execute(Sql.of(storageOperation.createTableQuery(streamId))) + .execute(Sql.of(storageOperation.createTableQuery(streamId, ""))) inOrder .verify(destinationHandler) - .execute(Sql.of(storageOperation.truncateTableQuery(streamId))) + .execute(Sql.of(storageOperation.truncateTableQuery(streamId, ""))) inOrder .verify(stagingClient) .createStageIfNotExists(storageOperation.getStageName(streamId)) @@ -80,7 +80,7 @@ class SnowflakeStorageOperationTest { val storageOperation = SnowflakeStorageOperation(sqlGenerator, destinationHandler, 1, stagingClient) - storageOperation.writeToStage(streamConfig, data) + storageOperation.writeToStage(streamConfig, "", data) val inOrder = inOrder(stagingClient) inOrder.verify(stagingClient).uploadRecordsToStage(any(), eq(stageName), any()) inOrder