Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination bigquery: upgrade cdk #35315

Merged
merged 10 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
plugins {
id 'airbyte-java-connector'
id 'org.jetbrains.kotlin.jvm' version '1.9.22'
}

airbyteJavaConnector {
cdkVersionRequired = '0.20.9'
cdkVersionRequired = '0.23.11'
features = [
'db-destinations',
'datastore-bigquery',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerImageTag: 2.4.11
dockerImageTag: 2.4.12
dockerRepository: airbyte/destination-bigquery
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
githubIssueLabel: destination-bigquery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -233,9 +234,11 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
final boolean disableTypeDedupe = BigQueryUtils.getDisableTypeDedupFlag(config);
final String datasetLocation = BigQueryUtils.getDatasetLocation(config);
final BigQuerySqlGenerator sqlGenerator = new BigQuerySqlGenerator(config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(), datasetLocation);
final ParsedCatalog parsedCatalog = parseCatalog(config, catalog, datasetLocation);
final Optional<String> rawNamespaceOverride = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_DATA_DATASET);
final ParsedCatalog parsedCatalog = parseCatalog(config, catalog, datasetLocation, rawNamespaceOverride);
final BigQuery bigquery = getBigQuery(config);
final TyperDeduper typerDeduper = buildTyperDeduper(sqlGenerator, parsedCatalog, bigquery, datasetLocation, disableTypeDedupe);
final TyperDeduper typerDeduper =
buildTyperDeduper(sqlGenerator, parsedCatalog, bigquery, datasetLocation, disableTypeDedupe);

AirbyteExceptionHandler.addAllStringsInConfigForDeinterpolation(config);
final JsonNode serviceAccountKey = config.get(BigQueryConsts.CONFIG_CREDS);
Expand Down Expand Up @@ -360,7 +363,6 @@ private SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuer
final Consumer<AirbyteMessage> outputRecordCollector,
final TyperDeduper typerDeduper)
throws Exception {
typerDeduper.prepareTables();
final Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, AbstractBigQueryUploader<?>>> writeConfigs = getUploaderMap(
bigquery,
config,
Expand All @@ -372,6 +374,8 @@ private SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuer
return new BigQueryRecordStandardConsumer(
outputRecordCollector,
() -> {
typerDeduper.prepareSchemasAndRunMigrations();

// Set up our raw tables
writeConfigs.get().forEach((streamId, uploader) -> {
final StreamConfig stream = parsedCatalog.getStream(streamId);
Expand All @@ -390,6 +394,8 @@ private SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuer
uploader.createRawTable();
}
});

typerDeduper.prepareFinalTables();
},
(hasFailed, streamSyncSummaries) -> {
try {
Expand Down Expand Up @@ -424,11 +430,13 @@ private void setDefaultStreamNamespace(final ConfiguredAirbyteCatalog catalog, f
}
}

private ParsedCatalog parseCatalog(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final String datasetLocation) {
private ParsedCatalog parseCatalog(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final String datasetLocation,
final Optional<String> rawNamespaceOverride) {
final BigQuerySqlGenerator sqlGenerator = new BigQuerySqlGenerator(config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(), datasetLocation);
final CatalogParser catalogParser = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_DATA_DATASET).isPresent()
? new CatalogParser(sqlGenerator, TypingAndDedupingFlag.getRawNamespaceOverride(RAW_DATA_DATASET).get())
: new CatalogParser(sqlGenerator);
final CatalogParser catalogParser = rawNamespaceOverride.map(s -> new CatalogParser(sqlGenerator, s))
.orElseGet(() -> new CatalogParser(sqlGenerator));

return catalogParser.parseCatalog(catalog);
}
Expand All @@ -440,11 +448,13 @@ private TyperDeduper buildTyperDeduper(final BigQuerySqlGenerator sqlGenerator,
final boolean disableTypeDedupe) {
final BigQueryV1V2Migrator migrator = new BigQueryV1V2Migrator(bigquery, namingResolver);
final BigQueryV2TableMigrator v2RawTableMigrator = new BigQueryV2TableMigrator(bigquery);
final BigQueryDestinationHandler destinationHandler = new BigQueryDestinationHandler(bigquery, datasetLocation);
final BigQueryDestinationHandler destinationHandler = new BigQueryDestinationHandler(
bigquery,
datasetLocation);

if (disableTypeDedupe) {
return new NoOpTyperDeduperWithV1V2Migrations<>(
sqlGenerator, destinationHandler, parsedCatalog, migrator, v2RawTableMigrator, 8);
sqlGenerator, destinationHandler, parsedCatalog, migrator, v2RawTableMigrator, List.of());
}

return new DefaultTyperDeduper<>(
Expand All @@ -453,8 +463,7 @@ private TyperDeduper buildTyperDeduper(final BigQuerySqlGenerator sqlGenerator,
parsedCatalog,
migrator,
v2RawTableMigrator,
8);

List.of());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ private OnStartFunction onStartFunction(final BigQueryStagingOperations bigQuery
final TyperDeduper typerDeduper) {
return () -> {
LOGGER.info("Preparing airbyte_raw tables in destination started for {} streams", writeConfigs.size());
typerDeduper.prepareTables();
typerDeduper.prepareSchemasAndRunMigrations();

for (final BigQueryWriteConfig writeConfig : writeConfigs.values()) {
LOGGER.info("Preparing staging are in destination for schema: {}, stream: {}, target table: {}, stage: {}",
writeConfig.tableSchema(), writeConfig.streamName(), writeConfig.targetTableId(), writeConfig.streamName());
Expand All @@ -156,6 +157,8 @@ private OnStartFunction onStartFunction(final BigQueryStagingOperations bigQuery
bigQueryGcsOperations.truncateTableIfExists(rawDatasetId, writeConfig.targetTableId(), writeConfig.tableSchema());
}
}

typerDeduper.prepareFinalTables();
LOGGER.info("Preparing tables in destination completed.");
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,17 @@

package io.airbyte.integrations.destination.bigquery.typing_deduping;

import static io.airbyte.integrations.base.destination.typing_deduping.CollectionUtils.containsAllIgnoreCase;
import static io.airbyte.integrations.base.destination.typing_deduping.CollectionUtils.containsIgnoreCase;
import static io.airbyte.integrations.base.destination.typing_deduping.CollectionUtils.matchingKey;
import static io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator.QUOTE;
import static io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator.clusteringColumns;
import static io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator.toDialectType;
import static java.util.stream.Collectors.toMap;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldValue;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobConfiguration;
Expand All @@ -14,28 +23,46 @@
import com.google.cloud.bigquery.JobStatistics;
import com.google.cloud.bigquery.JobStatus;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Streams;
import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.destination.typing_deduping.AlterTableReport;
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus;
import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableStatus;
import io.airbyte.integrations.base.destination.typing_deduping.Sql;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.integrations.base.destination.typing_deduping.TableNotMigratedException;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState.Impl;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.text.StringSubstitutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// TODO this stuff almost definitely exists somewhere else in our codebase.
public class BigQueryDestinationHandler implements DestinationHandler<TableDefinition> {
public class BigQueryDestinationHandler implements DestinationHandler<MinimumDestinationState.Impl> {

private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDestinationHandler.class);

Expand All @@ -47,32 +74,24 @@ public BigQueryDestinationHandler(final BigQuery bq, final String datasetLocatio
this.datasetLocation = datasetLocation;
}

@Override
public Optional<TableDefinition> findExistingTable(final StreamId id) {
final Table table = bq.getTable(id.finalNamespace(), id.finalName());
return Optional.ofNullable(table).map(Table::getDefinition);
}

@Override
public LinkedHashMap<String, TableDefinition> findExistingFinalTables(List<StreamId> streamIds) throws Exception {
return null;
}

@Override
public boolean isFinalTableEmpty(final StreamId id) {
return BigInteger.ZERO.equals(bq.getTable(TableId.of(id.finalNamespace(), id.finalName())).getNumRows());
}

@Override
public InitialRawTableState getInitialRawTableState(final StreamId id) throws Exception {
public InitialRawTableStatus getInitialRawTableState(final StreamId id) throws Exception {
final Table rawTable = bq.getTable(TableId.of(id.rawNamespace(), id.rawName()));
if (rawTable == null) {
// Table doesn't exist. There are no unprocessed records, and no timestamp.
return new InitialRawTableState(false, Optional.empty());
return new InitialRawTableStatus(false, false, Optional.empty());
}

final FieldValue unloadedRecordTimestamp = bq.query(QueryJobConfiguration.newBuilder(new StringSubstitutor(Map.of(
"raw_table", id.rawTableId(BigQuerySqlGenerator.QUOTE))).replace(
"raw_table", id.rawTableId(QUOTE))).replace(
// bigquery timestamps have microsecond precision
"""
SELECT TIMESTAMP_SUB(MIN(_airbyte_extracted_at), INTERVAL 1 MICROSECOND)
Expand All @@ -84,11 +103,11 @@ SELECT TIMESTAMP_SUB(MIN(_airbyte_extracted_at), INTERVAL 1 MICROSECOND)
// If it's not null, then we can return immediately - we've found some unprocessed records and their
// timestamp.
if (!unloadedRecordTimestamp.isNull()) {
return new InitialRawTableState(true, Optional.of(unloadedRecordTimestamp.getTimestampInstant()));
return new InitialRawTableStatus(true, true, Optional.of(unloadedRecordTimestamp.getTimestampInstant()));
}

final FieldValue loadedRecordTimestamp = bq.query(QueryJobConfiguration.newBuilder(new StringSubstitutor(Map.of(
"raw_table", id.rawTableId(BigQuerySqlGenerator.QUOTE))).replace(
"raw_table", id.rawTableId(QUOTE))).replace(
"""
SELECT MAX(_airbyte_extracted_at)
FROM ${raw_table}
Expand All @@ -98,10 +117,10 @@ SELECT MAX(_airbyte_extracted_at)
// So we just need to get the timestamp of the most recent record.
if (loadedRecordTimestamp.isNull()) {
// Null timestamp because the table is empty. T+D can process the entire raw table during this sync.
return new InitialRawTableState(false, Optional.empty());
return new InitialRawTableStatus(true, false, Optional.empty());
} else {
// The raw table already has some records. T+D can skip all records with timestamp <= this value.
return new InitialRawTableState(false, Optional.of(loadedRecordTimestamp.getTimestampInstant()));
return new InitialRawTableStatus(true, false, Optional.of(loadedRecordTimestamp.getTimestampInstant()));
}
}

Expand Down Expand Up @@ -172,4 +191,133 @@ public void execute(final Sql sql) throws InterruptedException {
}
}

@Override
public List<DestinationInitialStatus<Impl>> gatherInitialState(List<StreamConfig> streamConfigs) throws Exception {
final List<DestinationInitialStatus<MinimumDestinationState.Impl>> initialStates = new ArrayList<>();
for (final StreamConfig streamConfig : streamConfigs) {
final StreamId id = streamConfig.id();
final Optional<TableDefinition> finalTable = findExistingTable(id);
final InitialRawTableStatus rawTableState = getInitialRawTableState(id);
initialStates.add(new DestinationInitialStatus<>(
streamConfig,
finalTable.isPresent(),
rawTableState,
finalTable.isPresent() && !existingSchemaMatchesStreamConfig(streamConfig, finalTable.get()),
finalTable.isEmpty() || isFinalTableEmpty(id),
// Return a default state blob since we don't actually track state.
new MinimumDestinationState.Impl(false)));
}
return initialStates;
}

@Override
public void commitDestinationStates(Map<StreamId, MinimumDestinationState.Impl> destinationStates) throws Exception {
// Intentionally do nothing. Bigquery doesn't actually support destination states.
}

private boolean existingSchemaMatchesStreamConfig(final StreamConfig stream,
final TableDefinition existingTable)
throws TableNotMigratedException {
final var alterTableReport = buildAlterTableReport(stream, existingTable);
boolean tableClusteringMatches = false;
boolean tablePartitioningMatches = false;
if (existingTable instanceof final StandardTableDefinition standardExistingTable) {
tableClusteringMatches = clusteringMatches(stream, standardExistingTable);
tablePartitioningMatches = partitioningMatches(standardExistingTable);
}
LOGGER.info("Alter Table Report {} {} {}; Clustering {}; Partitioning {}",
alterTableReport.columnsToAdd(),
alterTableReport.columnsToRemove(),
alterTableReport.columnsToChangeType(),
tableClusteringMatches,
tablePartitioningMatches);

return alterTableReport.isNoOp() && tableClusteringMatches && tablePartitioningMatches;
}

public AlterTableReport buildAlterTableReport(final StreamConfig stream, final TableDefinition existingTable) {
final Set<String> pks = getPks(stream);

final Map<String, StandardSQLTypeName> streamSchema = stream.columns().entrySet().stream()
.collect(toMap(
entry -> entry.getKey().name(),
entry -> toDialectType(entry.getValue())));

final Map<String, StandardSQLTypeName> existingSchema = existingTable.getSchema().getFields().stream()
.collect(toMap(
field -> field.getName(),
field -> field.getType().getStandardType()));

// Columns in the StreamConfig that don't exist in the TableDefinition
final Set<String> columnsToAdd = streamSchema.keySet().stream()
.filter(name -> !containsIgnoreCase(existingSchema.keySet(), name))
.collect(Collectors.toSet());

// Columns in the current schema that are no longer in the StreamConfig
final Set<String> columnsToRemove = existingSchema.keySet().stream()
.filter(name -> !containsIgnoreCase(streamSchema.keySet(), name) && !containsIgnoreCase(
JavaBaseConstants.V2_FINAL_TABLE_METADATA_COLUMNS, name))
.collect(Collectors.toSet());

// Columns that are typed differently than the StreamConfig
final Set<String> columnsToChangeType = Stream.concat(
streamSchema.keySet().stream()
// If it's not in the existing schema, it should already be in the columnsToAdd Set
.filter(name -> {
// Big Query Columns are case-insensitive, first find the correctly cased key if it exists
return matchingKey(existingSchema.keySet(), name)
// if it does exist, only include it in this set if the type (the value in each respective map)
// is different between the stream and existing schemas
.map(key -> !existingSchema.get(key).equals(streamSchema.get(name)))
// if there is no matching key, then don't include it because it is probably already in columnsToAdd
.orElse(false);
}),

// OR columns that used to have a non-null constraint and shouldn't
// (https://github.com/airbytehq/airbyte/pull/31082)
existingTable.getSchema().getFields().stream()
.filter(field -> pks.contains(field.getName()))
.filter(field -> field.getMode() == Field.Mode.REQUIRED)
.map(Field::getName))
.collect(Collectors.toSet());

final boolean isDestinationV2Format = schemaContainAllFinalTableV2AirbyteColumns(existingSchema.keySet());

return new AlterTableReport(columnsToAdd, columnsToRemove, columnsToChangeType, isDestinationV2Format);
}

@VisibleForTesting
public static boolean clusteringMatches(final StreamConfig stream, final StandardTableDefinition existingTable) {
return existingTable.getClustering() != null
&& containsAllIgnoreCase(
new HashSet<>(existingTable.getClustering().getFields()),
clusteringColumns(stream));
}

@VisibleForTesting
public static boolean partitioningMatches(final StandardTableDefinition existingTable) {
return existingTable.getTimePartitioning() != null
&& existingTable.getTimePartitioning()
.getField()
.equalsIgnoreCase("_airbyte_extracted_at")
&& TimePartitioning.Type.DAY.equals(existingTable.getTimePartitioning().getType());
}

/**
* Checks the schema to determine whether the table contains all expected final table airbyte
* columns
*
* @param columnNames the column names of the schema to check
* @return whether all the {@link JavaBaseConstants#V2_FINAL_TABLE_METADATA_COLUMNS} are present
*/
@VisibleForTesting
public static boolean schemaContainAllFinalTableV2AirbyteColumns(final Collection<String> columnNames) {
return JavaBaseConstants.V2_FINAL_TABLE_METADATA_COLUMNS.stream()
.allMatch(column -> containsIgnoreCase(columnNames, column));
}

private static Set<String> getPks(final StreamConfig stream) {
return stream.primaryKey() != null ? stream.primaryKey().stream().map(ColumnId::name).collect(Collectors.toSet()) : Collections.emptySet();
}

}
Loading
Loading