Skip to content

Commit

Permalink
[source-mysql] 7084 source mysql regression in 39x date parsing errors (
Browse files Browse the repository at this point in the history
  • Loading branch information
rodireich authored Dec 19, 2024
1 parent f0d6f9f commit 8819bd1
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 13 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.9.2
dockerImageTag: 3.9.3
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ import io.airbyte.cdk.util.Jsons
import io.micronaut.context.annotation.Primary
import jakarta.inject.Singleton
import java.time.LocalDateTime
import java.time.OffsetDateTime
import java.time.ZoneOffset.UTC
import java.time.format.DateTimeFormatter
import java.time.format.DateTimeFormatterBuilder
import java.time.format.DateTimeParseException
import java.time.temporal.ChronoField
import java.util.*
import java.util.concurrent.ConcurrentHashMap
Expand Down Expand Up @@ -350,15 +353,29 @@ class MysqlJdbcPartitionFactory(
}
LeafAirbyteSchemaType.TIMESTAMP_WITH_TIMEZONE -> {
val timestampInStatePattern = "yyyy-MM-dd'T'HH:mm:ss"
val formatter =
DateTimeFormatterBuilder()
.appendPattern(timestampInStatePattern)
.optionalStart()
.appendFraction(ChronoField.NANO_OF_SECOND, 1, 9, true)
.optionalEnd()
.optionalStart()
.optionalStart()
.appendLiteral(' ')
.optionalEnd()
.appendOffset("+HH:mm", "Z")
.optionalEnd()
.toFormatter()

try {
val formatter: DateTimeFormatter =
DateTimeFormatter.ofPattern(timestampInStatePattern)
Jsons.valueToTree(
LocalDateTime.parse(stateValue, formatter)
.minusDays(1)
.atOffset(java.time.ZoneOffset.UTC)
.format(OffsetDateTimeCodec.formatter)
)
val offsetDateTime =
try {
OffsetDateTime.parse(stateValue, formatter)
} catch (_: DateTimeParseException) {
// if no offset exists, we assume it's UTC
LocalDateTime.parse(stateValue, formatter).atOffset(UTC)
}
Jsons.valueToTree(offsetDateTime.format(OffsetDateTimeCodec.formatter))
} catch (_: RuntimeException) {
// Resolve to use the new format.
Jsons.valueToTree<JsonNode>(stateValue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import kotlin.test.assertNull
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource

class MysqlJdbcPartitionFactoryTest {
companion object {
Expand Down Expand Up @@ -219,13 +221,28 @@ class MysqlJdbcPartitionFactoryTest {
assertTrue(jdbcPartition is MysqlJdbcCursorIncrementalPartition)
}

@Test
fun testResumeFromCompletedCursorBasedReadTimestamp() {
@ParameterizedTest
@CsvSource(
"'2025-09-03T05:23:35', '2025-09-03T05:23:35.000000Z'",
"'2025-09-03T05:23:35.0', '2025-09-03T05:23:35.000000Z'",
"'2025-09-03T05:23:35.1', '2025-09-03T05:23:35.100000Z'",
"'2025-09-03T05:23:35.123', '2025-09-03T05:23:35.123000Z'",
"'2025-09-03T05:23:35.123456789', '2025-09-03T05:23:35.123456Z'",
"'2025-09-03T05:23:35.123+00:00', '2025-09-03T05:23:35.123000Z'",
"'2025-09-03T05:23:35.123+00:00', '2025-09-03T05:23:35.123000Z'",
"'2025-09-03T05:23:35Z', '2025-09-03T05:23:35.000000Z'",
"'2025-09-03T05:23:35 Z', '2025-09-03T05:23:35.000000Z'",
"'2025-09-03T05:23:35.12345 +12:34', '2025-09-03T05:23:35.123450+12:34'",
)
fun testResumeFromCompletedCursorBasedReadTimestamp(
cursorVal: String,
expectedLowerBound: String
) {
val incomingStateValue: OpaqueStateValue =
Jsons.readTree(
"""
{
"cursor": "2025-09-03T05:23:35",
"cursor": "$cursorVal",
"version": 2,
"state_type": "cursor_based",
"stream_name": "stream2",
Expand All @@ -245,7 +262,7 @@ class MysqlJdbcPartitionFactoryTest {
assertTrue(jdbcPartition is MysqlJdbcCursorIncrementalPartition)

assertEquals(
Jsons.valueToTree("2025-09-02T05:23:35.000000Z"),
Jsons.valueToTree("$expectedLowerBound"),
(jdbcPartition as MysqlJdbcCursorIncrementalPartition).cursorLowerBound
)
}
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ Any database or table encoding combination of charset and collation is supported

| Version | Date | Pull Request | Subject |
|:---------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.9.3 | 2024-12-18 | [49932](https://github.com/airbytehq/airbyte/pull/49932) | Backward compatibility for saved states with timestamp that include timezone offset. |
| 3.9.2 | 2024-12-16 | [49830](https://github.com/airbytehq/airbyte/pull/49830) | Fixes an issue with auto generated tinyint columns |
| 3.9.1 | 2024-12-12 | [49456](https://github.com/airbytehq/airbyte/pull/49456) | Bump version to re-relase |
| 3.9.0 | 2024-12-12 | [49423](https://github.com/airbytehq/airbyte/pull/49423) | Promoting release candidate 3.9.0-rc.27 to a main version. |
Expand Down

0 comments on commit 8819bd1

Please sign in to comment.