Skip to content

Commit

Permalink
Merge branch 'master' into daryna/source-bing-ads/stream-budget-and-p…
Browse files Browse the repository at this point in the history
…roduct_dimension_performance_report
  • Loading branch information
darynaishchenko authored Feb 29, 2024
2 parents aaad088 + b7ae6c4 commit df4c4f8
Show file tree
Hide file tree
Showing 300 changed files with 5,208 additions and 3,430 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.23.7 | 2024-02-28 | [\#35376](https://github.com/airbytehq/airbyte/pull/35376) | Add a getNamespace into TestDataHolder |
| 0.23.6 | 2024-02-26 | [\#35647](https://github.com/airbytehq/airbyte/pull/35647) | Add a getNamespace into TestDataHolder |
| 0.23.5 | 2024-02-26 | [\#35512](https://github.com/airbytehq/airbyte/pull/35512) | Remove @DisplayName from all CDK tests. |
| 0.23.4 | 2024-02-26 | [\#35507](https://github.com/airbytehq/airbyte/pull/35507) | Add more logs into TestDatabase. |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.23.6
version=0.23.7
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ private static OnStartFunction onStartFunction(final JdbcDatabase database,
final Collection<WriteConfig> writeConfigs,
final TyperDeduper typerDeduper) {
return () -> {
typerDeduper.prepareTables();
typerDeduper.prepareSchemasAndRunMigrations();
LOGGER.info("Preparing raw tables in destination started for {} streams", writeConfigs.size());
final List<String> queryList = new ArrayList<>();
for (final WriteConfig writeConfig : writeConfigs) {
Expand All @@ -181,6 +181,7 @@ private static OnStartFunction onStartFunction(final JdbcDatabase database,
}
sqlOperations.executeTransaction(database, queryList);
LOGGER.info("Preparing raw tables in destination completed.");
typerDeduper.prepareFinalTables();
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ public static OnStartFunction onStartFunction(final JdbcDatabase database,
final TyperDeduper typerDeduper) {
return () -> {
log.info("Preparing raw tables in destination started for {} streams", writeConfigs.size());
typerDeduper.prepareTables();

typerDeduper.prepareSchemasAndRunMigrations();

// Create raw tables
final List<String> queryList = new ArrayList<>();
for (final WriteConfig writeConfig : writeConfigs) {
final String schema = writeConfig.getOutputSchemaName();
Expand Down Expand Up @@ -69,6 +72,9 @@ public static OnStartFunction onStartFunction(final JdbcDatabase database,

log.info("Preparing staging area in destination completed for schema {} stream {}", schema, stream);
}

typerDeduper.prepareFinalTables();

log.info("Executing finalization of tables.");
stagingOperations.executeTransaction(database, queryList);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import static io.airbyte.cdk.integrations.util.ConnectorExceptionUtil.getResultsOrLogAndThrowFirst;
import static io.airbyte.integrations.base.destination.typing_deduping.FutureUtils.*;
import static io.airbyte.integrations.base.destination.typing_deduping.FutureUtils.reduceExceptions;
import static io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtilKt.prepareAllSchemas;
import static java.util.Collections.singleton;

import io.airbyte.cdk.integrations.destination.StreamSyncSummary;
Expand Down Expand Up @@ -43,7 +42,7 @@
* <p>
* In a typical sync, destinations should call the methods:
* <ol>
* <li>{@link #prepareTables()} once at the start of the sync</li>
* <li>{@link #prepareFinalTables()} once at the start of the sync</li>
* <li>{@link #typeAndDedupe(String, String, boolean)} as needed throughout the sync</li>
* <li>{@link #commitFinalTables()} once at the end of the sync</li>
* </ol>
Expand Down Expand Up @@ -104,27 +103,23 @@ public DefaultTyperDeduper(final SqlGenerator sqlGenerator,
this(sqlGenerator, destinationHandler, parsedCatalog, v1V2Migrator, new NoopV2TableMigrator());
}

private void prepareSchemas(final ParsedCatalog parsedCatalog) throws Exception {
prepareAllSchemas(parsedCatalog, sqlGenerator, destinationHandler);
@Override
public void prepareSchemasAndRunMigrations() {
// Technically kind of weird to call this here, but it's the best place we have.
// Ideally, we'd create just airbyte_internal here, and defer creating the final table schemas
// until prepareFinalTables... but it doesn't really matter.
TyperDeduperUtil.prepareSchemas(sqlGenerator, destinationHandler, parsedCatalog);
TyperDeduperUtil.executeRawTableMigrations(executorService, sqlGenerator, destinationHandler, v1V2Migrator, v2TableMigrator, parsedCatalog);
}

@Override
public void prepareTables() throws Exception {
public void prepareFinalTables() throws Exception {
if (overwriteStreamsWithTmpTable != null) {
throw new IllegalStateException("Tables were already prepared.");
}
overwriteStreamsWithTmpTable = ConcurrentHashMap.newKeySet();
LOGGER.info("Preparing tables");

// This is intentionally not done in parallel to avoid rate limits in some destinations.
prepareSchemas(parsedCatalog);

// TODO: Either the migrations run the soft reset and create v2 tables or the actual prepare tables.
// unify the logic with current state of raw tables & final tables. This is done first before gather
// initial state to avoid recreating final tables later again.
final List<Either<? extends Exception, Void>> runMigrationsResult =
CompletableFutures.allOf(parsedCatalog.streams().stream().map(this::runMigrationsAsync).toList()).toCompletableFuture().join();
getResultsOrLogAndThrowFirst("The following exceptions were thrown attempting to run migrations:\n", runMigrationsResult);
final List<DestinationInitialState> initialStates = destinationHandler.gatherInitialState(parsedCatalog.streams());
final List<Either<? extends Exception, Void>> prepareTablesFutureResult = CompletableFutures.allOf(
initialStates.stream().map(this::prepareTablesFuture).toList()).toCompletableFuture().join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,14 @@

import static io.airbyte.cdk.integrations.base.IntegrationRunner.TYPE_AND_DEDUPE_THREAD_NAME;
import static io.airbyte.integrations.base.destination.typing_deduping.FutureUtils.getCountOfTypeAndDedupeThreads;
import static io.airbyte.integrations.base.destination.typing_deduping.FutureUtils.reduceExceptions;
import static io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtilKt.prepareAllSchemas;

import io.airbyte.cdk.integrations.destination.StreamSyncSummary;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import kotlin.NotImplementedError;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;

/**
Expand Down Expand Up @@ -54,31 +46,14 @@ public NoOpTyperDeduperWithV1V2Migrations(final SqlGenerator sqlGenerator,
}

@Override
public void prepareTables() throws Exception {
try {
log.info("Ensuring schemas exist for prepareTables with V1V2 migrations");
prepareAllSchemas(parsedCatalog, sqlGenerator, destinationHandler);
final Set<CompletableFuture<Optional<Exception>>> prepareTablesTasks = new HashSet<>();
for (final StreamConfig stream : parsedCatalog.streams()) {
prepareTablesTasks.add(CompletableFuture.supplyAsync(() -> {
// Migrate the Raw Tables if this is the first v2 sync after a v1 sync
try {
log.info("Migrating V1->V2 for stream {}", stream.id());
v1V2Migrator.migrateIfNecessary(sqlGenerator, destinationHandler, stream);
log.info("Migrating V2 legacy for stream {}", stream.id());
v2TableMigrator.migrateIfNecessary(stream);
return Optional.empty();
} catch (final Exception e) {
return Optional.of(e);
}
}, executorService));
}
CompletableFuture.allOf(prepareTablesTasks.toArray(CompletableFuture[]::new)).join();
reduceExceptions(prepareTablesTasks, "The following exceptions were thrown attempting to prepare tables:\n");
} catch (NotImplementedError | NotImplementedException e) {
log.warn(
"Could not prepare schemas or tables because this is not implemented for this destination, this should not be required for this destination to succeed");
}
public void prepareSchemasAndRunMigrations() {
TyperDeduperUtil.prepareSchemas(sqlGenerator, destinationHandler, parsedCatalog);
TyperDeduperUtil.executeRawTableMigrations(executorService, sqlGenerator, destinationHandler, v1V2Migrator, v2TableMigrator, parsedCatalog);
}

@Override
public void prepareFinalTables() {
log.info("Skipping prepareFinalTables");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,20 @@
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
* This class should be used while upgrading a destination from V1 to V2. V2 destinations should use
* {@link NoOpTyperDeduperWithV1V2Migrations} for disabling T+D, because it correctly handles
* various migration operations.
*/
public class NoopTyperDeduper implements TyperDeduper {

@Override
public void prepareTables() {
public void prepareSchemasAndRunMigrations() throws Exception {

}

@Override
public void prepareFinalTables() {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,53 @@
import java.util.Map;
import java.util.concurrent.locks.Lock;

/*
* This class wants to do three separate things, but not all of them actually happen here right now:
* * A migration runner, which handles any changes in raw tables (#prepareSchemasAndRawTables) * A
* raw table creator, which creates any missing raw tables (currently handled in e.g.
* GeneralStagingFunctions.onStartFunction, BigQueryStagingConsumerFactory.onStartFunction, etc.) *
* A T+D runner, which manages the final tables (#prepareFinalTables, #typeAndDedupe, etc.)
*
* These would be injectable to the relevant locations, so that we can have: * DV2 destinations with
* T+D enabled (i.e. all three objects instantiated for real) * DV2 destinations with T+D disabled
* (i.e. noop T+D runner but the other two objects for real) * DV1 destinations (i.e. all three
* objects as noop)
*
* Even more ideally, we'd create an instance per stream, instead of having one instance for the
* entire sync. This would massively simplify all the state contained in our implementations - see
* DefaultTyperDeduper's pile of Sets and Maps.
*
* Unfortunately, it's just a pain to inject these objects to everywhere they need to be, and we'd
* need to refactor part of the async framework on top of that. There's an obvious overlap with the
* async framework's onStart function... which we should deal with eventually.
*/
public interface TyperDeduper {

/**
* Does two things: Set up the schemas for the sync (both airbyte_internal and final table schemas),
* and execute any raw table migrations. These migrations might include: Upgrading v1 raw tables to
* v2, adding a column to the raw tables, etc. In general, this method shouldn't actually create the
* raw tables; the only exception is in the V1 -> V2 migration.
* <p>
* This method should be called BEFORE creating raw tables, because the V1V2 migration might create
* the raw tables.
* <p>
* This method may affect the behavior of {@link #prepareFinalTables()}. For example, modifying a
* raw table may require us to run a soft reset. However, we should defer that soft reset until
* {@link #prepareFinalTables()}.
*/
void prepareSchemasAndRunMigrations() throws Exception;

/**
* Create the tables that T+D will write to during the sync. In OVERWRITE mode, these might not be
* the true final tables. Specifically, other than an initial sync (i.e. table does not exist, or is
* empty) we write to a temporary final table, and swap it into the true final table at the end of
* the sync. This is to prevent user downtime during a sync.
* <p>
* This method should be called AFTER creating the raw tables, because it may run a soft reset
* (which requires the raw tables to exist).
*/
void prepareTables() throws Exception;
void prepareFinalTables() throws Exception;

/**
* Suggest that we execute typing and deduping for a single stream (i.e. fetch new raw records into
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,72 @@
package io.airbyte.integrations.base.destination.typing_deduping

import com.google.common.collect.Streams
import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil.getResultsOrLogAndThrowFirst
import io.airbyte.commons.concurrency.CompletableFutures
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.concurrent.ExecutorService

/**
* Extracts all the "raw" and "final" schemas identified in the [parsedCatalog] and ensures they
* exist in the Destination Database.
*/
fun prepareAllSchemas(parsedCatalog: ParsedCatalog, sqlGenerator: SqlGenerator, destinationHandler: DestinationHandler) {
val rawSchema = parsedCatalog.streams.mapNotNull { it.id.rawNamespace }
val finalSchema = parsedCatalog.streams.mapNotNull { it.id.finalNamespace }
val createAllSchemasSql = rawSchema.union(finalSchema)
.map { sqlGenerator.createSchema(it) }
.toList()
destinationHandler.execute(Sql.concat(createAllSchemasSql))

class TyperDeduperUtil {
companion object {

@JvmStatic
fun executeRawTableMigrations(
executorService: ExecutorService,
sqlGenerator: SqlGenerator,
destinationHandler: DestinationHandler,
v1V2Migrator: DestinationV1V2Migrator,
v2TableMigrator: V2TableMigrator,
parsedCatalog: ParsedCatalog
) {
// TODO: Either the migrations run the soft reset and create v2 tables or the actual prepare tables.
// unify the logic
// with current state of raw tables & final tables. This is done first before gather initial state
// to avoid recreating
// final tables later again.
val runMigrationsResult =
CompletableFutures.allOf(parsedCatalog.streams().stream()
.map { streamConfig -> runMigrationsAsync(executorService, sqlGenerator, destinationHandler, v1V2Migrator, v2TableMigrator, streamConfig) }
.toList()).toCompletableFuture().join()
getResultsOrLogAndThrowFirst("The following exceptions were thrown attempting to run migrations:\n", runMigrationsResult)
}

/**
* Extracts all the "raw" and "final" schemas identified in the [parsedCatalog] and ensures they
* exist in the Destination Database.
*/
@JvmStatic
fun prepareSchemas(
sqlGenerator: SqlGenerator,
destinationHandler: DestinationHandler,
parsedCatalog: ParsedCatalog) {
val rawSchema = parsedCatalog.streams.stream().map { it.id.rawNamespace }
val finalSchema = parsedCatalog.streams.stream().map { it.id.finalNamespace }
val createAllSchemasSql = Streams.concat<String>(rawSchema, finalSchema)
.filter(Objects::nonNull)
.distinct()
.map(sqlGenerator::createSchema)
.toList()
destinationHandler.execute(Sql.concat(createAllSchemasSql))
}

private fun runMigrationsAsync(
executorService: ExecutorService,
sqlGenerator: SqlGenerator,
destinationHandler: DestinationHandler,
v1V2Migrator: DestinationV1V2Migrator,
v2TableMigrator: V2TableMigrator,
streamConfig: StreamConfig): CompletionStage<Void> {
return CompletableFuture.runAsync({
try {
v1V2Migrator.migrateIfNecessary(sqlGenerator, destinationHandler, streamConfig)
v2TableMigrator.migrateIfNecessary(streamConfig)
} catch (e: java.lang.Exception) {
throw RuntimeException(e)
}
}, executorService)
}
}
}
Loading

0 comments on commit df4c4f8

Please sign in to comment.