Skip to content

Commit

Permalink
Java CDK updates for mysql (#36929)
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao authored May 3, 2024
1 parent b9dc205 commit 2c0cd6c
Show file tree
Hide file tree
Showing 27 changed files with 550 additions and 352 deletions.
4 changes: 2 additions & 2 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :--------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 0.31.8 | 2024-05-03 | [\#36932](https://github.com/airbytehq/airbyte/pull/36932) | CDK changes on resumable full refresh |
| 0.31.7 | 2024-05-02 | [\#36910](https://github.com/airbytehq/airbyte/pull/36910) | changes for destination-snowflake |
| 0.32.0 | 2024-05-03 | [\#36929](https://github.com/airbytehq/airbyte/pull/36929) | Destinations: Assorted DV2 changes for mysql |
| 0.31.7 | 2024-05-02 | [\#36910](https://github.com/airbytehq/airbyte/pull/36910) | changes for destination-snowflake |
| 0.31.6 | 2024-05-02 | [\#37746](https://github.com/airbytehq/airbyte/pull/37746) | debuggability improvements. |
| 0.31.5 | 2024-04-30 | [\#37758](https://github.com/airbytehq/airbyte/pull/37758) | Set debezium max retries to zero |
| 0.31.4 | 2024-04-30 | [\#37754](https://github.com/airbytehq/airbyte/pull/37754) | Add DebeziumEngine notification log |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import io.airbyte.protocol.models.v0.ConnectorSpecification
import java.util.function.Consumer

abstract class SpecModifyingDestination(private val destination: Destination) : Destination {
override val isV2Destination: Boolean = destination.isV2Destination

@Throws(Exception::class)
abstract fun modifySpec(originalSpec: ConnectorSpecification): ConnectorSpecification

Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.31.8
version=0.32.0
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
*/
package io.airbyte.cdk.testutils

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ObjectNode
import com.google.common.collect.ImmutableMap
import io.airbyte.cdk.db.ContextQueryFunction
import io.airbyte.cdk.db.Database
Expand Down Expand Up @@ -55,16 +55,16 @@ protected constructor(val container: C) : AutoCloseable {
@JvmField protected val databaseId: Int = nextDatabaseId.getAndIncrement()
@JvmField
protected val containerId: Int =
containerUidToId!!.computeIfAbsent(container.containerId) { _: String? ->
nextContainerId!!.getAndIncrement()
containerUidToId.computeIfAbsent(container.containerId) { _: String? ->
nextContainerId.getAndIncrement()
}!!
private val dateFormat: DateFormat = SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")

init {
LOGGER!!.info(formatLogLine("creating database " + databaseName))
LOGGER!!.info(formatLogLine("creating database $databaseName"))
}

protected fun formatLogLine(logLine: String?): String? {
protected fun formatLogLine(logLine: String?): String {
val retVal = "TestDatabase databaseId=$databaseId, containerId=$containerId - $logLine"
return retVal
}
Expand Down Expand Up @@ -100,7 +100,7 @@ protected constructor(val container: C) : AutoCloseable {
* object. This typically entails at least a CREATE DATABASE and a CREATE USER. Also Initializes
* the [DataSource] and [DSLContext] owned by this object.
*/
open fun initialized(): T? {
open fun initialized(): T {
inContainerBootstrapCmd().forEach { cmds: Stream<String> -> this.execInContainer(cmds) }
this.dataSource =
DataSourceFactory.create(
Expand Down Expand Up @@ -165,12 +165,12 @@ protected constructor(val container: C) : AutoCloseable {
databaseName
)

val database: Database?
val database: Database
get() = Database(getDslContext())

protected fun execSQL(sql: Stream<String>) {
try {
database!!.query<Any?> { ctx: DSLContext? ->
database.query<Any?> { ctx: DSLContext? ->
sql.forEach { statement: String? ->
LOGGER!!.info("executing SQL statement {}", statement)
ctx!!.execute(statement)
Expand Down Expand Up @@ -228,12 +228,12 @@ protected constructor(val container: C) : AutoCloseable {

@Throws(SQLException::class)
fun <X> query(transform: ContextQueryFunction<X>): X? {
return database!!.query(transform)
return database.query(transform)
}

@Throws(SQLException::class)
fun <X> transaction(transform: ContextQueryFunction<X>): X? {
return database!!.transaction(transform)
return database.transaction(transform)
}

/** Returns a builder for the connector config object. */
Expand All @@ -245,7 +245,7 @@ protected constructor(val container: C) : AutoCloseable {
return configBuilder().withHostAndPort().withCredentials().withDatabase()
}

fun integrationTestConfigBuilder(): B? {
fun integrationTestConfigBuilder(): B {
return configBuilder().withResolvedHostAndPort().withCredentials().withDatabase()
}

Expand All @@ -260,8 +260,8 @@ protected constructor(val container: C) : AutoCloseable {
) {
protected val builder: ImmutableMap.Builder<Any, Any> = ImmutableMap.builder()

fun build(): JsonNode {
return Jsons.jsonNode(builder.build())
fun build(): ObjectNode {
return Jsons.jsonNode(builder.build()) as ObjectNode
}

@Suppress("UNCHECKED_CAST")
Expand Down Expand Up @@ -314,7 +314,7 @@ protected constructor(val container: C) : AutoCloseable {

private val nextDatabaseId: AtomicInteger = AtomicInteger(0)

private val nextContainerId: AtomicInteger? = AtomicInteger(0)
private val containerUidToId: MutableMap<String?, Int?>? = ConcurrentHashMap()
private val nextContainerId: AtomicInteger = AtomicInteger(0)
private val containerUidToId: MutableMap<String?, Int?> = ConcurrentHashMap()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
namingResolver,
sqlOperations
)
protected val configSchemaKey: String
get() = "schema"
protected open val configSchemaKey: String = "schema"

/**
* If the destination should always disable type dedupe, override this method to return true. We
Expand Down Expand Up @@ -196,6 +195,11 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
rawTableSchema: String
): JdbcDestinationHandler<DestinationState>

protected open fun getV1V2Migrator(
database: JdbcDatabase,
databaseName: String
): DestinationV1V2Migrator = JdbcV1V2Migrator(namingResolver, database, databaseName)

/**
* Provide any migrations that the destination needs to run. Most destinations will need to
* provide an instande of
Expand Down Expand Up @@ -306,6 +310,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
typerDeduper,
getDataTransformer(parsedCatalog, defaultNamespace),
optimalBatchSizeBytes,
parsedCatalog,
)
}

Expand All @@ -317,7 +322,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
val sqlGenerator = getSqlGenerator(config)
val databaseName = getDatabaseName(config)
val v2TableMigrator = NoopV2TableMigrator()
val migrator = JdbcV1V2Migrator(namingResolver, database, databaseName)
val migrator = getV1V2Migrator(database, databaseName)
val destinationHandler: DestinationHandler<DestinationState> =
getDestinationHandler(
databaseName,
Expand Down Expand Up @@ -434,7 +439,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
if (attemptInsert) {
sqlOps.insertRecords(
database,
java.util.List.of(dummyRecord),
listOf(dummyRecord),
outputSchema,
outputTableName,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseF
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.RecordWriter
import io.airbyte.commons.json.Jsons
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import io.airbyte.integrations.base.destination.typing_deduping.StreamId.Companion.concatenateRawTableName
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper
import io.airbyte.protocol.models.v0.*
Expand Down Expand Up @@ -54,6 +56,7 @@ object JdbcBufferedConsumerFactory {

const val DEFAULT_OPTIMAL_BATCH_SIZE_FOR_FLUSH = 25 * 1024 * 1024L

/** @param parsedCatalog Nullable for v1 destinations. Required for v2 destinations. */
fun createAsync(
outputRecordCollector: Consumer<AirbyteMessage>,
database: JdbcDatabase,
Expand All @@ -65,9 +68,16 @@ object JdbcBufferedConsumerFactory {
typerDeduper: TyperDeduper,
dataTransformer: StreamAwareDataTransformer = IdentityDataTransformer(),
optimalBatchSizeBytes: Long = DEFAULT_OPTIMAL_BATCH_SIZE_FOR_FLUSH,
parsedCatalog: ParsedCatalog? = null
): SerializedAirbyteMessageConsumer {
val writeConfigs =
createWriteConfigs(namingResolver, config, catalog, sqlOperations.isSchemaRequired)
createWriteConfigs(
namingResolver,
config,
catalog,
sqlOperations.isSchemaRequired,
parsedCatalog
)
return AsyncStreamConsumer(
outputRecordCollector,
onStartFunction(database, sqlOperations, writeConfigs, typerDeduper),
Expand All @@ -89,19 +99,28 @@ object JdbcBufferedConsumerFactory {
namingResolver: NamingConventionTransformer,
config: JsonNode,
catalog: ConfiguredAirbyteCatalog?,
schemaRequired: Boolean
schemaRequired: Boolean,
parsedCatalog: ParsedCatalog?
): List<WriteConfig> {
if (schemaRequired) {
Preconditions.checkState(
config.has("schema"),
"jdbc destinations must specify a schema."
)
}
return catalog!!
.streams
.stream()
.map(toWriteConfig(namingResolver, config, schemaRequired))
.collect(Collectors.toList())
return if (parsedCatalog == null) {
catalog!!
.streams
.stream()
.map(toWriteConfig(namingResolver, config, schemaRequired))
.collect(Collectors.toList())
} else {
// we should switch this to kotlin-style list processing, but meh for now
parsedCatalog.streams
.stream()
.map(parsedStreamToWriteConfig(namingResolver))
.collect(Collectors.toList())
}
}

private fun toWriteConfig(
Expand Down Expand Up @@ -150,6 +169,27 @@ object JdbcBufferedConsumerFactory {
}
}

private fun parsedStreamToWriteConfig(
namingResolver: NamingConventionTransformer
): Function<StreamConfig, WriteConfig> {
return Function { streamConfig: StreamConfig ->
// TODO We should probably replace WriteConfig with StreamConfig?
// The only thing I'm not sure about is the tmpTableName thing,
// but otherwise it's a strict improvement (avoids people accidentally
// recomputing the table names, instead of just treating the output of
// CatalogParser as canonical).
WriteConfig(
streamConfig.id.originalName,
streamConfig.id.originalNamespace,
streamConfig.id.rawNamespace,
@Suppress("deprecation")
namingResolver.getTmpTableName(streamConfig.id.rawNamespace),
streamConfig.id.rawName,
streamConfig.destinationSyncMode,
)
}
}

/**
* Defer to the [AirbyteStream]'s namespace. If this is not set, use the destination's default
* schema. This namespace is source-provided, and can be potentially empty.
Expand All @@ -160,7 +200,7 @@ object JdbcBufferedConsumerFactory {
private fun getOutputSchema(
stream: AirbyteStream,
defaultDestSchema: String,
namingResolver: NamingConventionTransformer
namingResolver: NamingConventionTransformer,
): String {
return if (isDestinationV2) {
namingResolver.getNamespace(
Expand Down Expand Up @@ -252,8 +292,10 @@ object JdbcBufferedConsumerFactory {
records: List<PartialAirbyteMessage> ->
require(pairToWriteConfig.containsKey(pair)) {
String.format(
"Message contained record from a stream that was not in the catalog. \ncatalog: %s",
Jsons.serialize(catalog)
"Message contained record from a stream that was not in the catalog. \ncatalog: %s, \nstream identifier: %s\nkeys: %s",
Jsons.serialize(catalog),
pair,
pairToWriteConfig.keys
)
}
val writeConfig = pairToWriteConfig.getValue(pair)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,12 @@ abstract class JdbcSqlOperations : SqlOperations {
val uuid = UUID.randomUUID().toString()

val jsonData = record.serialized
val airbyteMeta = Jsons.serialize(record.record!!.meta)
val airbyteMeta =
if (record.record!!.meta == null) {
"{\"changes\":[]}"
} else {
Jsons.serialize(record.record!!.meta)
}
val extractedAt =
Timestamp.from(Instant.ofEpochMilli(record.record!!.emittedAt))
if (isDestinationV2) {
Expand Down
Loading

0 comments on commit 2c0cd6c

Please sign in to comment.