diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index 2f7367405478..ae11dbbe2f7b 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -174,8 +174,8 @@ corresponds to that version. | Version | Date | Pull Request | Subject | | :------ | :--------- | :--------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| 0.31.8 | 2024-05-03 | [\#36932](https://github.com/airbytehq/airbyte/pull/36932) | CDK changes on resumable full refresh | -| 0.31.7 | 2024-05-02 | [\#36910](https://github.com/airbytehq/airbyte/pull/36910) | changes for destination-snowflake | +| 0.32.0 | 2024-05-03 | [\#36929](https://github.com/airbytehq/airbyte/pull/36929) | Destinations: Assorted DV2 changes for mysql | +| 0.31.7 | 2024-05-02 | [\#36910](https://github.com/airbytehq/airbyte/pull/36910) | changes for destination-snowflake | | 0.31.6 | 2024-05-02 | [\#37746](https://github.com/airbytehq/airbyte/pull/37746) | debuggability improvements. | | 0.31.5 | 2024-04-30 | [\#37758](https://github.com/airbytehq/airbyte/pull/37758) | Set debezium max retries to zero | | 0.31.4 | 2024-04-30 | [\#37754](https://github.com/airbytehq/airbyte/pull/37754) | Add DebeziumEngine notification log | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/spec_modification/SpecModifyingDestination.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/spec_modification/SpecModifyingDestination.kt index 45a93269b6ef..e06ae860e189 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/spec_modification/SpecModifyingDestination.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/spec_modification/SpecModifyingDestination.kt @@ -14,6 +14,8 @@ import io.airbyte.protocol.models.v0.ConnectorSpecification import java.util.function.Consumer abstract class SpecModifyingDestination(private val destination: Destination) : Destination { + override val isV2Destination: Boolean = destination.isV2Destination + @Throws(Exception::class) abstract fun modifySpec(originalSpec: ConnectorSpecification): ConnectorSpecification diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 836bf3375adc..a730574df844 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.31.8 +version=0.32.0 diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/TestDatabase.kt b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/TestDatabase.kt index bffd92f31cf9..d061ccff8808 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/TestDatabase.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/TestDatabase.kt @@ -3,7 +3,7 @@ */ package io.airbyte.cdk.testutils -import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.ObjectNode import com.google.common.collect.ImmutableMap import io.airbyte.cdk.db.ContextQueryFunction import io.airbyte.cdk.db.Database @@ -55,16 +55,16 @@ protected constructor(val container: C) : AutoCloseable { @JvmField protected val databaseId: Int = nextDatabaseId.getAndIncrement() @JvmField protected val containerId: Int = - containerUidToId!!.computeIfAbsent(container.containerId) { _: String? -> - nextContainerId!!.getAndIncrement() + containerUidToId.computeIfAbsent(container.containerId) { _: String? -> + nextContainerId.getAndIncrement() }!! private val dateFormat: DateFormat = SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS") init { - LOGGER!!.info(formatLogLine("creating database " + databaseName)) + LOGGER!!.info(formatLogLine("creating database $databaseName")) } - protected fun formatLogLine(logLine: String?): String? { + protected fun formatLogLine(logLine: String?): String { val retVal = "TestDatabase databaseId=$databaseId, containerId=$containerId - $logLine" return retVal } @@ -100,7 +100,7 @@ protected constructor(val container: C) : AutoCloseable { * object. This typically entails at least a CREATE DATABASE and a CREATE USER. Also Initializes * the [DataSource] and [DSLContext] owned by this object. */ - open fun initialized(): T? { + open fun initialized(): T { inContainerBootstrapCmd().forEach { cmds: Stream -> this.execInContainer(cmds) } this.dataSource = DataSourceFactory.create( @@ -165,12 +165,12 @@ protected constructor(val container: C) : AutoCloseable { databaseName ) - val database: Database? + val database: Database get() = Database(getDslContext()) protected fun execSQL(sql: Stream) { try { - database!!.query { ctx: DSLContext? -> + database.query { ctx: DSLContext? -> sql.forEach { statement: String? -> LOGGER!!.info("executing SQL statement {}", statement) ctx!!.execute(statement) @@ -228,12 +228,12 @@ protected constructor(val container: C) : AutoCloseable { @Throws(SQLException::class) fun query(transform: ContextQueryFunction): X? { - return database!!.query(transform) + return database.query(transform) } @Throws(SQLException::class) fun transaction(transform: ContextQueryFunction): X? { - return database!!.transaction(transform) + return database.transaction(transform) } /** Returns a builder for the connector config object. */ @@ -245,7 +245,7 @@ protected constructor(val container: C) : AutoCloseable { return configBuilder().withHostAndPort().withCredentials().withDatabase() } - fun integrationTestConfigBuilder(): B? { + fun integrationTestConfigBuilder(): B { return configBuilder().withResolvedHostAndPort().withCredentials().withDatabase() } @@ -260,8 +260,8 @@ protected constructor(val container: C) : AutoCloseable { ) { protected val builder: ImmutableMap.Builder = ImmutableMap.builder() - fun build(): JsonNode { - return Jsons.jsonNode(builder.build()) + fun build(): ObjectNode { + return Jsons.jsonNode(builder.build()) as ObjectNode } @Suppress("UNCHECKED_CAST") @@ -314,7 +314,7 @@ protected constructor(val container: C) : AutoCloseable { private val nextDatabaseId: AtomicInteger = AtomicInteger(0) - private val nextContainerId: AtomicInteger? = AtomicInteger(0) - private val containerUidToId: MutableMap? = ConcurrentHashMap() + private val nextContainerId: AtomicInteger = AtomicInteger(0) + private val containerUidToId: MutableMap = ConcurrentHashMap() } } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt index ca13097cee90..501c7a9b42a6 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt @@ -65,8 +65,7 @@ abstract class AbstractJdbcDestination + protected open fun getV1V2Migrator( + database: JdbcDatabase, + databaseName: String + ): DestinationV1V2Migrator = JdbcV1V2Migrator(namingResolver, database, databaseName) + /** * Provide any migrations that the destination needs to run. Most destinations will need to * provide an instande of @@ -306,6 +310,7 @@ abstract class AbstractJdbcDestination = getDestinationHandler( databaseName, @@ -434,7 +439,7 @@ abstract class AbstractJdbcDestination, database: JdbcDatabase, @@ -65,9 +68,16 @@ object JdbcBufferedConsumerFactory { typerDeduper: TyperDeduper, dataTransformer: StreamAwareDataTransformer = IdentityDataTransformer(), optimalBatchSizeBytes: Long = DEFAULT_OPTIMAL_BATCH_SIZE_FOR_FLUSH, + parsedCatalog: ParsedCatalog? = null ): SerializedAirbyteMessageConsumer { val writeConfigs = - createWriteConfigs(namingResolver, config, catalog, sqlOperations.isSchemaRequired) + createWriteConfigs( + namingResolver, + config, + catalog, + sqlOperations.isSchemaRequired, + parsedCatalog + ) return AsyncStreamConsumer( outputRecordCollector, onStartFunction(database, sqlOperations, writeConfigs, typerDeduper), @@ -89,7 +99,8 @@ object JdbcBufferedConsumerFactory { namingResolver: NamingConventionTransformer, config: JsonNode, catalog: ConfiguredAirbyteCatalog?, - schemaRequired: Boolean + schemaRequired: Boolean, + parsedCatalog: ParsedCatalog? ): List { if (schemaRequired) { Preconditions.checkState( @@ -97,11 +108,19 @@ object JdbcBufferedConsumerFactory { "jdbc destinations must specify a schema." ) } - return catalog!! - .streams - .stream() - .map(toWriteConfig(namingResolver, config, schemaRequired)) - .collect(Collectors.toList()) + return if (parsedCatalog == null) { + catalog!! + .streams + .stream() + .map(toWriteConfig(namingResolver, config, schemaRequired)) + .collect(Collectors.toList()) + } else { + // we should switch this to kotlin-style list processing, but meh for now + parsedCatalog.streams + .stream() + .map(parsedStreamToWriteConfig(namingResolver)) + .collect(Collectors.toList()) + } } private fun toWriteConfig( @@ -150,6 +169,27 @@ object JdbcBufferedConsumerFactory { } } + private fun parsedStreamToWriteConfig( + namingResolver: NamingConventionTransformer + ): Function { + return Function { streamConfig: StreamConfig -> + // TODO We should probably replace WriteConfig with StreamConfig? + // The only thing I'm not sure about is the tmpTableName thing, + // but otherwise it's a strict improvement (avoids people accidentally + // recomputing the table names, instead of just treating the output of + // CatalogParser as canonical). + WriteConfig( + streamConfig.id.originalName, + streamConfig.id.originalNamespace, + streamConfig.id.rawNamespace, + @Suppress("deprecation") + namingResolver.getTmpTableName(streamConfig.id.rawNamespace), + streamConfig.id.rawName, + streamConfig.destinationSyncMode, + ) + } + } + /** * Defer to the [AirbyteStream]'s namespace. If this is not set, use the destination's default * schema. This namespace is source-provided, and can be potentially empty. @@ -160,7 +200,7 @@ object JdbcBufferedConsumerFactory { private fun getOutputSchema( stream: AirbyteStream, defaultDestSchema: String, - namingResolver: NamingConventionTransformer + namingResolver: NamingConventionTransformer, ): String { return if (isDestinationV2) { namingResolver.getNamespace( @@ -252,8 +292,10 @@ object JdbcBufferedConsumerFactory { records: List -> require(pairToWriteConfig.containsKey(pair)) { String.format( - "Message contained record from a stream that was not in the catalog. \ncatalog: %s", - Jsons.serialize(catalog) + "Message contained record from a stream that was not in the catalog. \ncatalog: %s, \nstream identifier: %s\nkeys: %s", + Jsons.serialize(catalog), + pair, + pairToWriteConfig.keys ) } val writeConfig = pairToWriteConfig.getValue(pair) diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.kt index 6a1a29f45099..5b01c38fd68b 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.kt @@ -144,7 +144,12 @@ abstract class JdbcSqlOperations : SqlOperations { val uuid = UUID.randomUUID().toString() val jsonData = record.serialized - val airbyteMeta = Jsons.serialize(record.record!!.meta) + val airbyteMeta = + if (record.record!!.meta == null) { + "{\"changes\":[]}" + } else { + Jsons.serialize(record.record!!.meta) + } val extractedAt = Timestamp.from(Instant.ofEpochMilli(record.record!!.emittedAt)) if (isDestinationV2) { diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt index 3e642da540ed..70b79abacb97 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt @@ -22,12 +22,12 @@ import java.time.Instant import java.time.OffsetDateTime import java.time.temporal.ChronoUnit import java.util.* -import java.util.HashMap import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletionStage import java.util.function.Predicate import org.jooq.Condition import org.jooq.DSLContext +import org.jooq.DataType import org.jooq.SQLDialect import org.jooq.conf.ParamType import org.jooq.impl.DSL @@ -39,19 +39,33 @@ import org.slf4j.Logger import org.slf4j.LoggerFactory abstract class JdbcDestinationHandler( - protected val databaseName: String, + // JDBC's "catalog name" refers to e.g. the Postgres/Mysql database. + // This is nullable because Mysql doesn't provide "schemas" within databses, + // unlike Postgres. + // For Postgres (and other systems supporting the database.schema.table + // layers), set this to a nonnull value. We will create ALL tables in this + // database. + // For Mysql (and other systems with just a database.table layering), + // set this to null and override the relevant methods to treat namespaces + // as databases. + protected val catalogName: String?, protected val jdbcDatabase: JdbcDatabase, - protected val rawTableSchemaName: String, + protected val rawTableNamespace: String, private val dialect: SQLDialect ) : DestinationHandler { protected val dslContext: DSLContext get() = DSL.using(dialect) + protected open val stateTableUpdatedAtType: DataType<*> = SQLDataType.TIMESTAMPWITHTIMEZONE + @Throws(Exception::class) - private fun findExistingTable(id: StreamId): Optional { - return findExistingTable(jdbcDatabase, databaseName, id.finalNamespace, id.finalName) + protected open fun findExistingTable(id: StreamId): Optional { + return findExistingTable(jdbcDatabase, catalogName, id.finalNamespace, id.finalName) } + protected open fun getTableFromMetadata(dbmetadata: DatabaseMetaData, id: StreamId): ResultSet = + dbmetadata.getTables(catalogName, id.rawNamespace, id.rawName, null) + @Throws(Exception::class) private fun isFinalTableEmpty(id: StreamId): Boolean { return !jdbcDatabase.queryBoolean( @@ -73,13 +87,12 @@ abstract class JdbcDestinationHandler( jdbcDatabase.executeMetadataQuery { dbmetadata: DatabaseMetaData? -> LOGGER.info( "Retrieving table from Db metadata: {} {} {}", - databaseName, + catalogName, id.rawNamespace, id.rawName ) try { - dbmetadata!!.getTables(databaseName, id.rawNamespace, id.rawName, null).use { - table -> + getTableFromMetadata(dbmetadata!!, id).use { table -> return@executeMetadataQuery table.next() } } catch (e: SQLException) { @@ -211,7 +224,7 @@ abstract class JdbcDestinationHandler( jdbcDatabase.execute( dslContext .createTableIfNotExists( - quotedName(rawTableSchemaName, DESTINATION_STATE_TABLE_NAME), + quotedName(rawTableNamespace, DESTINATION_STATE_TABLE_NAME), ) .column(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAME), SQLDataType.VARCHAR) .column( @@ -227,7 +240,7 @@ abstract class JdbcDestinationHandler( ) // Add an updated_at field. We don't actually need it yet, but it can't hurt! .column( quotedName(DESTINATION_STATE_TABLE_COLUMN_UPDATED_AT), - SQLDataType.TIMESTAMPWITHTIMEZONE, + stateTableUpdatedAtType, ) .getSQL(ParamType.INLINED), ) @@ -244,7 +257,7 @@ abstract class JdbcDestinationHandler( field(quotedName(DESTINATION_STATE_TABLE_COLUMN_STATE)), field(quotedName(DESTINATION_STATE_TABLE_COLUMN_UPDATED_AT)), ) - .from(quotedName(rawTableSchemaName, DESTINATION_STATE_TABLE_NAME)) + .from(quotedName(rawTableNamespace, DESTINATION_STATE_TABLE_NAME)) .sql, ) .map { recordJson: JsonNode -> @@ -338,22 +351,28 @@ abstract class JdbcDestinationHandler( } } - private fun isAirbyteRawIdColumnMatch(existingTable: TableDefinition): Boolean { - return existingTable.columns.containsKey(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID) && - toJdbcTypeName(AirbyteProtocolType.STRING) == - existingTable.columns[JavaBaseConstants.COLUMN_NAME_AB_RAW_ID]!!.type + protected open fun isAirbyteRawIdColumnMatch(existingTable: TableDefinition): Boolean { + return toJdbcTypeName(AirbyteProtocolType.STRING) + .equals( + existingTable.columns.getValue(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID).type, + ignoreCase = true, + ) } - private fun isAirbyteExtractedAtColumnMatch(existingTable: TableDefinition): Boolean { - return existingTable.columns.containsKey(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT) && - toJdbcTypeName(AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE) == - existingTable.columns[JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT]!!.type + protected open fun isAirbyteExtractedAtColumnMatch(existingTable: TableDefinition): Boolean { + return toJdbcTypeName(AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE) + .equals( + existingTable.columns.getValue(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT).type, + ignoreCase = true, + ) } - private fun isAirbyteMetaColumnMatch(existingTable: TableDefinition): Boolean { - return existingTable.columns.containsKey(JavaBaseConstants.COLUMN_NAME_AB_META) && - toJdbcTypeName(Struct(LinkedHashMap())) == - existingTable.columns[JavaBaseConstants.COLUMN_NAME_AB_META]!!.type + protected open fun isAirbyteMetaColumnMatch(existingTable: TableDefinition): Boolean { + return toJdbcTypeName(Struct(LinkedHashMap())) + .equals( + existingTable.columns.getValue(JavaBaseConstants.COLUMN_NAME_AB_META).type, + ignoreCase = true, + ) } open protected fun existingSchemaMatchesStreamConfig( @@ -362,9 +381,13 @@ abstract class JdbcDestinationHandler( ): Boolean { // Check that the columns match, with special handling for the metadata columns. if ( - !isAirbyteRawIdColumnMatch(existingTable) || - !isAirbyteExtractedAtColumnMatch(existingTable) || - !isAirbyteMetaColumnMatch(existingTable) + !(existingTable.columns.containsKey(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID) && + isAirbyteRawIdColumnMatch(existingTable)) || + !(existingTable.columns.containsKey( + JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT + ) && isAirbyteExtractedAtColumnMatch(existingTable)) || + !(existingTable.columns.containsKey(JavaBaseConstants.COLUMN_NAME_AB_META) && + isAirbyteMetaColumnMatch(existingTable)) ) { // Missing AB meta columns from final table, we need them to do proper T+D so trigger // soft-reset @@ -372,7 +395,7 @@ abstract class JdbcDestinationHandler( } val intendedColumns = LinkedHashMap( - stream!!.columns!!.entries.associate { it.key.name to toJdbcTypeName(it.value) } + stream!!.columns.entries.associate { it.key.name to toJdbcTypeName(it.value) } ) // Filter out Meta columns since they don't exist in stream config. @@ -390,7 +413,7 @@ abstract class JdbcDestinationHandler( { map: LinkedHashMap, column: Map.Entry -> - map[column.key] = column.value.type + map[column.key] = column.value.type.lowercase() }, { obj: LinkedHashMap, m: LinkedHashMap? -> obj.putAll(m!!) @@ -404,7 +427,7 @@ abstract class JdbcDestinationHandler( destinationStates: Map ): String { return dslContext - .deleteFrom(table(quotedName(rawTableSchemaName, DESTINATION_STATE_TABLE_NAME))) + .deleteFrom(table(quotedName(rawTableNamespace, DESTINATION_STATE_TABLE_NAME))) .where( destinationStates.keys .stream() @@ -436,7 +459,7 @@ abstract class JdbcDestinationHandler( // Reinsert all of our states var insertStatesStep = dslContext - .insertInto(table(quotedName(rawTableSchemaName, DESTINATION_STATE_TABLE_NAME))) + .insertInto(table(quotedName(rawTableNamespace, DESTINATION_STATE_TABLE_NAME))) .columns( field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAME), String::class.java), field( @@ -483,9 +506,9 @@ abstract class JdbcDestinationHandler( * @param airbyteType * @return */ - protected abstract fun toJdbcTypeName(airbyteType: AirbyteType?): String + protected abstract fun toJdbcTypeName(airbyteType: AirbyteType): String - protected abstract fun toDestinationState(json: JsonNode?): DestinationState + protected abstract fun toDestinationState(json: JsonNode): DestinationState companion object { private val LOGGER: Logger = LoggerFactory.getLogger(JdbcDestinationHandler::class.java) @@ -498,7 +521,7 @@ abstract class JdbcDestinationHandler( @Throws(SQLException::class) fun findExistingTable( jdbcDatabase: JdbcDatabase, - databaseName: String?, + catalogName: String?, schemaName: String?, tableName: String? ): Optional { @@ -511,12 +534,12 @@ abstract class JdbcDestinationHandler( val columnDefinitions = LinkedHashMap() LOGGER.info( "Retrieving existing columns for {}.{}.{}", - databaseName, + catalogName, schemaName, tableName ) try { - dbMetadata!!.getColumns(databaseName, schemaName, tableName, null).use { + dbMetadata!!.getColumns(catalogName, schemaName, tableName, null).use { columns -> while (columns.next()) { val columnName = columns.getString("COLUMN_NAME") @@ -535,7 +558,7 @@ abstract class JdbcDestinationHandler( } catch (e: SQLException) { LOGGER.error( "Failed to retrieve column info for {}.{}.{}", - databaseName, + catalogName, schemaName, tableName, e diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.kt index 59820026c359..5194940c498e 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.kt @@ -6,24 +6,36 @@ package io.airbyte.cdk.integrations.destination.jdbc.typing_deduping import com.google.common.annotations.VisibleForTesting import io.airbyte.cdk.integrations.base.JavaBaseConstants import io.airbyte.cdk.integrations.destination.NamingConventionTransformer -import io.airbyte.integrations.base.destination.typing_deduping.* +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType import io.airbyte.integrations.base.destination.typing_deduping.Array +import io.airbyte.integrations.base.destination.typing_deduping.ColumnId +import io.airbyte.integrations.base.destination.typing_deduping.Sql import io.airbyte.integrations.base.destination.typing_deduping.Sql.Companion.of import io.airbyte.integrations.base.destination.typing_deduping.Sql.Companion.transactionally +import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig +import io.airbyte.integrations.base.destination.typing_deduping.StreamId import io.airbyte.integrations.base.destination.typing_deduping.StreamId.Companion.concatenateRawTableName +import io.airbyte.integrations.base.destination.typing_deduping.Struct +import io.airbyte.integrations.base.destination.typing_deduping.Union +import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf import io.airbyte.protocol.models.v0.DestinationSyncMode import java.sql.Timestamp import java.time.Instant -import java.util.* +import java.util.Locale +import java.util.Optional import java.util.stream.Collectors -import java.util.stream.Stream -import kotlin.Any -import kotlin.Boolean -import kotlin.IllegalArgumentException import kotlin.Int -import kotlin.String -import kotlin.plus -import org.jooq.* +import org.jooq.Condition +import org.jooq.DSLContext +import org.jooq.DataType +import org.jooq.Field +import org.jooq.InsertValuesStepN +import org.jooq.Name +import org.jooq.Record +import org.jooq.SQLDialect +import org.jooq.SelectConditionStep import org.jooq.conf.ParamType import org.jooq.impl.DSL import org.jooq.impl.SQLDataType @@ -47,7 +59,7 @@ constructor( namingTransformer.getNamespace(rawNamespaceOverride), namingTransformer.convertStreamName(concatenateRawTableName(namespace, name)), namespace, - name + name, ) } @@ -56,7 +68,7 @@ constructor( return ColumnId( namingTransformer.getIdentifier(nameWithSuffix), name, - namingTransformer.getIdentifier(nameWithSuffix) + namingTransformer.getIdentifier(nameWithSuffix), ) } @@ -67,7 +79,7 @@ constructor( return when (type.typeName) { Struct.TYPE, UnsupportedOneOf.TYPE -> structType - Array.TYPE -> arrayType!! + Array.TYPE -> arrayType Union.TYPE -> toDialectType((type as Union).chooseType()) else -> throw IllegalArgumentException("Unsupported AirbyteType: $type") } @@ -85,21 +97,21 @@ constructor( AirbyteProtocolType.TIME_WITH_TIMEZONE -> SQLDataType.TIMEWITHTIMEZONE AirbyteProtocolType.TIME_WITHOUT_TIMEZONE -> SQLDataType.TIME AirbyteProtocolType.DATE -> SQLDataType.DATE - AirbyteProtocolType.UNKNOWN -> widestType!! + AirbyteProtocolType.UNKNOWN -> widestType } } protected abstract val structType: DataType<*> - protected abstract val arrayType: DataType<*>? + protected abstract val arrayType: DataType<*> @get:VisibleForTesting val timestampWithTimeZoneType: DataType<*> get() = toDialectType(AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE) - protected abstract val widestType: DataType<*>? + protected abstract val widestType: DataType<*> - protected abstract val dialect: SQLDialect? + protected abstract val dialect: SQLDialect /** * @param columns from the schema to be extracted from _airbyte_data column. Use the destination @@ -120,7 +132,7 @@ constructor( */ protected abstract fun buildAirbyteMetaColumn( columns: LinkedHashMap - ): Field<*>? + ): Field<*> /** * Get the cdc_deleted_at column condition for append_dedup mode by extracting it from @@ -128,7 +140,7 @@ constructor( * * @return */ - protected abstract fun cdcDeletedAtNotNullCondition(): Condition? + protected abstract fun cdcDeletedAtNotNullCondition(): Condition /** * Get the window step function row_number() over (partition by primary_key order by @@ -139,7 +151,7 @@ constructor( * @return */ protected abstract fun getRowNumber( - primaryKey: List?, + primaryKey: List, cursorField: Optional ): Field @@ -156,7 +168,7 @@ constructor( @VisibleForTesting fun buildFinalTableFields( columns: LinkedHashMap, - metaColumns: Map?> + metaColumns: Map> ): List> { val fields = metaColumns.entries @@ -182,8 +194,10 @@ constructor( * @param includeMetaColumn * @return */ - fun getFinalTableMetaColumns(includeMetaColumn: Boolean): LinkedHashMap?> { - val metaColumns = LinkedHashMap?>() + open fun getFinalTableMetaColumns( + includeMetaColumn: Boolean + ): LinkedHashMap> { + val metaColumns = LinkedHashMap>() metaColumns[JavaBaseConstants.COLUMN_NAME_AB_RAW_ID] = SQLDataType.VARCHAR(36).nullable(false) metaColumns[JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT] = @@ -204,7 +218,7 @@ constructor( @VisibleForTesting fun buildRawTableSelectFields( columns: LinkedHashMap, - metaColumns: Map?>, + metaColumns: Map>, useExpensiveSaferCasting: Boolean ): List> { val fields = @@ -237,13 +251,13 @@ constructor( condition = condition.and( DSL.field(DSL.name(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT)) - .gt(minRawTimestamp.get().toString()) + .gt(formatTimestampLiteral(minRawTimestamp.get())), ) } return condition } - override fun createSchema(schema: String?): Sql { + override fun createSchema(schema: String): Sql { return of(createSchemaSql(schema)) } @@ -251,40 +265,26 @@ constructor( // TODO: Use Naming transformer to sanitize these strings with redshift restrictions. val finalTableIdentifier = stream.id.finalName + suffix.lowercase(Locale.getDefault()) if (!force) { - return transactionally( - Stream.concat( - Stream.of( - createTableSql( - stream.id.finalNamespace, - finalTableIdentifier, - stream.columns!! - ) - ), - createIndexSql(stream, suffix).stream() - ) - .toList() + return of( + createTableSql(stream.id.finalNamespace, finalTableIdentifier, stream.columns) ) } val dropTableStep = - DSL.dropTableIfExists(DSL.quotedName(stream.id.finalNamespace, finalTableIdentifier)) + dslContext.dropTableIfExists( + DSL.quotedName(stream.id.finalNamespace, finalTableIdentifier) + ) if (cascadeDrop) { dropTableStep.cascade() } return transactionally( - Stream.concat( - Stream.of( - dropTableStep.getSQL(ParamType.INLINED), - createTableSql( - stream.id.finalNamespace, - finalTableIdentifier, - stream.columns!! - ) - ), - createIndexSql(stream, suffix).stream() - ) - .toList() + dropTableStep.getSQL(ParamType.INLINED), + createTableSql( + stream.id.finalNamespace, + finalTableIdentifier, + stream.columns, + ), ) } @@ -300,84 +300,95 @@ constructor( stream, finalSuffix, minRawTimestamp, - useExpensiveSaferCasting + useExpensiveSaferCasting, ) } + protected open fun renameTable(schema: String, originalName: String, newName: String): String = + dslContext.alterTable(DSL.name(schema, originalName)).renameTo(DSL.name(newName)).sql + override fun overwriteFinalTable(stream: StreamId, finalSuffix: String): Sql { - val dropTableStep = DSL.dropTableIfExists(DSL.name(stream.finalNamespace, stream.finalName)) + val dropTableStep = + dslContext.dropTableIfExists(DSL.name(stream.finalNamespace, stream.finalName)) if (cascadeDrop) { dropTableStep.cascade() } return transactionally( dropTableStep.getSQL(ParamType.INLINED), - DSL.alterTable(DSL.name(stream.finalNamespace, stream.finalName + finalSuffix)) - .renameTo(DSL.name(stream.finalName)) - .sql + renameTable(stream.finalNamespace, stream.finalName + finalSuffix, stream.finalName) ) } override fun migrateFromV1toV2( streamId: StreamId, - namespace: String?, - tableName: String? + namespace: String, + tableName: String, ): Sql { val rawTableName = DSL.name(streamId.rawNamespace, streamId.rawName) val dsl = dslContext return transactionally( dsl.createSchemaIfNotExists(streamId.rawNamespace).sql, dsl.dropTableIfExists(rawTableName).sql, - DSL.createTable(rawTableName) - .column( - JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, - SQLDataType.VARCHAR(36).nullable(false) - ) - .column( - JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, - timestampWithTimeZoneType.nullable(false) - ) - .column( - JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, - timestampWithTimeZoneType.nullable(true) - ) - .column(JavaBaseConstants.COLUMN_NAME_DATA, structType.nullable(false)) - .column(JavaBaseConstants.COLUMN_NAME_AB_META, structType.nullable(true)) - .`as`( - DSL.select( - DSL.field(JavaBaseConstants.COLUMN_NAME_AB_ID) - .`as`(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID), - DSL.field(JavaBaseConstants.COLUMN_NAME_EMITTED_AT) - .`as`(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT), - DSL.cast(null, timestampWithTimeZoneType) - .`as`(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT), - DSL.field(JavaBaseConstants.COLUMN_NAME_DATA) - .`as`(JavaBaseConstants.COLUMN_NAME_DATA), - DSL.cast(null, structType).`as`(JavaBaseConstants.COLUMN_NAME_AB_META) - ) - .from(DSL.table(DSL.name(namespace, tableName))) - ) - .getSQL(ParamType.INLINED) + createV2RawTableFromV1Table(rawTableName, namespace, tableName), ) } + protected open fun createV2RawTableFromV1Table( + rawTableName: Name, + namespace: String, + tableName: String + ) = + dslContext + .createTable(rawTableName) + .column( + JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, + SQLDataType.VARCHAR(36).nullable(false), + ) + .column( + JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, + timestampWithTimeZoneType.nullable(false), + ) + .column( + JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, + timestampWithTimeZoneType.nullable(true), + ) + .column(JavaBaseConstants.COLUMN_NAME_DATA, structType.nullable(false)) + .column(JavaBaseConstants.COLUMN_NAME_AB_META, structType.nullable(true)) + .`as`( + DSL.select( + DSL.field(JavaBaseConstants.COLUMN_NAME_AB_ID) + .`as`(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID), + DSL.field(JavaBaseConstants.COLUMN_NAME_EMITTED_AT) + .`as`(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT), + DSL.cast(null, timestampWithTimeZoneType) + .`as`(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT), + DSL.field(JavaBaseConstants.COLUMN_NAME_DATA) + .`as`(JavaBaseConstants.COLUMN_NAME_DATA), + DSL.cast(null, structType).`as`(JavaBaseConstants.COLUMN_NAME_AB_META), + ) + .from(DSL.table(DSL.name(namespace, tableName))), + ) + .getSQL(ParamType.INLINED) + override fun clearLoadedAt(streamId: StreamId): Sql { return of( - DSL.update(DSL.table(DSL.name(streamId.rawNamespace, streamId.rawName))) - .set( + dslContext + .update(DSL.table(DSL.name(streamId.rawNamespace, streamId.rawName))) + .set( DSL.field(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT), - DSL.inline(null as String?) + DSL.inline(null as String?), ) - .sql + .sql, ) } @VisibleForTesting fun selectFromRawTable( - schemaName: String?, - tableName: String?, + schemaName: String, + tableName: String, columns: LinkedHashMap, - metaColumns: Map?>, - condition: Condition?, + metaColumns: Map>, + condition: Condition, useExpensiveSaferCasting: Boolean ): SelectConditionStep { val dsl = dslContext @@ -389,10 +400,10 @@ constructor( @VisibleForTesting fun insertIntoFinalTable( - schemaName: String?, - tableName: String?, + schemaName: String, + tableName: String, columns: LinkedHashMap, - metaFields: Map?> + metaFields: Map> ): InsertValuesStepN { val dsl = dslContext return dsl.insertInto(DSL.table(DSL.quotedName(schemaName, tableName))) @@ -420,19 +431,19 @@ constructor( selectFromRawTable( rawSchema, rawTable, - streamConfig.columns!!, + streamConfig.columns, getFinalTableMetaColumns(false), rawTableCondition( - streamConfig.destinationSyncMode!!, - streamConfig.columns!!.containsKey(cdcDeletedAtColumn), - minRawTimestamp + streamConfig.destinationSyncMode, + streamConfig.columns.containsKey(cdcDeletedAtColumn), + minRawTimestamp, ), - useExpensiveSaferCasting - ) + useExpensiveSaferCasting, + ), ) val finalTableFields = - buildFinalTableFields(streamConfig.columns!!, getFinalTableMetaColumns(true)) - val rowNumber = getRowNumber(streamConfig.primaryKey, streamConfig.cursor!!) + buildFinalTableFields(streamConfig.columns, getFinalTableMetaColumns(true)) + val rowNumber = getRowNumber(streamConfig.primaryKey, streamConfig.cursor) val filteredRows = DSL.name(NUMBERED_ROWS_CTE_ALIAS) .`as`(DSL.select(DSL.asterisk(), rowNumber).from(rawTableRowsWithCast)) @@ -442,8 +453,8 @@ constructor( insertIntoFinalTable( finalSchema, finalTable, - streamConfig.columns!!, - getFinalTableMetaColumns(true) + streamConfig.columns, + getFinalTableMetaColumns(true), ) .select( DSL.with(rawTableRowsWithCast) @@ -451,8 +462,8 @@ constructor( .select(finalTableFields) .from(filteredRows) .where( - DSL.field(DSL.name(ROW_NUMBER_COLUMN_NAME), Int::class.java).eq(1) - ) // Can refer by CTE.field but no use since we don't strongly type + DSL.field(DSL.name(ROW_NUMBER_COLUMN_NAME), Int::class.java).eq(1), + ), // Can refer by CTE.field but no use since we don't strongly type // them. ) .getSQL(ParamType.INLINED) @@ -462,24 +473,24 @@ constructor( insertIntoFinalTable( finalSchema, finalTable, - streamConfig.columns!!, - getFinalTableMetaColumns(true) + streamConfig.columns, + getFinalTableMetaColumns(true), ) .select( DSL.with(rawTableRowsWithCast) .select(finalTableFields) - .from(rawTableRowsWithCast) + .from(rawTableRowsWithCast), ) .getSQL(ParamType.INLINED) val deleteStmt = deleteFromFinalTable( finalSchema, finalTable, - streamConfig.primaryKey!!, - streamConfig.cursor!! + streamConfig.primaryKey, + streamConfig.cursor, ) val deleteCdcDeletesStmt = - if (streamConfig.columns!!.containsKey(cdcDeletedAtColumn)) + if (streamConfig.columns.containsKey(cdcDeletedAtColumn)) deleteFromFinalTableCdcDeletes(finalSchema, finalTable) else "" val checkpointStmt = checkpointRawTable(rawSchema, rawTable, minRawTimestamp) @@ -493,19 +504,19 @@ constructor( insertStmtWithDedupe, deleteStmt, deleteCdcDeletesStmt, - checkpointStmt + checkpointStmt, ) } - protected fun createSchemaSql(namespace: String?): String { + protected fun createSchemaSql(namespace: String): String { val dsl = dslContext val createSchemaSql = dsl.createSchemaIfNotExists(DSL.quotedName(namespace)) return createSchemaSql.sql } protected fun createTableSql( - namespace: String?, - tableName: String?, + namespace: String, + tableName: String, columns: LinkedHashMap ): String { val dsl = dslContext @@ -515,15 +526,6 @@ constructor( return createTableSql.sql } - /** - * Subclasses may override this method to add additional indexes after their CREATE TABLE - * statement. This is useful if the destination's CREATE TABLE statement does not accept an - * index definition. - */ - protected open fun createIndexSql(stream: StreamConfig?, suffix: String?): List { - return emptyList() - } - protected fun beginTransaction(): String { return "BEGIN" } @@ -537,9 +539,9 @@ constructor( } private fun deleteFromFinalTable( - schemaName: String?, + schemaName: String, tableName: String, - primaryKeys: List, + primaryKeys: List, cursor: Optional ): String { val dsl = dslContext @@ -554,15 +556,15 @@ constructor( .from( DSL.select(airbyteRawId, rowNumber) .from(DSL.table(DSL.quotedName(schemaName, tableName))) - .asTable("airbyte_ids") + .asTable("airbyte_ids"), ) - .where(DSL.field(DSL.name(ROW_NUMBER_COLUMN_NAME)).ne(1)) - ) + .where(DSL.field(DSL.name(ROW_NUMBER_COLUMN_NAME)).ne(1)), + ), ) .getSQL(ParamType.INLINED) } - private fun deleteFromFinalTableCdcDeletes(schema: String?, tableName: String): String { + private fun deleteFromFinalTableCdcDeletes(schema: String, tableName: String): String { val dsl = dslContext return dsl.deleteFrom(DSL.table(DSL.quotedName(schema, tableName))) .where(DSL.field(DSL.quotedName(cdcDeletedAtColumn.name)).isNotNull()) @@ -570,8 +572,8 @@ constructor( } private fun checkpointRawTable( - schemaName: String?, - tableName: String?, + schemaName: String, + tableName: String, minRawTimestamp: Optional ): String { val dsl = dslContext @@ -580,13 +582,13 @@ constructor( extractedAtCondition = extractedAtCondition.and( DSL.field(DSL.name(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT)) - .gt(minRawTimestamp.get().toString()) + .gt(formatTimestampLiteral(minRawTimestamp.get())), ) } return dsl.update(DSL.table(DSL.quotedName(schemaName, tableName))) .set( DSL.field(DSL.quotedName(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT)), - currentTimestamp() + currentTimestamp(), ) .where(DSL.field(DSL.quotedName(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT)).isNull()) .and(extractedAtCondition) @@ -594,28 +596,26 @@ constructor( } protected open fun castedField( - field: Field<*>?, + field: Field<*>, type: AirbyteType, - alias: String?, useExpensiveSaferCasting: Boolean ): Field<*> { if (type is AirbyteProtocolType) { - return castedField(field, type, useExpensiveSaferCasting).`as`(DSL.quotedName(alias)) + return castedField(field, type, useExpensiveSaferCasting) } // Redshift SUPER can silently cast an array type to struct and vice versa. return when (type.typeName) { Struct.TYPE, - UnsupportedOneOf.TYPE -> DSL.cast(field, structType).`as`(DSL.quotedName(alias)) - Array.TYPE -> DSL.cast(field, arrayType).`as`(DSL.quotedName(alias)) - Union.TYPE -> - castedField(field, (type as Union).chooseType(), alias, useExpensiveSaferCasting) + UnsupportedOneOf.TYPE -> DSL.cast(field, structType) + Array.TYPE -> DSL.cast(field, arrayType) + Union.TYPE -> castedField(field, (type as Union).chooseType(), useExpensiveSaferCasting) else -> throw IllegalArgumentException("Unsupported AirbyteType: $type") } } protected open fun castedField( - field: Field<*>?, + field: Field<*>, type: AirbyteProtocolType, useExpensiveSaferCasting: Boolean ): Field<*> { @@ -626,8 +626,16 @@ constructor( return DSL.currentTimestamp() } + /** + * Some destinations (mysql) can't handle timestamps in ISO8601 format with 'Z' suffix. This + * method allows subclasses to format timestamps according to destination-specific needs. + */ + protected open fun formatTimestampLiteral(instant: Instant): String { + return instant.toString() + } + companion object { - protected const val ROW_NUMBER_COLUMN_NAME: String = "row_number" + const val ROW_NUMBER_COLUMN_NAME: String = "row_number" private const val TYPING_CTE_ALIAS = "intermediate_data" private const val NUMBERED_ROWS_CTE_ALIAS = "numbered_rows" } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcV1V2Migrator.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcV1V2Migrator.kt index f8db19595928..fa8fb0aec1d1 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcV1V2Migrator.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcV1V2Migrator.kt @@ -18,10 +18,10 @@ import java.util.* * Largely based on * [io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeV1V2Migrator]. */ -class JdbcV1V2Migrator( - private val namingConventionTransformer: NamingConventionTransformer, - private val database: JdbcDatabase, - private val databaseName: String +open class JdbcV1V2Migrator( + protected val namingConventionTransformer: NamingConventionTransformer, + protected val database: JdbcDatabase, + protected val databaseName: String? ) : BaseDestinationV1V2Migrator() { override fun doesAirbyteInternalNamespaceExist(streamConfig: StreamConfig?): Boolean { val retrievedSchema = @@ -67,9 +67,9 @@ class JdbcV1V2Migrator( override fun convertToV1RawName(streamConfig: StreamConfig): NamespacedTableName { @Suppress("deprecation") - val tableName = namingConventionTransformer.getRawTableName(streamConfig.id.originalName!!) + val tableName = namingConventionTransformer.getRawTableName(streamConfig.id.originalName) return NamespacedTableName( - namingConventionTransformer.getIdentifier(streamConfig.id.originalNamespace!!), + namingConventionTransformer.getIdentifier(streamConfig.id.originalNamespace), tableName ) } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/NoOpJdbcDestinationHandler.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/NoOpJdbcDestinationHandler.kt index bfa63ffb0758..8a90a62af3c1 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/NoOpJdbcDestinationHandler.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/NoOpJdbcDestinationHandler.kt @@ -14,7 +14,7 @@ import io.airbyte.integrations.base.destination.typing_deduping.StreamId import org.jooq.SQLDialect class NoOpJdbcDestinationHandler( - databaseName: String, + databaseName: String?, jdbcDatabase: JdbcDatabase, rawTableSchemaName: String, sqlDialect: SQLDialect @@ -40,11 +40,11 @@ class NoOpJdbcDestinationHandler( throw NotImplementedError("This JDBC Destination Handler does not support typing deduping") } - override fun toDestinationState(json: JsonNode?): DestinationState { + override fun toDestinationState(json: JsonNode): DestinationState { throw NotImplementedError("This JDBC Destination Handler does not support typing deduping") } - override fun toJdbcTypeName(airbyteType: AirbyteType?): String { + override fun toJdbcTypeName(airbyteType: AirbyteType): String { throw NotImplementedError("This JDBC Destination Handler does not support typing deduping") } } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/RawOnlySqlGenerator.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/RawOnlySqlGenerator.kt index 84b4dc6cb17b..e120fe05bf85 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/RawOnlySqlGenerator.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/RawOnlySqlGenerator.kt @@ -19,24 +19,24 @@ import org.jooq.SQLDialect * TyperDeduper classes. This implementation appeases that requirement but does not implement any * "final" table operations. */ -class RawOnlySqlGenerator(namingTransformer: NamingConventionTransformer) : +open class RawOnlySqlGenerator(namingTransformer: NamingConventionTransformer) : JdbcSqlGenerator(namingTransformer) { override val structType: DataType<*> get() { throw NotImplementedError("This Destination does not support final tables") } - override val arrayType: DataType<*>? + override val arrayType: DataType<*> get() { throw NotImplementedError("This Destination does not support final tables") } - override val widestType: DataType<*>? + override val widestType: DataType<*> get() { throw NotImplementedError("This Destination does not support final tables") } - override val dialect: SQLDialect? + override val dialect: SQLDialect get() { throw NotImplementedError("This Destination does not support final tables") } @@ -48,16 +48,16 @@ class RawOnlySqlGenerator(namingTransformer: NamingConventionTransformer) : throw NotImplementedError("This Destination does not support final tables") } - override fun buildAirbyteMetaColumn(columns: LinkedHashMap): Field<*>? { + override fun buildAirbyteMetaColumn(columns: LinkedHashMap): Field<*> { throw NotImplementedError("This Destination does not support final tables") } - override fun cdcDeletedAtNotNullCondition(): Condition? { + override fun cdcDeletedAtNotNullCondition(): Condition { throw NotImplementedError("This Destination does not support final tables") } override fun getRowNumber( - primaryKey: List?, + primaryKey: List, cursorField: Optional, ): Field { throw NotImplementedError("This Destination does not support final tables") diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/SerialStagingConsumerFactory.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/SerialStagingConsumerFactory.kt index 93980e30051d..3d9d2ef97271 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/SerialStagingConsumerFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/SerialStagingConsumerFactory.kt @@ -150,7 +150,7 @@ open class SerialStagingConsumerFactory { val tableName: String? if (useDestinationsV2Columns) { val streamId = parsedCatalog.getStream(abStream.namespace, streamName).id - outputSchema = streamId.rawNamespace!! + outputSchema = streamId.rawNamespace tableName = streamId.rawName } else { outputSchema = diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.kt index 30a0270345c6..0d4d25867b70 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.kt @@ -29,22 +29,14 @@ import org.jooq.impl.SQLDataType abstract class JdbcSqlGeneratorIntegrationTest : BaseSqlGeneratorIntegrationTest() { protected abstract val database: JdbcDatabase - get - protected abstract val structType: DataType<*> - get - private val timestampWithTimeZoneType: DataType<*> // TODO - can we move this class into db_destinations/testFixtures? - get() = sqlGenerator!!.toDialectType(AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE) - + get() = sqlGenerator.toDialectType(AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE) abstract override val sqlGenerator: JdbcSqlGenerator - get - protected abstract val sqlDialect: SQLDialect? - get - private val dslContext: DSLContext + val dslContext: DSLContext get() = DSL.using(sqlDialect) /** @@ -98,7 +90,7 @@ abstract class JdbcSqlGeneratorIntegrationTest { val streamId = parsedCatalog!!.getStream(abStream.namespace, streamName).id - outputSchema = streamId.rawNamespace!! - tableName = streamId.rawName!! + outputSchema = streamId.rawNamespace + tableName = streamId.rawName } JavaBaseConstants.DestinationColumns.LEGACY -> { outputSchema = diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt index a515b912c945..4ac3e5e4d6f0 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt @@ -66,7 +66,17 @@ constructor( } else { actualStreamConfig = originalStreamConfig } - streamConfigs.add(actualStreamConfig) + streamConfigs.add( + actualStreamConfig.copy( + // If we had collisions, we modified the stream name. + // Revert those changes. + id = + actualStreamConfig.id.copy( + originalName = stream.stream.name, + originalNamespace = stream.stream.namespace, + ), + ), + ) // Populate some interesting strings into the exception handler string deinterpolator addStringForDeinterpolation(actualStreamConfig.id.rawNamespace) @@ -75,29 +85,28 @@ constructor( addStringForDeinterpolation(actualStreamConfig.id.finalName) addStringForDeinterpolation(actualStreamConfig.id.originalNamespace) addStringForDeinterpolation(actualStreamConfig.id.originalName) - actualStreamConfig.columns!! - .keys - .forEach( - Consumer { columnId: ColumnId? -> - addStringForDeinterpolation(columnId!!.name) - addStringForDeinterpolation(columnId.originalName) - } - ) + actualStreamConfig.columns.keys.forEach( + Consumer { columnId: ColumnId? -> + addStringForDeinterpolation(columnId!!.name) + addStringForDeinterpolation(columnId.originalName) + } + ) // It's (unfortunately) possible for a cursor/PK to be declared that don't actually // exist in the // schema. // Add their strings explicitly. - actualStreamConfig.cursor!!.ifPresent { cursor: ColumnId -> + actualStreamConfig.cursor.ifPresent { cursor: ColumnId -> addStringForDeinterpolation(cursor.name) addStringForDeinterpolation(cursor.originalName) } - actualStreamConfig.primaryKey!!.forEach( + actualStreamConfig.primaryKey.forEach( Consumer { pk: ColumnId -> addStringForDeinterpolation(pk.name) addStringForDeinterpolation(pk.originalName) } ) } + LOGGER.info("Running sync with stream configs: $streamConfigs") return ParsedCatalog(streamConfigs) } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/SqlGenerator.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/SqlGenerator.kt index 2e82ac554efa..c37b25926467 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/SqlGenerator.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/SqlGenerator.kt @@ -34,7 +34,7 @@ interface SqlGenerator { * @param schema the schema to create * @return SQL to create the schema if it does not exist */ - fun createSchema(schema: String?): Sql + fun createSchema(schema: String): Sql /** * Generate a SQL statement to copy new data from the raw table into the final table. @@ -87,7 +87,7 @@ interface SqlGenerator { * @param tableName name of the v2 raw table * @return a string containing the necessary sql to migrate */ - fun migrateFromV1toV2(streamId: StreamId, namespace: String?, tableName: String?): Sql + fun migrateFromV1toV2(streamId: StreamId, namespace: String, tableName: String): Sql /** * Typically we need to create a soft reset temporary table and clear loaded at values diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/StreamConfig.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/StreamConfig.kt index 8f7d2876c447..c0fc5f7ce4a7 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/StreamConfig.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/StreamConfig.kt @@ -10,9 +10,9 @@ import kotlin.collections.LinkedHashMap data class StreamConfig( val id: StreamId, - val syncMode: SyncMode?, - val destinationSyncMode: DestinationSyncMode?, - val primaryKey: List?, - val cursor: Optional?, - val columns: LinkedHashMap? + val syncMode: SyncMode, + val destinationSyncMode: DestinationSyncMode, + val primaryKey: List, + val cursor: Optional, + val columns: LinkedHashMap, ) diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/StreamId.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/StreamId.kt index 1afb6199b436..011431c3d9c8 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/StreamId.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/StreamId.kt @@ -23,12 +23,12 @@ import kotlin.math.max * airbyte namespace. */ data class StreamId( - val finalNamespace: String?, - val finalName: String?, - val rawNamespace: String?, - val rawName: String?, - val originalNamespace: String?, - val originalName: String? + val finalNamespace: String, + val finalName: String, + val rawNamespace: String, + val rawName: String, + val originalNamespace: String, + val originalName: String, ) { /** * Most databases/warehouses use a `schema.name` syntax to identify tables. This is a diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/migrators/MinimumDestinationState.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/migrators/MinimumDestinationState.kt index ff9b8c8ac15e..77a482820172 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/migrators/MinimumDestinationState.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/migrators/MinimumDestinationState.kt @@ -42,7 +42,7 @@ interface MinimumDestinationState { @Suppress("UNCHECKED_CAST") override fun withSoftReset(needsSoftReset: Boolean): T { - return Impl(needsSoftReset = true) as T + return Impl(needsSoftReset = needsSoftReset) as T } } } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParserTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParserTest.kt index 563daa146ecb..537044ac53cc 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParserTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParserTest.kt @@ -8,6 +8,8 @@ import io.airbyte.commons.json.Jsons import io.airbyte.protocol.models.v0.AirbyteStream import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream +import io.airbyte.protocol.models.v0.DestinationSyncMode +import io.airbyte.protocol.models.v0.SyncMode import java.util.List import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions.assertAll @@ -57,7 +59,7 @@ internal class CatalogParserTest { invocation: InvocationOnMock -> val originalNamespace = invocation.getArgument(0) val originalName = (invocation.getArgument(1)) - val originalRawNamespace = (invocation.getArgument(1)) + val originalRawNamespace = (invocation.getArgument(2)) // emulate quoting logic that causes a name collision val quotedName = originalName.replace("bar".toRegex(), "") @@ -77,15 +79,25 @@ internal class CatalogParserTest { val parsedCatalog = parser.parseCatalog(catalog) assertAll( - { Assertions.assertEquals("a_abab_foofoo", parsedCatalog.streams.get(0).id.rawName) }, - { Assertions.assertEquals("foofoo", parsedCatalog.streams.get(0).id.finalName) }, { Assertions.assertEquals( - "a_abab_foofoo_3fd", - parsedCatalog.streams.get(1).id.rawName + StreamId("a", "foofoo", "airbyte_internal", "a_abab_foofoo", "a", "foobarfoo"), + parsedCatalog.streams[0].id, + ) + }, + { + Assertions.assertEquals( + StreamId( + "a", + "foofoo_3fd", + "airbyte_internal", + "a_abab_foofoo_3fd", + "a", + "foofoo" + ), + parsedCatalog.streams[1].id, ) }, - { Assertions.assertEquals("foofoo_3fd", parsedCatalog.streams.get(1).id.finalName) } ) } @@ -186,6 +198,8 @@ internal class CatalogParserTest { .withStream( AirbyteStream().withNamespace(namespace).withName(name).withJsonSchema(schema) ) + .withSyncMode(SyncMode.INCREMENTAL) + .withDestinationSyncMode(DestinationSyncMode.APPEND) } } } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.kt index f0985e2670c8..e5d0a5a3029f 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.kt @@ -932,11 +932,11 @@ class DefaultTyperDeduperTest { "overwrite_ns", "overwrite_stream" ), - null, + mock(), DestinationSyncMode.OVERWRITE, - null, - null, - null + mock(), + mock(), + mock() ) private val APPEND_STREAM_CONFIG = StreamConfig( @@ -948,11 +948,11 @@ class DefaultTyperDeduperTest { "append_ns", "append_stream" ), - null, + mock(), DestinationSyncMode.APPEND, - null, - null, - null + mock(), + mock(), + mock() ) private val DEDUPE_STREAM_CONFIG = StreamConfig( @@ -964,11 +964,11 @@ class DefaultTyperDeduperTest { "dedup_ns", "dedup_stream" ), - null, + mock(), DestinationSyncMode.APPEND_DEDUP, - null, - null, - null + mock(), + mock(), + mock() ) } } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2MigratorTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2MigratorTest.kt index 3e8509d6a984..646e32363baa 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2MigratorTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2MigratorTest.kt @@ -77,7 +77,7 @@ class DestinationV1V2MigratorTest { migrator: BaseDestinationV1V2Migrator<*>, expected: Boolean ) { - val config = StreamConfig(STREAM_ID, null, destinationSyncMode, null, null, null) + val config = StreamConfig(STREAM_ID, mock(), destinationSyncMode, mock(), mock(), mock()) val actual = migrator.shouldMigrate(config) Assertions.assertEquals(expected, actual) } @@ -86,7 +86,14 @@ class DestinationV1V2MigratorTest { @Throws(Exception::class) fun testMismatchedSchemaThrowsException() { val config = - StreamConfig(STREAM_ID, null, DestinationSyncMode.APPEND_DEDUP, null, null, null) + StreamConfig( + STREAM_ID, + mock(), + DestinationSyncMode.APPEND_DEDUP, + mock(), + mock(), + mock() + ) val migrator = makeMockMigrator(true, true, false, false, false) val exception = Assertions.assertThrows(UnexpectedSchemaException::class.java) { @@ -103,7 +110,14 @@ class DestinationV1V2MigratorTest { fun testMigrate() { val sqlGenerator = MockSqlGenerator() val stream = - StreamConfig(STREAM_ID, null, DestinationSyncMode.APPEND_DEDUP, null, null, null) + StreamConfig( + STREAM_ID, + mock(), + DestinationSyncMode.APPEND_DEDUP, + mock(), + mock(), + mock() + ) val handler = Mockito.mock(DestinationHandler::class.java) val sql = sqlGenerator.migrateFromV1toV2(STREAM_ID, "v1_raw_namespace", "v1_raw_table") // All is well @@ -123,7 +137,7 @@ class DestinationV1V2MigratorTest { } companion object { - private val STREAM_ID = StreamId("final", "final_table", "raw", "raw_table", null, null) + private val STREAM_ID = StreamId("final", "final_table", "raw", "raw_table", "fake", "fake") @Throws(Exception::class) fun makeMockMigrator( diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/MockSqlGenerator.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/MockSqlGenerator.kt index ac25371b61a8..a634e8fd6465 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/MockSqlGenerator.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/MockSqlGenerator.kt @@ -22,7 +22,7 @@ internal class MockSqlGenerator : SqlGenerator { throw RuntimeException() } - override fun createSchema(schema: String?): Sql { + override fun createSchema(schema: String): Sql { return of("CREATE SCHEMA $schema") } @@ -57,11 +57,7 @@ internal class MockSqlGenerator : SqlGenerator { ) } - override fun migrateFromV1toV2( - streamId: StreamId, - namespace: String?, - tableName: String? - ): Sql { + override fun migrateFromV1toV2(streamId: StreamId, namespace: String, tableName: String): Sql { return of( "MIGRATE TABLE " + java.lang.String.join(".", namespace, tableName) + diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt index 33ff1ae5e982..ac58a8433c95 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt @@ -99,7 +99,7 @@ abstract class BaseSqlGeneratorIntegrationTest /** Identical to [BaseTypingDedupingTest.getRawMetadataColumnNames]. */ @@ -368,7 +368,7 @@ abstract class BaseSqlGeneratorIntegrationTest repeatList(n: Int, list: List): List { return Collections.nCopies(n, list) .stream()