Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination Snowflake: CDK T+D initial state refactor #35456

Merged
merged 9 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.20.9'
cdkVersionRequired = '0.23.2'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 3.5.13
dockerImageTag: 3.5.14
dockerRepository: airbyte/destination-snowflake
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
githubIssueLabel: destination-snowflake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag;
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer;
import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
import io.airbyte.cdk.integrations.destination.staging.StagingConsumerFactory;
import io.airbyte.commons.json.Jsons;
Expand Down Expand Up @@ -129,6 +130,11 @@ protected JdbcSqlGenerator getSqlGenerator() {
throw new UnsupportedOperationException("Snowflake does not yet use the native JDBC DV2 interface");
}

@Override
protected JdbcDestinationHandler getDestinationHandler(String databaseName, JdbcDatabase database) {
throw new UnsupportedOperationException("Snowflake does not yet use the native JDBC DV2 interface");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after the incident with insertRecords - can you try setting up a new connector + running a sync with a dev build? just to verify that check still works

... I really hope getDestinationHandler isn't called anywhere on the check code path, but I'm scared 😢

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah absolutely. Not going to merge this without a test sync in Performance workspace.

}

@Override
public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
Expand Down Expand Up @@ -156,13 +162,11 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
final SnowflakeV1V2Migrator migrator = new SnowflakeV1V2Migrator(getNamingResolver(), database, databaseName);
final SnowflakeV2TableMigrator v2TableMigrator = new SnowflakeV2TableMigrator(database, databaseName, sqlGenerator, snowflakeDestinationHandler);
final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
final int defaultThreadCount = 8;
if (disableTypeDedupe) {
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator, v2TableMigrator,
defaultThreadCount);
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator, v2TableMigrator);
} else {
typerDeduper =
new DefaultTyperDeduper<>(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator, v2TableMigrator, defaultThreadCount);
new DefaultTyperDeduper(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator, v2TableMigrator);
}

