diff --git a/airbyte-integrations/connectors/destination-bigquery/build.gradle b/airbyte-integrations/connectors/destination-bigquery/build.gradle index f5d1b05d4b54..14da5852eef0 100644 --- a/airbyte-integrations/connectors/destination-bigquery/build.gradle +++ b/airbyte-integrations/connectors/destination-bigquery/build.gradle @@ -1,9 +1,10 @@ plugins { id 'airbyte-java-connector' + id 'org.jetbrains.kotlin.jvm' version '1.9.22' } airbyteJavaConnector { - cdkVersionRequired = '0.20.9' + cdkVersionRequired = '0.23.11' features = [ 'db-destinations', 'datastore-bigquery', diff --git a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml index e3e73d6a5c98..ee0aecd9a385 100644 --- a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml +++ b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 - dockerImageTag: 2.4.11 + dockerImageTag: 2.4.12 dockerRepository: airbyte/destination-bigquery documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery githubIssueLabel: destination-bigquery diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java index b884fe5dbd23..f2b11b35247b 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -61,6 +61,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -233,9 +234,11 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN final boolean disableTypeDedupe = BigQueryUtils.getDisableTypeDedupFlag(config); final String datasetLocation = BigQueryUtils.getDatasetLocation(config); final BigQuerySqlGenerator sqlGenerator = new BigQuerySqlGenerator(config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(), datasetLocation); - final ParsedCatalog parsedCatalog = parseCatalog(config, catalog, datasetLocation); + final Optional rawNamespaceOverride = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_DATA_DATASET); + final ParsedCatalog parsedCatalog = parseCatalog(config, catalog, datasetLocation, rawNamespaceOverride); final BigQuery bigquery = getBigQuery(config); - final TyperDeduper typerDeduper = buildTyperDeduper(sqlGenerator, parsedCatalog, bigquery, datasetLocation, disableTypeDedupe); + final TyperDeduper typerDeduper = + buildTyperDeduper(sqlGenerator, parsedCatalog, bigquery, datasetLocation, disableTypeDedupe); AirbyteExceptionHandler.addAllStringsInConfigForDeinterpolation(config); final JsonNode serviceAccountKey = config.get(BigQueryConsts.CONFIG_CREDS); @@ -360,7 +363,6 @@ private SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuer final Consumer outputRecordCollector, final TyperDeduper typerDeduper) throws Exception { - typerDeduper.prepareTables(); final Supplier>> writeConfigs = getUploaderMap( bigquery, config, @@ -372,6 +374,8 @@ private SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuer return new BigQueryRecordStandardConsumer( outputRecordCollector, () -> { + typerDeduper.prepareSchemasAndRunMigrations(); + // Set up our raw tables writeConfigs.get().forEach((streamId, uploader) -> { final StreamConfig stream = parsedCatalog.getStream(streamId); @@ -390,6 +394,8 @@ private SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuer uploader.createRawTable(); } }); + + typerDeduper.prepareFinalTables(); }, (hasFailed, streamSyncSummaries) -> { try { @@ -424,11 +430,13 @@ private void setDefaultStreamNamespace(final ConfiguredAirbyteCatalog catalog, f } } - private ParsedCatalog parseCatalog(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final String datasetLocation) { + private ParsedCatalog parseCatalog(final JsonNode config, + final ConfiguredAirbyteCatalog catalog, + final String datasetLocation, + final Optional rawNamespaceOverride) { final BigQuerySqlGenerator sqlGenerator = new BigQuerySqlGenerator(config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(), datasetLocation); - final CatalogParser catalogParser = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_DATA_DATASET).isPresent() - ? new CatalogParser(sqlGenerator, TypingAndDedupingFlag.getRawNamespaceOverride(RAW_DATA_DATASET).get()) - : new CatalogParser(sqlGenerator); + final CatalogParser catalogParser = rawNamespaceOverride.map(s -> new CatalogParser(sqlGenerator, s)) + .orElseGet(() -> new CatalogParser(sqlGenerator)); return catalogParser.parseCatalog(catalog); } @@ -440,11 +448,13 @@ private TyperDeduper buildTyperDeduper(final BigQuerySqlGenerator sqlGenerator, final boolean disableTypeDedupe) { final BigQueryV1V2Migrator migrator = new BigQueryV1V2Migrator(bigquery, namingResolver); final BigQueryV2TableMigrator v2RawTableMigrator = new BigQueryV2TableMigrator(bigquery); - final BigQueryDestinationHandler destinationHandler = new BigQueryDestinationHandler(bigquery, datasetLocation); + final BigQueryDestinationHandler destinationHandler = new BigQueryDestinationHandler( + bigquery, + datasetLocation); if (disableTypeDedupe) { return new NoOpTyperDeduperWithV1V2Migrations<>( - sqlGenerator, destinationHandler, parsedCatalog, migrator, v2RawTableMigrator, 8); + sqlGenerator, destinationHandler, parsedCatalog, migrator, v2RawTableMigrator, List.of()); } return new DefaultTyperDeduper<>( @@ -453,8 +463,7 @@ private TyperDeduper buildTyperDeduper(final BigQuerySqlGenerator sqlGenerator, parsedCatalog, migrator, v2RawTableMigrator, - 8); - + List.of()); } @Override diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java index a929bfbf095f..5f40d71c4815 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java @@ -135,7 +135,8 @@ private OnStartFunction onStartFunction(final BigQueryStagingOperations bigQuery final TyperDeduper typerDeduper) { return () -> { LOGGER.info("Preparing airbyte_raw tables in destination started for {} streams", writeConfigs.size()); - typerDeduper.prepareTables(); + typerDeduper.prepareSchemasAndRunMigrations(); + for (final BigQueryWriteConfig writeConfig : writeConfigs.values()) { LOGGER.info("Preparing staging are in destination for schema: {}, stream: {}, target table: {}, stage: {}", writeConfig.tableSchema(), writeConfig.streamName(), writeConfig.targetTableId(), writeConfig.streamName()); @@ -156,6 +157,8 @@ private OnStartFunction onStartFunction(final BigQueryStagingOperations bigQuery bigQueryGcsOperations.truncateTableIfExists(rawDatasetId, writeConfig.targetTableId(), writeConfig.tableSchema()); } } + + typerDeduper.prepareFinalTables(); LOGGER.info("Preparing tables in destination completed."); }; } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java index 8199165d6527..111894b62967 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java @@ -4,8 +4,17 @@ package io.airbyte.integrations.destination.bigquery.typing_deduping; +import static io.airbyte.integrations.base.destination.typing_deduping.CollectionUtils.containsAllIgnoreCase; +import static io.airbyte.integrations.base.destination.typing_deduping.CollectionUtils.containsIgnoreCase; +import static io.airbyte.integrations.base.destination.typing_deduping.CollectionUtils.matchingKey; +import static io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator.QUOTE; +import static io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator.clusteringColumns; +import static io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator.toDialectType; +import static java.util.stream.Collectors.toMap; + import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryException; +import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.FieldValue; import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.JobConfiguration; @@ -14,28 +23,46 @@ import com.google.cloud.bigquery.JobStatistics; import com.google.cloud.bigquery.JobStatus; import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; import com.google.cloud.bigquery.Table; import com.google.cloud.bigquery.TableDefinition; import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TimePartitioning; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Streams; import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler; +import io.airbyte.cdk.integrations.base.JavaBaseConstants; +import io.airbyte.integrations.base.destination.typing_deduping.AlterTableReport; +import io.airbyte.integrations.base.destination.typing_deduping.ColumnId; import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler; +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus; +import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableStatus; import io.airbyte.integrations.base.destination.typing_deduping.Sql; +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; +import io.airbyte.integrations.base.destination.typing_deduping.TableNotMigratedException; +import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState; +import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState.Impl; import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.Comparator; -import java.util.LinkedHashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.text.StringSubstitutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; // TODO this stuff almost definitely exists somewhere else in our codebase. -public class BigQueryDestinationHandler implements DestinationHandler { +public class BigQueryDestinationHandler implements DestinationHandler { private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDestinationHandler.class); @@ -47,32 +74,24 @@ public BigQueryDestinationHandler(final BigQuery bq, final String datasetLocatio this.datasetLocation = datasetLocation; } - @Override public Optional findExistingTable(final StreamId id) { final Table table = bq.getTable(id.finalNamespace(), id.finalName()); return Optional.ofNullable(table).map(Table::getDefinition); } - @Override - public LinkedHashMap findExistingFinalTables(List streamIds) throws Exception { - return null; - } - - @Override public boolean isFinalTableEmpty(final StreamId id) { return BigInteger.ZERO.equals(bq.getTable(TableId.of(id.finalNamespace(), id.finalName())).getNumRows()); } - @Override - public InitialRawTableState getInitialRawTableState(final StreamId id) throws Exception { + public InitialRawTableStatus getInitialRawTableState(final StreamId id) throws Exception { final Table rawTable = bq.getTable(TableId.of(id.rawNamespace(), id.rawName())); if (rawTable == null) { // Table doesn't exist. There are no unprocessed records, and no timestamp. - return new InitialRawTableState(false, Optional.empty()); + return new InitialRawTableStatus(false, false, Optional.empty()); } final FieldValue unloadedRecordTimestamp = bq.query(QueryJobConfiguration.newBuilder(new StringSubstitutor(Map.of( - "raw_table", id.rawTableId(BigQuerySqlGenerator.QUOTE))).replace( + "raw_table", id.rawTableId(QUOTE))).replace( // bigquery timestamps have microsecond precision """ SELECT TIMESTAMP_SUB(MIN(_airbyte_extracted_at), INTERVAL 1 MICROSECOND) @@ -84,11 +103,11 @@ SELECT TIMESTAMP_SUB(MIN(_airbyte_extracted_at), INTERVAL 1 MICROSECOND) // If it's not null, then we can return immediately - we've found some unprocessed records and their // timestamp. if (!unloadedRecordTimestamp.isNull()) { - return new InitialRawTableState(true, Optional.of(unloadedRecordTimestamp.getTimestampInstant())); + return new InitialRawTableStatus(true, true, Optional.of(unloadedRecordTimestamp.getTimestampInstant())); } final FieldValue loadedRecordTimestamp = bq.query(QueryJobConfiguration.newBuilder(new StringSubstitutor(Map.of( - "raw_table", id.rawTableId(BigQuerySqlGenerator.QUOTE))).replace( + "raw_table", id.rawTableId(QUOTE))).replace( """ SELECT MAX(_airbyte_extracted_at) FROM ${raw_table} @@ -98,10 +117,10 @@ SELECT MAX(_airbyte_extracted_at) // So we just need to get the timestamp of the most recent record. if (loadedRecordTimestamp.isNull()) { // Null timestamp because the table is empty. T+D can process the entire raw table during this sync. - return new InitialRawTableState(false, Optional.empty()); + return new InitialRawTableStatus(true, false, Optional.empty()); } else { // The raw table already has some records. T+D can skip all records with timestamp <= this value. - return new InitialRawTableState(false, Optional.of(loadedRecordTimestamp.getTimestampInstant())); + return new InitialRawTableStatus(true, false, Optional.of(loadedRecordTimestamp.getTimestampInstant())); } } @@ -172,4 +191,133 @@ public void execute(final Sql sql) throws InterruptedException { } } + @Override + public List> gatherInitialState(List streamConfigs) throws Exception { + final List> initialStates = new ArrayList<>(); + for (final StreamConfig streamConfig : streamConfigs) { + final StreamId id = streamConfig.id(); + final Optional finalTable = findExistingTable(id); + final InitialRawTableStatus rawTableState = getInitialRawTableState(id); + initialStates.add(new DestinationInitialStatus<>( + streamConfig, + finalTable.isPresent(), + rawTableState, + finalTable.isPresent() && !existingSchemaMatchesStreamConfig(streamConfig, finalTable.get()), + finalTable.isEmpty() || isFinalTableEmpty(id), + // Return a default state blob since we don't actually track state. + new MinimumDestinationState.Impl(false))); + } + return initialStates; + } + + @Override + public void commitDestinationStates(Map destinationStates) throws Exception { + // Intentionally do nothing. Bigquery doesn't actually support destination states. + } + + private boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, + final TableDefinition existingTable) + throws TableNotMigratedException { + final var alterTableReport = buildAlterTableReport(stream, existingTable); + boolean tableClusteringMatches = false; + boolean tablePartitioningMatches = false; + if (existingTable instanceof final StandardTableDefinition standardExistingTable) { + tableClusteringMatches = clusteringMatches(stream, standardExistingTable); + tablePartitioningMatches = partitioningMatches(standardExistingTable); + } + LOGGER.info("Alter Table Report {} {} {}; Clustering {}; Partitioning {}", + alterTableReport.columnsToAdd(), + alterTableReport.columnsToRemove(), + alterTableReport.columnsToChangeType(), + tableClusteringMatches, + tablePartitioningMatches); + + return alterTableReport.isNoOp() && tableClusteringMatches && tablePartitioningMatches; + } + + public AlterTableReport buildAlterTableReport(final StreamConfig stream, final TableDefinition existingTable) { + final Set pks = getPks(stream); + + final Map streamSchema = stream.columns().entrySet().stream() + .collect(toMap( + entry -> entry.getKey().name(), + entry -> toDialectType(entry.getValue()))); + + final Map existingSchema = existingTable.getSchema().getFields().stream() + .collect(toMap( + field -> field.getName(), + field -> field.getType().getStandardType())); + + // Columns in the StreamConfig that don't exist in the TableDefinition + final Set columnsToAdd = streamSchema.keySet().stream() + .filter(name -> !containsIgnoreCase(existingSchema.keySet(), name)) + .collect(Collectors.toSet()); + + // Columns in the current schema that are no longer in the StreamConfig + final Set columnsToRemove = existingSchema.keySet().stream() + .filter(name -> !containsIgnoreCase(streamSchema.keySet(), name) && !containsIgnoreCase( + JavaBaseConstants.V2_FINAL_TABLE_METADATA_COLUMNS, name)) + .collect(Collectors.toSet()); + + // Columns that are typed differently than the StreamConfig + final Set columnsToChangeType = Stream.concat( + streamSchema.keySet().stream() + // If it's not in the existing schema, it should already be in the columnsToAdd Set + .filter(name -> { + // Big Query Columns are case-insensitive, first find the correctly cased key if it exists + return matchingKey(existingSchema.keySet(), name) + // if it does exist, only include it in this set if the type (the value in each respective map) + // is different between the stream and existing schemas + .map(key -> !existingSchema.get(key).equals(streamSchema.get(name))) + // if there is no matching key, then don't include it because it is probably already in columnsToAdd + .orElse(false); + }), + + // OR columns that used to have a non-null constraint and shouldn't + // (https://github.com/airbytehq/airbyte/pull/31082) + existingTable.getSchema().getFields().stream() + .filter(field -> pks.contains(field.getName())) + .filter(field -> field.getMode() == Field.Mode.REQUIRED) + .map(Field::getName)) + .collect(Collectors.toSet()); + + final boolean isDestinationV2Format = schemaContainAllFinalTableV2AirbyteColumns(existingSchema.keySet()); + + return new AlterTableReport(columnsToAdd, columnsToRemove, columnsToChangeType, isDestinationV2Format); + } + + @VisibleForTesting + public static boolean clusteringMatches(final StreamConfig stream, final StandardTableDefinition existingTable) { + return existingTable.getClustering() != null + && containsAllIgnoreCase( + new HashSet<>(existingTable.getClustering().getFields()), + clusteringColumns(stream)); + } + + @VisibleForTesting + public static boolean partitioningMatches(final StandardTableDefinition existingTable) { + return existingTable.getTimePartitioning() != null + && existingTable.getTimePartitioning() + .getField() + .equalsIgnoreCase("_airbyte_extracted_at") + && TimePartitioning.Type.DAY.equals(existingTable.getTimePartitioning().getType()); + } + + /** + * Checks the schema to determine whether the table contains all expected final table airbyte + * columns + * + * @param columnNames the column names of the schema to check + * @return whether all the {@link JavaBaseConstants#V2_FINAL_TABLE_METADATA_COLUMNS} are present + */ + @VisibleForTesting + public static boolean schemaContainAllFinalTableV2AirbyteColumns(final Collection columnNames) { + return JavaBaseConstants.V2_FINAL_TABLE_METADATA_COLUMNS.stream() + .allMatch(column -> containsIgnoreCase(columnNames, column)); + } + + private static Set getPks(final StreamConfig stream) { + return stream.primaryKey() != null ? stream.primaryKey().stream().map(ColumnId::name).collect(Collectors.toSet()) : Collections.emptySet(); + } + } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java index c4370fc5dc0a..3fe1f2cbb145 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java @@ -4,25 +4,15 @@ package io.airbyte.integrations.destination.bigquery.typing_deduping; -import static io.airbyte.integrations.base.destination.typing_deduping.CollectionUtils.containsAllIgnoreCase; -import static io.airbyte.integrations.base.destination.typing_deduping.CollectionUtils.containsIgnoreCase; -import static io.airbyte.integrations.base.destination.typing_deduping.CollectionUtils.matchingKey; import static io.airbyte.integrations.base.destination.typing_deduping.Sql.separately; import static io.airbyte.integrations.base.destination.typing_deduping.Sql.transactionally; import static io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeTransaction.SOFT_RESET_SUFFIX; import static java.util.stream.Collectors.joining; -import com.google.cloud.bigquery.Field; -import com.google.cloud.bigquery.Field.Mode; import com.google.cloud.bigquery.StandardSQLTypeName; -import com.google.cloud.bigquery.StandardTableDefinition; -import com.google.cloud.bigquery.TableDefinition; -import com.google.cloud.bigquery.TimePartitioning; import com.google.common.annotations.VisibleForTesting; -import io.airbyte.cdk.integrations.base.JavaBaseConstants; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType; -import io.airbyte.integrations.base.destination.typing_deduping.AlterTableReport; import io.airbyte.integrations.base.destination.typing_deduping.Array; import io.airbyte.integrations.base.destination.typing_deduping.ColumnId; import io.airbyte.integrations.base.destination.typing_deduping.Sql; @@ -30,28 +20,22 @@ import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; import io.airbyte.integrations.base.destination.typing_deduping.Struct; -import io.airbyte.integrations.base.destination.typing_deduping.TableNotMigratedException; import io.airbyte.integrations.base.destination.typing_deduping.Union; import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf; import io.airbyte.integrations.destination.bigquery.BigQuerySQLNameTransformer; import io.airbyte.protocol.models.v0.DestinationSyncMode; import java.time.Instant; import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; import org.apache.commons.text.StringSubstitutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class BigQuerySqlGenerator implements SqlGenerator { +public class BigQuerySqlGenerator implements SqlGenerator { public static final String QUOTE = "`"; private static final BigQuerySQLNameTransformer nameTransformer = new BigQuerySQLNameTransformer(); @@ -95,7 +79,7 @@ public ColumnId buildColumnId(final String name, final String suffix) { nameTransformer.getIdentifier(nameWithSuffix.toLowerCase())); } - public StandardSQLTypeName toDialectType(final AirbyteType type) { + public static StandardSQLTypeName toDialectType(final AirbyteType type) { // switch pattern-matching is still in preview at language level 17 :( if (type instanceof final AirbyteProtocolType p) { return toDialectType(p); @@ -197,7 +181,7 @@ THEN JSON_QUERY(`_airbyte_data`, '$."${column_name}"') // TODO maybe make this a BiMap and elevate this method and its inverse (toDestinationSQLType?) to // the SQLGenerator? - public StandardSQLTypeName toDialectType(final AirbyteProtocolType airbyteProtocolType) { + public static StandardSQLTypeName toDialectType(final AirbyteProtocolType airbyteProtocolType) { return switch (airbyteProtocolType) { case STRING -> StandardSQLTypeName.STRING; case NUMBER -> StandardSQLTypeName.NUMERIC; @@ -239,7 +223,7 @@ PARTITION BY (DATE_TRUNC(_airbyte_extracted_at, DAY)) """)); } - private List clusteringColumns(final StreamConfig stream) { + static List clusteringColumns(final StreamConfig stream) { final List clusterColumns = new ArrayList<>(); if (stream.destinationSyncMode() == DestinationSyncMode.APPEND_DEDUP) { // We're doing de-duping, therefore we have a primary key. @@ -259,108 +243,6 @@ private String columnsAndTypes(final StreamConfig stream) { .collect(joining(",\n")); } - @Override - public boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, - final TableDefinition existingTable) - throws TableNotMigratedException { - final var alterTableReport = buildAlterTableReport(stream, existingTable); - boolean tableClusteringMatches = false; - boolean tablePartitioningMatches = false; - if (existingTable instanceof final StandardTableDefinition standardExistingTable) { - tableClusteringMatches = clusteringMatches(stream, standardExistingTable); - tablePartitioningMatches = partitioningMatches(standardExistingTable); - } - LOGGER.info("Alter Table Report {} {} {}; Clustering {}; Partitioning {}", - alterTableReport.columnsToAdd(), - alterTableReport.columnsToRemove(), - alterTableReport.columnsToChangeType(), - tableClusteringMatches, - tablePartitioningMatches); - - return alterTableReport.isNoOp() && tableClusteringMatches && tablePartitioningMatches; - } - - @VisibleForTesting - public boolean clusteringMatches(final StreamConfig stream, final StandardTableDefinition existingTable) { - return existingTable.getClustering() != null - && containsAllIgnoreCase( - new HashSet<>(existingTable.getClustering().getFields()), - clusteringColumns(stream)); - } - - @VisibleForTesting - public boolean partitioningMatches(final StandardTableDefinition existingTable) { - return existingTable.getTimePartitioning() != null - && existingTable.getTimePartitioning() - .getField() - .equalsIgnoreCase("_airbyte_extracted_at") - && TimePartitioning.Type.DAY.equals(existingTable.getTimePartitioning().getType()); - } - - public AlterTableReport buildAlterTableReport(final StreamConfig stream, final TableDefinition existingTable) { - final Set pks = getPks(stream); - - final Map streamSchema = stream.columns().entrySet().stream() - .collect(Collectors.toMap( - entry -> entry.getKey().name(), - entry -> toDialectType(entry.getValue()))); - - final Map existingSchema = existingTable.getSchema().getFields().stream() - .collect(Collectors.toMap( - field -> field.getName(), - field -> field.getType().getStandardType())); - - // Columns in the StreamConfig that don't exist in the TableDefinition - final Set columnsToAdd = streamSchema.keySet().stream() - .filter(name -> !containsIgnoreCase(existingSchema.keySet(), name)) - .collect(Collectors.toSet()); - - // Columns in the current schema that are no longer in the StreamConfig - final Set columnsToRemove = existingSchema.keySet().stream() - .filter(name -> !containsIgnoreCase(streamSchema.keySet(), name) && !containsIgnoreCase( - JavaBaseConstants.V2_FINAL_TABLE_METADATA_COLUMNS, name)) - .collect(Collectors.toSet()); - - // Columns that are typed differently than the StreamConfig - final Set columnsToChangeType = Stream.concat( - streamSchema.keySet().stream() - // If it's not in the existing schema, it should already be in the columnsToAdd Set - .filter(name -> { - // Big Query Columns are case-insensitive, first find the correctly cased key if it exists - return matchingKey(existingSchema.keySet(), name) - // if it does exist, only include it in this set if the type (the value in each respective map) - // is different between the stream and existing schemas - .map(key -> !existingSchema.get(key).equals(streamSchema.get(name))) - // if there is no matching key, then don't include it because it is probably already in columnsToAdd - .orElse(false); - }), - - // OR columns that used to have a non-null constraint and shouldn't - // (https://github.com/airbytehq/airbyte/pull/31082) - existingTable.getSchema().getFields().stream() - .filter(field -> pks.contains(field.getName())) - .filter(field -> field.getMode() == Mode.REQUIRED) - .map(Field::getName)) - .collect(Collectors.toSet()); - - final boolean isDestinationV2Format = schemaContainAllFinalTableV2AirbyteColumns(existingSchema.keySet()); - - return new AlterTableReport(columnsToAdd, columnsToRemove, columnsToChangeType, isDestinationV2Format); - } - - /** - * Checks the schema to determine whether the table contains all expected final table airbyte - * columns - * - * @param columnNames the column names of the schema to check - * @return whether all the {@link JavaBaseConstants#V2_FINAL_TABLE_METADATA_COLUMNS} are present - */ - @VisibleForTesting - public static boolean schemaContainAllFinalTableV2AirbyteColumns(final Collection columnNames) { - return JavaBaseConstants.V2_FINAL_TABLE_METADATA_COLUMNS.stream() - .allMatch(column -> containsIgnoreCase(columnNames, column)); - } - @Override public Sql prepareTablesForSoftReset(final StreamConfig stream) { // Bigquery can't run DDL in a transaction, so these are separate transactions. @@ -765,10 +647,6 @@ private static String cast(final String content, final String asType, final bool return wrap(open, content + " as " + asType, ")"); } - private static Set getPks(final StreamConfig stream) { - return stream.primaryKey() != null ? stream.primaryKey().stream().map(ColumnId::name).collect(Collectors.toSet()) : Collections.emptySet(); - } - private static String wrap(final String open, final String content, final String close) { return open + content + close; } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/AbstractBigQueryTypingDedupingTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/AbstractBigQueryTypingDedupingTest.java index cc9f499abdfe..3d78ed982294 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/AbstractBigQueryTypingDedupingTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/AbstractBigQueryTypingDedupingTest.java @@ -87,7 +87,7 @@ protected void teardownStreamAndNamespace(String streamNamespace, final String s } @Override - protected SqlGenerator getSqlGenerator() { + protected SqlGenerator getSqlGenerator() { return new BigQuerySqlGenerator(getConfig().get(BigQueryConsts.CONFIG_PROJECT_ID).asText(), null); } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java index 99ac8a8e75dd..a303a176d38c 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java @@ -25,7 +25,6 @@ import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.cloud.bigquery.Table; -import com.google.cloud.bigquery.TableDefinition; import com.google.cloud.bigquery.TableResult; import io.airbyte.cdk.integrations.base.JavaBaseConstants; import io.airbyte.commons.json.Jsons; @@ -34,6 +33,7 @@ import io.airbyte.integrations.base.destination.typing_deduping.Sql; import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; +import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState; import io.airbyte.integrations.destination.bigquery.BigQueryConsts; import io.airbyte.integrations.destination.bigquery.BigQueryDestination; import io.airbyte.protocol.models.v0.DestinationSyncMode; @@ -57,7 +57,7 @@ import org.slf4j.LoggerFactory; @Execution(ExecutionMode.CONCURRENT) -public class BigQuerySqlGeneratorIntegrationTest extends BaseSqlGeneratorIntegrationTest { +public class BigQuerySqlGeneratorIntegrationTest extends BaseSqlGeneratorIntegrationTest { private static final Logger LOGGER = LoggerFactory.getLogger(BigQuerySqlGeneratorIntegrationTest.class); @@ -429,6 +429,17 @@ public void noCrashOnSpecialCharacters(final String specialChars) throws Excepti super.noCrashOnSpecialCharacters(specialChars); } + /** + * Bigquery doesn't handle frequent INSERT/DELETE statements on a single table very well. So we + * don't have real state handling. Disable this test. + */ + @Override + @Disabled + @Test + public void testStateHandling() throws Exception { + super.testStateHandling(); + } + /** * TableResult contains records in a somewhat nonintuitive format (and it avoids loading them all * into memory). That's annoying for us since we're working with small test data, so just pull diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java index 1fac62e2d681..66c2147b1c3f 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java @@ -7,59 +7,24 @@ import static java.util.Collections.emptyList; import static org.junit.jupiter.api.Assertions.assertEquals; -import com.google.cloud.bigquery.Clustering; -import com.google.cloud.bigquery.StandardSQLTypeName; -import com.google.cloud.bigquery.StandardTableDefinition; -import com.google.cloud.bigquery.TimePartitioning; -import com.google.common.collect.ImmutableList; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType; -import io.airbyte.integrations.base.destination.typing_deduping.Array; import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser; import io.airbyte.integrations.base.destination.typing_deduping.ColumnId; import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; -import io.airbyte.integrations.base.destination.typing_deduping.Struct; -import io.airbyte.integrations.base.destination.typing_deduping.Union; -import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf; import io.airbyte.protocol.models.v0.AirbyteStream; import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; import io.airbyte.protocol.models.v0.DestinationSyncMode; import io.airbyte.protocol.models.v0.SyncMode; -import java.util.ArrayList; import java.util.LinkedHashMap; -import java.util.List; import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; public class BigQuerySqlGeneratorTest { private final BigQuerySqlGenerator generator = new BigQuerySqlGenerator("foo", "US"); - @Test - public void testToDialectType() { - final Struct s = new Struct(new LinkedHashMap<>()); - final Array a = new Array(AirbyteProtocolType.BOOLEAN); - - assertEquals(StandardSQLTypeName.INT64, generator.toDialectType((AirbyteType) AirbyteProtocolType.INTEGER)); - assertEquals(StandardSQLTypeName.JSON, generator.toDialectType(s)); - assertEquals(StandardSQLTypeName.JSON, generator.toDialectType(a)); - assertEquals(StandardSQLTypeName.JSON, generator.toDialectType(new UnsupportedOneOf(new ArrayList<>()))); - - Union u = new Union(ImmutableList.of(s)); - assertEquals(StandardSQLTypeName.JSON, generator.toDialectType(u)); - u = new Union(ImmutableList.of(a)); - assertEquals(StandardSQLTypeName.JSON, generator.toDialectType(u)); - u = new Union(ImmutableList.of(AirbyteProtocolType.BOOLEAN, AirbyteProtocolType.NUMBER)); - assertEquals(StandardSQLTypeName.NUMERIC, generator.toDialectType(u)); - } - @Test public void testBuildColumnId() { // Uninteresting names are unchanged @@ -68,85 +33,6 @@ public void testBuildColumnId() { generator.buildColumnId("foo")); } - @Test - public void testClusteringMatches() { - StreamConfig stream = new StreamConfig(null, - null, - DestinationSyncMode.APPEND_DEDUP, - List.of(new ColumnId("foo", "bar", "fizz")), - null, - null); - - // Clustering is null - final StandardTableDefinition existingTable = Mockito.mock(StandardTableDefinition.class); - Mockito.when(existingTable.getClustering()).thenReturn(null); - Assertions.assertFalse(generator.clusteringMatches(stream, existingTable)); - - // Clustering does not contain all fields - Mockito.when(existingTable.getClustering()) - .thenReturn(Clustering.newBuilder().setFields(List.of("_airbyte_extracted_at")).build()); - Assertions.assertFalse(generator.clusteringMatches(stream, existingTable)); - - // Clustering matches - stream = new StreamConfig(null, - null, - DestinationSyncMode.OVERWRITE, - null, - null, - null); - Assertions.assertTrue(generator.clusteringMatches(stream, existingTable)); - - // Clustering only the first 3 PK columns (See https://github.com/airbytehq/oncall/issues/2565) - final var expectedStreamColumnNames = List.of("a", "b", "c"); - Mockito.when(existingTable.getClustering()) - .thenReturn(Clustering.newBuilder().setFields( - Stream.concat(expectedStreamColumnNames.stream(), Stream.of("_airbyte_extracted_at")) - .collect(Collectors.toList())) - .build()); - stream = new StreamConfig(null, - null, - DestinationSyncMode.APPEND_DEDUP, - Stream.concat(expectedStreamColumnNames.stream(), Stream.of("d", "e")) - .map(name -> new ColumnId(name, "foo", "bar")) - .collect(Collectors.toList()), - null, - null); - Assertions.assertTrue(generator.clusteringMatches(stream, existingTable)); - } - - @Test - public void testPartitioningMatches() { - final StandardTableDefinition existingTable = Mockito.mock(StandardTableDefinition.class); - // Partitioning is null - Mockito.when(existingTable.getTimePartitioning()).thenReturn(null); - Assertions.assertFalse(generator.partitioningMatches(existingTable)); - // incorrect field - Mockito.when(existingTable.getTimePartitioning()) - .thenReturn(TimePartitioning.newBuilder(TimePartitioning.Type.DAY).setField("_foo").build()); - Assertions.assertFalse(generator.partitioningMatches(existingTable)); - // incorrect partitioning scheme - Mockito.when(existingTable.getTimePartitioning()) - .thenReturn(TimePartitioning.newBuilder(TimePartitioning.Type.YEAR).setField("_airbyte_extracted_at").build()); - Assertions.assertFalse(generator.partitioningMatches(existingTable)); - - // partitioning matches - Mockito.when(existingTable.getTimePartitioning()) - .thenReturn(TimePartitioning.newBuilder(TimePartitioning.Type.DAY).setField("_airbyte_extracted_at").build()); - Assertions.assertTrue(generator.partitioningMatches(existingTable)); - } - - @Test - public void testSchemaContainAllFinalTableV2AirbyteColumns() { - Assertions.assertTrue( - BigQuerySqlGenerator.schemaContainAllFinalTableV2AirbyteColumns(Set.of("_airbyte_meta", "_airbyte_extracted_at", "_airbyte_raw_id"))); - Assertions.assertFalse(BigQuerySqlGenerator.schemaContainAllFinalTableV2AirbyteColumns(Set.of("_airbyte_extracted_at", "_airbyte_raw_id"))); - Assertions.assertFalse(BigQuerySqlGenerator.schemaContainAllFinalTableV2AirbyteColumns(Set.of("_airbyte_meta", "_airbyte_raw_id"))); - Assertions.assertFalse(BigQuerySqlGenerator.schemaContainAllFinalTableV2AirbyteColumns(Set.of("_airbyte_meta", "_airbyte_extracted_at"))); - Assertions.assertFalse(BigQuerySqlGenerator.schemaContainAllFinalTableV2AirbyteColumns(Set.of())); - Assertions.assertTrue( - BigQuerySqlGenerator.schemaContainAllFinalTableV2AirbyteColumns(Set.of("_AIRBYTE_META", "_AIRBYTE_EXTRACTED_AT", "_AIRBYTE_RAW_ID"))); - } - @Test void columnCollision() { final CatalogParser parser = new CatalogParser(generator); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigqueryDestinationHandlerTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigqueryDestinationHandlerTest.java new file mode 100644 index 000000000000..7a2d6184945d --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigqueryDestinationHandlerTest.java @@ -0,0 +1,132 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.bigquery.typing_deduping; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.google.cloud.bigquery.Clustering; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TimePartitioning; +import com.google.common.collect.ImmutableList; +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType; +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType; +import io.airbyte.integrations.base.destination.typing_deduping.Array; +import io.airbyte.integrations.base.destination.typing_deduping.ColumnId; +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; +import io.airbyte.integrations.base.destination.typing_deduping.Struct; +import io.airbyte.integrations.base.destination.typing_deduping.Union; +import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf; +import io.airbyte.protocol.models.v0.DestinationSyncMode; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class BigqueryDestinationHandlerTest { + + @Test + public void testToDialectType() { + final Struct s = new Struct(new LinkedHashMap<>()); + final Array a = new Array(AirbyteProtocolType.BOOLEAN); + + assertEquals(StandardSQLTypeName.INT64, BigQuerySqlGenerator.toDialectType((AirbyteType) AirbyteProtocolType.INTEGER)); + assertEquals(StandardSQLTypeName.JSON, BigQuerySqlGenerator.toDialectType(s)); + assertEquals(StandardSQLTypeName.JSON, BigQuerySqlGenerator.toDialectType(a)); + assertEquals(StandardSQLTypeName.JSON, BigQuerySqlGenerator.toDialectType(new UnsupportedOneOf(new ArrayList<>()))); + + Union u = new Union(ImmutableList.of(s)); + assertEquals(StandardSQLTypeName.JSON, BigQuerySqlGenerator.toDialectType(u)); + u = new Union(ImmutableList.of(a)); + assertEquals(StandardSQLTypeName.JSON, BigQuerySqlGenerator.toDialectType(u)); + u = new Union(ImmutableList.of(AirbyteProtocolType.BOOLEAN, AirbyteProtocolType.NUMBER)); + assertEquals(StandardSQLTypeName.NUMERIC, BigQuerySqlGenerator.toDialectType(u)); + } + + @Test + public void testClusteringMatches() { + StreamConfig stream = new StreamConfig(null, + null, + DestinationSyncMode.APPEND_DEDUP, + List.of(new ColumnId("foo", "bar", "fizz")), + null, + null); + + // Clustering is null + final StandardTableDefinition existingTable = Mockito.mock(StandardTableDefinition.class); + Mockito.when(existingTable.getClustering()).thenReturn(null); + Assertions.assertFalse(BigQueryDestinationHandler.clusteringMatches(stream, existingTable)); + + // Clustering does not contain all fields + Mockito.when(existingTable.getClustering()) + .thenReturn(Clustering.newBuilder().setFields(List.of("_airbyte_extracted_at")).build()); + Assertions.assertFalse(BigQueryDestinationHandler.clusteringMatches(stream, existingTable)); + + // Clustering matches + stream = new StreamConfig(null, + null, + DestinationSyncMode.OVERWRITE, + null, + null, + null); + Assertions.assertTrue(BigQueryDestinationHandler.clusteringMatches(stream, existingTable)); + + // Clustering only the first 3 PK columns (See https://github.com/airbytehq/oncall/issues/2565) + final var expectedStreamColumnNames = List.of("a", "b", "c"); + Mockito.when(existingTable.getClustering()) + .thenReturn(Clustering.newBuilder().setFields( + Stream.concat(expectedStreamColumnNames.stream(), Stream.of("_airbyte_extracted_at")) + .collect(Collectors.toList())) + .build()); + stream = new StreamConfig(null, + null, + DestinationSyncMode.APPEND_DEDUP, + Stream.concat(expectedStreamColumnNames.stream(), Stream.of("d", "e")) + .map(name -> new ColumnId(name, "foo", "bar")) + .collect(Collectors.toList()), + null, + null); + Assertions.assertTrue(BigQueryDestinationHandler.clusteringMatches(stream, existingTable)); + } + + @Test + public void testPartitioningMatches() { + final StandardTableDefinition existingTable = Mockito.mock(StandardTableDefinition.class); + // Partitioning is null + Mockito.when(existingTable.getTimePartitioning()).thenReturn(null); + Assertions.assertFalse(BigQueryDestinationHandler.partitioningMatches(existingTable)); + // incorrect field + Mockito.when(existingTable.getTimePartitioning()) + .thenReturn(TimePartitioning.newBuilder(TimePartitioning.Type.DAY).setField("_foo").build()); + Assertions.assertFalse(BigQueryDestinationHandler.partitioningMatches(existingTable)); + // incorrect partitioning scheme + Mockito.when(existingTable.getTimePartitioning()) + .thenReturn(TimePartitioning.newBuilder(TimePartitioning.Type.YEAR).setField("_airbyte_extracted_at").build()); + Assertions.assertFalse(BigQueryDestinationHandler.partitioningMatches(existingTable)); + + // partitioning matches + Mockito.when(existingTable.getTimePartitioning()) + .thenReturn(TimePartitioning.newBuilder(TimePartitioning.Type.DAY).setField("_airbyte_extracted_at").build()); + Assertions.assertTrue(BigQueryDestinationHandler.partitioningMatches(existingTable)); + } + + @Test + public void testSchemaContainAllFinalTableV2AirbyteColumns() { + Assertions.assertTrue( + BigQueryDestinationHandler.schemaContainAllFinalTableV2AirbyteColumns(Set.of("_airbyte_meta", "_airbyte_extracted_at", "_airbyte_raw_id"))); + Assertions.assertFalse(BigQueryDestinationHandler.schemaContainAllFinalTableV2AirbyteColumns(Set.of("_airbyte_extracted_at", "_airbyte_raw_id"))); + Assertions.assertFalse(BigQueryDestinationHandler.schemaContainAllFinalTableV2AirbyteColumns(Set.of("_airbyte_meta", "_airbyte_raw_id"))); + Assertions.assertFalse(BigQueryDestinationHandler.schemaContainAllFinalTableV2AirbyteColumns(Set.of("_airbyte_meta", "_airbyte_extracted_at"))); + Assertions.assertFalse(BigQueryDestinationHandler.schemaContainAllFinalTableV2AirbyteColumns(Set.of())); + Assertions.assertTrue( + BigQueryDestinationHandler.schemaContainAllFinalTableV2AirbyteColumns(Set.of("_AIRBYTE_META", "_AIRBYTE_EXTRACTED_AT", "_AIRBYTE_RAW_ID"))); + } + +} diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index 7f475376d592..e02e203bce07 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -210,13 +210,14 @@ tutorials: | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 2.4.11 | 2024-02-22 | [35569](https://github.com/airbytehq/airbyte/pull/35569) | Fix logging bug. | -| 2.4.10 | 2024-02-15 | [35240](https://github.com/airbytehq/airbyte/pull/35240) | Adopt CDK 0.20.9 | -| 2.4.9 | 2024-02-15 | [35285](https://github.com/airbytehq/airbyte/pull/35285) | Adopt CDK 0.20.8 | -| 2.4.8 | 2024-02-12 | [35144](https://github.com/airbytehq/airbyte/pull/35144) | Adopt CDK 0.20.2 | -| 2.4.7 | 2024-02-12 | [35111](https://github.com/airbytehq/airbyte/pull/35111) | Adopt CDK 0.20.1 | -| 2.4.6 | 2024-02-09 | [34575](https://github.com/airbytehq/airbyte/pull/34575) | Adopt CDK 0.20.0 | -| 2.4.5 | 2024-02-08 | [34745](https://github.com/airbytehq/airbyte/pull/34745) | Adopt CDK 0.19.0 | +| 2.4.12 | 2024-03-04 | [35315](https://github.com/airbytehq/airbyte/pull/35315) | Adopt CDK 0.23.11 | +| 2.4.11 | 2024-02-22 | [35569](https://github.com/airbytehq/airbyte/pull/35569) | Fix logging bug. | +| 2.4.10 | 2024-02-15 | [35240](https://github.com/airbytehq/airbyte/pull/35240) | Adopt CDK 0.20.9 | +| 2.4.9 | 2024-02-15 | [35285](https://github.com/airbytehq/airbyte/pull/35285) | Adopt CDK 0.20.8 | +| 2.4.8 | 2024-02-12 | [35144](https://github.com/airbytehq/airbyte/pull/35144) | Adopt CDK 0.20.2 | +| 2.4.7 | 2024-02-12 | [35111](https://github.com/airbytehq/airbyte/pull/35111) | Adopt CDK 0.20.1 | +| 2.4.6 | 2024-02-09 | [34575](https://github.com/airbytehq/airbyte/pull/34575) | Adopt CDK 0.20.0 | +| 2.4.5 | 2024-02-08 | [34745](https://github.com/airbytehq/airbyte/pull/34745) | Adopt CDK 0.19.0 | | 2.4.4 | 2024-02-08 | [35027](https://github.com/airbytehq/airbyte/pull/35027) | Upgrade CDK to 0.17.1 | | 2.4.3 | 2024-02-01 | [34728](https://github.com/airbytehq/airbyte/pull/34728) | Upgrade CDK to 0.16.4; Notable changes from 0.14.2, 0.15.1 and 0.16.3 | | 2.4.2 | 2024-01-24 | [34451](https://github.com/airbytehq/airbyte/pull/34451) | Improve logging for unparseable input |