Skip to content

Commit

Permalink
destination-snowflake: speed up metadata queries (#45422)
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte authored Sep 18, 2024
1 parent 0c9805e commit 14f9e5c
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 3.12.0
dockerImageTag: 3.13.0
dockerRepository: airbyte/destination-snowflake
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
githubIssueLabel: destination-snowflake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf
import io.airbyte.integrations.destination.snowflake.SnowflakeDatabaseUtils
import io.airbyte.integrations.destination.snowflake.migrations.SnowflakeState
import java.sql.Connection
import java.sql.DatabaseMetaData
import java.sql.ResultSet
import java.sql.SQLException
import java.time.Instant
Expand Down Expand Up @@ -64,34 +63,57 @@ class SnowflakeDestinationHandler(
// Postgres is close enough to Snowflake SQL for our purposes.
// We don't quote the database name in any queries, so just upcase it.
private val databaseName = databaseName.uppercase(Locale.getDefault())
private data class SnowflakeTableInfo(
val schemaName: String,
val tableName: String,
val rowCount: Int
) {}
private fun queryTable(schemaName: String, tableName: String): List<SnowflakeTableInfo> {
val showTablesQuery =
"""
SHOW TABLES LIKE '$tableName' IN "$databaseName"."$schemaName";
""".trimIndent()
try {
val showTablesResult =
database.queryJsons(
showTablesQuery,
)
return showTablesResult.map {
SnowflakeTableInfo(
it["schema_name"].asText(),
it["name"].asText(),
it["rows"].asText().toInt()
)
}
} catch (e: SnowflakeSQLException) {
val message = e.message
if (
message != null &&
message.contains("does not exist, or operation cannot be performed.")
)
return emptyList()
else {
throw e
}
}
}

@Throws(SQLException::class)
private fun getFinalTableRowCount(
streamIds: List<StreamId>
): LinkedHashMap<String, LinkedHashMap<String, Int>> {
val tableRowCounts = LinkedHashMap<String, LinkedHashMap<String, Int>>()
// convert list stream to array
val namespaces = streamIds.map { it.finalNamespace }.toTypedArray()
val names = streamIds.map { it.finalName }.toTypedArray()
val query =
"""
|SELECT table_schema, table_name, row_count
|FROM information_schema.tables
|WHERE table_catalog = ?
|AND table_schema IN (${IntRange(1, streamIds.size).joinToString { "?" }})
|AND table_name IN (${IntRange(1, streamIds.size).joinToString { "?" }})
|""".trimMargin()
val bindValues = arrayOf(databaseName) + namespaces + names
val results: List<JsonNode> = database.queryJsons(query, *bindValues)
for (result in results) {
val tableSchema = result["TABLE_SCHEMA"].asText()
val tableName = result["TABLE_NAME"].asText()
val rowCount = result["ROW_COUNT"].asInt()
tableRowCounts
.computeIfAbsent(tableSchema) { _: String? -> LinkedHashMap() }[tableName] =
rowCount
val tableRowCountsFromShowQuery = LinkedHashMap<String, LinkedHashMap<String, Int>>()
for (stream in streamIds) {
val tables = queryTable(stream.finalNamespace, stream.finalName)
tables.forEach {
if (it.tableName == stream.finalName) {
tableRowCountsFromShowQuery
.computeIfAbsent(it.schemaName) { LinkedHashMap() }[it.tableName] =
it.rowCount
}
}
}
return tableRowCounts
return tableRowCountsFromShowQuery
}

@Throws(Exception::class)
Expand All @@ -101,36 +123,15 @@ class SnowflakeDestinationHandler(
): InitialRawTableStatus {
val rawTableName = id.rawName + suffix
val tableExists =
database.executeMetadataQuery { databaseMetaData: DatabaseMetaData ->
LOGGER.info(
"Retrieving table from Db metadata: {} {}",
id.rawNamespace,
rawTableName
)
try {
val rs =
databaseMetaData.getTables(
databaseName,
id.rawNamespace,
rawTableName,
null
)
// When QUOTED_IDENTIFIERS_IGNORE_CASE is set to true, the raw table is
// interpreted as uppercase
// in db metadata calls. check for both
val rsUppercase =
databaseMetaData.getTables(
databaseName,
id.rawNamespace.uppercase(),
rawTableName.uppercase(),
null
)
rs.next() || rsUppercase.next()
} catch (e: SQLException) {
LOGGER.error("Failed to retrieve table metadata", e)
throw RuntimeException(e)
}
queryTable(id.rawNamespace, rawTableName).any {
// When QUOTED_IDENTIFIERS_IGNORE_CASE is set to true, the raw table is
// interpreted as uppercase
// in db metadata calls. check for both
(it.schemaName == id.rawNamespace && it.tableName == rawTableName) ||
(it.schemaName == id.rawNamespace.uppercase() &&
it.tableName == rawTableName.uppercase())
}

if (!tableExists) {
return InitialRawTableStatus(
rawTableExists = false,
Expand Down Expand Up @@ -388,7 +389,7 @@ class SnowflakeDestinationHandler(
val destinationStates = getAllDestinationStates()

val streamIds = streamConfigs.map(StreamConfig::id).toList()
val existingTables = findExistingTables(database, databaseName, streamIds)
val existingTables = findExistingTables(database, streamIds)
val tableRowCounts = getFinalTableRowCount(streamIds)
return streamConfigs
.stream()
Expand Down Expand Up @@ -536,42 +537,52 @@ class SnowflakeDestinationHandler(
@Throws(SQLException::class)
fun findExistingTables(
database: JdbcDatabase,
databaseName: String,
streamIds: List<StreamId>
): LinkedHashMap<String, LinkedHashMap<String, TableDefinition>> {
val existingTables = LinkedHashMap<String, LinkedHashMap<String, TableDefinition>>()
// convert list stream to array
val namespaces = streamIds.map { it.finalNamespace }.toTypedArray()
val names = streamIds.map { it.finalName }.toTypedArray()
val query =
"""
|SELECT table_schema, table_name, column_name, data_type, is_nullable
|FROM information_schema.columns
|WHERE table_catalog = ?
|AND table_schema IN (${IntRange(1, streamIds.size).joinToString { "?" }})
|AND table_name IN (${IntRange(1, streamIds.size).joinToString { "?" }})
|ORDER BY table_schema, table_name, ordinal_position;
|""".trimMargin()

val bindValues =
arrayOf(databaseName.uppercase(Locale.getDefault())) + namespaces + names
val results: List<JsonNode> = database.queryJsons(query, *bindValues)
for (result in results) {
val tableSchema = result["TABLE_SCHEMA"].asText()
val tableName = result["TABLE_NAME"].asText()
val columnName = result["COLUMN_NAME"].asText()
val dataType = result["DATA_TYPE"].asText()
val isNullable = result["IS_NULLABLE"].asText()
val tableDefinition =
): Map<String, Map<String, TableDefinition>> {
val existingTables = HashMap<String, HashMap<String, TableDefinition>>()
for (stream in streamIds) {
val schemaName = stream.finalNamespace
val tableName = stream.finalName
val table = getTable(database, schemaName, tableName)
if (table != null) {
existingTables
.computeIfAbsent(tableSchema) { _: String? -> LinkedHashMap() }
.computeIfAbsent(tableName) { _: String? ->
TableDefinition(LinkedHashMap())
}
tableDefinition.columns[columnName] =
ColumnDefinition(columnName, dataType, 0, fromIsNullableIsoString(isNullable))
.computeIfAbsent(schemaName) { _: String? -> HashMap() }
.computeIfAbsent(tableName) { _: String? -> table }
}
}
return existingTables
}

fun getTable(
database: JdbcDatabase,
schemaName: String,
tableName: String,
): TableDefinition? {
try {
val columns = LinkedHashMap<String, ColumnDefinition>()
database.queryJsons("DESCRIBE TABLE \"$schemaName\".\"$tableName\"").map {
val columnName = it["name"].asText()
val dataType =
when (
val snowflakeDataType =
it["type"].asText().takeWhile { char -> char != '(' }
) {
"VARCHAR" -> "TEXT"
else -> snowflakeDataType
}

val isNullable = it["null?"].asText() == "Y"
columns[columnName] =
ColumnDefinition(columnName, dataType, columnSize = 0, isNullable)
}
return TableDefinition(columns)
} catch (e: SnowflakeSQLException) {
if (e.message != null && e.message!!.contains("does not exist")) {
return null
} else {
throw e
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,20 @@
*/
package io.airbyte.integrations.destination.snowflake.typing_deduping

import com.fasterxml.jackson.databind.JsonNode
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.db.jdbc.JdbcDatabase
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer
import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition
import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler.Companion.fromIsNullableIsoString
import io.airbyte.integrations.base.destination.typing_deduping.BaseDestinationV1V2Migrator
import io.airbyte.integrations.base.destination.typing_deduping.CollectionUtils.containsAllIgnoreCase
import io.airbyte.integrations.base.destination.typing_deduping.NamespacedTableName
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import io.github.oshai.kotlinlogging.KotlinLogging
import java.util.*
import lombok.SneakyThrows
import net.snowflake.client.jdbc.SnowflakeSQLException

private val LOGGER = KotlinLogging.logger {}

@SuppressFBWarnings("NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE")
class SnowflakeV1V2Migrator(
Expand All @@ -26,19 +27,19 @@ class SnowflakeV1V2Migrator(
@SneakyThrows
@Throws(Exception::class)
override fun doesAirbyteInternalNamespaceExist(streamConfig: StreamConfig?): Boolean {
return database
.queryJsons(
"""
SELECT SCHEMA_NAME
FROM information_schema.schemata
WHERE schema_name = ?
AND catalog_name = ?;
""".trimIndent(),
streamConfig!!.id.rawNamespace,
databaseName
)
.isNotEmpty()
try {
return database
.queryJsons(
"SHOW SCHEMAS LIKE '${streamConfig!!.id.rawNamespace}' IN DATABASE \"$databaseName\";",
)
.isNotEmpty()
} catch (e: SnowflakeSQLException) {
if (e.message != null && e.message!!.contains("does not exist")) {
return false
} else {
throw e
}
}
}

override fun schemaMatchesExpectation(
Expand All @@ -54,50 +55,9 @@ class SnowflakeV1V2Migrator(
namespace: String?,
tableName: String?
): Optional<TableDefinition> {
// TODO this looks similar to SnowflakeDestinationHandler#findExistingTables, with a twist;
// databaseName not upper-cased and rawNamespace and rawTableName as-is (no uppercase).
// The obvious database.getMetaData().getColumns() solution doesn't work, because JDBC
// translates
// VARIANT as VARCHAR
val columns =
database
.queryJsons(
"""
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_catalog = ?
AND table_schema = ?
AND table_name = ?
ORDER BY ordinal_position;
""".trimIndent(),
databaseName,
namespace!!,
tableName!!
)
.stream()
.collect(
{ LinkedHashMap() },
{ map: java.util.LinkedHashMap<String, ColumnDefinition>, row: JsonNode ->
map[row["COLUMN_NAME"].asText()] =
ColumnDefinition(
row["COLUMN_NAME"].asText(),
row["DATA_TYPE"].asText(),
0,
fromIsNullableIsoString(row["IS_NULLABLE"].asText())
)
},
{
obj: java.util.LinkedHashMap<String, ColumnDefinition>,
m: java.util.LinkedHashMap<String, ColumnDefinition>? ->
obj.putAll(m!!)
}
)
return if (columns.isEmpty()) {
Optional.empty()
} else {
Optional.of(TableDefinition(columns))
}
return Optional.ofNullable(
SnowflakeDestinationHandler.getTable(database, namespace!!, tableName!!)
)
}

override fun convertToV1RawName(streamConfig: StreamConfig): NamespacedTableName {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,9 @@ class SnowflakeInternalStagingLowercaseDatabaseTypingDedupingTest :
override fun testV1V2Migration() {
super.testV1V2Migration()
}

@Test
override fun identicalNameSimultaneousSync() {
super.identicalNameSimultaneousSync()
}
}
5 changes: 3 additions & 2 deletions docs/integrations/destinations/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,10 @@ desired namespace.

| Version | Date | Pull Request | Subject |
| :-------------- | :--------- | :--------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 3.13.0 | 2024-09-17 | [\#45422](https://github.com/airbytehq/airbyte/pull/45422) | speed up metadata queries |
| 3.12.0 | 2024-09-17 | [\#38585](https://github.com/airbytehq/airbyte/pull/38585) | force UTF8 collation when creating schemas and tables |
| 3.11.12 | 2024-09-12 | [\#45370](https://github.com/airbytehq/airbyte/pull/45370) | fix a race condition in our orphanedThreadFilter |
| 3.11.11 | 2024-08-20 | [\#44476](https://github.com/airbytehq/airbyte/pull/44476) | Increase message parsing limit to 100mb |
| 3.11.12 | 2024-09-12 | [\#45370](https://github.com/airbytehq/airbyte/pull/45370) | fix a race condition in our orphanedThreadFilter |
| 3.11.11 | 2024-08-20 | [\#44476](https://github.com/airbytehq/airbyte/pull/44476) | Increase message parsing limit to 100mb |
| 3.11.10 | 2024-08-22 | [\#44526](https://github.com/airbytehq/airbyte/pull/44526) | Revert protocol compliance fix |
| 3.11.9 | 2024-08-19 | [\#43367](https://github.com/airbytehq/airbyte/pull/43367) | Add opt in using MERGE statement for upserts and deletes |
| 3.11.8 | 2024-08-16 | [\#42505](https://github.com/airbytehq/airbyte/pull/42505) | Fix bug in refreshes logic (already mitigated in platform, just fixing protocol compliance) |
Expand Down

0 comments on commit 14f9e5c

Please sign in to comment.