Skip to content

Commit

Permalink
fix compilation post rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
gisripa committed Mar 1, 2024
1 parent 7c68992 commit ce77de0
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ private SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuer
return new BigQueryRecordStandardConsumer(
outputRecordCollector,
() -> {
typerDeduper.prepareSchemasAndRawTables();
typerDeduper.prepareSchemasAndRunMigrations();

// Set up our raw tables
writeConfigs.get().forEach((streamId, uploader) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ private OnStartFunction onStartFunction(final BigQueryStagingOperations bigQuery
final TyperDeduper typerDeduper) {
return () -> {
LOGGER.info("Preparing airbyte_raw tables in destination started for {} streams", writeConfigs.size());
typerDeduper.prepareSchemasAndRawTables();
typerDeduper.prepareSchemasAndRunMigrations();

for (final BigQueryWriteConfig writeConfig : writeConfigs.values()) {
LOGGER.info("Preparing staging are in destination for schema: {}, stream: {}, target table: {}, stage: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@
import io.airbyte.integrations.base.destination.typing_deduping.AlterTableReport;
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialState;
import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableState;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus;
import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableStatus;
import io.airbyte.integrations.base.destination.typing_deduping.Sql;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.integrations.base.destination.typing_deduping.TableNotMigratedException;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState.Impl;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -82,11 +83,11 @@ public boolean isFinalTableEmpty(final StreamId id) {
return BigInteger.ZERO.equals(bq.getTable(TableId.of(id.finalNamespace(), id.finalName())).getNumRows());
}

public InitialRawTableState getInitialRawTableState(final StreamId id) throws Exception {
public InitialRawTableStatus getInitialRawTableState(final StreamId id) throws Exception {
final Table rawTable = bq.getTable(TableId.of(id.rawNamespace(), id.rawName()));
if (rawTable == null) {
// Table doesn't exist. There are no unprocessed records, and no timestamp.
return new InitialRawTableState(false, false, Optional.empty());
return new InitialRawTableStatus(false, false, Optional.empty());
}

final FieldValue unloadedRecordTimestamp = bq.query(QueryJobConfiguration.newBuilder(new StringSubstitutor(Map.of(
Expand All @@ -102,7 +103,7 @@ SELECT TIMESTAMP_SUB(MIN(_airbyte_extracted_at), INTERVAL 1 MICROSECOND)
// If it's not null, then we can return immediately - we've found some unprocessed records and their
// timestamp.
if (!unloadedRecordTimestamp.isNull()) {
return new InitialRawTableState(true, true, Optional.of(unloadedRecordTimestamp.getTimestampInstant()));
return new InitialRawTableStatus(true, true, Optional.of(unloadedRecordTimestamp.getTimestampInstant()));
}

final FieldValue loadedRecordTimestamp = bq.query(QueryJobConfiguration.newBuilder(new StringSubstitutor(Map.of(
Expand All @@ -116,10 +117,10 @@ SELECT MAX(_airbyte_extracted_at)
// So we just need to get the timestamp of the most recent record.
if (loadedRecordTimestamp.isNull()) {
// Null timestamp because the table is empty. T+D can process the entire raw table during this sync.
return new InitialRawTableState(true, false, Optional.empty());
return new InitialRawTableStatus(true, false, Optional.empty());
} else {
// The raw table already has some records. T+D can skip all records with timestamp <= this value.
return new InitialRawTableState(true, false, Optional.of(loadedRecordTimestamp.getTimestampInstant()));
return new InitialRawTableStatus(true, false, Optional.of(loadedRecordTimestamp.getTimestampInstant()));
}
}

Expand Down Expand Up @@ -191,18 +192,18 @@ public void execute(final Sql sql) throws InterruptedException {
}

@Override
public List<DestinationInitialState<MinimumDestinationState.Impl>> gatherInitialState(List<StreamConfig> streamConfigs) throws Exception {
final List<DestinationInitialState<MinimumDestinationState.Impl>> initialStates = new ArrayList<>();
public List<DestinationInitialStatus<Impl>> gatherInitialState(List<StreamConfig> streamConfigs) throws Exception {
final List<DestinationInitialStatus<MinimumDestinationState.Impl>> initialStates = new ArrayList<>();
for (final StreamConfig streamConfig : streamConfigs) {
final StreamId id = streamConfig.id();
final Optional<TableDefinition> finalTable = findExistingTable(id);
final InitialRawTableState rawTableState = getInitialRawTableState(id);
initialStates.add(new DestinationInitialState<>(
final InitialRawTableStatus rawTableState = getInitialRawTableState(id);
initialStates.add(new DestinationInitialStatus<>(
streamConfig,
finalTable.isPresent(),
rawTableState,
finalTable.isPresent() && !existingSchemaMatchesStreamConfig(streamConfig, finalTable.get()),
!finalTable.isPresent() || isFinalTableEmpty(id),
finalTable.isEmpty() || isFinalTableEmpty(id),
// Return a default state blob since we don't actually track state.
new MinimumDestinationState.Impl(false)));
}
Expand Down

0 comments on commit ce77de0

Please sign in to comment.