From 1ed06cadc4e629b6e04ff0fba3399de3c202353d Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Wed, 14 Feb 2024 16:17:27 -0800 Subject: [PATCH 01/10] upgrade cdk --- .../connectors/destination-bigquery/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/destination-bigquery/build.gradle b/airbyte-integrations/connectors/destination-bigquery/build.gradle index f5d1b05d4b54..a240349eab00 100644 --- a/airbyte-integrations/connectors/destination-bigquery/build.gradle +++ b/airbyte-integrations/connectors/destination-bigquery/build.gradle @@ -10,7 +10,7 @@ airbyteJavaConnector { 'typing-deduping', 'gcs-destinations', ] - useLocalCdk = false + useLocalCdk = true } java { From ac356932de75a85150653712fc8f1ead6b3ce5fa Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Fri, 23 Feb 2024 13:52:48 -0800 Subject: [PATCH 02/10] start fixing build --- .../connectors/destination-bigquery/build.gradle | 1 + .../destination/bigquery/BigQueryDestination.java | 10 ++++++---- .../bigquery/BigQueryStagingConsumerFactory.java | 5 ++++- .../typing_deduping/BigQueryDestinationHandler.java | 3 ++- .../typing_deduping/BigQuerySqlGenerator.java | 2 +- .../bigquery/typing_deduping/BigqueryState.kt | 13 +++++++++++++ 6 files changed, 27 insertions(+), 7 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigqueryState.kt diff --git a/airbyte-integrations/connectors/destination-bigquery/build.gradle b/airbyte-integrations/connectors/destination-bigquery/build.gradle index a240349eab00..113d101821bb 100644 --- a/airbyte-integrations/connectors/destination-bigquery/build.gradle +++ b/airbyte-integrations/connectors/destination-bigquery/build.gradle @@ -1,5 +1,6 @@ plugins { id 'airbyte-java-connector' + id 'org.jetbrains.kotlin.jvm' version '1.9.22' } airbyteJavaConnector { diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java index b884fe5dbd23..f11769f1f6db 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -360,7 +360,6 @@ private SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuer final Consumer outputRecordCollector, final TyperDeduper typerDeduper) throws Exception { - typerDeduper.prepareTables(); final Supplier>> writeConfigs = getUploaderMap( bigquery, config, @@ -372,6 +371,8 @@ private SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuer return new BigQueryRecordStandardConsumer( outputRecordCollector, () -> { + typerDeduper.prepareSchemasAndRawTables(); + // Set up our raw tables writeConfigs.get().forEach((streamId, uploader) -> { final StreamConfig stream = parsedCatalog.getStream(streamId); @@ -390,6 +391,8 @@ private SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuer uploader.createRawTable(); } }); + + typerDeduper.prepareFinalTables(); }, (hasFailed, streamSyncSummaries) -> { try { @@ -444,7 +447,7 @@ private TyperDeduper buildTyperDeduper(final BigQuerySqlGenerator sqlGenerator, if (disableTypeDedupe) { return new NoOpTyperDeduperWithV1V2Migrations<>( - sqlGenerator, destinationHandler, parsedCatalog, migrator, v2RawTableMigrator, 8); + sqlGenerator, destinationHandler, parsedCatalog, migrator, v2RawTableMigrator, List.of()); } return new DefaultTyperDeduper<>( @@ -453,8 +456,7 @@ private TyperDeduper buildTyperDeduper(final BigQuerySqlGenerator sqlGenerator, parsedCatalog, migrator, v2RawTableMigrator, - 8); - + List.of()); } @Override diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java index a929bfbf095f..036045380e8f 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java @@ -135,7 +135,8 @@ private OnStartFunction onStartFunction(final BigQueryStagingOperations bigQuery final TyperDeduper typerDeduper) { return () -> { LOGGER.info("Preparing airbyte_raw tables in destination started for {} streams", writeConfigs.size()); - typerDeduper.prepareTables(); + typerDeduper.prepareSchemasAndRawTables(); + for (final BigQueryWriteConfig writeConfig : writeConfigs.values()) { LOGGER.info("Preparing staging are in destination for schema: {}, stream: {}, target table: {}, stage: {}", writeConfig.tableSchema(), writeConfig.streamName(), writeConfig.targetTableId(), writeConfig.streamName()); @@ -156,6 +157,8 @@ private OnStartFunction onStartFunction(final BigQueryStagingOperations bigQuery bigQueryGcsOperations.truncateTableIfExists(rawDatasetId, writeConfig.targetTableId(), writeConfig.tableSchema()); } } + + typerDeduper.prepareFinalTables(); LOGGER.info("Preparing tables in destination completed."); }; } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java index 8199165d6527..05b1d8f7ad48 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java @@ -20,6 +20,7 @@ import com.google.common.collect.Streams; import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler; import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler; +import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableState; import io.airbyte.integrations.base.destination.typing_deduping.Sql; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; import java.math.BigInteger; @@ -35,7 +36,7 @@ import org.slf4j.LoggerFactory; // TODO this stuff almost definitely exists somewhere else in our codebase. -public class BigQueryDestinationHandler implements DestinationHandler { +public class BigQueryDestinationHandler implements DestinationHandler { private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDestinationHandler.class); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java index c4370fc5dc0a..f64af3e251ad 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java @@ -51,7 +51,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class BigQuerySqlGenerator implements SqlGenerator { +public class BigQuerySqlGenerator implements SqlGenerator { public static final String QUOTE = "`"; private static final BigQuerySQLNameTransformer nameTransformer = new BigQuerySQLNameTransformer(); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigqueryState.kt b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigqueryState.kt new file mode 100644 index 000000000000..3d9ce03f6c5c --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigqueryState.kt @@ -0,0 +1,13 @@ +package io.airbyte.integrations.destination.bigquery.typing_deduping + +import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState + +data class BigqueryState(val needsSoftReset: Boolean): MinimumDestinationState { + override fun needsSoftReset(): Boolean { + return needsSoftReset + } + + override fun withSoftReset(needsSoftReset: Boolean): T { + return copy(needsSoftReset = needsSoftReset) as T + } +} From 336132096a9aa71be2fe187c665c39475131462e Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Fri, 23 Feb 2024 15:10:17 -0800 Subject: [PATCH 03/10] fix build --- .../bigquery/BigQueryDestination.java | 25 +- .../BigQueryDestinationHandler.java | 269 ++++++++++++++++-- .../typing_deduping/BigQuerySqlGenerator.java | 128 +-------- .../AbstractBigQueryTypingDedupingTest.java | 2 +- .../BigQuerySqlGeneratorIntegrationTest.java | 7 +- .../BigQuerySqlGeneratorTest.java | 98 ------- .../BigqueryDestinationHandlerTest.java | 127 +++++++++ 7 files changed, 404 insertions(+), 252 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigqueryDestinationHandlerTest.java diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java index f11769f1f6db..d45f807bf571 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -22,6 +22,7 @@ import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer; import io.airbyte.cdk.integrations.base.Destination; import io.airbyte.cdk.integrations.base.IntegrationRunner; +import io.airbyte.cdk.integrations.base.JavaBaseConstants; import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer; import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag; import io.airbyte.cdk.integrations.destination.StandardNameTransformer; @@ -61,6 +62,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -233,9 +235,10 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN final boolean disableTypeDedupe = BigQueryUtils.getDisableTypeDedupFlag(config); final String datasetLocation = BigQueryUtils.getDatasetLocation(config); final BigQuerySqlGenerator sqlGenerator = new BigQuerySqlGenerator(config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(), datasetLocation); - final ParsedCatalog parsedCatalog = parseCatalog(config, catalog, datasetLocation); + final Optional rawNamespaceOverride = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_DATA_DATASET); + final ParsedCatalog parsedCatalog = parseCatalog(config, catalog, datasetLocation, rawNamespaceOverride); final BigQuery bigquery = getBigQuery(config); - final TyperDeduper typerDeduper = buildTyperDeduper(sqlGenerator, parsedCatalog, bigquery, datasetLocation, disableTypeDedupe); + final TyperDeduper typerDeduper = buildTyperDeduper(sqlGenerator, parsedCatalog, bigquery, datasetLocation, disableTypeDedupe, rawNamespaceOverride); AirbyteExceptionHandler.addAllStringsInConfigForDeinterpolation(config); final JsonNode serviceAccountKey = config.get(BigQueryConsts.CONFIG_CREDS); @@ -427,11 +430,13 @@ private void setDefaultStreamNamespace(final ConfiguredAirbyteCatalog catalog, f } } - private ParsedCatalog parseCatalog(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final String datasetLocation) { + private ParsedCatalog parseCatalog(final JsonNode config, + final ConfiguredAirbyteCatalog catalog, + final String datasetLocation, + final Optional rawNamespaceOverride) { final BigQuerySqlGenerator sqlGenerator = new BigQuerySqlGenerator(config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(), datasetLocation); - final CatalogParser catalogParser = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_DATA_DATASET).isPresent() - ? new CatalogParser(sqlGenerator, TypingAndDedupingFlag.getRawNamespaceOverride(RAW_DATA_DATASET).get()) - : new CatalogParser(sqlGenerator); + final CatalogParser catalogParser = rawNamespaceOverride.map(s -> new CatalogParser(sqlGenerator, s)) + .orElseGet(() -> new CatalogParser(sqlGenerator)); return catalogParser.parseCatalog(catalog); } @@ -440,10 +445,14 @@ private TyperDeduper buildTyperDeduper(final BigQuerySqlGenerator sqlGenerator, final ParsedCatalog parsedCatalog, final BigQuery bigquery, final String datasetLocation, - final boolean disableTypeDedupe) { + final boolean disableTypeDedupe, + final Optional rawNamespaceOverride) { final BigQueryV1V2Migrator migrator = new BigQueryV1V2Migrator(bigquery, namingResolver); final BigQueryV2TableMigrator v2RawTableMigrator = new BigQueryV2TableMigrator(bigquery); - final BigQueryDestinationHandler destinationHandler = new BigQueryDestinationHandler(bigquery, datasetLocation); + final BigQueryDestinationHandler destinationHandler = new BigQueryDestinationHandler( + bigquery, + datasetLocation, + rawNamespaceOverride.orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE)); if (disableTypeDedupe) { return new NoOpTyperDeduperWithV1V2Migrations<>( diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java index 05b1d8f7ad48..b5968a43c52d 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java @@ -4,8 +4,18 @@ package io.airbyte.integrations.destination.bigquery.typing_deduping; +import static io.airbyte.integrations.base.destination.typing_deduping.CollectionUtils.containsAllIgnoreCase; +import static io.airbyte.integrations.base.destination.typing_deduping.CollectionUtils.containsIgnoreCase; +import static io.airbyte.integrations.base.destination.typing_deduping.CollectionUtils.matchingKey; +import static io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator.QUOTE; +import static io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator.clusteringColumns; +import static io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator.toDialectType; +import static java.util.stream.Collectors.toMap; + +import com.fasterxml.jackson.databind.JsonNode; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryException; +import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.FieldValue; import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.JobConfiguration; @@ -14,24 +24,46 @@ import com.google.cloud.bigquery.JobStatistics; import com.google.cloud.bigquery.JobStatus; import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.QueryParameterValue; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; import com.google.cloud.bigquery.Table; import com.google.cloud.bigquery.TableDefinition; import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import com.google.cloud.bigquery.TimePartitioning; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Streams; import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler; +import io.airbyte.cdk.integrations.base.JavaBaseConstants; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.destination.typing_deduping.AlterTableReport; +import io.airbyte.integrations.base.destination.typing_deduping.ColumnId; import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler; +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialState; import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableState; import io.airbyte.integrations.base.destination.typing_deduping.Sql; +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; +import io.airbyte.integrations.base.destination.typing_deduping.TableNotMigratedException; +import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.Comparator; -import java.util.LinkedHashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; import java.util.stream.Stream; +import java.util.stream.StreamSupport; import org.apache.commons.text.StringSubstitutor; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,40 +72,40 @@ public class BigQueryDestinationHandler implements DestinationHandler findExistingTable(final StreamId id) { final Table table = bq.getTable(id.finalNamespace(), id.finalName()); return Optional.ofNullable(table).map(Table::getDefinition); } - @Override - public LinkedHashMap findExistingFinalTables(List streamIds) throws Exception { - return null; - } - - @Override public boolean isFinalTableEmpty(final StreamId id) { return BigInteger.ZERO.equals(bq.getTable(TableId.of(id.finalNamespace(), id.finalName())).getNumRows()); } - @Override public InitialRawTableState getInitialRawTableState(final StreamId id) throws Exception { final Table rawTable = bq.getTable(TableId.of(id.rawNamespace(), id.rawName())); if (rawTable == null) { // Table doesn't exist. There are no unprocessed records, and no timestamp. - return new InitialRawTableState(false, Optional.empty()); + return new InitialRawTableState(false, false, Optional.empty()); } final FieldValue unloadedRecordTimestamp = bq.query(QueryJobConfiguration.newBuilder(new StringSubstitutor(Map.of( - "raw_table", id.rawTableId(BigQuerySqlGenerator.QUOTE))).replace( + "raw_table", id.rawTableId(QUOTE))).replace( // bigquery timestamps have microsecond precision """ SELECT TIMESTAMP_SUB(MIN(_airbyte_extracted_at), INTERVAL 1 MICROSECOND) @@ -85,11 +117,11 @@ SELECT TIMESTAMP_SUB(MIN(_airbyte_extracted_at), INTERVAL 1 MICROSECOND) // If it's not null, then we can return immediately - we've found some unprocessed records and their // timestamp. if (!unloadedRecordTimestamp.isNull()) { - return new InitialRawTableState(true, Optional.of(unloadedRecordTimestamp.getTimestampInstant())); + return new InitialRawTableState(true, true, Optional.of(unloadedRecordTimestamp.getTimestampInstant())); } final FieldValue loadedRecordTimestamp = bq.query(QueryJobConfiguration.newBuilder(new StringSubstitutor(Map.of( - "raw_table", id.rawTableId(BigQuerySqlGenerator.QUOTE))).replace( + "raw_table", id.rawTableId(QUOTE))).replace( """ SELECT MAX(_airbyte_extracted_at) FROM ${raw_table} @@ -99,10 +131,10 @@ SELECT MAX(_airbyte_extracted_at) // So we just need to get the timestamp of the most recent record. if (loadedRecordTimestamp.isNull()) { // Null timestamp because the table is empty. T+D can process the entire raw table during this sync. - return new InitialRawTableState(false, Optional.empty()); + return new InitialRawTableState(true, false, Optional.empty()); } else { // The raw table already has some records. T+D can skip all records with timestamp <= this value. - return new InitialRawTableState(false, Optional.of(loadedRecordTimestamp.getTimestampInstant())); + return new InitialRawTableState(true, false, Optional.of(loadedRecordTimestamp.getTimestampInstant())); } } @@ -173,4 +205,209 @@ public void execute(final Sql sql) throws InterruptedException { } } + @Override + public List> gatherInitialState(List streamConfigs) throws Exception { + final TableId stateTableId = TableId.of(rawTableDataset, DESTINATION_STATE_TABLE_NAME); + final Table stateTable = bq.getTable(stateTableId); + if (stateTable == null || !stateTable.exists()) { + bq.create(TableInfo.newBuilder( + stateTableId, + StandardTableDefinition.of(Schema.of( + Field.of(DESTINATION_STATE_TABLE_COLUMN_NAME, StandardSQLTypeName.STRING), + Field.of(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE, StandardSQLTypeName.STRING), + Field.of(DESTINATION_STATE_TABLE_COLUMN_STATE, StandardSQLTypeName.JSON), + Field.of(DESTINATION_STATE_TABLE_COLUMN_UPDATED_AT, StandardSQLTypeName.TIMESTAMP) + )) + ).build()); + } + + Map destinationStates = StreamSupport.stream( + bq.query(QueryJobConfiguration.newBuilder( + "SELECT * FROM " + getStateTableName() + ).build()).iterateAll().spliterator(), + false + ).collect(toMap( + fvList -> { + final FieldValue nameFieldValue = fvList.get(DESTINATION_STATE_TABLE_COLUMN_NAME); + final FieldValue namespaceFieldValue = fvList.get(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE); + return new AirbyteStreamNameNamespacePair( + nameFieldValue.isNull() ? null : nameFieldValue.getStringValue(), + namespaceFieldValue.isNull() ? null : namespaceFieldValue.getStringValue() + ); + }, + fvList -> { + JsonNode json = Jsons.deserialize(fvList.get(DESTINATION_STATE_TABLE_COLUMN_STATE).getStringValue()); + return toBigqueryState(json); + } + )); + + final List> initialStates = new ArrayList<>(); + for (final StreamConfig streamConfig : streamConfigs) { + final StreamId id = streamConfig.id(); + final Optional finalTable = findExistingTable(id); + final InitialRawTableState rawTableState = getInitialRawTableState(id); + final BigqueryState bigqueryState = destinationStates.getOrDefault(streamConfig.id().asPair(), toBigqueryState(Jsons.emptyObject())); + initialStates.add(new DestinationInitialState<>( + streamConfig, + finalTable.isPresent(), + rawTableState, + finalTable.isPresent() && !existingSchemaMatchesStreamConfig(streamConfig, finalTable.get()), + !finalTable.isPresent() || isFinalTableEmpty(id), + bigqueryState)); + } + return initialStates; + } + + @Override + public void commitDestinationStates(Map destinationStates) throws Exception { + if (destinationStates.isEmpty()) { + return; + } + + final String deleteStates = "DELETE FROM " + getStateTableName() + " WHERE " + + destinationStates.keySet().stream() + .map(streamId -> String.format("(%s = ? AND %s = ?)", + DESTINATION_STATE_TABLE_COLUMN_NAME, + DESTINATION_STATE_TABLE_COLUMN_NAMESPACE)) + .collect(Collectors.joining(" OR ")); + final QueryJobConfiguration.Builder deleteQueryConfig = QueryJobConfiguration.newBuilder(deleteStates); + destinationStates.forEach((key, value) -> { + deleteQueryConfig.addPositionalParameter(QueryParameterValue.string(key.originalName())); + deleteQueryConfig.addPositionalParameter(QueryParameterValue.string(key.originalNamespace())); + }); + bq.query(deleteQueryConfig.build()); + + final String insertStates = "INSERT INTO " + getStateTableName() + " (" + + DESTINATION_STATE_TABLE_COLUMN_NAME + ", " + + DESTINATION_STATE_TABLE_COLUMN_NAMESPACE + ", " + + DESTINATION_STATE_TABLE_COLUMN_STATE + ", " + + DESTINATION_STATE_TABLE_COLUMN_UPDATED_AT + ") VALUES " + + destinationStates.keySet().stream() + .map(streamId -> "(?, ?, ?, CURRENT_TIMESTAMP)") + .collect(Collectors.joining(", ")); + final QueryJobConfiguration.Builder insertQueryConfig = QueryJobConfiguration.newBuilder(insertStates); + destinationStates.forEach((key, value) -> { + insertQueryConfig.addPositionalParameter(QueryParameterValue.string(key.originalName())); + insertQueryConfig.addPositionalParameter(QueryParameterValue.string(key.originalNamespace())); + insertQueryConfig.addPositionalParameter(QueryParameterValue.json(Jsons.serialize(value))); + }); + bq.query(insertQueryConfig.build()); + } + + private boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, + final TableDefinition existingTable) + throws TableNotMigratedException { + final var alterTableReport = buildAlterTableReport(stream, existingTable); + boolean tableClusteringMatches = false; + boolean tablePartitioningMatches = false; + if (existingTable instanceof final StandardTableDefinition standardExistingTable) { + tableClusteringMatches = clusteringMatches(stream, standardExistingTable); + tablePartitioningMatches = partitioningMatches(standardExistingTable); + } + LOGGER.info("Alter Table Report {} {} {}; Clustering {}; Partitioning {}", + alterTableReport.columnsToAdd(), + alterTableReport.columnsToRemove(), + alterTableReport.columnsToChangeType(), + tableClusteringMatches, + tablePartitioningMatches); + + return alterTableReport.isNoOp() && tableClusteringMatches && tablePartitioningMatches; + } + + public AlterTableReport buildAlterTableReport(final StreamConfig stream, final TableDefinition existingTable) { + final Set pks = getPks(stream); + + final Map streamSchema = stream.columns().entrySet().stream() + .collect(toMap( + entry -> entry.getKey().name(), + entry -> toDialectType(entry.getValue()))); + + final Map existingSchema = existingTable.getSchema().getFields().stream() + .collect(toMap( + field -> field.getName(), + field -> field.getType().getStandardType())); + + // Columns in the StreamConfig that don't exist in the TableDefinition + final Set columnsToAdd = streamSchema.keySet().stream() + .filter(name -> !containsIgnoreCase(existingSchema.keySet(), name)) + .collect(Collectors.toSet()); + + // Columns in the current schema that are no longer in the StreamConfig + final Set columnsToRemove = existingSchema.keySet().stream() + .filter(name -> !containsIgnoreCase(streamSchema.keySet(), name) && !containsIgnoreCase( + JavaBaseConstants.V2_FINAL_TABLE_METADATA_COLUMNS, name)) + .collect(Collectors.toSet()); + + // Columns that are typed differently than the StreamConfig + final Set columnsToChangeType = Stream.concat( + streamSchema.keySet().stream() + // If it's not in the existing schema, it should already be in the columnsToAdd Set + .filter(name -> { + // Big Query Columns are case-insensitive, first find the correctly cased key if it exists + return matchingKey(existingSchema.keySet(), name) + // if it does exist, only include it in this set if the type (the value in each respective map) + // is different between the stream and existing schemas + .map(key -> !existingSchema.get(key).equals(streamSchema.get(name))) + // if there is no matching key, then don't include it because it is probably already in columnsToAdd + .orElse(false); + }), + + // OR columns that used to have a non-null constraint and shouldn't + // (https://github.com/airbytehq/airbyte/pull/31082) + existingTable.getSchema().getFields().stream() + .filter(field -> pks.contains(field.getName())) + .filter(field -> field.getMode() == Field.Mode.REQUIRED) + .map(Field::getName)) + .collect(Collectors.toSet()); + + final boolean isDestinationV2Format = schemaContainAllFinalTableV2AirbyteColumns(existingSchema.keySet()); + + return new AlterTableReport(columnsToAdd, columnsToRemove, columnsToChangeType, isDestinationV2Format); + } + + @VisibleForTesting + public static boolean clusteringMatches(final StreamConfig stream, final StandardTableDefinition existingTable) { + return existingTable.getClustering() != null + && containsAllIgnoreCase( + new HashSet<>(existingTable.getClustering().getFields()), + clusteringColumns(stream)); + } + + @VisibleForTesting + public static boolean partitioningMatches(final StandardTableDefinition existingTable) { + return existingTable.getTimePartitioning() != null + && existingTable.getTimePartitioning() + .getField() + .equalsIgnoreCase("_airbyte_extracted_at") + && TimePartitioning.Type.DAY.equals(existingTable.getTimePartitioning().getType()); + } + + /** + * Checks the schema to determine whether the table contains all expected final table airbyte + * columns + * + * @param columnNames the column names of the schema to check + * @return whether all the {@link JavaBaseConstants#V2_FINAL_TABLE_METADATA_COLUMNS} are present + */ + @VisibleForTesting + public static boolean schemaContainAllFinalTableV2AirbyteColumns(final Collection columnNames) { + return JavaBaseConstants.V2_FINAL_TABLE_METADATA_COLUMNS.stream() + .allMatch(column -> containsIgnoreCase(columnNames, column)); + } + + private static Set getPks(final StreamConfig stream) { + return stream.primaryKey() != null ? stream.primaryKey().stream().map(ColumnId::name).collect(Collectors.toSet()) : Collections.emptySet(); + } + + @NotNull + private String getStateTableName() { + return QUOTE + rawTableDataset + QUOTE + "." + DESTINATION_STATE_TABLE_NAME; + } + + @NotNull + private static BigqueryState toBigqueryState(JsonNode json) { + return new BigqueryState( + json.hasNonNull("needsSoftReset") && json.get("needsSoftReset").asBoolean()); + } + } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java index f64af3e251ad..3fe1f2cbb145 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java @@ -4,25 +4,15 @@ package io.airbyte.integrations.destination.bigquery.typing_deduping; -import static io.airbyte.integrations.base.destination.typing_deduping.CollectionUtils.containsAllIgnoreCase; -import static io.airbyte.integrations.base.destination.typing_deduping.CollectionUtils.containsIgnoreCase; -import static io.airbyte.integrations.base.destination.typing_deduping.CollectionUtils.matchingKey; import static io.airbyte.integrations.base.destination.typing_deduping.Sql.separately; import static io.airbyte.integrations.base.destination.typing_deduping.Sql.transactionally; import static io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeTransaction.SOFT_RESET_SUFFIX; import static java.util.stream.Collectors.joining; -import com.google.cloud.bigquery.Field; -import com.google.cloud.bigquery.Field.Mode; import com.google.cloud.bigquery.StandardSQLTypeName; -import com.google.cloud.bigquery.StandardTableDefinition; -import com.google.cloud.bigquery.TableDefinition; -import com.google.cloud.bigquery.TimePartitioning; import com.google.common.annotations.VisibleForTesting; -import io.airbyte.cdk.integrations.base.JavaBaseConstants; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType; -import io.airbyte.integrations.base.destination.typing_deduping.AlterTableReport; import io.airbyte.integrations.base.destination.typing_deduping.Array; import io.airbyte.integrations.base.destination.typing_deduping.ColumnId; import io.airbyte.integrations.base.destination.typing_deduping.Sql; @@ -30,21 +20,15 @@ import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; import io.airbyte.integrations.base.destination.typing_deduping.Struct; -import io.airbyte.integrations.base.destination.typing_deduping.TableNotMigratedException; import io.airbyte.integrations.base.destination.typing_deduping.Union; import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf; import io.airbyte.integrations.destination.bigquery.BigQuerySQLNameTransformer; import io.airbyte.protocol.models.v0.DestinationSyncMode; import java.time.Instant; import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; import org.apache.commons.text.StringSubstitutor; @@ -95,7 +79,7 @@ public ColumnId buildColumnId(final String name, final String suffix) { nameTransformer.getIdentifier(nameWithSuffix.toLowerCase())); } - public StandardSQLTypeName toDialectType(final AirbyteType type) { + public static StandardSQLTypeName toDialectType(final AirbyteType type) { // switch pattern-matching is still in preview at language level 17 :( if (type instanceof final AirbyteProtocolType p) { return toDialectType(p); @@ -197,7 +181,7 @@ THEN JSON_QUERY(`_airbyte_data`, '$."${column_name}"') // TODO maybe make this a BiMap and elevate this method and its inverse (toDestinationSQLType?) to // the SQLGenerator? - public StandardSQLTypeName toDialectType(final AirbyteProtocolType airbyteProtocolType) { + public static StandardSQLTypeName toDialectType(final AirbyteProtocolType airbyteProtocolType) { return switch (airbyteProtocolType) { case STRING -> StandardSQLTypeName.STRING; case NUMBER -> StandardSQLTypeName.NUMERIC; @@ -239,7 +223,7 @@ PARTITION BY (DATE_TRUNC(_airbyte_extracted_at, DAY)) """)); } - private List clusteringColumns(final StreamConfig stream) { + static List clusteringColumns(final StreamConfig stream) { final List clusterColumns = new ArrayList<>(); if (stream.destinationSyncMode() == DestinationSyncMode.APPEND_DEDUP) { // We're doing de-duping, therefore we have a primary key. @@ -259,108 +243,6 @@ private String columnsAndTypes(final StreamConfig stream) { .collect(joining(",\n")); } - @Override - public boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, - final TableDefinition existingTable) - throws TableNotMigratedException { - final var alterTableReport = buildAlterTableReport(stream, existingTable); - boolean tableClusteringMatches = false; - boolean tablePartitioningMatches = false; - if (existingTable instanceof final StandardTableDefinition standardExistingTable) { - tableClusteringMatches = clusteringMatches(stream, standardExistingTable); - tablePartitioningMatches = partitioningMatches(standardExistingTable); - } - LOGGER.info("Alter Table Report {} {} {}; Clustering {}; Partitioning {}", - alterTableReport.columnsToAdd(), - alterTableReport.columnsToRemove(), - alterTableReport.columnsToChangeType(), - tableClusteringMatches, - tablePartitioningMatches); - - return alterTableReport.isNoOp() && tableClusteringMatches && tablePartitioningMatches; - } - - @VisibleForTesting - public boolean clusteringMatches(final StreamConfig stream, final StandardTableDefinition existingTable) { - return existingTable.getClustering() != null - && containsAllIgnoreCase( - new HashSet<>(existingTable.getClustering().getFields()), - clusteringColumns(stream)); - } - - @VisibleForTesting - public boolean partitioningMatches(final StandardTableDefinition existingTable) { - return existingTable.getTimePartitioning() != null - && existingTable.getTimePartitioning() - .getField() - .equalsIgnoreCase("_airbyte_extracted_at") - && TimePartitioning.Type.DAY.equals(existingTable.getTimePartitioning().getType()); - } - - public AlterTableReport buildAlterTableReport(final StreamConfig stream, final TableDefinition existingTable) { - final Set pks = getPks(stream); - - final Map streamSchema = stream.columns().entrySet().stream() - .collect(Collectors.toMap( - entry -> entry.getKey().name(), - entry -> toDialectType(entry.getValue()))); - - final Map existingSchema = existingTable.getSchema().getFields().stream() - .collect(Collectors.toMap( - field -> field.getName(), - field -> field.getType().getStandardType())); - - // Columns in the StreamConfig that don't exist in the TableDefinition - final Set columnsToAdd = streamSchema.keySet().stream() - .filter(name -> !containsIgnoreCase(existingSchema.keySet(), name)) - .collect(Collectors.toSet()); - - // Columns in the current schema that are no longer in the StreamConfig - final Set columnsToRemove = existingSchema.keySet().stream() - .filter(name -> !containsIgnoreCase(streamSchema.keySet(), name) && !containsIgnoreCase( - JavaBaseConstants.V2_FINAL_TABLE_METADATA_COLUMNS, name)) - .collect(Collectors.toSet()); - - // Columns that are typed differently than the StreamConfig - final Set columnsToChangeType = Stream.concat( - streamSchema.keySet().stream() - // If it's not in the existing schema, it should already be in the columnsToAdd Set - .filter(name -> { - // Big Query Columns are case-insensitive, first find the correctly cased key if it exists - return matchingKey(existingSchema.keySet(), name) - // if it does exist, only include it in this set if the type (the value in each respective map) - // is different between the stream and existing schemas - .map(key -> !existingSchema.get(key).equals(streamSchema.get(name))) - // if there is no matching key, then don't include it because it is probably already in columnsToAdd - .orElse(false); - }), - - // OR columns that used to have a non-null constraint and shouldn't - // (https://github.com/airbytehq/airbyte/pull/31082) - existingTable.getSchema().getFields().stream() - .filter(field -> pks.contains(field.getName())) - .filter(field -> field.getMode() == Mode.REQUIRED) - .map(Field::getName)) - .collect(Collectors.toSet()); - - final boolean isDestinationV2Format = schemaContainAllFinalTableV2AirbyteColumns(existingSchema.keySet()); - - return new AlterTableReport(columnsToAdd, columnsToRemove, columnsToChangeType, isDestinationV2Format); - } - - /** - * Checks the schema to determine whether the table contains all expected final table airbyte - * columns - * - * @param columnNames the column names of the schema to check - * @return whether all the {@link JavaBaseConstants#V2_FINAL_TABLE_METADATA_COLUMNS} are present - */ - @VisibleForTesting - public static boolean schemaContainAllFinalTableV2AirbyteColumns(final Collection columnNames) { - return JavaBaseConstants.V2_FINAL_TABLE_METADATA_COLUMNS.stream() - .allMatch(column -> containsIgnoreCase(columnNames, column)); - } - @Override public Sql prepareTablesForSoftReset(final StreamConfig stream) { // Bigquery can't run DDL in a transaction, so these are separate transactions. @@ -765,10 +647,6 @@ private static String cast(final String content, final String asType, final bool return wrap(open, content + " as " + asType, ")"); } - private static Set getPks(final StreamConfig stream) { - return stream.primaryKey() != null ? stream.primaryKey().stream().map(ColumnId::name).collect(Collectors.toSet()) : Collections.emptySet(); - } - private static String wrap(final String open, final String content, final String close) { return open + content + close; } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/AbstractBigQueryTypingDedupingTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/AbstractBigQueryTypingDedupingTest.java index cc9f499abdfe..3d78ed982294 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/AbstractBigQueryTypingDedupingTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/AbstractBigQueryTypingDedupingTest.java @@ -87,7 +87,7 @@ protected void teardownStreamAndNamespace(String streamNamespace, final String s } @Override - protected SqlGenerator getSqlGenerator() { + protected SqlGenerator getSqlGenerator() { return new BigQuerySqlGenerator(getConfig().get(BigQueryConsts.CONFIG_PROJECT_ID).asText(), null); } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java index 99ac8a8e75dd..3dd7f41114bc 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java @@ -25,7 +25,6 @@ import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.cloud.bigquery.Table; -import com.google.cloud.bigquery.TableDefinition; import com.google.cloud.bigquery.TableResult; import io.airbyte.cdk.integrations.base.JavaBaseConstants; import io.airbyte.commons.json.Jsons; @@ -57,7 +56,7 @@ import org.slf4j.LoggerFactory; @Execution(ExecutionMode.CONCURRENT) -public class BigQuerySqlGeneratorIntegrationTest extends BaseSqlGeneratorIntegrationTest { +public class BigQuerySqlGeneratorIntegrationTest extends BaseSqlGeneratorIntegrationTest { private static final Logger LOGGER = LoggerFactory.getLogger(BigQuerySqlGeneratorIntegrationTest.class); @@ -82,7 +81,7 @@ protected BigQuerySqlGenerator getSqlGenerator() { @Override protected BigQueryDestinationHandler getDestinationHandler() { - return new BigQueryDestinationHandler(bq, "US"); + return new BigQueryDestinationHandler(bq, "US", namespace); } @Override @@ -363,7 +362,7 @@ public void testCreateTableIncremental() throws Exception { @Test public void testCreateTableInOtherRegion() throws InterruptedException { - final BigQueryDestinationHandler destinationHandler = new BigQueryDestinationHandler(bq, "asia-east1"); + final BigQueryDestinationHandler destinationHandler = new BigQueryDestinationHandler(bq, "asia-east1", namespace); // We're creating the dataset in the wrong location in the @BeforeEach block. Explicitly delete it. bq.getDataset(namespace).delete(); final var sqlGenerator = new BigQuerySqlGenerator(projectId, "asia-east1"); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java index 1fac62e2d681..83da7706baf9 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java @@ -42,24 +42,6 @@ public class BigQuerySqlGeneratorTest { private final BigQuerySqlGenerator generator = new BigQuerySqlGenerator("foo", "US"); - @Test - public void testToDialectType() { - final Struct s = new Struct(new LinkedHashMap<>()); - final Array a = new Array(AirbyteProtocolType.BOOLEAN); - - assertEquals(StandardSQLTypeName.INT64, generator.toDialectType((AirbyteType) AirbyteProtocolType.INTEGER)); - assertEquals(StandardSQLTypeName.JSON, generator.toDialectType(s)); - assertEquals(StandardSQLTypeName.JSON, generator.toDialectType(a)); - assertEquals(StandardSQLTypeName.JSON, generator.toDialectType(new UnsupportedOneOf(new ArrayList<>()))); - - Union u = new Union(ImmutableList.of(s)); - assertEquals(StandardSQLTypeName.JSON, generator.toDialectType(u)); - u = new Union(ImmutableList.of(a)); - assertEquals(StandardSQLTypeName.JSON, generator.toDialectType(u)); - u = new Union(ImmutableList.of(AirbyteProtocolType.BOOLEAN, AirbyteProtocolType.NUMBER)); - assertEquals(StandardSQLTypeName.NUMERIC, generator.toDialectType(u)); - } - @Test public void testBuildColumnId() { // Uninteresting names are unchanged @@ -67,86 +49,6 @@ public void testBuildColumnId() { new ColumnId("foo", "foo", "foo"), generator.buildColumnId("foo")); } - - @Test - public void testClusteringMatches() { - StreamConfig stream = new StreamConfig(null, - null, - DestinationSyncMode.APPEND_DEDUP, - List.of(new ColumnId("foo", "bar", "fizz")), - null, - null); - - // Clustering is null - final StandardTableDefinition existingTable = Mockito.mock(StandardTableDefinition.class); - Mockito.when(existingTable.getClustering()).thenReturn(null); - Assertions.assertFalse(generator.clusteringMatches(stream, existingTable)); - - // Clustering does not contain all fields - Mockito.when(existingTable.getClustering()) - .thenReturn(Clustering.newBuilder().setFields(List.of("_airbyte_extracted_at")).build()); - Assertions.assertFalse(generator.clusteringMatches(stream, existingTable)); - - // Clustering matches - stream = new StreamConfig(null, - null, - DestinationSyncMode.OVERWRITE, - null, - null, - null); - Assertions.assertTrue(generator.clusteringMatches(stream, existingTable)); - - // Clustering only the first 3 PK columns (See https://github.com/airbytehq/oncall/issues/2565) - final var expectedStreamColumnNames = List.of("a", "b", "c"); - Mockito.when(existingTable.getClustering()) - .thenReturn(Clustering.newBuilder().setFields( - Stream.concat(expectedStreamColumnNames.stream(), Stream.of("_airbyte_extracted_at")) - .collect(Collectors.toList())) - .build()); - stream = new StreamConfig(null, - null, - DestinationSyncMode.APPEND_DEDUP, - Stream.concat(expectedStreamColumnNames.stream(), Stream.of("d", "e")) - .map(name -> new ColumnId(name, "foo", "bar")) - .collect(Collectors.toList()), - null, - null); - Assertions.assertTrue(generator.clusteringMatches(stream, existingTable)); - } - - @Test - public void testPartitioningMatches() { - final StandardTableDefinition existingTable = Mockito.mock(StandardTableDefinition.class); - // Partitioning is null - Mockito.when(existingTable.getTimePartitioning()).thenReturn(null); - Assertions.assertFalse(generator.partitioningMatches(existingTable)); - // incorrect field - Mockito.when(existingTable.getTimePartitioning()) - .thenReturn(TimePartitioning.newBuilder(TimePartitioning.Type.DAY).setField("_foo").build()); - Assertions.assertFalse(generator.partitioningMatches(existingTable)); - // incorrect partitioning scheme - Mockito.when(existingTable.getTimePartitioning()) - .thenReturn(TimePartitioning.newBuilder(TimePartitioning.Type.YEAR).setField("_airbyte_extracted_at").build()); - Assertions.assertFalse(generator.partitioningMatches(existingTable)); - - // partitioning matches - Mockito.when(existingTable.getTimePartitioning()) - .thenReturn(TimePartitioning.newBuilder(TimePartitioning.Type.DAY).setField("_airbyte_extracted_at").build()); - Assertions.assertTrue(generator.partitioningMatches(existingTable)); - } - - @Test - public void testSchemaContainAllFinalTableV2AirbyteColumns() { - Assertions.assertTrue( - BigQuerySqlGenerator.schemaContainAllFinalTableV2AirbyteColumns(Set.of("_airbyte_meta", "_airbyte_extracted_at", "_airbyte_raw_id"))); - Assertions.assertFalse(BigQuerySqlGenerator.schemaContainAllFinalTableV2AirbyteColumns(Set.of("_airbyte_extracted_at", "_airbyte_raw_id"))); - Assertions.assertFalse(BigQuerySqlGenerator.schemaContainAllFinalTableV2AirbyteColumns(Set.of("_airbyte_meta", "_airbyte_raw_id"))); - Assertions.assertFalse(BigQuerySqlGenerator.schemaContainAllFinalTableV2AirbyteColumns(Set.of("_airbyte_meta", "_airbyte_extracted_at"))); - Assertions.assertFalse(BigQuerySqlGenerator.schemaContainAllFinalTableV2AirbyteColumns(Set.of())); - Assertions.assertTrue( - BigQuerySqlGenerator.schemaContainAllFinalTableV2AirbyteColumns(Set.of("_AIRBYTE_META", "_AIRBYTE_EXTRACTED_AT", "_AIRBYTE_RAW_ID"))); - } - @Test void columnCollision() { final CatalogParser parser = new CatalogParser(generator); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigqueryDestinationHandlerTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigqueryDestinationHandlerTest.java new file mode 100644 index 000000000000..74d2cf9715c3 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigqueryDestinationHandlerTest.java @@ -0,0 +1,127 @@ +package io.airbyte.integrations.destination.bigquery.typing_deduping; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.google.cloud.bigquery.Clustering; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TimePartitioning; +import com.google.common.collect.ImmutableList; +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType; +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType; +import io.airbyte.integrations.base.destination.typing_deduping.Array; +import io.airbyte.integrations.base.destination.typing_deduping.ColumnId; +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; +import io.airbyte.integrations.base.destination.typing_deduping.Struct; +import io.airbyte.integrations.base.destination.typing_deduping.Union; +import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf; +import io.airbyte.protocol.models.v0.DestinationSyncMode; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class BigqueryDestinationHandlerTest { + + @Test + public void testToDialectType() { + final Struct s = new Struct(new LinkedHashMap<>()); + final Array a = new Array(AirbyteProtocolType.BOOLEAN); + + assertEquals(StandardSQLTypeName.INT64, BigQuerySqlGenerator.toDialectType((AirbyteType) AirbyteProtocolType.INTEGER)); + assertEquals(StandardSQLTypeName.JSON, BigQuerySqlGenerator.toDialectType(s)); + assertEquals(StandardSQLTypeName.JSON, BigQuerySqlGenerator.toDialectType(a)); + assertEquals(StandardSQLTypeName.JSON, BigQuerySqlGenerator.toDialectType(new UnsupportedOneOf(new ArrayList<>()))); + + Union u = new Union(ImmutableList.of(s)); + assertEquals(StandardSQLTypeName.JSON, BigQuerySqlGenerator.toDialectType(u)); + u = new Union(ImmutableList.of(a)); + assertEquals(StandardSQLTypeName.JSON, BigQuerySqlGenerator.toDialectType(u)); + u = new Union(ImmutableList.of(AirbyteProtocolType.BOOLEAN, AirbyteProtocolType.NUMBER)); + assertEquals(StandardSQLTypeName.NUMERIC, BigQuerySqlGenerator.toDialectType(u)); + } + + @Test + public void testClusteringMatches() { + StreamConfig stream = new StreamConfig(null, + null, + DestinationSyncMode.APPEND_DEDUP, + List.of(new ColumnId("foo", "bar", "fizz")), + null, + null); + + // Clustering is null + final StandardTableDefinition existingTable = Mockito.mock(StandardTableDefinition.class); + Mockito.when(existingTable.getClustering()).thenReturn(null); + Assertions.assertFalse(BigQueryDestinationHandler.clusteringMatches(stream, existingTable)); + + // Clustering does not contain all fields + Mockito.when(existingTable.getClustering()) + .thenReturn(Clustering.newBuilder().setFields(List.of("_airbyte_extracted_at")).build()); + Assertions.assertFalse(BigQueryDestinationHandler.clusteringMatches(stream, existingTable)); + + // Clustering matches + stream = new StreamConfig(null, + null, + DestinationSyncMode.OVERWRITE, + null, + null, + null); + Assertions.assertTrue(BigQueryDestinationHandler.clusteringMatches(stream, existingTable)); + + // Clustering only the first 3 PK columns (See https://github.com/airbytehq/oncall/issues/2565) + final var expectedStreamColumnNames = List.of("a", "b", "c"); + Mockito.when(existingTable.getClustering()) + .thenReturn(Clustering.newBuilder().setFields( + Stream.concat(expectedStreamColumnNames.stream(), Stream.of("_airbyte_extracted_at")) + .collect(Collectors.toList())) + .build()); + stream = new StreamConfig(null, + null, + DestinationSyncMode.APPEND_DEDUP, + Stream.concat(expectedStreamColumnNames.stream(), Stream.of("d", "e")) + .map(name -> new ColumnId(name, "foo", "bar")) + .collect(Collectors.toList()), + null, + null); + Assertions.assertTrue(BigQueryDestinationHandler.clusteringMatches(stream, existingTable)); + } + + @Test + public void testPartitioningMatches() { + final StandardTableDefinition existingTable = Mockito.mock(StandardTableDefinition.class); + // Partitioning is null + Mockito.when(existingTable.getTimePartitioning()).thenReturn(null); + Assertions.assertFalse(BigQueryDestinationHandler.partitioningMatches(existingTable)); + // incorrect field + Mockito.when(existingTable.getTimePartitioning()) + .thenReturn(TimePartitioning.newBuilder(TimePartitioning.Type.DAY).setField("_foo").build()); + Assertions.assertFalse(BigQueryDestinationHandler.partitioningMatches(existingTable)); + // incorrect partitioning scheme + Mockito.when(existingTable.getTimePartitioning()) + .thenReturn(TimePartitioning.newBuilder(TimePartitioning.Type.YEAR).setField("_airbyte_extracted_at").build()); + Assertions.assertFalse(BigQueryDestinationHandler.partitioningMatches(existingTable)); + + // partitioning matches + Mockito.when(existingTable.getTimePartitioning()) + .thenReturn(TimePartitioning.newBuilder(TimePartitioning.Type.DAY).setField("_airbyte_extracted_at").build()); + Assertions.assertTrue(BigQueryDestinationHandler.partitioningMatches(existingTable)); + } + + @Test + public void testSchemaContainAllFinalTableV2AirbyteColumns() { + Assertions.assertTrue( + BigQueryDestinationHandler.schemaContainAllFinalTableV2AirbyteColumns(Set.of("_airbyte_meta", "_airbyte_extracted_at", "_airbyte_raw_id"))); + Assertions.assertFalse(BigQueryDestinationHandler.schemaContainAllFinalTableV2AirbyteColumns(Set.of("_airbyte_extracted_at", "_airbyte_raw_id"))); + Assertions.assertFalse(BigQueryDestinationHandler.schemaContainAllFinalTableV2AirbyteColumns(Set.of("_airbyte_meta", "_airbyte_raw_id"))); + Assertions.assertFalse(BigQueryDestinationHandler.schemaContainAllFinalTableV2AirbyteColumns(Set.of("_airbyte_meta", "_airbyte_extracted_at"))); + Assertions.assertFalse(BigQueryDestinationHandler.schemaContainAllFinalTableV2AirbyteColumns(Set.of())); + Assertions.assertTrue( + BigQueryDestinationHandler.schemaContainAllFinalTableV2AirbyteColumns(Set.of("_AIRBYTE_META", "_AIRBYTE_EXTRACTED_AT", "_AIRBYTE_RAW_ID"))); + } +} From c456d09d941a41efe8598147947d39a9882d3c22 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Fri, 23 Feb 2024 15:31:23 -0800 Subject: [PATCH 04/10] format --- .../bigquery/BigQueryDestination.java | 3 +- .../BigQueryDestinationHandler.java | 82 +++++++++---------- .../BigQuerySqlGeneratorTest.java | 18 +--- .../BigqueryDestinationHandlerTest.java | 5 +- 4 files changed, 45 insertions(+), 63 deletions(-) diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java index d45f807bf571..f0e2cd5c4e3f 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -238,7 +238,8 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN final Optional rawNamespaceOverride = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_DATA_DATASET); final ParsedCatalog parsedCatalog = parseCatalog(config, catalog, datasetLocation, rawNamespaceOverride); final BigQuery bigquery = getBigQuery(config); - final TyperDeduper typerDeduper = buildTyperDeduper(sqlGenerator, parsedCatalog, bigquery, datasetLocation, disableTypeDedupe, rawNamespaceOverride); + final TyperDeduper typerDeduper = + buildTyperDeduper(sqlGenerator, parsedCatalog, bigquery, datasetLocation, disableTypeDedupe, rawNamespaceOverride); AirbyteExceptionHandler.addAllStringsInConfigForDeinterpolation(config); final JsonNode serviceAccountKey = config.get(BigQueryConsts.CONFIG_CREDS); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java index b5968a43c52d..5f2af7f11e32 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java @@ -216,30 +216,26 @@ public List> gatherInitialState(List destinationStates = StreamSupport.stream( bq.query(QueryJobConfiguration.newBuilder( - "SELECT * FROM " + getStateTableName() - ).build()).iterateAll().spliterator(), - false - ).collect(toMap( - fvList -> { - final FieldValue nameFieldValue = fvList.get(DESTINATION_STATE_TABLE_COLUMN_NAME); - final FieldValue namespaceFieldValue = fvList.get(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE); - return new AirbyteStreamNameNamespacePair( - nameFieldValue.isNull() ? null : nameFieldValue.getStringValue(), - namespaceFieldValue.isNull() ? null : namespaceFieldValue.getStringValue() - ); - }, - fvList -> { - JsonNode json = Jsons.deserialize(fvList.get(DESTINATION_STATE_TABLE_COLUMN_STATE).getStringValue()); - return toBigqueryState(json); - } - )); + "SELECT * FROM " + getStateTableName()).build()).iterateAll().spliterator(), + false).collect( + toMap( + fvList -> { + final FieldValue nameFieldValue = fvList.get(DESTINATION_STATE_TABLE_COLUMN_NAME); + final FieldValue namespaceFieldValue = fvList.get(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE); + return new AirbyteStreamNameNamespacePair( + nameFieldValue.isNull() ? null : nameFieldValue.getStringValue(), + namespaceFieldValue.isNull() ? null : namespaceFieldValue.getStringValue()); + }, + fvList -> { + JsonNode json = Jsons.deserialize(fvList.get(DESTINATION_STATE_TABLE_COLUMN_STATE).getStringValue()); + return toBigqueryState(json); + })); final List> initialStates = new ArrayList<>(); for (final StreamConfig streamConfig : streamConfigs) { @@ -295,7 +291,7 @@ public void commitDestinationStates(Map destinationStat } private boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, - final TableDefinition existingTable) + final TableDefinition existingTable) throws TableNotMigratedException { final var alterTableReport = buildAlterTableReport(stream, existingTable); boolean tableClusteringMatches = false; @@ -340,24 +336,24 @@ public AlterTableReport buildAlterTableReport(final StreamConfig stream, final T // Columns that are typed differently than the StreamConfig final Set columnsToChangeType = Stream.concat( - streamSchema.keySet().stream() - // If it's not in the existing schema, it should already be in the columnsToAdd Set - .filter(name -> { - // Big Query Columns are case-insensitive, first find the correctly cased key if it exists - return matchingKey(existingSchema.keySet(), name) - // if it does exist, only include it in this set if the type (the value in each respective map) - // is different between the stream and existing schemas - .map(key -> !existingSchema.get(key).equals(streamSchema.get(name))) - // if there is no matching key, then don't include it because it is probably already in columnsToAdd - .orElse(false); - }), - - // OR columns that used to have a non-null constraint and shouldn't - // (https://github.com/airbytehq/airbyte/pull/31082) - existingTable.getSchema().getFields().stream() - .filter(field -> pks.contains(field.getName())) - .filter(field -> field.getMode() == Field.Mode.REQUIRED) - .map(Field::getName)) + streamSchema.keySet().stream() + // If it's not in the existing schema, it should already be in the columnsToAdd Set + .filter(name -> { + // Big Query Columns are case-insensitive, first find the correctly cased key if it exists + return matchingKey(existingSchema.keySet(), name) + // if it does exist, only include it in this set if the type (the value in each respective map) + // is different between the stream and existing schemas + .map(key -> !existingSchema.get(key).equals(streamSchema.get(name))) + // if there is no matching key, then don't include it because it is probably already in columnsToAdd + .orElse(false); + }), + + // OR columns that used to have a non-null constraint and shouldn't + // (https://github.com/airbytehq/airbyte/pull/31082) + existingTable.getSchema().getFields().stream() + .filter(field -> pks.contains(field.getName())) + .filter(field -> field.getMode() == Field.Mode.REQUIRED) + .map(Field::getName)) .collect(Collectors.toSet()); final boolean isDestinationV2Format = schemaContainAllFinalTableV2AirbyteColumns(existingSchema.keySet()); @@ -369,16 +365,16 @@ public AlterTableReport buildAlterTableReport(final StreamConfig stream, final T public static boolean clusteringMatches(final StreamConfig stream, final StandardTableDefinition existingTable) { return existingTable.getClustering() != null && containsAllIgnoreCase( - new HashSet<>(existingTable.getClustering().getFields()), - clusteringColumns(stream)); + new HashSet<>(existingTable.getClustering().getFields()), + clusteringColumns(stream)); } @VisibleForTesting public static boolean partitioningMatches(final StandardTableDefinition existingTable) { return existingTable.getTimePartitioning() != null && existingTable.getTimePartitioning() - .getField() - .equalsIgnoreCase("_airbyte_extracted_at") + .getField() + .equalsIgnoreCase("_airbyte_extracted_at") && TimePartitioning.Type.DAY.equals(existingTable.getTimePartitioning().getType()); } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java index 83da7706baf9..66c2147b1c3f 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java @@ -7,36 +7,19 @@ import static java.util.Collections.emptyList; import static org.junit.jupiter.api.Assertions.assertEquals; -import com.google.cloud.bigquery.Clustering; -import com.google.cloud.bigquery.StandardSQLTypeName; -import com.google.cloud.bigquery.StandardTableDefinition; -import com.google.cloud.bigquery.TimePartitioning; -import com.google.common.collect.ImmutableList; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType; -import io.airbyte.integrations.base.destination.typing_deduping.Array; import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser; import io.airbyte.integrations.base.destination.typing_deduping.ColumnId; import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; -import io.airbyte.integrations.base.destination.typing_deduping.Struct; -import io.airbyte.integrations.base.destination.typing_deduping.Union; -import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf; import io.airbyte.protocol.models.v0.AirbyteStream; import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; import io.airbyte.protocol.models.v0.DestinationSyncMode; import io.airbyte.protocol.models.v0.SyncMode; -import java.util.ArrayList; import java.util.LinkedHashMap; -import java.util.List; import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; public class BigQuerySqlGeneratorTest { @@ -49,6 +32,7 @@ public void testBuildColumnId() { new ColumnId("foo", "foo", "foo"), generator.buildColumnId("foo")); } + @Test void columnCollision() { final CatalogParser parser = new CatalogParser(generator); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigqueryDestinationHandlerTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigqueryDestinationHandlerTest.java index 74d2cf9715c3..d2d8fdec0ec1 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigqueryDestinationHandlerTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigqueryDestinationHandlerTest.java @@ -78,8 +78,8 @@ public void testClusteringMatches() { final var expectedStreamColumnNames = List.of("a", "b", "c"); Mockito.when(existingTable.getClustering()) .thenReturn(Clustering.newBuilder().setFields( - Stream.concat(expectedStreamColumnNames.stream(), Stream.of("_airbyte_extracted_at")) - .collect(Collectors.toList())) + Stream.concat(expectedStreamColumnNames.stream(), Stream.of("_airbyte_extracted_at")) + .collect(Collectors.toList())) .build()); stream = new StreamConfig(null, null, @@ -124,4 +124,5 @@ public void testSchemaContainAllFinalTableV2AirbyteColumns() { Assertions.assertTrue( BigQueryDestinationHandler.schemaContainAllFinalTableV2AirbyteColumns(Set.of("_AIRBYTE_META", "_AIRBYTE_EXTRACTED_AT", "_AIRBYTE_RAW_ID"))); } + } From 8cc7e2c45dd7c516f57b291e168072aa27b6b513 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Fri, 23 Feb 2024 16:45:37 -0800 Subject: [PATCH 05/10] switch to sql --- .../BigQueryDestinationHandler.java | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java index 5f2af7f11e32..e31ee28d422f 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java @@ -207,18 +207,13 @@ public void execute(final Sql sql) throws InterruptedException { @Override public List> gatherInitialState(List streamConfigs) throws Exception { - final TableId stateTableId = TableId.of(rawTableDataset, DESTINATION_STATE_TABLE_NAME); - final Table stateTable = bq.getTable(stateTableId); - if (stateTable == null || !stateTable.exists()) { - bq.create(TableInfo.newBuilder( - stateTableId, - StandardTableDefinition.of(Schema.of( - Field.of(DESTINATION_STATE_TABLE_COLUMN_NAME, StandardSQLTypeName.STRING), - Field.of(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE, StandardSQLTypeName.STRING), - Field.of(DESTINATION_STATE_TABLE_COLUMN_STATE, StandardSQLTypeName.JSON), - Field.of(DESTINATION_STATE_TABLE_COLUMN_UPDATED_AT, StandardSQLTypeName.TIMESTAMP)))) - .build()); - } + // Would be nice to use bq.create(), but it doesn't support `create table if not exists`. + bq.query(QueryJobConfiguration.newBuilder( + "CREATE TABLE IF NOT EXISTS " + getStateTableName() + " (" + + DESTINATION_STATE_TABLE_COLUMN_NAME + " STRING, " + + DESTINATION_STATE_TABLE_COLUMN_NAMESPACE + " STRING, " + + DESTINATION_STATE_TABLE_COLUMN_STATE + " JSON, " + + DESTINATION_STATE_TABLE_COLUMN_UPDATED_AT + " TIMESTAMP)").build()); Map destinationStates = StreamSupport.stream( bq.query(QueryJobConfiguration.newBuilder( From 564f57b4077723f0cae570c7cf8d9e9d3fd5e690 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 26 Feb 2024 08:45:48 -0800 Subject: [PATCH 06/10] remove state handling --- .../bigquery/BigQueryDestination.java | 10 +- .../BigQueryDestinationHandler.java | 100 ++---------------- .../bigquery/typing_deduping/BigqueryState.kt | 13 --- .../BigQuerySqlGeneratorIntegrationTest.java | 7 +- 4 files changed, 17 insertions(+), 113 deletions(-) delete mode 100644 airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigqueryState.kt diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java index f0e2cd5c4e3f..f184e69043f0 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -22,7 +22,6 @@ import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer; import io.airbyte.cdk.integrations.base.Destination; import io.airbyte.cdk.integrations.base.IntegrationRunner; -import io.airbyte.cdk.integrations.base.JavaBaseConstants; import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer; import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag; import io.airbyte.cdk.integrations.destination.StandardNameTransformer; @@ -239,7 +238,7 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN final ParsedCatalog parsedCatalog = parseCatalog(config, catalog, datasetLocation, rawNamespaceOverride); final BigQuery bigquery = getBigQuery(config); final TyperDeduper typerDeduper = - buildTyperDeduper(sqlGenerator, parsedCatalog, bigquery, datasetLocation, disableTypeDedupe, rawNamespaceOverride); + buildTyperDeduper(sqlGenerator, parsedCatalog, bigquery, datasetLocation, disableTypeDedupe); AirbyteExceptionHandler.addAllStringsInConfigForDeinterpolation(config); final JsonNode serviceAccountKey = config.get(BigQueryConsts.CONFIG_CREDS); @@ -446,14 +445,13 @@ private TyperDeduper buildTyperDeduper(final BigQuerySqlGenerator sqlGenerator, final ParsedCatalog parsedCatalog, final BigQuery bigquery, final String datasetLocation, - final boolean disableTypeDedupe, - final Optional rawNamespaceOverride) { + final boolean disableTypeDedupe) { final BigQueryV1V2Migrator migrator = new BigQueryV1V2Migrator(bigquery, namingResolver); final BigQueryV2TableMigrator v2RawTableMigrator = new BigQueryV2TableMigrator(bigquery); final BigQueryDestinationHandler destinationHandler = new BigQueryDestinationHandler( bigquery, - datasetLocation, - rawNamespaceOverride.orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE)); + datasetLocation + ); if (disableTypeDedupe) { return new NoOpTyperDeduperWithV1V2Migrations<>( diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java index e31ee28d422f..697d90b47d5c 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java @@ -12,7 +12,6 @@ import static io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator.toDialectType; import static java.util.stream.Collectors.toMap; -import com.fasterxml.jackson.databind.JsonNode; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.bigquery.Field; @@ -24,20 +23,16 @@ import com.google.cloud.bigquery.JobStatistics; import com.google.cloud.bigquery.JobStatus; import com.google.cloud.bigquery.QueryJobConfiguration; -import com.google.cloud.bigquery.QueryParameterValue; -import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.cloud.bigquery.StandardTableDefinition; import com.google.cloud.bigquery.Table; import com.google.cloud.bigquery.TableDefinition; import com.google.cloud.bigquery.TableId; -import com.google.cloud.bigquery.TableInfo; import com.google.cloud.bigquery.TimePartitioning; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Streams; import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler; import io.airbyte.cdk.integrations.base.JavaBaseConstants; -import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.destination.typing_deduping.AlterTableReport; import io.airbyte.integrations.base.destination.typing_deduping.ColumnId; import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler; @@ -47,7 +42,7 @@ import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; import io.airbyte.integrations.base.destination.typing_deduping.TableNotMigratedException; -import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; +import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState; import java.math.BigInteger; import java.util.ArrayList; import java.util.Collection; @@ -61,31 +56,21 @@ import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; -import java.util.stream.StreamSupport; import org.apache.commons.text.StringSubstitutor; -import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; // TODO this stuff almost definitely exists somewhere else in our codebase. -public class BigQueryDestinationHandler implements DestinationHandler { +public class BigQueryDestinationHandler implements DestinationHandler { private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDestinationHandler.class); - private static final String DESTINATION_STATE_TABLE_NAME = "_airbyte_destination_state"; - private static final String DESTINATION_STATE_TABLE_COLUMN_NAME = "name"; - private static final String DESTINATION_STATE_TABLE_COLUMN_NAMESPACE = "namespace"; - private static final String DESTINATION_STATE_TABLE_COLUMN_STATE = "destination_state"; - private static final String DESTINATION_STATE_TABLE_COLUMN_UPDATED_AT = "updated_at"; - private final BigQuery bq; private final String datasetLocation; - private final String rawTableDataset; - public BigQueryDestinationHandler(final BigQuery bq, final String datasetLocation, String rawTableDataset) { + public BigQueryDestinationHandler(final BigQuery bq, final String datasetLocation) { this.bq = bq; this.datasetLocation = datasetLocation; - this.rawTableDataset = rawTableDataset; } public Optional findExistingTable(final StreamId id) { @@ -206,83 +191,27 @@ public void execute(final Sql sql) throws InterruptedException { } @Override - public List> gatherInitialState(List streamConfigs) throws Exception { - // Would be nice to use bq.create(), but it doesn't support `create table if not exists`. - bq.query(QueryJobConfiguration.newBuilder( - "CREATE TABLE IF NOT EXISTS " + getStateTableName() + " (" - + DESTINATION_STATE_TABLE_COLUMN_NAME + " STRING, " - + DESTINATION_STATE_TABLE_COLUMN_NAMESPACE + " STRING, " - + DESTINATION_STATE_TABLE_COLUMN_STATE + " JSON, " - + DESTINATION_STATE_TABLE_COLUMN_UPDATED_AT + " TIMESTAMP)").build()); - - Map destinationStates = StreamSupport.stream( - bq.query(QueryJobConfiguration.newBuilder( - "SELECT * FROM " + getStateTableName()).build()).iterateAll().spliterator(), - false).collect( - toMap( - fvList -> { - final FieldValue nameFieldValue = fvList.get(DESTINATION_STATE_TABLE_COLUMN_NAME); - final FieldValue namespaceFieldValue = fvList.get(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE); - return new AirbyteStreamNameNamespacePair( - nameFieldValue.isNull() ? null : nameFieldValue.getStringValue(), - namespaceFieldValue.isNull() ? null : namespaceFieldValue.getStringValue()); - }, - fvList -> { - JsonNode json = Jsons.deserialize(fvList.get(DESTINATION_STATE_TABLE_COLUMN_STATE).getStringValue()); - return toBigqueryState(json); - })); - - final List> initialStates = new ArrayList<>(); + public List> gatherInitialState(List streamConfigs) throws Exception { + final List> initialStates = new ArrayList<>(); for (final StreamConfig streamConfig : streamConfigs) { final StreamId id = streamConfig.id(); final Optional finalTable = findExistingTable(id); final InitialRawTableState rawTableState = getInitialRawTableState(id); - final BigqueryState bigqueryState = destinationStates.getOrDefault(streamConfig.id().asPair(), toBigqueryState(Jsons.emptyObject())); initialStates.add(new DestinationInitialState<>( streamConfig, finalTable.isPresent(), rawTableState, finalTable.isPresent() && !existingSchemaMatchesStreamConfig(streamConfig, finalTable.get()), !finalTable.isPresent() || isFinalTableEmpty(id), - bigqueryState)); + // Return a default state blob since we don't actually track state. + new MinimumDestinationState.Impl(false))); } return initialStates; } @Override - public void commitDestinationStates(Map destinationStates) throws Exception { - if (destinationStates.isEmpty()) { - return; - } - - final String deleteStates = "DELETE FROM " + getStateTableName() + " WHERE " - + destinationStates.keySet().stream() - .map(streamId -> String.format("(%s = ? AND %s = ?)", - DESTINATION_STATE_TABLE_COLUMN_NAME, - DESTINATION_STATE_TABLE_COLUMN_NAMESPACE)) - .collect(Collectors.joining(" OR ")); - final QueryJobConfiguration.Builder deleteQueryConfig = QueryJobConfiguration.newBuilder(deleteStates); - destinationStates.forEach((key, value) -> { - deleteQueryConfig.addPositionalParameter(QueryParameterValue.string(key.originalName())); - deleteQueryConfig.addPositionalParameter(QueryParameterValue.string(key.originalNamespace())); - }); - bq.query(deleteQueryConfig.build()); - - final String insertStates = "INSERT INTO " + getStateTableName() + " (" - + DESTINATION_STATE_TABLE_COLUMN_NAME + ", " - + DESTINATION_STATE_TABLE_COLUMN_NAMESPACE + ", " - + DESTINATION_STATE_TABLE_COLUMN_STATE + ", " - + DESTINATION_STATE_TABLE_COLUMN_UPDATED_AT + ") VALUES " - + destinationStates.keySet().stream() - .map(streamId -> "(?, ?, ?, CURRENT_TIMESTAMP)") - .collect(Collectors.joining(", ")); - final QueryJobConfiguration.Builder insertQueryConfig = QueryJobConfiguration.newBuilder(insertStates); - destinationStates.forEach((key, value) -> { - insertQueryConfig.addPositionalParameter(QueryParameterValue.string(key.originalName())); - insertQueryConfig.addPositionalParameter(QueryParameterValue.string(key.originalNamespace())); - insertQueryConfig.addPositionalParameter(QueryParameterValue.json(Jsons.serialize(value))); - }); - bq.query(insertQueryConfig.build()); + public void commitDestinationStates(Map destinationStates) throws Exception { + // Intentionally do nothing. Bigquery doesn't actually support destination states. } private boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, @@ -390,15 +319,4 @@ private static Set getPks(final StreamConfig stream) { return stream.primaryKey() != null ? stream.primaryKey().stream().map(ColumnId::name).collect(Collectors.toSet()) : Collections.emptySet(); } - @NotNull - private String getStateTableName() { - return QUOTE + rawTableDataset + QUOTE + "." + DESTINATION_STATE_TABLE_NAME; - } - - @NotNull - private static BigqueryState toBigqueryState(JsonNode json) { - return new BigqueryState( - json.hasNonNull("needsSoftReset") && json.get("needsSoftReset").asBoolean()); - } - } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigqueryState.kt b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigqueryState.kt deleted file mode 100644 index 3d9ce03f6c5c..000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigqueryState.kt +++ /dev/null @@ -1,13 +0,0 @@ -package io.airbyte.integrations.destination.bigquery.typing_deduping - -import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState - -data class BigqueryState(val needsSoftReset: Boolean): MinimumDestinationState { - override fun needsSoftReset(): Boolean { - return needsSoftReset - } - - override fun withSoftReset(needsSoftReset: Boolean): T { - return copy(needsSoftReset = needsSoftReset) as T - } -} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java index 3dd7f41114bc..8e1b3cce918e 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java @@ -33,6 +33,7 @@ import io.airbyte.integrations.base.destination.typing_deduping.Sql; import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; +import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState; import io.airbyte.integrations.destination.bigquery.BigQueryConsts; import io.airbyte.integrations.destination.bigquery.BigQueryDestination; import io.airbyte.protocol.models.v0.DestinationSyncMode; @@ -56,7 +57,7 @@ import org.slf4j.LoggerFactory; @Execution(ExecutionMode.CONCURRENT) -public class BigQuerySqlGeneratorIntegrationTest extends BaseSqlGeneratorIntegrationTest { +public class BigQuerySqlGeneratorIntegrationTest extends BaseSqlGeneratorIntegrationTest { private static final Logger LOGGER = LoggerFactory.getLogger(BigQuerySqlGeneratorIntegrationTest.class); @@ -81,7 +82,7 @@ protected BigQuerySqlGenerator getSqlGenerator() { @Override protected BigQueryDestinationHandler getDestinationHandler() { - return new BigQueryDestinationHandler(bq, "US", namespace); + return new BigQueryDestinationHandler(bq, "US"); } @Override @@ -362,7 +363,7 @@ public void testCreateTableIncremental() throws Exception { @Test public void testCreateTableInOtherRegion() throws InterruptedException { - final BigQueryDestinationHandler destinationHandler = new BigQueryDestinationHandler(bq, "asia-east1", namespace); + final BigQueryDestinationHandler destinationHandler = new BigQueryDestinationHandler(bq, "asia-east1"); // We're creating the dataset in the wrong location in the @BeforeEach block. Explicitly delete it. bq.getDataset(namespace).delete(); final var sqlGenerator = new BigQuerySqlGenerator(projectId, "asia-east1"); From c13c74040a98fd5f93766f7da8ee51d944117f7c Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 26 Feb 2024 11:36:13 -0800 Subject: [PATCH 07/10] disable test --- .../BigQuerySqlGeneratorIntegrationTest.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java index 8e1b3cce918e..23bfeedc1b1a 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java @@ -429,6 +429,16 @@ public void noCrashOnSpecialCharacters(final String specialChars) throws Excepti super.noCrashOnSpecialCharacters(specialChars); } + /** + * Bigquery doesn't handle frequent INSERT/DELETE statements on a single table very well. + * So we don't have real state handling. Disable this test. + */ + @Override + @Disabled + @Test + public void testStateHandling() throws Exception { + super.testStateHandling(); + } /** * TableResult contains records in a somewhat nonintuitive format (and it avoids loading them all * into memory). That's annoying for us since we're working with small test data, so just pull From b73ecdd0019f2ee94538886f5e59840c948d4e7a Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 26 Feb 2024 11:37:53 -0800 Subject: [PATCH 08/10] format --- .../destination/bigquery/BigQueryDestination.java | 3 +-- .../typing_deduping/BigQuerySqlGeneratorIntegrationTest.java | 5 +++-- .../typing_deduping/BigqueryDestinationHandlerTest.java | 4 ++++ 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java index f184e69043f0..7f33ebafcca3 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -450,8 +450,7 @@ private TyperDeduper buildTyperDeduper(final BigQuerySqlGenerator sqlGenerator, final BigQueryV2TableMigrator v2RawTableMigrator = new BigQueryV2TableMigrator(bigquery); final BigQueryDestinationHandler destinationHandler = new BigQueryDestinationHandler( bigquery, - datasetLocation - ); + datasetLocation); if (disableTypeDedupe) { return new NoOpTyperDeduperWithV1V2Migrations<>( diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java index 23bfeedc1b1a..a303a176d38c 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java @@ -430,8 +430,8 @@ public void noCrashOnSpecialCharacters(final String specialChars) throws Excepti } /** - * Bigquery doesn't handle frequent INSERT/DELETE statements on a single table very well. - * So we don't have real state handling. Disable this test. + * Bigquery doesn't handle frequent INSERT/DELETE statements on a single table very well. So we + * don't have real state handling. Disable this test. */ @Override @Disabled @@ -439,6 +439,7 @@ public void noCrashOnSpecialCharacters(final String specialChars) throws Excepti public void testStateHandling() throws Exception { super.testStateHandling(); } + /** * TableResult contains records in a somewhat nonintuitive format (and it avoids loading them all * into memory). That's annoying for us since we're working with small test data, so just pull diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigqueryDestinationHandlerTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigqueryDestinationHandlerTest.java index d2d8fdec0ec1..7a2d6184945d 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigqueryDestinationHandlerTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigqueryDestinationHandlerTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.destination.bigquery.typing_deduping; import static org.junit.jupiter.api.Assertions.assertEquals; From c001be8246ef401ce95ca9f397b7b606390e5098 Mon Sep 17 00:00:00 2001 From: Gireesh Sreepathi Date: Thu, 29 Feb 2024 17:25:43 -0800 Subject: [PATCH 09/10] fix compilation post rebase --- .../bigquery/BigQueryDestination.java | 2 +- .../BigQueryStagingConsumerFactory.java | 2 +- .../BigQueryDestinationHandler.java | 25 ++++++++++--------- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java index 7f33ebafcca3..f2b11b35247b 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -374,7 +374,7 @@ private SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuer return new BigQueryRecordStandardConsumer( outputRecordCollector, () -> { - typerDeduper.prepareSchemasAndRawTables(); + typerDeduper.prepareSchemasAndRunMigrations(); // Set up our raw tables writeConfigs.get().forEach((streamId, uploader) -> { diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java index 036045380e8f..5f40d71c4815 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java @@ -135,7 +135,7 @@ private OnStartFunction onStartFunction(final BigQueryStagingOperations bigQuery final TyperDeduper typerDeduper) { return () -> { LOGGER.info("Preparing airbyte_raw tables in destination started for {} streams", writeConfigs.size()); - typerDeduper.prepareSchemasAndRawTables(); + typerDeduper.prepareSchemasAndRunMigrations(); for (final BigQueryWriteConfig writeConfig : writeConfigs.values()) { LOGGER.info("Preparing staging are in destination for schema: {}, stream: {}, target table: {}, stage: {}", diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java index 697d90b47d5c..111894b62967 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java @@ -36,13 +36,14 @@ import io.airbyte.integrations.base.destination.typing_deduping.AlterTableReport; import io.airbyte.integrations.base.destination.typing_deduping.ColumnId; import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler; -import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialState; -import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableState; +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus; +import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableStatus; import io.airbyte.integrations.base.destination.typing_deduping.Sql; import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; import io.airbyte.integrations.base.destination.typing_deduping.TableNotMigratedException; import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState; +import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState.Impl; import java.math.BigInteger; import java.util.ArrayList; import java.util.Collection; @@ -82,11 +83,11 @@ public boolean isFinalTableEmpty(final StreamId id) { return BigInteger.ZERO.equals(bq.getTable(TableId.of(id.finalNamespace(), id.finalName())).getNumRows()); } - public InitialRawTableState getInitialRawTableState(final StreamId id) throws Exception { + public InitialRawTableStatus getInitialRawTableState(final StreamId id) throws Exception { final Table rawTable = bq.getTable(TableId.of(id.rawNamespace(), id.rawName())); if (rawTable == null) { // Table doesn't exist. There are no unprocessed records, and no timestamp. - return new InitialRawTableState(false, false, Optional.empty()); + return new InitialRawTableStatus(false, false, Optional.empty()); } final FieldValue unloadedRecordTimestamp = bq.query(QueryJobConfiguration.newBuilder(new StringSubstitutor(Map.of( @@ -102,7 +103,7 @@ SELECT TIMESTAMP_SUB(MIN(_airbyte_extracted_at), INTERVAL 1 MICROSECOND) // If it's not null, then we can return immediately - we've found some unprocessed records and their // timestamp. if (!unloadedRecordTimestamp.isNull()) { - return new InitialRawTableState(true, true, Optional.of(unloadedRecordTimestamp.getTimestampInstant())); + return new InitialRawTableStatus(true, true, Optional.of(unloadedRecordTimestamp.getTimestampInstant())); } final FieldValue loadedRecordTimestamp = bq.query(QueryJobConfiguration.newBuilder(new StringSubstitutor(Map.of( @@ -116,10 +117,10 @@ SELECT MAX(_airbyte_extracted_at) // So we just need to get the timestamp of the most recent record. if (loadedRecordTimestamp.isNull()) { // Null timestamp because the table is empty. T+D can process the entire raw table during this sync. - return new InitialRawTableState(true, false, Optional.empty()); + return new InitialRawTableStatus(true, false, Optional.empty()); } else { // The raw table already has some records. T+D can skip all records with timestamp <= this value. - return new InitialRawTableState(true, false, Optional.of(loadedRecordTimestamp.getTimestampInstant())); + return new InitialRawTableStatus(true, false, Optional.of(loadedRecordTimestamp.getTimestampInstant())); } } @@ -191,18 +192,18 @@ public void execute(final Sql sql) throws InterruptedException { } @Override - public List> gatherInitialState(List streamConfigs) throws Exception { - final List> initialStates = new ArrayList<>(); + public List> gatherInitialState(List streamConfigs) throws Exception { + final List> initialStates = new ArrayList<>(); for (final StreamConfig streamConfig : streamConfigs) { final StreamId id = streamConfig.id(); final Optional finalTable = findExistingTable(id); - final InitialRawTableState rawTableState = getInitialRawTableState(id); - initialStates.add(new DestinationInitialState<>( + final InitialRawTableStatus rawTableState = getInitialRawTableState(id); + initialStates.add(new DestinationInitialStatus<>( streamConfig, finalTable.isPresent(), rawTableState, finalTable.isPresent() && !existingSchemaMatchesStreamConfig(streamConfig, finalTable.get()), - !finalTable.isPresent() || isFinalTableEmpty(id), + finalTable.isEmpty() || isFinalTableEmpty(id), // Return a default state blob since we don't actually track state. new MinimumDestinationState.Impl(false))); } From 64fb4cb796deee80380d5a919048196d2d6c5fd5 Mon Sep 17 00:00:00 2001 From: Gireesh Sreepathi Date: Mon, 4 Mar 2024 11:09:56 -0800 Subject: [PATCH 10/10] version logistics Signed-off-by: Gireesh Sreepathi --- .../connectors/destination-bigquery/build.gradle | 4 ++-- .../connectors/destination-bigquery/metadata.yaml | 2 +- docs/integrations/destinations/bigquery.md | 15 ++++++++------- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/airbyte-integrations/connectors/destination-bigquery/build.gradle b/airbyte-integrations/connectors/destination-bigquery/build.gradle index 113d101821bb..14da5852eef0 100644 --- a/airbyte-integrations/connectors/destination-bigquery/build.gradle +++ b/airbyte-integrations/connectors/destination-bigquery/build.gradle @@ -4,14 +4,14 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.20.9' + cdkVersionRequired = '0.23.11' features = [ 'db-destinations', 'datastore-bigquery', 'typing-deduping', 'gcs-destinations', ] - useLocalCdk = true + useLocalCdk = false } java { diff --git a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml index e3e73d6a5c98..ee0aecd9a385 100644 --- a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml +++ b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 - dockerImageTag: 2.4.11 + dockerImageTag: 2.4.12 dockerRepository: airbyte/destination-bigquery documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery githubIssueLabel: destination-bigquery diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index 7f475376d592..e02e203bce07 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -210,13 +210,14 @@ tutorials: | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 2.4.11 | 2024-02-22 | [35569](https://github.com/airbytehq/airbyte/pull/35569) | Fix logging bug. | -| 2.4.10 | 2024-02-15 | [35240](https://github.com/airbytehq/airbyte/pull/35240) | Adopt CDK 0.20.9 | -| 2.4.9 | 2024-02-15 | [35285](https://github.com/airbytehq/airbyte/pull/35285) | Adopt CDK 0.20.8 | -| 2.4.8 | 2024-02-12 | [35144](https://github.com/airbytehq/airbyte/pull/35144) | Adopt CDK 0.20.2 | -| 2.4.7 | 2024-02-12 | [35111](https://github.com/airbytehq/airbyte/pull/35111) | Adopt CDK 0.20.1 | -| 2.4.6 | 2024-02-09 | [34575](https://github.com/airbytehq/airbyte/pull/34575) | Adopt CDK 0.20.0 | -| 2.4.5 | 2024-02-08 | [34745](https://github.com/airbytehq/airbyte/pull/34745) | Adopt CDK 0.19.0 | +| 2.4.12 | 2024-03-04 | [35315](https://github.com/airbytehq/airbyte/pull/35315) | Adopt CDK 0.23.11 | +| 2.4.11 | 2024-02-22 | [35569](https://github.com/airbytehq/airbyte/pull/35569) | Fix logging bug. | +| 2.4.10 | 2024-02-15 | [35240](https://github.com/airbytehq/airbyte/pull/35240) | Adopt CDK 0.20.9 | +| 2.4.9 | 2024-02-15 | [35285](https://github.com/airbytehq/airbyte/pull/35285) | Adopt CDK 0.20.8 | +| 2.4.8 | 2024-02-12 | [35144](https://github.com/airbytehq/airbyte/pull/35144) | Adopt CDK 0.20.2 | +| 2.4.7 | 2024-02-12 | [35111](https://github.com/airbytehq/airbyte/pull/35111) | Adopt CDK 0.20.1 | +| 2.4.6 | 2024-02-09 | [34575](https://github.com/airbytehq/airbyte/pull/34575) | Adopt CDK 0.20.0 | +| 2.4.5 | 2024-02-08 | [34745](https://github.com/airbytehq/airbyte/pull/34745) | Adopt CDK 0.19.0 | | 2.4.4 | 2024-02-08 | [35027](https://github.com/airbytehq/airbyte/pull/35027) | Upgrade CDK to 0.17.1 | | 2.4.3 | 2024-02-01 | [34728](https://github.com/airbytehq/airbyte/pull/34728) | Upgrade CDK to 0.16.4; Notable changes from 0.14.2, 0.15.1 and 0.16.3 | | 2.4.2 | 2024-01-24 | [34451](https://github.com/airbytehq/airbyte/pull/34451) | Improve logging for unparseable input |