Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destinations CDK: Refactor T+D to gather required world state upfront #35342

Merged
merged 15 commits into from
Feb 22, 2024
191 changes: 96 additions & 95 deletions airbyte-cdk/java/airbyte-cdk/README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.airbyte.cdk.integrations.base.errors.messages.ErrorMessage;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.exceptions.ConnectionErrorException;
import io.airbyte.commons.functional.Either;
import java.sql.SQLException;
import java.sql.SQLSyntaxErrorException;
import java.util.Collection;
Expand Down Expand Up @@ -85,6 +86,17 @@ public static <T extends Throwable> void logAllAndThrowFirst(final String initia
}
}

public static <T extends Throwable, Result> List<Result> getResultsOrLogAndThrowFirst(final String initialMessage,
final List<Either<? extends T, Result>> eithers)
throws T {
List<? extends T> throwables = eithers.stream().filter(Either::isLeft).map(Either::getLeft).toList();
if (!throwables.isEmpty()) {
logAllAndThrowFirst(initialMessage, throwables);
}
// No need to filter on isRight since isLeft will throw before reaching this line.
return eithers.stream().map(Either::getRight).toList();
}

private static boolean isConfigErrorException(Throwable e) {
return e instanceof ConfigErrorException;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.22.1
version=0.23.0
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,7 @@ private void assertCustomParametersDontOverwriteDefaultParameters(final Map<Stri

protected abstract JdbcSqlGenerator getSqlGenerator();

protected JdbcDestinationHandler getDestinationHandler(final String databaseName, final JdbcDatabase database) {
return new JdbcDestinationHandler(databaseName, database);
}
protected abstract JdbcDestinationHandler getDestinationHandler(final String databaseName, final JdbcDatabase database);

/**
* "database" key at root of the config json, for any other variants in config, override this
Expand Down Expand Up @@ -318,14 +316,14 @@ private TyperDeduper getV2TyperDeduper(final JsonNode config, final ConfiguredAi
final String databaseName = getDatabaseName(config);
final var migrator = new JdbcV1V2Migrator(namingResolver, database, databaseName);
final NoopV2TableMigrator v2TableMigrator = new NoopV2TableMigrator();
final DestinationHandler<TableDefinition> destinationHandler = getDestinationHandler(databaseName, database);
final DestinationHandler destinationHandler = getDestinationHandler(databaseName, database);
final boolean disableTypeDedupe = !config.has(DISABLE_TYPE_DEDUPE) || config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
final TyperDeduper typerDeduper;
if (disableTypeDedupe) {
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator);
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator);
} else {
typerDeduper =
new DefaultTyperDeduper<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator);
new DefaultTyperDeduper(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator);
}
return typerDeduper;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@

package io.airbyte.cdk.integrations.destination.jdbc;

import java.sql.SQLType;

public record ColumnDefinition(String name, String type, SQLType sqlType, int columnSize) {
/**
* Jdbc destination column definition representation
*
* @param name
* @param type
* @param columnSize
*/
public record ColumnDefinition(String name, String type, int columnSize, boolean isNullable) {

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import java.util.LinkedHashMap;

/**
* Jdbc destination table definition representation
* Jdbc destination table definition representation with a map of column names to column definitions
*
* @param columns
*/
Expand Down

This file was deleted.

This file was deleted.

Loading
Loading