From ba3bdb19d02a2e793f5976acb7bd785c9d6b4501 Mon Sep 17 00:00:00 2001 From: Joe Bell Date: Tue, 9 Apr 2024 21:37:25 -0700 Subject: [PATCH] Update CDK for Raw Only Dv2 destinations (#36047) --- .../io/airbyte/cdk/db/jdbc/JdbcUtils.kt | 2 + .../integrations/base/JavaBaseConstants.kt | 11 +- .../src/main/resources/version.properties | 2 +- .../jdbc/AbstractJdbcDestination.kt | 124 ++++++++++++------ .../NoOpJdbcDestinationHandler.kt | 50 +++++++ .../destination/DestinationAcceptanceTest.kt | 6 +- 6 files changed, 145 insertions(+), 50 deletions(-) create mode 100644 airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/NoOpJdbcDestinationHandler.kt diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcUtils.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcUtils.kt index e5041eff61a3..ee829ba6d9ec 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcUtils.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcUtils.kt @@ -64,6 +64,7 @@ object JdbcUtils { ) @JvmStatic val defaultSourceOperations: JdbcSourceOperations = JdbcSourceOperations() + @JvmStatic val defaultJSONFormat: JSONFormat = JSONFormat().recordFormat(JSONFormat.RecordFormat.OBJECT) @JvmStatic @@ -85,6 +86,7 @@ object JdbcUtils { } } + @JvmStatic @JvmOverloads fun parseJdbcParameters( jdbcPropertiesString: String, diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/JavaBaseConstants.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/JavaBaseConstants.kt index d8ab8d738334..3c888d64142f 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/JavaBaseConstants.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/JavaBaseConstants.kt @@ -3,6 +3,13 @@ */ package io.airbyte.cdk.integrations.base +import java.util.* +import org.apache.commons.lang3.StringUtils + +fun upperQuoted(column: String): String { + return StringUtils.wrap(column.uppercase(Locale.getDefault()), "\"") +} + object JavaBaseConstants { const val ARGS_CONFIG_KEY: String = "config" const val ARGS_CATALOG_KEY: String = "catalog" @@ -33,7 +40,7 @@ object JavaBaseConstants { COLUMN_NAME_AB_RAW_ID, COLUMN_NAME_AB_EXTRACTED_AT, COLUMN_NAME_AB_LOADED_AT, - COLUMN_NAME_DATA + COLUMN_NAME_DATA, ) @JvmField val V2_RAW_TABLE_COLUMN_NAMES: List = @@ -42,7 +49,7 @@ object JavaBaseConstants { COLUMN_NAME_AB_EXTRACTED_AT, COLUMN_NAME_AB_LOADED_AT, COLUMN_NAME_DATA, - COLUMN_NAME_AB_META + COLUMN_NAME_AB_META, ) @JvmField val V2_FINAL_TABLE_METADATA_COLUMNS: List = diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 22e430d83820..3ec407fa2186 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.29.8 +version=0.29.9 diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt index 5787a97c97c7..1ec35452e4de 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt @@ -56,6 +56,16 @@ abstract class AbstractJdbcDestination CatalogParser(sqlGenerator, override) } .orElse(CatalogParser(sqlGenerator)) .parseCatalog(catalog!!) + val typerDeduper: TyperDeduper = + buildTyperDeduper( + config, + database, + parsedCatalog, + ) + + return JdbcBufferedConsumerFactory.createAsync( + outputRecordCollector, + database, + sqlOperations, + namingResolver, + config, + catalog, + defaultNamespace, + typerDeduper, + getDataTransformer(parsedCatalog, defaultNamespace), + ) + } + + private fun buildTyperDeduper( + config: JsonNode, + database: JdbcDatabase, + parsedCatalog: ParsedCatalog, + ): TyperDeduper { val databaseName = getDatabaseName(config) - val migrator = JdbcV1V2Migrator(namingResolver, database, databaseName) val v2TableMigrator = NoopV2TableMigrator() + val migrator = JdbcV1V2Migrator(namingResolver, database, databaseName) val destinationHandler: DestinationHandler = getDestinationHandler( databaseName, database, - rawNamespaceOverride.orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE) + getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE) + .orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE), ) - val disableTypeDedupe = - config.has(DISABLE_TYPE_DEDUPE) && config[DISABLE_TYPE_DEDUPE].asBoolean(false) - val typerDeduper: TyperDeduper + val disableTypeDedupe = isTypeDedupeDisabled(config) val migrations = getMigrations(database, databaseName, sqlGenerator, destinationHandler) - typerDeduper = - if (disableTypeDedupe) { - NoOpTyperDeduperWithV1V2Migrations( - sqlGenerator, - destinationHandler, - parsedCatalog, - migrator, - v2TableMigrator, - migrations - ) - } else { + + val typerDeduper: TyperDeduper + if (disableTypeDedupe) { + typerDeduper = + if (migrations.isEmpty()) { + NoopTyperDeduper() + } else { + NoOpTyperDeduperWithV1V2Migrations( + sqlGenerator, + destinationHandler, + parsedCatalog, + migrator, + v2TableMigrator, + migrations, + ) + } + } else { + typerDeduper = DefaultTyperDeduper( sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator, - migrations + migrations, ) - } - - return JdbcBufferedConsumerFactory.createAsync( - outputRecordCollector, - database, - sqlOperations, - namingResolver, - config, - catalog, - defaultNamespace, - typerDeduper, - getDataTransformer(parsedCatalog, defaultNamespace) - ) + } + return typerDeduper } companion object { @@ -361,7 +401,7 @@ abstract class AbstractJdbcDestination conn.metaData.catalogs }, { queryContext: ResultSet? -> JdbcUtils.defaultSourceOperations.rowToJson(queryContext!!) - } + }, ) // verify we have write permissions on the target schema by creating a table with a @@ -370,7 +410,7 @@ abstract class AbstractJdbcDestination( + databaseName: String, + jdbcDatabase: JdbcDatabase, + rawTableSchemaName: String, + sqlDialect: SQLDialect +) : + JdbcDestinationHandler( + databaseName, + jdbcDatabase, + rawTableSchemaName, + sqlDialect + ) { + + override fun execute(sql: Sql) { + throw NotImplementedError("This JDBC Destination Handler does not support typing deduping") + } + + override fun gatherInitialState( + streamConfigs: List + ): List> { + throw NotImplementedError("This JDBC Destination Handler does not support typing deduping") + } + + override fun commitDestinationStates(destinationStates: Map) { + throw NotImplementedError("This JDBC Destination Handler does not support typing deduping") + } + + override fun toDestinationState(json: JsonNode?): DestinationState { + throw NotImplementedError("This JDBC Destination Handler does not support typing deduping") + } + + override fun toJdbcTypeName(airbyteType: AirbyteType?): String { + throw NotImplementedError("This JDBC Destination Handler does not support typing deduping") + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt index fd727e916ebe..6d6bf28fc915 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt @@ -25,7 +25,6 @@ import io.airbyte.configoss.JobGetSpecConfig import io.airbyte.configoss.OperatorDbt import io.airbyte.configoss.StandardCheckConnectionInput import io.airbyte.configoss.StandardCheckConnectionOutput -import io.airbyte.configoss.StandardCheckConnectionOutput.Status import io.airbyte.configoss.WorkerDestinationConfig import io.airbyte.protocol.models.Field import io.airbyte.protocol.models.JsonSchemaType @@ -64,9 +63,6 @@ import java.util.concurrent.atomic.AtomicInteger import java.util.function.Consumer import java.util.stream.Collectors import java.util.stream.Stream -import kotlin.Comparator -import kotlin.collections.ArrayList -import kotlin.collections.HashSet import kotlin.test.assertNotNull import org.junit.jupiter.api.* import org.junit.jupiter.api.extension.ExtensionContext @@ -345,7 +341,7 @@ abstract class DestinationAcceptanceTest { """This method is moved to the AdvancedTestDataComparator. Please move your destination implementation of the method to your comparator implementation.""" ) - protected fun resolveIdentifier(identifier: String?): List { + protected open fun resolveIdentifier(identifier: String?): List { return java.util.List.of(identifier) }