Skip to content

Commit

Permalink
[source-mysql-v2] Complete datatypes (#45465)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaohansong authored Sep 25, 2024
1 parent f7aaadd commit ab73310
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 561393ed-7e3a-4d0d-8b8b-90ded371754c
dockerImageTag: 0.0.7
dockerImageTag: 0.0.8
dockerRepository: airbyte/source-mysql-v2
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql-v2
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,29 @@
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
package io.airbyte.integrations.source.mysql

import com.mysql.cj.MysqlType
import io.airbyte.cdk.discover.Field
import io.airbyte.cdk.discover.FieldType
import io.airbyte.cdk.discover.JdbcMetadataQuerier
import io.airbyte.cdk.discover.SystemType
import io.airbyte.cdk.discover.UserDefinedArray
import io.airbyte.cdk.discover.UserDefinedType
import io.airbyte.cdk.jdbc.ArrayFieldType
import io.airbyte.cdk.jdbc.BigDecimalFieldType
import io.airbyte.cdk.jdbc.BigIntegerFieldType
import io.airbyte.cdk.jdbc.BinaryStreamFieldType
import io.airbyte.cdk.jdbc.BooleanFieldType
import io.airbyte.cdk.jdbc.ClobFieldType
import io.airbyte.cdk.jdbc.ByteFieldType
import io.airbyte.cdk.jdbc.DoubleFieldType
import io.airbyte.cdk.jdbc.FloatFieldType
import io.airbyte.cdk.jdbc.IntFieldType
import io.airbyte.cdk.jdbc.JdbcFieldType
import io.airbyte.cdk.jdbc.JsonStringFieldType
import io.airbyte.cdk.jdbc.LocalDateFieldType
import io.airbyte.cdk.jdbc.LocalDateTimeFieldType
import io.airbyte.cdk.jdbc.LocalTimeFieldType
import io.airbyte.cdk.jdbc.LongFieldType
import io.airbyte.cdk.jdbc.LosslessJdbcFieldType
import io.airbyte.cdk.jdbc.NClobFieldType
import io.airbyte.cdk.jdbc.NStringFieldType
import io.airbyte.cdk.jdbc.NullFieldType
import io.airbyte.cdk.jdbc.OffsetDateTimeFieldType
import io.airbyte.cdk.jdbc.PokemonFieldType
import io.airbyte.cdk.jdbc.ShortFieldType
import io.airbyte.cdk.jdbc.StringFieldType
import io.airbyte.cdk.read.And
import io.airbyte.cdk.read.Equal
Expand Down Expand Up @@ -63,81 +62,66 @@ import jakarta.inject.Singleton
class MysqlSourceOperations : JdbcMetadataQuerier.FieldTypeMapper, SelectQueryGenerator {
override fun toFieldType(c: JdbcMetadataQuerier.ColumnMetadata): FieldType =
when (val type = c.type) {
is SystemType -> leafType(c.type.typeName, type.scale != 0)
is UserDefinedArray -> ArrayFieldType(recursiveArrayType(type))
is UserDefinedType -> PokemonFieldType
is SystemType -> leafType(type)
else -> PokemonFieldType
}

private fun recursiveArrayType(type: UserDefinedArray): JdbcFieldType<*> =
when (val elementType = type.elementType) {
is SystemType -> {
val leafType: JdbcFieldType<*> =
leafType(elementType.typeName, elementType.scale != 0)
if (leafType == OffsetDateTimeFieldType) {
// Mysql's JDBC driver has a bug which prevents object conversions in
// ArrayDataResultSet instances. Fall back to strings instead.
PokemonFieldType
private fun leafType(type: SystemType): JdbcFieldType<*> {
return when (MysqlType.getByName(type.typeName)) {
MysqlType.BIT -> {
if (type.precision!! > 1) {
ByteFieldType
} else {
leafType
BooleanFieldType
}
}
is UserDefinedArray -> ArrayFieldType(recursiveArrayType(elementType))
is UserDefinedType -> PokemonFieldType
}

private fun leafType(
typeName: String?,
notInteger: Boolean,
): JdbcFieldType<*> =
// TODO: https://github.com/airbytehq/airbyte-internal-issues/issues/9670
when (typeName) {
"BINARY_FLOAT" -> FloatFieldType
"BINARY_DOUBLE" -> DoubleFieldType
"FLOAT",
"DOUBLE PRECISION",
"REAL", -> BigDecimalFieldType
"NUMBER",
"NUMERIC",
"DECIMAL",
"DEC", -> if (notInteger) BigDecimalFieldType else BigIntegerFieldType
"INTEGER",
"INT",
"SMALLINT", -> BigIntegerFieldType
"BOOLEAN",
"BOOL", -> BooleanFieldType
"CHAR",
"VARCHAR2",
"VARCHAR",
"CHARACTER",
"CHARACTER VARYING",
"CHAR VARYING", -> StringFieldType
"NCHAR",
"NVARCHAR2",
"NCHAR VARYING",
"NATIONAL CHARACTER VARYING",
"NATIONAL CHARACTER",
"NATIONAL CHAR VARYING",
"NATIONAL CHAR", -> NStringFieldType
"BLOB" -> BinaryStreamFieldType
"CLOB" -> ClobFieldType
"NCLOB" -> NClobFieldType
"BFILE" -> BinaryStreamFieldType
"DATE" -> LocalDateFieldType
"INTERVALDS",
"INTERVAL DAY TO SECOND",
"INTERVALYM",
"INTERVAL YEAR TO MONTH", -> StringFieldType
"JSON" -> JsonStringFieldType
"LONG",
"LONG RAW",
"RAW", -> BinaryStreamFieldType
"TIMESTAMP",
"TIMESTAMP WITH LOCAL TIME ZONE",
"TIMESTAMP WITH LOCAL TZ", -> LocalDateTimeFieldType
"TIMESTAMP WITH TIME ZONE",
"TIMESTAMP WITH TZ", -> OffsetDateTimeFieldType
else -> PokemonFieldType
MysqlType.BOOLEAN -> BooleanFieldType
MysqlType.TINYINT,
MysqlType.TINYINT_UNSIGNED,
MysqlType.YEAR -> ShortFieldType
MysqlType.SMALLINT,
MysqlType.SMALLINT_UNSIGNED,
MysqlType.MEDIUMINT,
MysqlType.MEDIUMINT_UNSIGNED,
MysqlType.INT -> IntFieldType
MysqlType.INT_UNSIGNED,
MysqlType.BIGINT,
MysqlType.BIGINT_UNSIGNED -> BigIntegerFieldType
MysqlType.FLOAT,
MysqlType.FLOAT_UNSIGNED -> FloatFieldType
MysqlType.DOUBLE,
MysqlType.DOUBLE_UNSIGNED -> DoubleFieldType
MysqlType.DECIMAL,
MysqlType.DECIMAL_UNSIGNED -> {
if (type.scale == 0) BigIntegerFieldType else BigDecimalFieldType
}
MysqlType.DATE -> LocalDateFieldType
MysqlType.DATETIME -> LocalDateTimeFieldType
MysqlType.TIMESTAMP -> OffsetDateTimeFieldType
MysqlType.TIME -> LocalTimeFieldType
MysqlType.CHAR,
MysqlType.VARCHAR,
MysqlType.TINYTEXT,
MysqlType.TEXT,
MysqlType.MEDIUMTEXT,
MysqlType.LONGTEXT,
MysqlType.JSON,
MysqlType.ENUM,
MysqlType.SET -> StringFieldType
MysqlType.TINYBLOB,
MysqlType.BLOB,
MysqlType.MEDIUMBLOB,
MysqlType.LONGBLOB,
MysqlType.BINARY,
MysqlType.VARBINARY,
MysqlType.GEOMETRY -> BinaryStreamFieldType
MysqlType.NULL -> NullFieldType
else -> {
print("test debug: unrecognized type: ${type.typeName}")
PokemonFieldType
}
}
}

override fun generate(ast: SelectQuerySpec): SelectQuery =
SelectQuery(ast.sql(), ast.select.columns, ast.bindings())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,16 +161,167 @@ class MysqlSourceDatatypeIntegrationTest {
JdbcConnectionFactory(MysqlSourceConfigurationFactory().make(config()))
}

val bitValues =
mapOf(
"b'1'" to "true",
"b'0'" to "false",
)

val longBitValues =
mapOf(
"b'10101010'" to """-86""",
)

val stringValues =
mapOf(
"'abcdef'" to """"abcdef"""",
"'ABCD'" to """"ABCD"""",
"'OXBEEF'" to """"OXBEEF"""",
)

val jsonValues = mapOf("""'{"col1": "v1"}'""" to """"{\"col1\": \"v1\"}"""")

val yearValues =
mapOf(
"1992" to """1992""",
"2002" to """2002""",
"70" to """1970""",
)

val decimalValues =
mapOf(
"0.2" to """0.2""",
)

val zeroPrecisionDecimalValues =
mapOf(
"2" to """2""",
)

val tinyintValues =
mapOf(
"10" to "10",
"4" to "4",
"2" to "2",
)

val intValues =
mapOf(
"10" to "10",
"100000000" to "100000000",
"200000000" to "200000000",
)

val dateValues =
mapOf(
"'2022-01-01'" to """"2022-01-01"""",
)

val timeValues =
mapOf(
"'14:30:00'" to """"14:30:00.000000"""",
)

val dateTimeValues =
mapOf(
"'2024-09-13 14:30:00'" to """"2024-09-13T14:30:00.000000"""",
"'2024-09-13T14:40:00+00:00'" to """"2024-09-13T14:40:00.000000""""
)

val timestampValues =
mapOf(
"'2024-09-12 14:30:00'" to """"2024-09-12T14:30:00.000000Z"""",
"CONVERT_TZ('2024-09-12 14:30:00', 'America/Los_Angeles', 'UTC')" to
""""2024-09-12T21:30:00.000000Z"""",
)

val booleanValues =
mapOf(
"TRUE" to "true",
"FALSE" to "false",
)

val enumValues =
mapOf(
"'a'" to """"a"""",
"'b'" to """"b"""",
"'c'" to """"c"""",
)

// Encoded into base64
val binaryValues =
mapOf(
"X'89504E470D0A1A0A0000000D49484452'" to """"iVBORw0KGgoAAAANSUhEUg=="""",
)

val testCases: List<TestCase> =
listOf(
TestCase("VARCHAR(10)", stringValues),
TestCase(
"BOOLEAN",
booleanValues,
airbyteType = LeafAirbyteType.BOOLEAN,
cursor = false
),
TestCase("VARCHAR(10)", stringValues, airbyteType = LeafAirbyteType.STRING),
TestCase("DECIMAL(10,2)", decimalValues, airbyteType = LeafAirbyteType.NUMBER),
TestCase(
"DECIMAL(10,2) UNSIGNED",
decimalValues,
airbyteType = LeafAirbyteType.NUMBER
),
TestCase(
"DECIMAL UNSIGNED",
zeroPrecisionDecimalValues,
airbyteType = LeafAirbyteType.INTEGER
),
TestCase("FLOAT", decimalValues, airbyteType = LeafAirbyteType.NUMBER),
TestCase("FLOAT(7,4)", decimalValues, airbyteType = LeafAirbyteType.NUMBER),
TestCase("FLOAT(53,8)", decimalValues, airbyteType = LeafAirbyteType.NUMBER),
TestCase("DOUBLE", decimalValues, airbyteType = LeafAirbyteType.NUMBER),
TestCase("DOUBLE UNSIGNED", decimalValues, airbyteType = LeafAirbyteType.NUMBER),
TestCase("TINYINT", tinyintValues, airbyteType = LeafAirbyteType.INTEGER),
TestCase("TINYINT UNSIGNED", tinyintValues, airbyteType = LeafAirbyteType.INTEGER),
TestCase("SMALLINT", tinyintValues, airbyteType = LeafAirbyteType.INTEGER),
TestCase("MEDIUMINT", tinyintValues, airbyteType = LeafAirbyteType.INTEGER),
TestCase("BIGINT", intValues, airbyteType = LeafAirbyteType.INTEGER),
TestCase("SMALLINT UNSIGNED", tinyintValues, airbyteType = LeafAirbyteType.INTEGER),
TestCase(
"MEDIUMINT UNSIGNED",
tinyintValues,
airbyteType = LeafAirbyteType.INTEGER
),
TestCase("BIGINT UNSIGNED", intValues, airbyteType = LeafAirbyteType.INTEGER),
TestCase("INT", intValues, airbyteType = LeafAirbyteType.INTEGER),
TestCase("INT UNSIGNED", intValues, airbyteType = LeafAirbyteType.INTEGER),
TestCase("DATE", dateValues, airbyteType = LeafAirbyteType.DATE),
TestCase(
"TIMESTAMP",
timestampValues,
airbyteType = LeafAirbyteType.TIMESTAMP_WITH_TIMEZONE
),
TestCase(
"DATETIME",
dateTimeValues,
airbyteType = LeafAirbyteType.TIMESTAMP_WITHOUT_TIMEZONE
),
TestCase("TIME", timeValues, airbyteType = LeafAirbyteType.TIME_WITHOUT_TIMEZONE),
TestCase("YEAR", yearValues, airbyteType = LeafAirbyteType.INTEGER),
TestCase(
"VARBINARY(255)",
binaryValues,
airbyteType = LeafAirbyteType.BINARY,
cursor = false,
noPK = true
),
TestCase("BIT", bitValues, airbyteType = LeafAirbyteType.BOOLEAN, cursor = false),
TestCase("BIT(8)", longBitValues, airbyteType = LeafAirbyteType.INTEGER),
TestCase("JSON", jsonValues, airbyteType = LeafAirbyteType.STRING, noPK = true),
TestCase(
"ENUM('a', 'b', 'c')",
enumValues,
airbyteType = LeafAirbyteType.STRING,
noPK = true
),
)

val allStreamNamesAndRecordData: Map<String, List<JsonNode>> =
Expand Down

0 comments on commit ab73310

Please sign in to comment.