Skip to content

Commit

Permalink
source-mysql: normalize class names (#49939)
Browse files Browse the repository at this point in the history
  • Loading branch information
postamar authored Dec 19, 2024
1 parent 62b63a7 commit cc383cf
Show file tree
Hide file tree
Showing 35 changed files with 657 additions and 1,390 deletions.
6 changes: 2 additions & 4 deletions airbyte-integrations/connectors/source-mysql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,19 @@ plugins {
}

application {
mainClass = 'io.airbyte.integrations.source.mysql.MysqlSource'
mainClass = 'io.airbyte.integrations.source.mysql.MySqlSource'
}

airbyteBulkConnector {
core = 'extract'
toolkits = ['extract-jdbc', 'extract-cdc']
cdk = 'local'
cdk = '0.226'
}

dependencies {
implementation 'com.mysql:mysql-connector-j:9.1.0'
implementation 'org.codehaus.plexus:plexus-utils:4.0.0'
implementation 'io.debezium:debezium-connector-mysql'

testImplementation platform('org.testcontainers:testcontainers-bom:1.20.2')
testImplementation 'org.testcontainers:mysql'
testImplementation("io.mockk:mockk:1.12.0")
}
6 changes: 3 additions & 3 deletions airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
data:
ab_internal:
ql: 200
sl: 0
ql: 400
sl: 300
allowedHosts:
hosts:
- ${host}
- ${tunnel_method.tunnel_host}
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.9.3
dockerImageTag: 3.9.4
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package io.airbyte.integrations.source.mysql

import io.airbyte.cdk.AirbyteSourceRunner

object MysqlSource {
object MySqlSource {
@JvmStatic
fun main(args: Array<String>) {
AirbyteSourceRunner.run(*args)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.mysql.cdc.converters
package io.airbyte.integrations.source.mysql

import io.debezium.spi.converter.CustomConverter
import io.debezium.spi.converter.RelationalColumn
import java.util.*
import org.apache.kafka.connect.data.SchemaBuilder

class MySQLBooleanConverter : CustomConverter<SchemaBuilder, RelationalColumn> {
class MySqlSourceCdcBooleanConverter : CustomConverter<SchemaBuilder, RelationalColumn> {
override fun configure(props: Properties?) {}

private val BOOLEAN_TYPES = arrayOf("BOOLEAN", "BOOL", "TINYINT")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import io.airbyte.cdk.discover.Field
import io.airbyte.cdk.read.Stream
import io.airbyte.cdk.util.Jsons

data class MysqlCdcInitialSnapshotStateValue(
data class MySqlSourceCdcInitialSnapshotStateValue(
@JsonProperty("pk_val") val pkVal: String? = null,
@JsonProperty("pk_name") val pkName: String? = null,
@JsonProperty("version") val version: Int? = null,
Expand All @@ -25,7 +25,7 @@ data class MysqlCdcInitialSnapshotStateValue(
/** Value representing the completion of a FULL_REFRESH snapshot. */
fun getSnapshotCompletedState(stream: Stream): OpaqueStateValue =
Jsons.valueToTree(
MysqlCdcInitialSnapshotStateValue(
MySqlSourceCdcInitialSnapshotStateValue(
streamName = stream.name,
cursorField = listOf(),
streamNamespace = stream.namespace
Expand All @@ -42,7 +42,7 @@ data class MysqlCdcInitialSnapshotStateValue(
true -> Jsons.nullNode()
false ->
Jsons.valueToTree(
MysqlCdcInitialSnapshotStateValue(
MySqlSourceCdcInitialSnapshotStateValue(
pkName = primaryKeyField.id,
pkVal = primaryKeyCheckpoint.first().asText(),
stateType = "primary_key",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import io.airbyte.cdk.discover.CdcStringMetaFieldType
import io.airbyte.cdk.discover.FieldType
import io.airbyte.cdk.discover.MetaField

enum class MysqlCdcMetaFields(
enum class MySqlSourceCdcMetaFields(
override val type: FieldType,
) : MetaField {
CDC_CURSOR(CdcIntegerMetaFieldType),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.mysql.cdc.converters
package io.airbyte.integrations.source.mysql

import io.debezium.spi.converter.CustomConverter
import io.debezium.spi.converter.RelationalColumn
import java.util.*
import org.apache.kafka.connect.data.SchemaBuilder

class MySQLNumericConverter : CustomConverter<SchemaBuilder, RelationalColumn> {
class MySqlSourceCdcNumericConverter : CustomConverter<SchemaBuilder, RelationalColumn> {
override fun configure(props: Properties?) {}

private val NUMERIC_TYPES = arrayOf("FLOAT", "DOUBLE", "DECIMAL")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.mysql.cdc
package io.airbyte.integrations.source.mysql

import kotlin.io.path.Path
import kotlin.io.path.extension

/** WAL position datum for MySQL. */
data class MySqlPosition(val fileName: String, val position: Long) : Comparable<MySqlPosition> {
data class MySqlSourceCdcPosition(val fileName: String, val position: Long) :
Comparable<MySqlSourceCdcPosition> {

/**
* Numerical value encoded in the extension of the binlog file name.
Expand All @@ -31,5 +32,6 @@ data class MySqlPosition(val fileName: String, val position: Long) : Comparable<
val cursorValue: Long
get() = (fileExtension.toLong() shl Int.SIZE_BITS) or position

override fun compareTo(other: MySqlPosition): Int = cursorValue.compareTo(other.cursorValue)
override fun compareTo(other: MySqlSourceCdcPosition): Int =
cursorValue.compareTo(other.cursorValue)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.mysql.cdc.converters
package io.airbyte.integrations.source.mysql

import io.airbyte.cdk.jdbc.converters.DateTimeConverter
import io.debezium.spi.converter.CustomConverter
Expand All @@ -24,7 +24,7 @@ import org.apache.kafka.connect.data.SchemaBuilder
* MySqlCdcProperties#commonProperties(JdbcDatabase)} (If you don't rename, a test would still fail
* but it might be tricky to figure out where to change the property name)
*/
class MySQLDateTimeConverter : CustomConverter<SchemaBuilder, RelationalColumn> {
class MySqlSourceCdcTemporalConverter : CustomConverter<SchemaBuilder, RelationalColumn> {

private val DATE_TYPES = arrayOf("DATE", "DATETIME", "TIME", "TIMESTAMP")
override fun configure(props: Properties?) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import java.time.Duration

private val log = KotlinLogging.logger {}

/** Mysql-specific implementation of [SourceConfiguration] */
data class MysqlSourceConfiguration(
/** MySQL-specific implementation of [SourceConfiguration] */
data class MySqlSourceConfiguration(
override val realHost: String,
override val realPort: Int,
override val sshTunnel: SshTunnelMethodConfiguration?,
Expand All @@ -41,16 +41,16 @@ data class MysqlSourceConfiguration(
) : JdbcSourceConfiguration, CdcSourceConfiguration {
override val global = incrementalConfiguration is CdcIncrementalConfiguration

/** Required to inject [MysqlSourceConfiguration] directly. */
/** Required to inject [MySqlSourceConfiguration] directly. */
@Factory
private class MicronautFactory {
@Singleton
fun mysqlSourceConfig(
factory:
SourceConfigurationFactory<
MysqlSourceConfigurationSpecification, MysqlSourceConfiguration>,
supplier: ConfigurationSpecificationSupplier<MysqlSourceConfigurationSpecification>,
): MysqlSourceConfiguration = factory.make(supplier.get())
MySqlSourceConfigurationSpecification, MySqlSourceConfiguration>,
supplier: ConfigurationSpecificationSupplier<MySqlSourceConfigurationSpecification>,
): MySqlSourceConfiguration = factory.make(supplier.get())
}
}

Expand All @@ -71,14 +71,14 @@ enum class InvalidCdcCursorPositionBehavior {
}

@Singleton
class MysqlSourceConfigurationFactory @Inject constructor(val featureFlags: Set<FeatureFlag>) :
SourceConfigurationFactory<MysqlSourceConfigurationSpecification, MysqlSourceConfiguration> {
class MySqlSourceConfigurationFactory @Inject constructor(val featureFlags: Set<FeatureFlag>) :
SourceConfigurationFactory<MySqlSourceConfigurationSpecification, MySqlSourceConfiguration> {

constructor() : this(emptySet())

override fun makeWithoutExceptionHandling(
pojo: MysqlSourceConfigurationSpecification,
): MysqlSourceConfiguration {
pojo: MySqlSourceConfigurationSpecification,
): MySqlSourceConfiguration {
val realHost: String = pojo.host
val realPort: Int = pojo.port
val sshTunnel: SshTunnelMethodConfiguration? = pojo.getTunnelMethodValue()
Expand Down Expand Up @@ -115,20 +115,21 @@ class MysqlSourceConfigurationFactory @Inject constructor(val featureFlags: Set<
"SSL encryption or an SSH tunnel."
)
}
MysqlJdbcEncryption(sslMode = SSLMode.PREFERRED)
MySqlSourceEncryption(sslMode = MySqlSourceEncryption.SslMode.PREFERRED)
}
is EncryptionRequired -> MysqlJdbcEncryption(sslMode = SSLMode.REQUIRED)
is EncryptionRequired ->
MySqlSourceEncryption(sslMode = MySqlSourceEncryption.SslMode.REQUIRED)
is SslVerifyCertificate ->
MysqlJdbcEncryption(
sslMode = SSLMode.VERIFY_CA,
MySqlSourceEncryption(
sslMode = MySqlSourceEncryption.SslMode.VERIFY_CA,
caCertificate = encryption.sslCertificate,
clientCertificate = encryption.sslClientCertificate,
clientKey = encryption.sslClientKey,
clientKeyPassword = encryption.sslClientPassword
)
is SslVerifyIdentity ->
MysqlJdbcEncryption(
sslMode = SSLMode.VERIFY_IDENTITY,
MySqlSourceEncryption(
sslMode = MySqlSourceEncryption.SslMode.VERIFY_IDENTITY,
caCertificate = encryption.sslCertificate,
clientCertificate = encryption.sslClientCertificate,
clientKey = encryption.sslClientKey,
Expand Down Expand Up @@ -178,7 +179,7 @@ class MysqlSourceConfigurationFactory @Inject constructor(val featureFlags: Set<
},
)
}
return MysqlSourceConfiguration(
return MySqlSourceConfiguration(
realHost = realHost,
realPort = realPort,
sshTunnel = sshTunnel,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@ import io.micronaut.context.annotation.ConfigurationProperties
import jakarta.inject.Singleton

/**
* The object which is mapped to the Mysql source configuration JSON.
* The object which is mapped to the MySQL source configuration JSON.
*
* Use [MysqlSourceConfiguration] instead wherever possible. This object also allows injecting
* Use [MySqlSourceConfiguration] instead wherever possible. This object also allows injecting
* values through Micronaut properties, this is made possible by the classes named
* `MicronautPropertiesFriendly.*`.
*/
@JsonSchemaTitle("Mysql Source Spec")
@JsonSchemaTitle("MySQL Source Spec")
@JsonPropertyOrder(
value = ["host", "port", "database", "username", "replication_method"],
)
@Singleton
@ConfigurationProperties(CONNECTOR_CONFIG_PREFIX)
@SuppressFBWarnings(value = ["NP_NONNULL_RETURN_VIOLATION"], justification = "Micronaut DI")
class MysqlSourceConfigurationSpecification : ConfigurationSpecification() {
class MySqlSourceConfigurationSpecification : ConfigurationSpecification() {
@JsonProperty("host")
@JsonSchemaTitle("Host")
@JsonSchemaInject(json = """{"order":1}""")
Expand Down Expand Up @@ -320,7 +320,7 @@ data object UserDefinedCursor : CursorMethodConfiguration
@JsonSchemaTitle("Read Changes using Change Data Capture (CDC)")
@JsonSchemaDescription(
"<i>Recommended</i> - " +
"Incrementally reads new inserts, updates, and deletes using Mysql's <a href=" +
"Incrementally reads new inserts, updates, and deletes using MySQL's <a href=" +
"\"https://docs.airbyte.com/integrations/sources/mssql/#change-data-capture-cdc\"" +
"> change data capture feature</a>. This must be enabled on your database.",
)
Expand Down
Loading

0 comments on commit cc383cf

Please sign in to comment.