Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DV2 destinations: Build DestinationState / Migration framework #35303

Merged
merged 2 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,10 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.23.9 | 2024-03-01 | [\#35720](https://github.com/airbytehq/airbyte/pull/35720) | various improvements for tests TestDataHolder |
| 0.23.10 | 2024-03-01 | [\#35303](https://github.com/airbytehq/airbyte/pull/35303) | various improvements for tests TestDataHolder |
| 0.23.9 | 2024-03-01 | [\#35720](https://github.com/airbytehq/airbyte/pull/35720) | various improvements for tests TestDataHolder |
| 0.23.8 | 2024-02-28 | [\#35529](https://github.com/airbytehq/airbyte/pull/35529) | Refactor on state iterators |
| 0.23.7 | 2024-02-28 | [\#35376](https://github.com/airbytehq/airbyte/pull/35376) | Extract typereduper migrations to separte method |
| 0.23.7 | 2024-02-28 | [\#35376](https://github.com/airbytehq/airbyte/pull/35376) | Extract typereduper migrations to separte method |
| 0.23.6 | 2024-02-26 | [\#35647](https://github.com/airbytehq/airbyte/pull/35647) | Add a getNamespace into TestDataHolder |
| 0.23.5 | 2024-02-26 | [\#35512](https://github.com/airbytehq/airbyte/pull/35512) | Remove @DisplayName from all CDK tests. |
| 0.23.4 | 2024-02-26 | [\#35507](https://github.com/airbytehq/airbyte/pull/35507) | Add more logs into TestDatabase. |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.23.9
version=0.23.10
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.cdk.integrations.destination.jdbc;

import static io.airbyte.cdk.integrations.base.JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE;
import static io.airbyte.cdk.integrations.base.errors.messages.ErrorMessage.getErrorMessage;
import static io.airbyte.cdk.integrations.util.ConfiguredCatalogUtilKt.addDefaultNamespaceToStreams;

Expand All @@ -17,7 +18,6 @@
import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer;
import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.cdk.integrations.base.Destination;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer;
import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag;
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer;
Expand All @@ -37,6 +37,7 @@
import io.airbyte.integrations.base.destination.typing_deduping.NoopV2TableMigrator;
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.v0.AirbyteMessage;
Expand All @@ -45,6 +46,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Consumer;
import javax.sql.DataSource;
Expand Down Expand Up @@ -93,7 +95,7 @@ public AirbyteConnectionStatus check(final JsonNode config) {
attemptTableOperations(outputSchema, database, namingResolver, sqlOperations, false);
if (TypingAndDedupingFlag.isDestinationV2()) {
final var v2RawSchema = namingResolver.getIdentifier(TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE)
.orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE));
.orElse(DEFAULT_AIRBYTE_INTERNAL_NAMESPACE));
attemptTableOperations(v2RawSchema, database, namingResolver, sqlOperations, false);
destinationSpecificTableOperations(database);
}
Expand Down Expand Up @@ -252,7 +254,9 @@ private void assertCustomParametersDontOverwriteDefaultParameters(final Map<Stri

protected abstract JdbcSqlGenerator getSqlGenerator();

protected abstract JdbcDestinationHandler getDestinationHandler(final String databaseName, final JdbcDatabase database);
protected abstract JdbcDestinationHandler<? extends MinimumDestinationState> getDestinationHandler(final String databaseName,
final JdbcDatabase database,
final String rawTableSchema);

/**
* "database" key at root of the config json, for any other variants in config, override this
Expand Down Expand Up @@ -309,21 +313,23 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
*/
private TyperDeduper getV2TyperDeduper(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final JdbcDatabase database) {
final JdbcSqlGenerator sqlGenerator = getSqlGenerator();
final ParsedCatalog parsedCatalog = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE)
Optional<String> rawNamespaceOverride = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE);
final ParsedCatalog parsedCatalog = rawNamespaceOverride
.map(override -> new CatalogParser(sqlGenerator, override))
.orElse(new CatalogParser(sqlGenerator))
.parseCatalog(catalog);
final String databaseName = getDatabaseName(config);
final var migrator = new JdbcV1V2Migrator(namingResolver, database, databaseName);
final NoopV2TableMigrator v2TableMigrator = new NoopV2TableMigrator();
final DestinationHandler destinationHandler = getDestinationHandler(databaseName, database);
final DestinationHandler<? extends MinimumDestinationState> destinationHandler =
getDestinationHandler(databaseName, database, rawNamespaceOverride.orElse(DEFAULT_AIRBYTE_INTERNAL_NAMESPACE));
final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
final TyperDeduper typerDeduper;
if (disableTypeDedupe) {
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator);
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator, List.of());
} else {
typerDeduper =
new DefaultTyperDeduper(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator);
new DefaultTyperDeduper<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator, List.of());
}
return typerDeduper;
}
Expand Down
Loading
Loading