diff --git a/airbyte-integrations/connectors/source-mysql/metadata.yaml b/airbyte-integrations/connectors/source-mysql/metadata.yaml index 18297abf395a..1664f0159176 100644 --- a/airbyte-integrations/connectors/source-mysql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad - dockerImageTag: 3.9.2 + dockerImageTag: 3.9.3 dockerRepository: airbyte/source-mysql documentationUrl: https://docs.airbyte.com/integrations/sources/mysql githubIssueLabel: source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactory.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactory.kt index 43235cae3996..cc94ea2840af 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactory.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactory.kt @@ -28,8 +28,11 @@ import io.airbyte.cdk.util.Jsons import io.micronaut.context.annotation.Primary import jakarta.inject.Singleton import java.time.LocalDateTime +import java.time.OffsetDateTime +import java.time.ZoneOffset.UTC import java.time.format.DateTimeFormatter import java.time.format.DateTimeFormatterBuilder +import java.time.format.DateTimeParseException import java.time.temporal.ChronoField import java.util.* import java.util.concurrent.ConcurrentHashMap @@ -350,15 +353,29 @@ class MysqlJdbcPartitionFactory( } LeafAirbyteSchemaType.TIMESTAMP_WITH_TIMEZONE -> { val timestampInStatePattern = "yyyy-MM-dd'T'HH:mm:ss" + val formatter = + DateTimeFormatterBuilder() + .appendPattern(timestampInStatePattern) + .optionalStart() + .appendFraction(ChronoField.NANO_OF_SECOND, 1, 9, true) + .optionalEnd() + .optionalStart() + .optionalStart() + .appendLiteral(' ') + .optionalEnd() + .appendOffset("+HH:mm", "Z") + .optionalEnd() + .toFormatter() + try { - val formatter: DateTimeFormatter = - DateTimeFormatter.ofPattern(timestampInStatePattern) - Jsons.valueToTree( - LocalDateTime.parse(stateValue, formatter) - .minusDays(1) - .atOffset(java.time.ZoneOffset.UTC) - .format(OffsetDateTimeCodec.formatter) - ) + val offsetDateTime = + try { + OffsetDateTime.parse(stateValue, formatter) + } catch (_: DateTimeParseException) { + // if no offset exists, we assume it's UTC + LocalDateTime.parse(stateValue, formatter).atOffset(UTC) + } + Jsons.valueToTree(offsetDateTime.format(OffsetDateTimeCodec.formatter)) } catch (_: RuntimeException) { // Resolve to use the new format. Jsons.valueToTree(stateValue) diff --git a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactoryTest.kt b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactoryTest.kt index 4a3e7b16cb4c..a363e99f42f1 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactoryTest.kt +++ b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactoryTest.kt @@ -37,6 +37,8 @@ import kotlin.test.assertNull import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.CsvSource class MysqlJdbcPartitionFactoryTest { companion object { @@ -219,13 +221,28 @@ class MysqlJdbcPartitionFactoryTest { assertTrue(jdbcPartition is MysqlJdbcCursorIncrementalPartition) } - @Test - fun testResumeFromCompletedCursorBasedReadTimestamp() { + @ParameterizedTest + @CsvSource( + "'2025-09-03T05:23:35', '2025-09-03T05:23:35.000000Z'", + "'2025-09-03T05:23:35.0', '2025-09-03T05:23:35.000000Z'", + "'2025-09-03T05:23:35.1', '2025-09-03T05:23:35.100000Z'", + "'2025-09-03T05:23:35.123', '2025-09-03T05:23:35.123000Z'", + "'2025-09-03T05:23:35.123456789', '2025-09-03T05:23:35.123456Z'", + "'2025-09-03T05:23:35.123+00:00', '2025-09-03T05:23:35.123000Z'", + "'2025-09-03T05:23:35.123+00:00', '2025-09-03T05:23:35.123000Z'", + "'2025-09-03T05:23:35Z', '2025-09-03T05:23:35.000000Z'", + "'2025-09-03T05:23:35 Z', '2025-09-03T05:23:35.000000Z'", + "'2025-09-03T05:23:35.12345 +12:34', '2025-09-03T05:23:35.123450+12:34'", + ) + fun testResumeFromCompletedCursorBasedReadTimestamp( + cursorVal: String, + expectedLowerBound: String + ) { val incomingStateValue: OpaqueStateValue = Jsons.readTree( """ { - "cursor": "2025-09-03T05:23:35", + "cursor": "$cursorVal", "version": 2, "state_type": "cursor_based", "stream_name": "stream2", @@ -245,7 +262,7 @@ class MysqlJdbcPartitionFactoryTest { assertTrue(jdbcPartition is MysqlJdbcCursorIncrementalPartition) assertEquals( - Jsons.valueToTree("2025-09-02T05:23:35.000000Z"), + Jsons.valueToTree("$expectedLowerBound"), (jdbcPartition as MysqlJdbcCursorIncrementalPartition).cursorLowerBound ) } diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index 9c61f0aaf679..0817f2c8793e 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -226,6 +226,7 @@ Any database or table encoding combination of charset and collation is supported | Version | Date | Pull Request | Subject | |:---------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.9.3 | 2024-12-18 | [49932](https://github.com/airbytehq/airbyte/pull/49932) | Backward compatibility for saved states with timestamp that include timezone offset. | | 3.9.2 | 2024-12-16 | [49830](https://github.com/airbytehq/airbyte/pull/49830) | Fixes an issue with auto generated tinyint columns | | 3.9.1 | 2024-12-12 | [49456](https://github.com/airbytehq/airbyte/pull/49456) | Bump version to re-relase | | 3.9.0 | 2024-12-12 | [49423](https://github.com/airbytehq/airbyte/pull/49423) | Promoting release candidate 3.9.0-rc.27 to a main version. |