From ec28fd1d07b1188aa74418c6aa7e05f80b1f5b6e Mon Sep 17 00:00:00 2001 From: Gireesh Sreepathi Date: Fri, 23 Feb 2024 12:57:41 -0800 Subject: [PATCH] Destination Snowflake: CDK T+D initial state refactor (#35456) Signed-off-by: Gireesh Sreepathi --- .../destination-snowflake/build.gradle | 2 +- .../destination-snowflake/metadata.yaml | 2 +- .../SnowflakeInternalStagingDestination.java | 12 +- .../typing_deduping/SnowflakeColumn.java | 11 - .../SnowflakeColumnDefinition.java | 19 -- .../SnowflakeDestinationHandler.java | 246 ++++++++++++++---- .../SnowflakeSqlGenerator.java | 40 +-- .../SnowflakeTableDefinition.java | 14 - .../SnowflakeV1V2Migrator.java | 29 +-- .../SnowflakeV2TableMigrator.java | 44 +--- .../AbstractSnowflakeTypingDedupingTest.java | 2 +- .../SnowflakeSqlGeneratorIntegrationTest.java | 13 +- docs/integrations/destinations/snowflake.md | 3 +- 13 files changed, 238 insertions(+), 199 deletions(-) delete mode 100644 airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeColumn.java delete mode 100644 airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeColumnDefinition.java delete mode 100644 airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeTableDefinition.java diff --git a/airbyte-integrations/connectors/destination-snowflake/build.gradle b/airbyte-integrations/connectors/destination-snowflake/build.gradle index 3cc7265e2df9..b84e054c0609 100644 --- a/airbyte-integrations/connectors/destination-snowflake/build.gradle +++ b/airbyte-integrations/connectors/destination-snowflake/build.gradle @@ -3,7 +3,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.20.9' + cdkVersionRequired = '0.23.2' features = ['db-destinations', 's3-destinations', 'typing-deduping'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml index 021cfd26e685..d39c5a8c9669 100644 --- a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml +++ b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 424892c4-daac-4491-b35d-c6688ba547ba - dockerImageTag: 3.5.13 + dockerImageTag: 3.5.14 dockerRepository: airbyte/destination-snowflake documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake githubIssueLabel: destination-snowflake diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java index 472c8d5dec8a..253212ecf628 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java @@ -13,6 +13,7 @@ import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag; import io.airbyte.cdk.integrations.destination.NamingConventionTransformer; import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination; +import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler; import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator; import io.airbyte.cdk.integrations.destination.staging.StagingConsumerFactory; import io.airbyte.commons.json.Jsons; @@ -129,6 +130,11 @@ protected JdbcSqlGenerator getSqlGenerator() { throw new UnsupportedOperationException("Snowflake does not yet use the native JDBC DV2 interface"); } + @Override + protected JdbcDestinationHandler getDestinationHandler(String databaseName, JdbcDatabase database) { + throw new UnsupportedOperationException("Snowflake does not yet use the native JDBC DV2 interface"); + } + @Override public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonNode config, final ConfiguredAirbyteCatalog catalog, @@ -156,13 +162,11 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN final SnowflakeV1V2Migrator migrator = new SnowflakeV1V2Migrator(getNamingResolver(), database, databaseName); final SnowflakeV2TableMigrator v2TableMigrator = new SnowflakeV2TableMigrator(database, databaseName, sqlGenerator, snowflakeDestinationHandler); final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false); - final int defaultThreadCount = 8; if (disableTypeDedupe) { - typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator, v2TableMigrator, - defaultThreadCount); + typerDeduper = new NoOpTyperDeduperWithV1V2Migrations(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator, v2TableMigrator); } else { typerDeduper = - new DefaultTyperDeduper<>(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator, v2TableMigrator, defaultThreadCount); + new DefaultTyperDeduper(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator, v2TableMigrator); } return StagingConsumerFactory.builder( diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeColumn.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeColumn.java deleted file mode 100644 index 8415fedf587c..000000000000 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeColumn.java +++ /dev/null @@ -1,11 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.snowflake.typing_deduping; - -/** - * type is notably _not_ a {@link net.snowflake.client.jdbc.SnowflakeType}. That enum doesn't - * contain all the types that snowflake supports (specifically NUMBER). - */ -public record SnowflakeColumn(String name, String type) {} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeColumnDefinition.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeColumnDefinition.java deleted file mode 100644 index 06be84ffe67f..000000000000 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeColumnDefinition.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.snowflake.typing_deduping; - -/** - * isNullable is only used to execute a migration away from an older version of - * destination-snowflake, where we created PK columns as NOT NULL. This caused a lot of problems - * because many sources emit null PKs. We may want to remove this field eventually. - */ -public record SnowflakeColumnDefinition(String type, boolean isNullable) { - - @Deprecated - public boolean isNullable() { - return isNullable; - } - -} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java index 7afa49a48cc4..5bfeb5d6b25e 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java @@ -4,24 +4,46 @@ package io.airbyte.integrations.destination.snowflake.typing_deduping; +import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT; +import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_META; +import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_RAW_ID; +import static io.airbyte.cdk.integrations.base.JavaBaseConstants.V2_FINAL_TABLE_METADATA_COLUMNS; + +import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.cdk.db.jdbc.JdbcDatabase; -import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler; +import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition; +import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition; +import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler; +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType; +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType; +import io.airbyte.integrations.base.destination.typing_deduping.Array; +import io.airbyte.integrations.base.destination.typing_deduping.ColumnId; +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialState; +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStateImpl; +import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableState; import io.airbyte.integrations.base.destination.typing_deduping.Sql; +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; +import io.airbyte.integrations.base.destination.typing_deduping.Struct; +import io.airbyte.integrations.base.destination.typing_deduping.Union; +import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf; import java.sql.ResultSet; import java.sql.SQLException; import java.time.Instant; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; import net.snowflake.client.jdbc.SnowflakeSQLException; import org.apache.commons.text.StringSubstitutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SnowflakeDestinationHandler implements DestinationHandler { +public class SnowflakeDestinationHandler extends JdbcDestinationHandler { private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeDestinationHandler.class); public static final String EXCEPTION_COMMON_PREFIX = "JavaScript execution error: Uncaught Execution of multiple statements failed on statement"; @@ -30,60 +52,74 @@ public class SnowflakeDestinationHandler implements DestinationHandler findExistingTable(final StreamId id) throws SQLException { - // The obvious database.getMetaData().getColumns() solution doesn't work, because JDBC translates - // VARIANT as VARCHAR - final LinkedHashMap columns = database.queryJsons( - """ - SELECT column_name, data_type, is_nullable - FROM information_schema.columns - WHERE table_catalog = ? - AND table_schema = ? - AND table_name = ? - ORDER BY ordinal_position; - """, - databaseName.toUpperCase(), - id.finalNamespace().toUpperCase(), - id.finalName().toUpperCase()).stream() - .collect(LinkedHashMap::new, - (map, row) -> map.put( - row.get("COLUMN_NAME").asText(), - new SnowflakeColumnDefinition(row.get("DATA_TYPE").asText(), fromSnowflakeBoolean(row.get("IS_NULLABLE").asText()))), - LinkedHashMap::putAll); - if (columns.isEmpty()) { - return Optional.empty(); - } else { - return Optional.of(new SnowflakeTableDefinition(columns)); + public static LinkedHashMap> findExistingTables(final JdbcDatabase database, + final String databaseName, + final List streamIds) + throws SQLException { + final LinkedHashMap> existingTables = new LinkedHashMap<>(); + final String paramHolder = String.join(",", Collections.nCopies(streamIds.size(), "?")); + // convert list stream to array + final String[] namespaces = streamIds.stream().map(StreamId::finalNamespace).toArray(String[]::new); + final String[] names = streamIds.stream().map(StreamId::finalName).toArray(String[]::new); + final String query = """ + SELECT table_schema, table_name, column_name, data_type, is_nullable + FROM information_schema.columns + WHERE table_catalog = ? + AND table_schema IN (%s) + AND table_name IN (%s) + ORDER BY table_schema, table_name, ordinal_position; + """.formatted(paramHolder, paramHolder); + final String[] bindValues = new String[streamIds.size() * 2 + 1]; + bindValues[0] = databaseName.toUpperCase(); + System.arraycopy(namespaces, 0, bindValues, 1, namespaces.length); + System.arraycopy(names, 0, bindValues, namespaces.length + 1, names.length); + final List results = database.queryJsons(query, bindValues); + for (final JsonNode result : results) { + final String tableSchema = result.get("TABLE_SCHEMA").asText(); + final String tableName = result.get("TABLE_NAME").asText(); + final String columnName = result.get("COLUMN_NAME").asText(); + final String dataType = result.get("DATA_TYPE").asText(); + final String isNullable = result.get("IS_NULLABLE").asText(); + final TableDefinition tableDefinition = existingTables + .computeIfAbsent(tableSchema, k -> new LinkedHashMap<>()) + .computeIfAbsent(tableName, k -> new TableDefinition(new LinkedHashMap<>())); + tableDefinition.columns().put(columnName, new ColumnDefinition(columnName, dataType, 0, fromIsNullableIsoString(isNullable))); } + return existingTables; } - @Override - public LinkedHashMap findExistingFinalTables(final List list) throws Exception { - return null; - } - - @Override - public boolean isFinalTableEmpty(final StreamId id) throws SQLException { - final int rowCount = database.queryInt( - """ - SELECT row_count - FROM information_schema.tables - WHERE table_catalog = ? - AND table_schema = ? - AND table_name = ? - """, - databaseName.toUpperCase(), - id.finalNamespace().toUpperCase(), - id.finalName().toUpperCase()); - return rowCount == 0; + private LinkedHashMap> getFinalTableRowCount(final List streamIds) throws SQLException { + final LinkedHashMap> tableRowCounts = new LinkedHashMap<>(); + final String paramHolder = String.join(",", Collections.nCopies(streamIds.size(), "?")); + // convert list stream to array + final String[] namespaces = streamIds.stream().map(StreamId::finalNamespace).toArray(String[]::new); + final String[] names = streamIds.stream().map(StreamId::finalName).toArray(String[]::new); + final String query = """ + SELECT table_schema, table_name, row_count + FROM information_schema.tables + WHERE table_catalog = ? + AND table_schema IN (%s) + AND table_name IN (%s) + """.formatted(paramHolder, paramHolder); + final String[] bindValues = new String[streamIds.size() * 2 + 1]; + bindValues[0] = databaseName.toUpperCase(); + System.arraycopy(namespaces, 0, bindValues, 1, namespaces.length); + System.arraycopy(names, 0, bindValues, namespaces.length + 1, names.length); + final List results = database.queryJsons(query, bindValues); + for (final JsonNode result : results) { + final String tableSchema = result.get("TABLE_SCHEMA").asText(); + final String tableName = result.get("TABLE_NAME").asText(); + final int rowCount = result.get("ROW_COUNT").asInt(); + tableRowCounts.computeIfAbsent(tableSchema, k -> new LinkedHashMap<>()).put(tableName, rowCount); + } + return tableRowCounts; } - @Override public InitialRawTableState getInitialRawTableState(final StreamId id) throws Exception { final ResultSet tables = database.getMetaData().getTables( databaseName, @@ -158,12 +194,116 @@ public void execute(final Sql sql) throws Exception { } } - /** - * In snowflake information_schema tables, booleans return "YES" and "NO", which DataBind doesn't - * know how to use - */ - private boolean fromSnowflakeBoolean(final String input) { - return input.equalsIgnoreCase("yes"); + private Set getPks(final StreamConfig stream) { + return stream.primaryKey() != null ? stream.primaryKey().stream().map(ColumnId::name).collect(Collectors.toSet()) : Collections.emptySet(); + } + + private boolean isAirbyteRawIdColumnMatch(final TableDefinition existingTable) { + final String abRawIdColumnName = COLUMN_NAME_AB_RAW_ID.toUpperCase(); + return existingTable.columns().containsKey(abRawIdColumnName) && + toJdbcTypeName(AirbyteProtocolType.STRING).equals(existingTable.columns().get(abRawIdColumnName).type()); + } + + private boolean isAirbyteExtractedAtColumnMatch(final TableDefinition existingTable) { + final String abExtractedAtColumnName = COLUMN_NAME_AB_EXTRACTED_AT.toUpperCase(); + return existingTable.columns().containsKey(abExtractedAtColumnName) && + toJdbcTypeName(AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE).equals(existingTable.columns().get(abExtractedAtColumnName).type()); + } + + private boolean isAirbyteMetaColumnMatch(TableDefinition existingTable) { + final String abMetaColumnName = COLUMN_NAME_AB_META.toUpperCase(); + return existingTable.columns().containsKey(abMetaColumnName) && + "VARIANT".equals(existingTable.columns().get(abMetaColumnName).type()); + } + + protected boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, final TableDefinition existingTable) { + final Set pks = getPks(stream); + // This is same as JdbcDestinationHandler#existingSchemaMatchesStreamConfig with upper case + // conversion. + // TODO: Unify this using name transformer or something. + if (!isAirbyteRawIdColumnMatch(existingTable) || + !isAirbyteExtractedAtColumnMatch(existingTable) || + !isAirbyteMetaColumnMatch(existingTable)) { + // Missing AB meta columns from final table, we need them to do proper T+D so trigger soft-reset + return false; + } + final LinkedHashMap intendedColumns = stream.columns().entrySet().stream() + .collect(LinkedHashMap::new, + (map, column) -> map.put(column.getKey().name(), toJdbcTypeName(column.getValue())), + LinkedHashMap::putAll); + + // Filter out Meta columns since they don't exist in stream config. + final LinkedHashMap actualColumns = existingTable.columns().entrySet().stream() + .filter(column -> V2_FINAL_TABLE_METADATA_COLUMNS.stream().map(String::toUpperCase) + .noneMatch(airbyteColumnName -> airbyteColumnName.equals(column.getKey()))) + .collect(LinkedHashMap::new, + (map, column) -> map.put(column.getKey(), column.getValue().type()), + LinkedHashMap::putAll); + // soft-resetting https://github.com/airbytehq/airbyte/pull/31082 + @SuppressWarnings("deprecation") + final boolean hasPksWithNonNullConstraint = existingTable.columns().entrySet().stream() + .anyMatch(c -> pks.contains(c.getKey()) && !c.getValue().isNullable()); + + return !hasPksWithNonNullConstraint + && actualColumns.equals(intendedColumns); + + } + + @Override + public List gatherInitialState(List streamConfigs) throws Exception { + List streamIds = streamConfigs.stream().map(StreamConfig::id).toList(); + final LinkedHashMap> existingTables = findExistingTables(database, databaseName, streamIds); + final LinkedHashMap> tableRowCounts = getFinalTableRowCount(streamIds); + return streamConfigs.stream().map(streamConfig -> { + try { + final String namespace = streamConfig.id().finalNamespace().toUpperCase(); + final String name = streamConfig.id().finalName().toUpperCase(); + boolean isSchemaMismatch = false; + boolean isFinalTableEmpty = true; + boolean isFinalTablePresent = existingTables.containsKey(namespace) && existingTables.get(namespace).containsKey(name); + boolean hasRowCount = tableRowCounts.containsKey(namespace) && tableRowCounts.get(namespace).containsKey(name); + if (isFinalTablePresent) { + final TableDefinition existingTable = existingTables.get(namespace).get(name); + isSchemaMismatch = !existingSchemaMatchesStreamConfig(streamConfig, existingTable); + isFinalTableEmpty = hasRowCount && tableRowCounts.get(namespace).get(name) == 0; + } + final InitialRawTableState initialRawTableState = getInitialRawTableState(streamConfig.id()); + return new DestinationInitialStateImpl(streamConfig, isFinalTablePresent, initialRawTableState, isSchemaMismatch, isFinalTableEmpty); + } catch (Exception e) { + throw new RuntimeException(e); + } + }).collect(Collectors.toList()); + } + + @Override + protected String toJdbcTypeName(AirbyteType airbyteType) { + if (airbyteType instanceof final AirbyteProtocolType p) { + return toJdbcTypeName(p); + } + + return switch (airbyteType.getTypeName()) { + case Struct.TYPE -> "OBJECT"; + case Array.TYPE -> "ARRAY"; + case UnsupportedOneOf.TYPE -> "VARIANT"; + case Union.TYPE -> toJdbcTypeName(((Union) airbyteType).chooseType()); + default -> throw new IllegalArgumentException("Unrecognized type: " + airbyteType.getTypeName()); + }; + } + + private String toJdbcTypeName(final AirbyteProtocolType airbyteProtocolType) { + return switch (airbyteProtocolType) { + case STRING -> "TEXT"; + case NUMBER -> "FLOAT"; + case INTEGER -> "NUMBER"; + case BOOLEAN -> "BOOLEAN"; + case TIMESTAMP_WITH_TIMEZONE -> "TIMESTAMP_TZ"; + case TIMESTAMP_WITHOUT_TIMEZONE -> "TIMESTAMP_NTZ"; + // If you change this - also change the logic in extractAndCast + case TIME_WITH_TIMEZONE -> "TEXT"; + case TIME_WITHOUT_TIMEZONE -> "TIME"; + case DATE -> "DATE"; + case UNKNOWN -> "VARIANT"; + }; } } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java index 88733c74315d..37b0bdaefff8 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java @@ -21,22 +21,18 @@ import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; import io.airbyte.integrations.base.destination.typing_deduping.Struct; -import io.airbyte.integrations.base.destination.typing_deduping.TableNotMigratedException; import io.airbyte.integrations.base.destination.typing_deduping.Union; import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf; import io.airbyte.protocol.models.v0.DestinationSyncMode; import java.time.Instant; -import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.commons.text.StringSubstitutor; -public class SnowflakeSqlGenerator implements SqlGenerator { +public class SnowflakeSqlGenerator implements SqlGenerator { public static final String QUOTE = "\""; @@ -134,36 +130,6 @@ public Sql createTable(final StreamConfig stream, final String suffix, final boo """)); } - @Override - public boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, final SnowflakeTableDefinition existingTable) - throws TableNotMigratedException { - final Set pks = getPks(stream); - - // Check that the columns match, with special handling for the metadata columns. - final LinkedHashMap intendedColumns = stream.columns().entrySet().stream() - .collect(LinkedHashMap::new, - (map, column) -> map.put(column.getKey().name(), toDialectType(column.getValue())), - LinkedHashMap::putAll); - final LinkedHashMap actualColumns = existingTable.columns().entrySet().stream() - .filter(column -> JavaBaseConstants.V2_FINAL_TABLE_METADATA_COLUMNS.stream().map(String::toUpperCase) - .noneMatch(airbyteColumnName -> airbyteColumnName.equals(column.getKey()))) - .collect(LinkedHashMap::new, - (map, column) -> map.put(column.getKey(), column.getValue().type()), - LinkedHashMap::putAll); - // soft-resetting https://github.com/airbytehq/airbyte/pull/31082 - @SuppressWarnings("deprecation") - final boolean hasPksWithNonNullConstraint = existingTable.columns().entrySet().stream() - .anyMatch(c -> pks.contains(c.getKey()) && !c.getValue().isNullable()); - - final boolean sameColumns = actualColumns.equals(intendedColumns) - && !hasPksWithNonNullConstraint - && "TEXT".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID.toUpperCase()).type()) - && "TIMESTAMP_TZ".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT.toUpperCase()).type()) - && "VARIANT".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_META.toUpperCase()).type()); - - return sameColumns; - } - @Override public Sql updateTable(final StreamConfig stream, final String finalSuffix, @@ -552,8 +518,4 @@ public static String escapeSingleQuotedString(final String str) { .replace("'", "\\'"); } - private static Set getPks(final StreamConfig stream) { - return stream.primaryKey() != null ? stream.primaryKey().stream().map(ColumnId::name).collect(Collectors.toSet()) : Collections.emptySet(); - } - } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeTableDefinition.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeTableDefinition.java deleted file mode 100644 index 2535d9004b13..000000000000 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeTableDefinition.java +++ /dev/null @@ -1,14 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.snowflake.typing_deduping; - -import java.util.LinkedHashMap; - -/** - * @param columns Map from column name to type. Type is a plain string because - * {@link net.snowflake.client.jdbc.SnowflakeType} doesn't actually have all the types that - * Snowflake supports. - */ -public record SnowflakeTableDefinition(LinkedHashMap columns) {} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeV1V2Migrator.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeV1V2Migrator.java index aa6eba7f7f96..3226afa58337 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeV1V2Migrator.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeV1V2Migrator.java @@ -4,8 +4,12 @@ package io.airbyte.integrations.destination.snowflake.typing_deduping; +import static io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler.*; + import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.integrations.destination.NamingConventionTransformer; +import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition; +import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition; import io.airbyte.integrations.base.destination.typing_deduping.BaseDestinationV1V2Migrator; import io.airbyte.integrations.base.destination.typing_deduping.CollectionUtils; import io.airbyte.integrations.base.destination.typing_deduping.NamespacedTableName; @@ -15,7 +19,7 @@ import java.util.Optional; import lombok.SneakyThrows; -public class SnowflakeV1V2Migrator extends BaseDestinationV1V2Migrator { +public class SnowflakeV1V2Migrator extends BaseDestinationV1V2Migrator { private final NamingConventionTransformer namingConventionTransformer; @@ -48,18 +52,18 @@ protected boolean doesAirbyteInternalNamespaceExist(final StreamConfig streamCon } @Override - protected boolean schemaMatchesExpectation(final SnowflakeTableDefinition existingTable, final Collection columns) { + protected boolean schemaMatchesExpectation(final TableDefinition existingTable, final Collection columns) { return CollectionUtils.containsAllIgnoreCase(existingTable.columns().keySet(), columns); } @SneakyThrows @Override - protected Optional getTableIfExists(final String namespace, final String tableName) throws Exception { - // TODO this is mostly copied from SnowflakeDestinationHandler#findExistingTable, we should probably - // reuse this logic + protected Optional getTableIfExists(final String namespace, final String tableName) throws Exception { + // TODO this looks similar to SnowflakeDestinationHandler#findExistingTables, with a twist; + // databaseName not upper-cased and rawNamespace and rawTableName as-is (no uppercase). // The obvious database.getMetaData().getColumns() solution doesn't work, because JDBC translates // VARIANT as VARCHAR - final LinkedHashMap columns = + final LinkedHashMap columns = database.queryJsons( """ SELECT column_name, data_type, is_nullable @@ -75,12 +79,13 @@ protected Optional getTableIfExists(final String names .stream() .collect(LinkedHashMap::new, (map, row) -> map.put(row.get("COLUMN_NAME").asText(), - new SnowflakeColumnDefinition(row.get("DATA_TYPE").asText(), fromSnowflakeBoolean(row.get("IS_NULLABLE").asText()))), + new ColumnDefinition(row.get("COLUMN_NAME").asText(), row.get("DATA_TYPE").asText(), 0, + fromIsNullableIsoString(row.get("IS_NULLABLE").asText()))), LinkedHashMap::putAll); if (columns.isEmpty()) { return Optional.empty(); } else { - return Optional.of(new SnowflakeTableDefinition(columns)); + return Optional.of(new TableDefinition(columns)); } } @@ -101,12 +106,4 @@ protected boolean doesValidV1RawTableExist(final String namespace, final String return super.doesValidV1RawTableExist(namespace.toUpperCase(), tableName.toUpperCase()); } - /** - * In snowflake information_schema tables, booleans return "YES" and "NO", which DataBind doesn't - * know how to use - */ - private boolean fromSnowflakeBoolean(final String input) { - return input.equalsIgnoreCase("yes"); - } - } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeV2TableMigrator.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeV2TableMigrator.java index 9e04ec3b6f22..eef75f86c7bf 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeV2TableMigrator.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeV2TableMigrator.java @@ -9,6 +9,7 @@ import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag; +import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition; import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeTransaction; @@ -16,6 +17,7 @@ import io.airbyte.protocol.models.v0.DestinationSyncMode; import java.sql.SQLException; import java.util.LinkedHashMap; +import java.util.List; import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,8 +50,8 @@ public void migrateIfNecessary(final StreamConfig streamConfig) throws Exception streamConfig.id().originalName(), rawNamespace); final boolean syncModeRequiresMigration = streamConfig.destinationSyncMode() != DestinationSyncMode.OVERWRITE; - final boolean existingTableCaseSensitiveExists = findExistingTable_caseSensitive(caseSensitiveStreamId).isPresent(); - final boolean existingTableUppercaseDoesNotExist = !handler.findExistingTable(streamConfig.id()).isPresent(); + final boolean existingTableCaseSensitiveExists = findExistingTable(caseSensitiveStreamId).isPresent(); + final boolean existingTableUppercaseDoesNotExist = findExistingTable(streamConfig.id()).isEmpty(); LOGGER.info( "Checking whether upcasing migration is necessary for {}.{}. Sync mode requires migration: {}; existing case-sensitive table exists: {}; existing uppercased table does not exist: {}", streamConfig.id().originalNamespace(), @@ -87,41 +89,15 @@ private static String escapeIdentifier_caseSensitive(final String identifier) { return identifier.replace("\"", "\"\""); } - // And this was taken from - // https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java - public Optional findExistingTable_caseSensitive(final StreamId id) throws SQLException { + private Optional findExistingTable(final StreamId id) throws SQLException { // The obvious database.getMetaData().getColumns() solution doesn't work, because JDBC translates // VARIANT as VARCHAR - final LinkedHashMap columns = database.queryJsons( - """ - SELECT column_name, data_type, is_nullable - FROM information_schema.columns - WHERE table_catalog = ? - AND table_schema = ? - AND table_name = ? - ORDER BY ordinal_position; - """, - databaseName.toUpperCase(), - id.finalNamespace(), - id.finalName()).stream() - .collect(LinkedHashMap::new, - (map, row) -> map.put( - row.get("COLUMN_NAME").asText(), - new SnowflakeColumnDefinition(row.get("DATA_TYPE").asText(), fromSnowflakeBoolean(row.get("IS_NULLABLE").asText()))), - LinkedHashMap::putAll); - if (columns.isEmpty()) { - return Optional.empty(); - } else { - return Optional.of(new SnowflakeTableDefinition(columns)); + LinkedHashMap> existingTableMap = + SnowflakeDestinationHandler.findExistingTables(database, databaseName, List.of(id)); + if (existingTableMap.containsKey(id.finalNamespace()) && existingTableMap.get(id.finalNamespace()).containsKey(id.finalName())) { + return Optional.of(existingTableMap.get(id.finalNamespace()).get(id.finalName())); } - } - - /** - * In snowflake information_schema tables, booleans return "YES" and "NO", which DataBind doesn't - * know how to use - */ - private boolean fromSnowflakeBoolean(String input) { - return input.equalsIgnoreCase("yes"); + return Optional.empty(); } } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.java index a7aac9cef7cc..2c502d1c1ac9 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.java @@ -107,7 +107,7 @@ protected void globalTeardown() throws Exception { } @Override - protected SqlGenerator getSqlGenerator() { + protected SqlGenerator getSqlGenerator() { return new SnowflakeSqlGenerator(); } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java index 13338a83a03e..bf204e1909d7 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java @@ -22,6 +22,7 @@ import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.destination.typing_deduping.BaseSqlGeneratorIntegrationTest; +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialState; import io.airbyte.integrations.base.destination.typing_deduping.Sql; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; import io.airbyte.integrations.destination.snowflake.OssCloudEnvVarConsts; @@ -44,7 +45,7 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -public class SnowflakeSqlGeneratorIntegrationTest extends BaseSqlGeneratorIntegrationTest { +public class SnowflakeSqlGeneratorIntegrationTest extends BaseSqlGeneratorIntegrationTest { private static String databaseName; private static JdbcDatabase database; @@ -411,8 +412,9 @@ public void ensurePKsAreIndexedUnique() throws Exception { // should be OK with new tables destinationHandler.execute(createTable); - final Optional existingTableA = destinationHandler.findExistingTable(streamId); - assertTrue(generator.existingSchemaMatchesStreamConfig(incrementalDedupStream, existingTableA.get())); + List initialStates = destinationHandler.gatherInitialState(List.of(incrementalDedupStream)); + assertEquals(1, initialStates.size()); + assertFalse(initialStates.get(0).isSchemaMismatch()); destinationHandler.execute(Sql.of("DROP TABLE " + streamId.finalTableId(""))); // Hack the create query to add NOT NULLs to emulate the old behavior @@ -424,8 +426,9 @@ public void ensurePKsAreIndexedUnique() throws Exception { .collect(joining("\r\n"))) .toList()).toList(); destinationHandler.execute(new Sql(createTableModified)); - final Optional existingTableB = destinationHandler.findExistingTable(streamId); - assertFalse(generator.existingSchemaMatchesStreamConfig(incrementalDedupStream, existingTableB.get())); + initialStates = destinationHandler.gatherInitialState(List.of(incrementalDedupStream)); + assertEquals(1, initialStates.size()); + assertTrue(initialStates.get(0).isSchemaMismatch()); } } diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index f7a6f3e7a95d..39be90148e99 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -246,7 +246,8 @@ Otherwise, make sure to grant the role the required permissions in the desired n | Version | Date | Pull Request | Subject | |:----------------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 3.5.13 | 2024-02-22 | [35569](https://github.com/airbytehq/airbyte/pull/35569) | Fix logging bug. | +| 3.5.14 | 2024-02-22 | [35456](https://github.com/airbytehq/airbyte/pull/35456) | Adopt CDK 0.23.0; Gather initial state upfront, reduce information_schema calls | +| 3.5.13 | 2024-02-22 | [35569](https://github.com/airbytehq/airbyte/pull/35569) | Fix logging bug. | | 3.5.12 | 2024-02-15 | [35240](https://github.com/airbytehq/airbyte/pull/35240) | Adopt CDK 0.20.9 | | 3.5.11 | 2024-02-12 | [35194](https://github.com/airbytehq/airbyte/pull/35194) | Reorder auth options | | 3.5.10 | 2024-02-12 | [35144](https://github.com/airbytehq/airbyte/pull/35144) | Adopt CDK 0.20.2 |