diff --git a/airbyte-integrations/connectors/source-mysql-v2/metadata.yaml b/airbyte-integrations/connectors/source-mysql-v2/metadata.yaml index d0253829145b..9e87d8079025 100644 --- a/airbyte-integrations/connectors/source-mysql-v2/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql-v2/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: 561393ed-7e3a-4d0d-8b8b-90ded371754c - dockerImageTag: 0.0.7 + dockerImageTag: 0.0.8 dockerRepository: airbyte/source-mysql-v2 documentationUrl: https://docs.airbyte.com/integrations/sources/mysql githubIssueLabel: source-mysql-v2 diff --git a/airbyte-integrations/connectors/source-mysql-v2/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceOperations.kt b/airbyte-integrations/connectors/source-mysql-v2/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceOperations.kt index 48a91db7a074..6be0e5297b27 100644 --- a/airbyte-integrations/connectors/source-mysql-v2/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceOperations.kt +++ b/airbyte-integrations/connectors/source-mysql-v2/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceOperations.kt @@ -1,30 +1,29 @@ /* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ package io.airbyte.integrations.source.mysql +import com.mysql.cj.MysqlType import io.airbyte.cdk.discover.Field import io.airbyte.cdk.discover.FieldType import io.airbyte.cdk.discover.JdbcMetadataQuerier import io.airbyte.cdk.discover.SystemType -import io.airbyte.cdk.discover.UserDefinedArray -import io.airbyte.cdk.discover.UserDefinedType -import io.airbyte.cdk.jdbc.ArrayFieldType import io.airbyte.cdk.jdbc.BigDecimalFieldType import io.airbyte.cdk.jdbc.BigIntegerFieldType import io.airbyte.cdk.jdbc.BinaryStreamFieldType import io.airbyte.cdk.jdbc.BooleanFieldType -import io.airbyte.cdk.jdbc.ClobFieldType +import io.airbyte.cdk.jdbc.ByteFieldType import io.airbyte.cdk.jdbc.DoubleFieldType import io.airbyte.cdk.jdbc.FloatFieldType +import io.airbyte.cdk.jdbc.IntFieldType import io.airbyte.cdk.jdbc.JdbcFieldType -import io.airbyte.cdk.jdbc.JsonStringFieldType import io.airbyte.cdk.jdbc.LocalDateFieldType import io.airbyte.cdk.jdbc.LocalDateTimeFieldType +import io.airbyte.cdk.jdbc.LocalTimeFieldType import io.airbyte.cdk.jdbc.LongFieldType import io.airbyte.cdk.jdbc.LosslessJdbcFieldType -import io.airbyte.cdk.jdbc.NClobFieldType -import io.airbyte.cdk.jdbc.NStringFieldType +import io.airbyte.cdk.jdbc.NullFieldType import io.airbyte.cdk.jdbc.OffsetDateTimeFieldType import io.airbyte.cdk.jdbc.PokemonFieldType +import io.airbyte.cdk.jdbc.ShortFieldType import io.airbyte.cdk.jdbc.StringFieldType import io.airbyte.cdk.read.And import io.airbyte.cdk.read.Equal @@ -63,81 +62,66 @@ import jakarta.inject.Singleton class MysqlSourceOperations : JdbcMetadataQuerier.FieldTypeMapper, SelectQueryGenerator { override fun toFieldType(c: JdbcMetadataQuerier.ColumnMetadata): FieldType = when (val type = c.type) { - is SystemType -> leafType(c.type.typeName, type.scale != 0) - is UserDefinedArray -> ArrayFieldType(recursiveArrayType(type)) - is UserDefinedType -> PokemonFieldType + is SystemType -> leafType(type) + else -> PokemonFieldType } - private fun recursiveArrayType(type: UserDefinedArray): JdbcFieldType<*> = - when (val elementType = type.elementType) { - is SystemType -> { - val leafType: JdbcFieldType<*> = - leafType(elementType.typeName, elementType.scale != 0) - if (leafType == OffsetDateTimeFieldType) { - // Mysql's JDBC driver has a bug which prevents object conversions in - // ArrayDataResultSet instances. Fall back to strings instead. - PokemonFieldType + private fun leafType(type: SystemType): JdbcFieldType<*> { + return when (MysqlType.getByName(type.typeName)) { + MysqlType.BIT -> { + if (type.precision!! > 1) { + ByteFieldType } else { - leafType + BooleanFieldType } } - is UserDefinedArray -> ArrayFieldType(recursiveArrayType(elementType)) - is UserDefinedType -> PokemonFieldType - } - - private fun leafType( - typeName: String?, - notInteger: Boolean, - ): JdbcFieldType<*> = - // TODO: https://github.com/airbytehq/airbyte-internal-issues/issues/9670 - when (typeName) { - "BINARY_FLOAT" -> FloatFieldType - "BINARY_DOUBLE" -> DoubleFieldType - "FLOAT", - "DOUBLE PRECISION", - "REAL", -> BigDecimalFieldType - "NUMBER", - "NUMERIC", - "DECIMAL", - "DEC", -> if (notInteger) BigDecimalFieldType else BigIntegerFieldType - "INTEGER", - "INT", - "SMALLINT", -> BigIntegerFieldType - "BOOLEAN", - "BOOL", -> BooleanFieldType - "CHAR", - "VARCHAR2", - "VARCHAR", - "CHARACTER", - "CHARACTER VARYING", - "CHAR VARYING", -> StringFieldType - "NCHAR", - "NVARCHAR2", - "NCHAR VARYING", - "NATIONAL CHARACTER VARYING", - "NATIONAL CHARACTER", - "NATIONAL CHAR VARYING", - "NATIONAL CHAR", -> NStringFieldType - "BLOB" -> BinaryStreamFieldType - "CLOB" -> ClobFieldType - "NCLOB" -> NClobFieldType - "BFILE" -> BinaryStreamFieldType - "DATE" -> LocalDateFieldType - "INTERVALDS", - "INTERVAL DAY TO SECOND", - "INTERVALYM", - "INTERVAL YEAR TO MONTH", -> StringFieldType - "JSON" -> JsonStringFieldType - "LONG", - "LONG RAW", - "RAW", -> BinaryStreamFieldType - "TIMESTAMP", - "TIMESTAMP WITH LOCAL TIME ZONE", - "TIMESTAMP WITH LOCAL TZ", -> LocalDateTimeFieldType - "TIMESTAMP WITH TIME ZONE", - "TIMESTAMP WITH TZ", -> OffsetDateTimeFieldType - else -> PokemonFieldType + MysqlType.BOOLEAN -> BooleanFieldType + MysqlType.TINYINT, + MysqlType.TINYINT_UNSIGNED, + MysqlType.YEAR -> ShortFieldType + MysqlType.SMALLINT, + MysqlType.SMALLINT_UNSIGNED, + MysqlType.MEDIUMINT, + MysqlType.MEDIUMINT_UNSIGNED, + MysqlType.INT -> IntFieldType + MysqlType.INT_UNSIGNED, + MysqlType.BIGINT, + MysqlType.BIGINT_UNSIGNED -> BigIntegerFieldType + MysqlType.FLOAT, + MysqlType.FLOAT_UNSIGNED -> FloatFieldType + MysqlType.DOUBLE, + MysqlType.DOUBLE_UNSIGNED -> DoubleFieldType + MysqlType.DECIMAL, + MysqlType.DECIMAL_UNSIGNED -> { + if (type.scale == 0) BigIntegerFieldType else BigDecimalFieldType + } + MysqlType.DATE -> LocalDateFieldType + MysqlType.DATETIME -> LocalDateTimeFieldType + MysqlType.TIMESTAMP -> OffsetDateTimeFieldType + MysqlType.TIME -> LocalTimeFieldType + MysqlType.CHAR, + MysqlType.VARCHAR, + MysqlType.TINYTEXT, + MysqlType.TEXT, + MysqlType.MEDIUMTEXT, + MysqlType.LONGTEXT, + MysqlType.JSON, + MysqlType.ENUM, + MysqlType.SET -> StringFieldType + MysqlType.TINYBLOB, + MysqlType.BLOB, + MysqlType.MEDIUMBLOB, + MysqlType.LONGBLOB, + MysqlType.BINARY, + MysqlType.VARBINARY, + MysqlType.GEOMETRY -> BinaryStreamFieldType + MysqlType.NULL -> NullFieldType + else -> { + print("test debug: unrecognized type: ${type.typeName}") + PokemonFieldType + } } + } override fun generate(ast: SelectQuerySpec): SelectQuery = SelectQuery(ast.sql(), ast.select.columns, ast.bindings()) diff --git a/airbyte-integrations/connectors/source-mysql-v2/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceDatatypeIntegrationTest.kt b/airbyte-integrations/connectors/source-mysql-v2/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceDatatypeIntegrationTest.kt index 8ce1085414fa..3f340c335783 100644 --- a/airbyte-integrations/connectors/source-mysql-v2/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceDatatypeIntegrationTest.kt +++ b/airbyte-integrations/connectors/source-mysql-v2/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceDatatypeIntegrationTest.kt @@ -161,6 +161,17 @@ class MysqlSourceDatatypeIntegrationTest { JdbcConnectionFactory(MysqlSourceConfigurationFactory().make(config())) } + val bitValues = + mapOf( + "b'1'" to "true", + "b'0'" to "false", + ) + + val longBitValues = + mapOf( + "b'10101010'" to """-86""", + ) + val stringValues = mapOf( "'abcdef'" to """"abcdef"""", @@ -168,9 +179,149 @@ class MysqlSourceDatatypeIntegrationTest { "'OXBEEF'" to """"OXBEEF"""", ) + val jsonValues = mapOf("""'{"col1": "v1"}'""" to """"{\"col1\": \"v1\"}"""") + + val yearValues = + mapOf( + "1992" to """1992""", + "2002" to """2002""", + "70" to """1970""", + ) + + val decimalValues = + mapOf( + "0.2" to """0.2""", + ) + + val zeroPrecisionDecimalValues = + mapOf( + "2" to """2""", + ) + + val tinyintValues = + mapOf( + "10" to "10", + "4" to "4", + "2" to "2", + ) + + val intValues = + mapOf( + "10" to "10", + "100000000" to "100000000", + "200000000" to "200000000", + ) + + val dateValues = + mapOf( + "'2022-01-01'" to """"2022-01-01"""", + ) + + val timeValues = + mapOf( + "'14:30:00'" to """"14:30:00.000000"""", + ) + + val dateTimeValues = + mapOf( + "'2024-09-13 14:30:00'" to """"2024-09-13T14:30:00.000000"""", + "'2024-09-13T14:40:00+00:00'" to """"2024-09-13T14:40:00.000000"""" + ) + + val timestampValues = + mapOf( + "'2024-09-12 14:30:00'" to """"2024-09-12T14:30:00.000000Z"""", + "CONVERT_TZ('2024-09-12 14:30:00', 'America/Los_Angeles', 'UTC')" to + """"2024-09-12T21:30:00.000000Z"""", + ) + + val booleanValues = + mapOf( + "TRUE" to "true", + "FALSE" to "false", + ) + + val enumValues = + mapOf( + "'a'" to """"a"""", + "'b'" to """"b"""", + "'c'" to """"c"""", + ) + + // Encoded into base64 + val binaryValues = + mapOf( + "X'89504E470D0A1A0A0000000D49484452'" to """"iVBORw0KGgoAAAANSUhEUg=="""", + ) + val testCases: List = listOf( - TestCase("VARCHAR(10)", stringValues), + TestCase( + "BOOLEAN", + booleanValues, + airbyteType = LeafAirbyteType.BOOLEAN, + cursor = false + ), + TestCase("VARCHAR(10)", stringValues, airbyteType = LeafAirbyteType.STRING), + TestCase("DECIMAL(10,2)", decimalValues, airbyteType = LeafAirbyteType.NUMBER), + TestCase( + "DECIMAL(10,2) UNSIGNED", + decimalValues, + airbyteType = LeafAirbyteType.NUMBER + ), + TestCase( + "DECIMAL UNSIGNED", + zeroPrecisionDecimalValues, + airbyteType = LeafAirbyteType.INTEGER + ), + TestCase("FLOAT", decimalValues, airbyteType = LeafAirbyteType.NUMBER), + TestCase("FLOAT(7,4)", decimalValues, airbyteType = LeafAirbyteType.NUMBER), + TestCase("FLOAT(53,8)", decimalValues, airbyteType = LeafAirbyteType.NUMBER), + TestCase("DOUBLE", decimalValues, airbyteType = LeafAirbyteType.NUMBER), + TestCase("DOUBLE UNSIGNED", decimalValues, airbyteType = LeafAirbyteType.NUMBER), + TestCase("TINYINT", tinyintValues, airbyteType = LeafAirbyteType.INTEGER), + TestCase("TINYINT UNSIGNED", tinyintValues, airbyteType = LeafAirbyteType.INTEGER), + TestCase("SMALLINT", tinyintValues, airbyteType = LeafAirbyteType.INTEGER), + TestCase("MEDIUMINT", tinyintValues, airbyteType = LeafAirbyteType.INTEGER), + TestCase("BIGINT", intValues, airbyteType = LeafAirbyteType.INTEGER), + TestCase("SMALLINT UNSIGNED", tinyintValues, airbyteType = LeafAirbyteType.INTEGER), + TestCase( + "MEDIUMINT UNSIGNED", + tinyintValues, + airbyteType = LeafAirbyteType.INTEGER + ), + TestCase("BIGINT UNSIGNED", intValues, airbyteType = LeafAirbyteType.INTEGER), + TestCase("INT", intValues, airbyteType = LeafAirbyteType.INTEGER), + TestCase("INT UNSIGNED", intValues, airbyteType = LeafAirbyteType.INTEGER), + TestCase("DATE", dateValues, airbyteType = LeafAirbyteType.DATE), + TestCase( + "TIMESTAMP", + timestampValues, + airbyteType = LeafAirbyteType.TIMESTAMP_WITH_TIMEZONE + ), + TestCase( + "DATETIME", + dateTimeValues, + airbyteType = LeafAirbyteType.TIMESTAMP_WITHOUT_TIMEZONE + ), + TestCase("TIME", timeValues, airbyteType = LeafAirbyteType.TIME_WITHOUT_TIMEZONE), + TestCase("YEAR", yearValues, airbyteType = LeafAirbyteType.INTEGER), + TestCase( + "VARBINARY(255)", + binaryValues, + airbyteType = LeafAirbyteType.BINARY, + cursor = false, + noPK = true + ), + TestCase("BIT", bitValues, airbyteType = LeafAirbyteType.BOOLEAN, cursor = false), + TestCase("BIT(8)", longBitValues, airbyteType = LeafAirbyteType.INTEGER), + TestCase("JSON", jsonValues, airbyteType = LeafAirbyteType.STRING, noPK = true), + TestCase( + "ENUM('a', 'b', 'c')", + enumValues, + airbyteType = LeafAirbyteType.STRING, + noPK = true + ), ) val allStreamNamesAndRecordData: Map> =