diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java index 7f33ebafcca3..f2b11b35247b 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -374,7 +374,7 @@ private SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuer return new BigQueryRecordStandardConsumer( outputRecordCollector, () -> { - typerDeduper.prepareSchemasAndRawTables(); + typerDeduper.prepareSchemasAndRunMigrations(); // Set up our raw tables writeConfigs.get().forEach((streamId, uploader) -> { diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java index 036045380e8f..5f40d71c4815 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java @@ -135,7 +135,7 @@ private OnStartFunction onStartFunction(final BigQueryStagingOperations bigQuery final TyperDeduper typerDeduper) { return () -> { LOGGER.info("Preparing airbyte_raw tables in destination started for {} streams", writeConfigs.size()); - typerDeduper.prepareSchemasAndRawTables(); + typerDeduper.prepareSchemasAndRunMigrations(); for (final BigQueryWriteConfig writeConfig : writeConfigs.values()) { LOGGER.info("Preparing staging are in destination for schema: {}, stream: {}, target table: {}, stage: {}", diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java index 697d90b47d5c..111894b62967 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java @@ -36,13 +36,14 @@ import io.airbyte.integrations.base.destination.typing_deduping.AlterTableReport; import io.airbyte.integrations.base.destination.typing_deduping.ColumnId; import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler; -import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialState; -import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableState; +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus; +import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableStatus; import io.airbyte.integrations.base.destination.typing_deduping.Sql; import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; import io.airbyte.integrations.base.destination.typing_deduping.TableNotMigratedException; import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState; +import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState.Impl; import java.math.BigInteger; import java.util.ArrayList; import java.util.Collection; @@ -82,11 +83,11 @@ public boolean isFinalTableEmpty(final StreamId id) { return BigInteger.ZERO.equals(bq.getTable(TableId.of(id.finalNamespace(), id.finalName())).getNumRows()); } - public InitialRawTableState getInitialRawTableState(final StreamId id) throws Exception { + public InitialRawTableStatus getInitialRawTableState(final StreamId id) throws Exception { final Table rawTable = bq.getTable(TableId.of(id.rawNamespace(), id.rawName())); if (rawTable == null) { // Table doesn't exist. There are no unprocessed records, and no timestamp. - return new InitialRawTableState(false, false, Optional.empty()); + return new InitialRawTableStatus(false, false, Optional.empty()); } final FieldValue unloadedRecordTimestamp = bq.query(QueryJobConfiguration.newBuilder(new StringSubstitutor(Map.of( @@ -102,7 +103,7 @@ SELECT TIMESTAMP_SUB(MIN(_airbyte_extracted_at), INTERVAL 1 MICROSECOND) // If it's not null, then we can return immediately - we've found some unprocessed records and their // timestamp. if (!unloadedRecordTimestamp.isNull()) { - return new InitialRawTableState(true, true, Optional.of(unloadedRecordTimestamp.getTimestampInstant())); + return new InitialRawTableStatus(true, true, Optional.of(unloadedRecordTimestamp.getTimestampInstant())); } final FieldValue loadedRecordTimestamp = bq.query(QueryJobConfiguration.newBuilder(new StringSubstitutor(Map.of( @@ -116,10 +117,10 @@ SELECT MAX(_airbyte_extracted_at) // So we just need to get the timestamp of the most recent record. if (loadedRecordTimestamp.isNull()) { // Null timestamp because the table is empty. T+D can process the entire raw table during this sync. - return new InitialRawTableState(true, false, Optional.empty()); + return new InitialRawTableStatus(true, false, Optional.empty()); } else { // The raw table already has some records. T+D can skip all records with timestamp <= this value. - return new InitialRawTableState(true, false, Optional.of(loadedRecordTimestamp.getTimestampInstant())); + return new InitialRawTableStatus(true, false, Optional.of(loadedRecordTimestamp.getTimestampInstant())); } } @@ -191,18 +192,18 @@ public void execute(final Sql sql) throws InterruptedException { } @Override - public List> gatherInitialState(List streamConfigs) throws Exception { - final List> initialStates = new ArrayList<>(); + public List> gatherInitialState(List streamConfigs) throws Exception { + final List> initialStates = new ArrayList<>(); for (final StreamConfig streamConfig : streamConfigs) { final StreamId id = streamConfig.id(); final Optional finalTable = findExistingTable(id); - final InitialRawTableState rawTableState = getInitialRawTableState(id); - initialStates.add(new DestinationInitialState<>( + final InitialRawTableStatus rawTableState = getInitialRawTableState(id); + initialStates.add(new DestinationInitialStatus<>( streamConfig, finalTable.isPresent(), rawTableState, finalTable.isPresent() && !existingSchemaMatchesStreamConfig(streamConfig, finalTable.get()), - !finalTable.isPresent() || isFinalTableEmpty(id), + finalTable.isEmpty() || isFinalTableEmpty(id), // Return a default state blob since we don't actually track state. new MinimumDestinationState.Impl(false))); }