return StagingConsumerFactory.builder(
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,46 @@

package io.airbyte.integrations.destination.snowflake.typing_deduping;

import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT;
import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_META;
import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_RAW_ID;
import static io.airbyte.cdk.integrations.base.JavaBaseConstants.V2_FINAL_TABLE_METADATA_COLUMNS;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition;
import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType;
import io.airbyte.integrations.base.destination.typing_deduping.Array;
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialState;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStateImpl;
import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableState;
import io.airbyte.integrations.base.destination.typing_deduping.Sql;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.integrations.base.destination.typing_deduping.Struct;
import io.airbyte.integrations.base.destination.typing_deduping.Union;
import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import net.snowflake.client.jdbc.SnowflakeSQLException;
import org.apache.commons.text.StringSubstitutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnowflakeDestinationHandler implements DestinationHandler<SnowflakeTableDefinition> {
public class SnowflakeDestinationHandler extends JdbcDestinationHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeDestinationHandler.class);
public static final String EXCEPTION_COMMON_PREFIX = "JavaScript execution error: Uncaught Execution of multiple statements failed on statement";
Expand All @@ -30,60 +52,74 @@ public class SnowflakeDestinationHandler implements DestinationHandler<Snowflake
private final JdbcDatabase database;

public SnowflakeDestinationHandler(final String databaseName, final JdbcDatabase database) {
super(databaseName, database);
this.databaseName = databaseName;
this.database = database;
}

@Override
public Optional<SnowflakeTableDefinition> findExistingTable(final StreamId id) throws SQLException {
// The obvious database.getMetaData().getColumns() solution doesn't work, because JDBC translates
// VARIANT as VARCHAR
final LinkedHashMap<String, SnowflakeColumnDefinition> columns = database.queryJsons(
"""
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_catalog = ?
AND table_schema = ?
AND table_name = ?
ORDER BY ordinal_position;
""",
databaseName.toUpperCase(),
id.finalNamespace().toUpperCase(),
id.finalName().toUpperCase()).stream()
.collect(LinkedHashMap::new,
(map, row) -> map.put(
row.get("COLUMN_NAME").asText(),
new SnowflakeColumnDefinition(row.get("DATA_TYPE").asText(), fromSnowflakeBoolean(row.get("IS_NULLABLE").asText()))),
LinkedHashMap::putAll);
if (columns.isEmpty()) {
return Optional.empty();
} else {
return Optional.of(new SnowflakeTableDefinition(columns));
public static LinkedHashMap<String, LinkedHashMap<String, TableDefinition>> findExistingTables(final JdbcDatabase database,
gisripa marked this conversation as resolved.
Show resolved Hide resolved
gisripa marked this conversation as resolved.
Show resolved Hide resolved
final String databaseName,
final List<StreamId> streamIds)
throws SQLException {
final LinkedHashMap<String, LinkedHashMap<String, TableDefinition>> existingTables = new LinkedHashMap<>();
final String paramHolder = String.join(",", Collections.nCopies(streamIds.size(), "?"));
// convert list stream to array
final String[] namespaces = streamIds.stream().map(StreamId::finalNamespace).toArray(String[]::new);
final String[] names = streamIds.stream().map(StreamId::finalName).toArray(String[]::new);
final String query = """
SELECT table_schema, table_name, column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_catalog = ?
AND table_schema IN (%s)
AND table_name IN (%s)
ORDER BY table_schema, table_name, ordinal_position;
edgao marked this conversation as resolved.
Show resolved Hide resolved
""".formatted(paramHolder, paramHolder);
final String[] bindValues = new String[streamIds.size() * 2 + 1];
bindValues[0] = databaseName.toUpperCase();
System.arraycopy(namespaces, 0, bindValues, 1, namespaces.length);
System.arraycopy(names, 0, bindValues, namespaces.length + 1, names.length);
final List<JsonNode> results = database.queryJsons(query, bindValues);
for (final JsonNode result : results) {
final String tableSchema = result.get("TABLE_SCHEMA").asText();
final String tableName = result.get("TABLE_NAME").asText();
final String columnName = result.get("COLUMN_NAME").asText();
final String dataType = result.get("DATA_TYPE").asText();
final String isNullable = result.get("IS_NULLABLE").asText();
final TableDefinition tableDefinition = existingTables
.computeIfAbsent(tableSchema, k -> new LinkedHashMap<>())
.computeIfAbsent(tableName, k -> new TableDefinition(new LinkedHashMap<>()));
tableDefinition.columns().put(columnName, new ColumnDefinition(columnName, dataType, 0, fromIsNullableIsoString(isNullable)));
}
return existingTables;
}

@Override
public LinkedHashMap<String, SnowflakeTableDefinition> findExistingFinalTables(final List<StreamId> list) throws Exception {
return null;
}

@Override
public boolean isFinalTableEmpty(final StreamId id) throws SQLException {
final int rowCount = database.queryInt(
"""
SELECT row_count
FROM information_schema.tables
WHERE table_catalog = ?
AND table_schema = ?
AND table_name = ?
""",
databaseName.toUpperCase(),
id.finalNamespace().toUpperCase(),
id.finalName().toUpperCase());
return rowCount == 0;
private LinkedHashMap<String, LinkedHashMap<String, Integer>> getFinalTableRowCount(final List<StreamId> streamIds) throws SQLException {
final LinkedHashMap<String, LinkedHashMap<String, Integer>> tableRowCounts = new LinkedHashMap<>();
final String paramHolder = String.join(",", Collections.nCopies(streamIds.size(), "?"));
// convert list stream to array
final String[] namespaces = streamIds.stream().map(StreamId::finalNamespace).toArray(String[]::new);
final String[] names = streamIds.stream().map(StreamId::finalName).toArray(String[]::new);
final String query = """
SELECT table_schema, table_name, row_count
FROM information_schema.tables
WHERE table_catalog = ?
AND table_schema IN (%s)
AND table_name IN (%s)
""".formatted(paramHolder, paramHolder);
final String[] bindValues = new String[streamIds.size() * 2 + 1];
bindValues[0] = databaseName.toUpperCase();
System.arraycopy(namespaces, 0, bindValues, 1, namespaces.length);
System.arraycopy(names, 0, bindValues, namespaces.length + 1, names.length);
final List<JsonNode> results = database.queryJsons(query, bindValues);
for (final JsonNode result : results) {
final String tableSchema = result.get("TABLE_SCHEMA").asText();
final String tableName = result.get("TABLE_NAME").asText();
final int rowCount = result.get("ROW_COUNT").asInt();
tableRowCounts.computeIfAbsent(tableSchema, k -> new LinkedHashMap<>()).put(tableName, rowCount);
}
return tableRowCounts;
}

@Override
public InitialRawTableState getInitialRawTableState(final StreamId id) throws Exception {
final ResultSet tables = database.getMetaData().getTables(
databaseName,
Expand Down Expand Up @@ -158,12 +194,116 @@ public void execute(final Sql sql) throws Exception {
}
}

/**
* In snowflake information_schema tables, booleans return "YES" and "NO", which DataBind doesn't
* know how to use
*/
private boolean fromSnowflakeBoolean(final String input) {
return input.equalsIgnoreCase("yes");
private Set<String> getPks(final StreamConfig stream) {
return stream.primaryKey() != null ? stream.primaryKey().stream().map(ColumnId::name).collect(Collectors.toSet()) : Collections.emptySet();
}

private boolean isAirbyteRawIdColumnMatch(final TableDefinition existingTable) {
final String abRawIdColumnName = COLUMN_NAME_AB_RAW_ID.toUpperCase();
return existingTable.columns().containsKey(abRawIdColumnName) &&
toJdbcTypeName(AirbyteProtocolType.STRING).equals(existingTable.columns().get(abRawIdColumnName).type());
}

private boolean isAirbyteExtractedAtColumnMatch(final TableDefinition existingTable) {
final String abExtractedAtColumnName = COLUMN_NAME_AB_EXTRACTED_AT.toUpperCase();
return existingTable.columns().containsKey(abExtractedAtColumnName) &&
toJdbcTypeName(AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE).equals(existingTable.columns().get(abExtractedAtColumnName).type());
}

private boolean isAirbyteMetaColumnMatch(TableDefinition existingTable) {
final String abMetaColumnName = COLUMN_NAME_AB_META.toUpperCase();
return existingTable.columns().containsKey(abMetaColumnName) &&
"VARIANT".equals(existingTable.columns().get(abMetaColumnName).type());
}

protected boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, final TableDefinition existingTable) {
final Set<String> pks = getPks(stream);
// This is same as JdbcDestinationHandler#existingSchemaMatchesStreamConfig with upper case
// conversion.
// TODO: Unify this using name transformer or something.
if (!isAirbyteRawIdColumnMatch(existingTable) ||
!isAirbyteExtractedAtColumnMatch(existingTable) ||
!isAirbyteMetaColumnMatch(existingTable)) {
// Missing AB meta columns from final table, we need them to do proper T+D so trigger soft-reset
return false;
}
final LinkedHashMap<String, String> intendedColumns = stream.columns().entrySet().stream()
.collect(LinkedHashMap::new,
(map, column) -> map.put(column.getKey().name(), toJdbcTypeName(column.getValue())),
LinkedHashMap::putAll);

// Filter out Meta columns since they don't exist in stream config.
final LinkedHashMap<String, String> actualColumns = existingTable.columns().entrySet().stream()
.filter(column -> V2_FINAL_TABLE_METADATA_COLUMNS.stream().map(String::toUpperCase)
.noneMatch(airbyteColumnName -> airbyteColumnName.equals(column.getKey())))
.collect(LinkedHashMap::new,
(map, column) -> map.put(column.getKey(), column.getValue().type()),
LinkedHashMap::putAll);
// soft-resetting https://github.com/airbytehq/airbyte/pull/31082
@SuppressWarnings("deprecation")
final boolean hasPksWithNonNullConstraint = existingTable.columns().entrySet().stream()
.anyMatch(c -> pks.contains(c.getKey()) && !c.getValue().isNullable());

return !hasPksWithNonNullConstraint
&& actualColumns.equals(intendedColumns);

}

@Override
public List<DestinationInitialState> gatherInitialState(List<StreamConfig> streamConfigs) throws Exception {
List<StreamId> streamIds = streamConfigs.stream().map(StreamConfig::id).toList();
final LinkedHashMap<String, LinkedHashMap<String, TableDefinition>> existingTables = findExistingTables(database, databaseName, streamIds);
final LinkedHashMap<String, LinkedHashMap<String, Integer>> tableRowCounts = getFinalTableRowCount(streamIds);
return streamConfigs.stream().map(streamConfig -> {
try {
final String namespace = streamConfig.id().finalNamespace().toUpperCase();
final String name = streamConfig.id().finalName().toUpperCase();
boolean isSchemaMismatch = false;
boolean isFinalTableEmpty = true;
boolean isFinalTablePresent = existingTables.containsKey(namespace) && existingTables.get(namespace).containsKey(name);
boolean hasRowCount = tableRowCounts.containsKey(namespace) && tableRowCounts.get(namespace).containsKey(name);
if (isFinalTablePresent) {
final TableDefinition existingTable = existingTables.get(namespace).get(name);
isSchemaMismatch = !existingSchemaMatchesStreamConfig(streamConfig, existingTable);
isFinalTableEmpty = hasRowCount && tableRowCounts.get(namespace).get(name) == 0;
}
final InitialRawTableState initialRawTableState = getInitialRawTableState(streamConfig.id());
return new DestinationInitialStateImpl(streamConfig, isFinalTablePresent, initialRawTableState, isSchemaMismatch, isFinalTableEmpty);
} catch (Exception e) {
throw new RuntimeException(e);
}
}).collect(Collectors.toList());
}

@Override
protected String toJdbcTypeName(AirbyteType airbyteType) {
if (airbyteType instanceof final AirbyteProtocolType p) {
return toJdbcTypeName(p);
}

return switch (airbyteType.getTypeName()) {
case Struct.TYPE -> "OBJECT";
case Array.TYPE -> "ARRAY";
case UnsupportedOneOf.TYPE -> "VARIANT";
case Union.TYPE -> toJdbcTypeName(((Union) airbyteType).chooseType());
default -> throw new IllegalArgumentException("Unrecognized type: " + airbyteType.getTypeName());
};
}

private String toJdbcTypeName(final AirbyteProtocolType airbyteProtocolType) {
return switch (airbyteProtocolType) {
case STRING -> "TEXT";
case NUMBER -> "FLOAT";
case INTEGER -> "NUMBER";
case BOOLEAN -> "BOOLEAN";
case TIMESTAMP_WITH_TIMEZONE -> "TIMESTAMP_TZ";
case TIMESTAMP_WITHOUT_TIMEZONE -> "TIMESTAMP_NTZ";
// If you change this - also change the logic in extractAndCast
case TIME_WITH_TIMEZONE -> "TEXT";
case TIME_WITHOUT_TIMEZONE -> "TIME";
case DATE -> "DATE";
case UNKNOWN -> "VARIANT";
};
}

}
Loading
Loading