Skip to content

Commit

Permalink
[source-mysql-v2] 9903: add cdc options (#45866)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaohansong authored Sep 25, 2024
1 parent ab73310 commit cb493e6
Show file tree
Hide file tree
Showing 4 changed files with 343 additions and 212 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 561393ed-7e3a-4d0d-8b8b-90ded371754c
dockerImageTag: 0.0.8
dockerImageTag: 0.0.9
dockerRepository: airbyte/source-mysql-v2
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql-v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,79 @@ data object UserDefinedCursor : CursorConfiguration
"\"https://docs.airbyte.com/integrations/sources/mssql/#change-data-capture-cdc\"" +
"> change data capture feature</a>. This must be enabled on your database.",
)
data object CdcCursor : CursorConfiguration
class CdcCursor : CursorConfiguration {
@JsonProperty("initial_waiting_seconds")
@JsonSchemaTitle("Initial Waiting Time in Seconds (Advanced)")
@JsonSchemaDefault("300")
@JsonPropertyDescription(
"The amount of time the connector will wait when it launches to determine if there is new data to sync or not. Defaults to 300 seconds. Valid range: 120 seconds to 1200 seconds. Read about <a href=\" +\n" +
" \"\\\"https://docs.airbyte.com/integrations/sources/mysql/#change-data-capture-cdc\\\"\" +\n" +
" \"> initial waiting time</a>.",
)
@JsonSchemaInject(json = """{"order":1, "max": 1200, "min": 120, "always_show": true}""")
var initialWaitTimeInSeconds: Int? = 300

@JsonProperty("server_timezone")
@JsonSchemaTitle("Configured server timezone for the MySQL source (Advanced)")
@JsonPropertyDescription(
"Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.",
)
@JsonSchemaInject(json = """{"order":2,"always_show":true}""")
var serverTimezone: String? = null

@JsonIgnore
@ConfigurationBuilder(configurationPrefix = "invalid_cdc_behavior")
var invalidCdcBehavior = MicronautPropertiesFriendlyInvalidCdcBehaviorConfiguration()

@JsonIgnore var invalidCdcPositionBehaviorJson: InvalidCdcPositionBehavior? = null

@JsonSetter("invalid_cdc_behavior")
fun setInvalidCdcBehaviorValue(value: InvalidCdcPositionBehavior) {
invalidCdcPositionBehaviorJson = value
}

@JsonGetter("invalid_cdc_behavior")
@JsonSchemaTitle("Invalid CDC position behavior (Advanced)")
@JsonPropertyDescription(
"Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.",
)
@JsonSchemaInject(json = """{"order":3, "always_show": true}""")
fun getInvalidCdcBehaviorValue(): InvalidCdcPositionBehavior? =
invalidCdcPositionBehaviorJson ?: invalidCdcBehavior.asInvalidCdcPositionBehavior()

@JsonProperty("initial_load_timeout_hours")
@JsonSchemaTitle("Initial Load Timeout in Hours (Advanced)")
@JsonPropertyDescription(
"The amount of time an initial load is allowed to continue for before catching up on CDC logs.",
)
@JsonSchemaDefault("8")
@JsonSchemaInject(json = """{"order":4, "max": 24, "min": 4,"always_show": true}""")
var initialLoadTimeoutHours: Int? = 8
}

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "invalid_cdc_cursor_position_behavior")
@JsonSubTypes(
JsonSubTypes.Type(value = FailSync::class, name = "fail_sync"),
JsonSubTypes.Type(value = ResyncData::class, name = "resync")
)
@JsonSchemaTitle("Update Method")
sealed interface InvalidCdcPositionBehavior

@JsonSchemaTitle("Fail sync") data object FailSync : InvalidCdcPositionBehavior

@JsonSchemaTitle("Re-sync data") data object ResyncData : InvalidCdcPositionBehavior

@ConfigurationProperties("$CONNECTOR_CONFIG_PREFIX.invalid_cdc_cursor_position_behavior")
class MicronautPropertiesFriendlyInvalidCdcBehaviorConfiguration {
var invalidCdcBehavior: String = "fail_sync"

fun asInvalidCdcPositionBehavior(): InvalidCdcPositionBehavior =
when (invalidCdcBehavior) {
"fail_sync" -> FailSync
"resync" -> ResyncData
else -> throw ConfigErrorException("invalid value $invalidCdcBehavior")
}
}

@ConfigurationProperties("$CONNECTOR_CONFIG_PREFIX.cursor")
class MicronautPropertiesFriendlyCursorConfiguration {
Expand All @@ -345,7 +417,7 @@ class MicronautPropertiesFriendlyCursorConfiguration {
fun asCursorConfiguration(): CursorConfiguration =
when (cursorMethod) {
"user_defined" -> UserDefinedCursor
"cdc" -> CdcCursor
"cdc" -> CdcCursor()
else -> throw ConfigErrorException("invalid value $cursorMethod")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class MysqlCdcIntegrationTest {
lateinit var dbContainer: MySQLContainer<*>

fun config(): MysqlSourceConfigurationJsonObject =
MysqlContainerFactory.config(dbContainer).apply { setCursorMethodValue(CdcCursor) }
MysqlContainerFactory.config(dbContainer).apply { setCursorMethodValue(CdcCursor()) }

val connectionFactory: JdbcConnectionFactory by lazy {
JdbcConnectionFactory(MysqlSourceConfigurationFactory().make(config()))
Expand Down
Loading

0 comments on commit cb493e6

Please sign in to comment.