From bae63044cca6ab550e1aa0efd5b952b5d497102c Mon Sep 17 00:00:00 2001 From: Akash Kulkarni <113392464+akashkulk@users.noreply.github.com> Date: Tue, 16 Apr 2024 14:25:27 -0700 Subject: [PATCH] [Source-mysql/mssql] : Populate null values (#37111) --- .../AbstractJdbcCompatibleSourceOperations.kt | 9 -- .../src/main/resources/version.properties | 2 +- .../connectors/source-mssql/build.gradle | 2 +- .../connectors/source-mssql/metadata.yaml | 2 +- .../source/mssql/MssqlSourceOperations.java | 10 +- .../mssql/CdcMssqlSourceAcceptanceTest.java | 7 +- .../mssql/MssqlSourceAcceptanceTest.java | 4 +- .../connectors/source-mysql/build.gradle | 2 +- .../connectors/source-mysql/metadata.yaml | 2 +- .../source/mysql/MySqlSourceOperations.java | 93 ++++++++++--------- .../sources/CdcMySqlSourceAcceptanceTest.java | 8 +- .../sources/MySqlSourceAcceptanceTest.java | 9 +- docs/integrations/sources/mssql.md | 3 +- docs/integrations/sources/mysql.md | 1 + 14 files changed, 80 insertions(+), 74 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/AbstractJdbcCompatibleSourceOperations.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/AbstractJdbcCompatibleSourceOperations.kt index 0ae1e42c4f1b..db78b4ab3b8b 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/AbstractJdbcCompatibleSourceOperations.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/AbstractJdbcCompatibleSourceOperations.kt @@ -43,15 +43,6 @@ abstract class AbstractJdbcCompatibleSourceOperations : val columnName = queryContext.metaData.getColumnName(i) val columnTypeName = queryContext.metaData.getColumnTypeName(i) try { - // attempt to access the column. this allows us to know if it is null before we do - // type-specific - // parsing. if it is null, we can move on. while awkward, this seems to be the - // agreed upon way of checking for null values with jdbc. - queryContext.getObject(i) - if (queryContext.wasNull()) { - continue - } - // convert to java types that will convert into reasonable json. copyToJsonField(queryContext, i, jsonNode) } catch (e: java.lang.Exception) { diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 7870af4b1f3b..3df70a91eac5 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.30.4 +version=0.30.5 diff --git a/airbyte-integrations/connectors/source-mssql/build.gradle b/airbyte-integrations/connectors/source-mssql/build.gradle index 4a4109889b0e..98c2e01f8878 100644 --- a/airbyte-integrations/connectors/source-mssql/build.gradle +++ b/airbyte-integrations/connectors/source-mssql/build.gradle @@ -3,7 +3,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.30.4' + cdkVersionRequired = '0.30.5' features = ['db-sources'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/source-mssql/metadata.yaml b/airbyte-integrations/connectors/source-mssql/metadata.yaml index b691d147a0af..b0b22bf89bb6 100644 --- a/airbyte-integrations/connectors/source-mssql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mssql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1 - dockerImageTag: 4.0.12 + dockerImageTag: 4.0.13 dockerRepository: airbyte/source-mssql documentationUrl: https://docs.airbyte.com/integrations/sources/mssql githubIssueLabel: source-mssql diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSourceOperations.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSourceOperations.java index 08860768aeb8..e301bbb95c46 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSourceOperations.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSourceOperations.java @@ -71,14 +71,20 @@ public AirbyteRecordData convertDatabaseRowToAirbyteRecordData(final ResultSet q @Override public void copyToJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException { - final SQLServerResultSetMetaData metadata = (SQLServerResultSetMetaData) resultSet .getMetaData(); final String columnName = metadata.getColumnName(colIndex); final String columnTypeName = metadata.getColumnTypeName(colIndex); final JDBCType columnType = safeGetJdbcType(metadata.getColumnType(colIndex)); - if (columnTypeName.equalsIgnoreCase("time")) { + // Attempt to access the column. this allows us to know if it is null before we do + // type-specific parsing. If the column is null, we will populate the null value and skip attempting + // to + // parse the column value. + resultSet.getObject(colIndex); + if (resultSet.wasNull()) { + json.putNull(columnName); + } else if (columnTypeName.equalsIgnoreCase("time")) { putTime(json, columnName, resultSet, colIndex); } else if (columnTypeName.equalsIgnoreCase("geometry")) { putGeometry(json, columnName, resultSet, colIndex); diff --git a/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceAcceptanceTest.java index 3294edd1eedb..e9179d2b8b95 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceAcceptanceTest.java @@ -215,7 +215,7 @@ protected void testNullValueConversion() throws Exception { ObjectMapper mapper = new ObjectMapper(); assertTrue(cdcFieldsOmitted(recordMessages.get(0).getData()).equals( - mapper.readTree("{\"id\":4, \"name\":\"voyager\"}"))); + mapper.readTree("{\"id\":4, \"name\":\"voyager\", \"userid\":null}"))); // when we run incremental sync again there should be no new records. Run a sync with the latest // state message and assert no records were emitted. @@ -229,9 +229,8 @@ protected void testNullValueConversion() throws Exception { assertFalse( secondSyncRecords.isEmpty(), "Expected the second incremental sync to produce records."); - assertTrue(cdcFieldsOmitted(secondSyncRecords.get(0).getData()).equals( - mapper.readTree("{\"id\":5, \"name\":\"deep space nine\", \"userid\":null}"))); - + assertEquals(cdcFieldsOmitted(secondSyncRecords.get(0).getData()), + mapper.readTree("{\"id\":5, \"name\":\"deep space nine\", \"userid\":null}")); } private JsonNode cdcFieldsOmitted(final JsonNode node) { diff --git a/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/MssqlSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/MssqlSourceAcceptanceTest.java index 84fea93058ff..bb48874fdb10 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/MssqlSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/MssqlSourceAcceptanceTest.java @@ -187,7 +187,7 @@ protected void testNullValueConversion() throws Exception { ObjectMapper mapper = new ObjectMapper(); assertTrue(recordMessages.get(0).getData().equals( - mapper.readTree("{\"id\":4, \"name\":\"voyager\"}"))); + mapper.readTree("{\"id\":4, \"name\":\"voyager\", \"userid\":null}}"))); // when we run incremental sync again there should be no new records. Run a sync with the latest // state message and assert no records were emitted. @@ -203,7 +203,7 @@ protected void testNullValueConversion() throws Exception { secondSyncRecords.isEmpty(), "Expected the second incremental sync to produce records."); assertTrue(secondSyncRecords.get(0).getData().equals( - mapper.readTree("{\"id\":5, \"name\":\"deep space nine\"}"))); + mapper.readTree("{\"id\":5, \"name\":\"deep space nine\", \"userid\":null}}"))); } diff --git a/airbyte-integrations/connectors/source-mysql/build.gradle b/airbyte-integrations/connectors/source-mysql/build.gradle index 10d03411dc9c..b491ec5d8350 100644 --- a/airbyte-integrations/connectors/source-mysql/build.gradle +++ b/airbyte-integrations/connectors/source-mysql/build.gradle @@ -6,7 +6,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.30.1' + cdkVersionRequired = '0.30.5' features = ['db-sources'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/source-mysql/metadata.yaml b/airbyte-integrations/connectors/source-mysql/metadata.yaml index 6cf2ddd3f27d..e7376ed33951 100644 --- a/airbyte-integrations/connectors/source-mysql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad - dockerImageTag: 3.3.19 + dockerImageTag: 3.3.20 dockerRepository: airbyte/source-mysql documentationUrl: https://docs.airbyte.com/integrations/sources/mysql githubIssueLabel: source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java index c716a26ffcf4..c322ebb2ed60 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java @@ -102,53 +102,62 @@ public void copyToJsonField(final ResultSet resultSet, final int colIndex, final final String columnName = field.getName(); final MysqlType columnType = field.getMysqlType(); - // https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-type-conversions.html - switch (columnType) { - case BIT -> { - if (field.getLength() == 1L) { - // BIT(1) is boolean - putBoolean(json, columnName, resultSet, colIndex); - } else { - putBinary(json, columnName, resultSet, colIndex); + // Attempt to access the column. this allows us to know if it is null before we do + // type-specific parsing. If the column is null, we will populate the null value and skip attempting + // to + // parse the column value. + resultSet.getObject(colIndex); + if (resultSet.wasNull()) { + json.putNull(columnName); + } else { + // https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-type-conversions.html + switch (columnType) { + case BIT -> { + if (field.getLength() == 1L) { + // BIT(1) is boolean + putBoolean(json, columnName, resultSet, colIndex); + } else { + putBinary(json, columnName, resultSet, colIndex); + } } - } - case BOOLEAN -> putBoolean(json, columnName, resultSet, colIndex); - case TINYINT -> { - if (field.getLength() == 1L) { - // TINYINT(1) is boolean - putBoolean(json, columnName, resultSet, colIndex); - } else { - putShortInt(json, columnName, resultSet, colIndex); + case BOOLEAN -> putBoolean(json, columnName, resultSet, colIndex); + case TINYINT -> { + if (field.getLength() == 1L) { + // TINYINT(1) is boolean + putBoolean(json, columnName, resultSet, colIndex); + } else { + putShortInt(json, columnName, resultSet, colIndex); + } } - } - case TINYINT_UNSIGNED, YEAR -> putShortInt(json, columnName, resultSet, colIndex); - case SMALLINT, SMALLINT_UNSIGNED, MEDIUMINT, MEDIUMINT_UNSIGNED -> putInteger(json, columnName, resultSet, colIndex); - case INT, INT_UNSIGNED -> { - if (field.isUnsigned()) { - putBigInt(json, columnName, resultSet, colIndex); - } else { - putInteger(json, columnName, resultSet, colIndex); + case TINYINT_UNSIGNED, YEAR -> putShortInt(json, columnName, resultSet, colIndex); + case SMALLINT, SMALLINT_UNSIGNED, MEDIUMINT, MEDIUMINT_UNSIGNED -> putInteger(json, columnName, resultSet, colIndex); + case INT, INT_UNSIGNED -> { + if (field.isUnsigned()) { + putBigInt(json, columnName, resultSet, colIndex); + } else { + putInteger(json, columnName, resultSet, colIndex); + } } - } - case BIGINT, BIGINT_UNSIGNED -> putBigInt(json, columnName, resultSet, colIndex); - case FLOAT, FLOAT_UNSIGNED -> putFloat(json, columnName, resultSet, colIndex); - case DOUBLE, DOUBLE_UNSIGNED -> putDouble(json, columnName, resultSet, colIndex); - case DECIMAL, DECIMAL_UNSIGNED -> { - if (field.getDecimals() == 0) { - putBigInt(json, columnName, resultSet, colIndex); - } else { - putBigDecimal(json, columnName, resultSet, colIndex); + case BIGINT, BIGINT_UNSIGNED -> putBigInt(json, columnName, resultSet, colIndex); + case FLOAT, FLOAT_UNSIGNED -> putFloat(json, columnName, resultSet, colIndex); + case DOUBLE, DOUBLE_UNSIGNED -> putDouble(json, columnName, resultSet, colIndex); + case DECIMAL, DECIMAL_UNSIGNED -> { + if (field.getDecimals() == 0) { + putBigInt(json, columnName, resultSet, colIndex); + } else { + putBigDecimal(json, columnName, resultSet, colIndex); + } } + case DATE -> putDate(json, columnName, resultSet, colIndex); + case DATETIME -> putTimestamp(json, columnName, resultSet, colIndex); + case TIMESTAMP -> putTimestampWithTimezone(json, columnName, resultSet, colIndex); + case TIME -> putTime(json, columnName, resultSet, colIndex); + case CHAR, VARCHAR -> putString(json, columnName, resultSet, colIndex); + case TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB, BINARY, VARBINARY, GEOMETRY -> putBinary(json, columnName, resultSet, colIndex); + case TINYTEXT, TEXT, MEDIUMTEXT, LONGTEXT, JSON, ENUM, SET -> putString(json, columnName, resultSet, colIndex); + case NULL -> json.set(columnName, NullNode.instance); + default -> putDefault(json, columnName, resultSet, colIndex); } - case DATE -> putDate(json, columnName, resultSet, colIndex); - case DATETIME -> putTimestamp(json, columnName, resultSet, colIndex); - case TIMESTAMP -> putTimestampWithTimezone(json, columnName, resultSet, colIndex); - case TIME -> putTime(json, columnName, resultSet, colIndex); - case CHAR, VARCHAR -> putString(json, columnName, resultSet, colIndex); - case TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB, BINARY, VARBINARY, GEOMETRY -> putBinary(json, columnName, resultSet, colIndex); - case TINYTEXT, TEXT, MEDIUMTEXT, LONGTEXT, JSON, ENUM, SET -> putString(json, columnName, resultSet, colIndex); - case NULL -> json.set(columnName, NullNode.instance); - default -> putDefault(json, columnName, resultSet, colIndex); } } diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcMySqlSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcMySqlSourceAcceptanceTest.java index 9c3d0e3a82c1..8286c3087991 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcMySqlSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcMySqlSourceAcceptanceTest.java @@ -218,8 +218,8 @@ protected void testNullValueConversion() throws Exception { assertFalse(stateMessages.isEmpty(), "Reason"); ObjectMapper mapper = new ObjectMapper(); - assertTrue(cdcFieldsOmitted(recordMessages.get(0).getData()).equals( - mapper.readTree("{\"id\":4, \"name\":\"voyager\"}"))); + assertEquals(cdcFieldsOmitted(recordMessages.get(0).getData()), + mapper.readTree("{\"id\":4, \"name\":\"voyager\", \"userid\":null}")); // when we run incremental sync again there should be no new records. Run a sync with the latest // state message and assert no records were emitted. @@ -234,8 +234,8 @@ protected void testNullValueConversion() throws Exception { assertFalse( secondSyncRecords.isEmpty(), "Expected the second incremental sync to produce records."); - assertTrue(cdcFieldsOmitted(secondSyncRecords.get(0).getData()).equals( - mapper.readTree("{\"id\":5, \"name\":\"deep space nine\", \"userid\":null}"))); + assertEquals(cdcFieldsOmitted(secondSyncRecords.get(0).getData()), + mapper.readTree("{\"id\":5, \"name\":\"deep space nine\", \"userid\":null}")); } diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MySqlSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MySqlSourceAcceptanceTest.java index 39796b510807..6044c66cf9cb 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MySqlSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MySqlSourceAcceptanceTest.java @@ -7,7 +7,6 @@ import static io.airbyte.protocol.models.v0.SyncMode.INCREMENTAL; import static org.junit.Assert.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -138,8 +137,8 @@ protected void testNullValueConversion() throws Exception { assertFalse(stateMessages.isEmpty(), "Reason"); ObjectMapper mapper = new ObjectMapper(); - assertTrue(recordMessages.get(0).getData().equals( - mapper.readTree("{\"id\":4, \"name\":\"voyager\"}"))); + assertEquals(recordMessages.get(0).getData(), + mapper.readTree("{\"id\":4, \"name\":\"voyager\", \"userid\":null}")); // when we run incremental sync again there should be no new records. Run a sync with the latest // state message and assert no records were emitted. @@ -154,8 +153,8 @@ protected void testNullValueConversion() throws Exception { assertFalse( secondSyncRecords.isEmpty(), "Expected the second incremental sync to produce records."); - assertTrue(secondSyncRecords.get(0).getData().equals( - mapper.readTree("{\"id\":5, \"name\":\"deep space nine\"}"))); + assertEquals(secondSyncRecords.get(0).getData(), + mapper.readTree("{\"id\":5, \"name\":\"deep space nine\", \"userid\":null}")); } diff --git a/docs/integrations/sources/mssql.md b/docs/integrations/sources/mssql.md index 8633863ea786..7368a78ea9ef 100644 --- a/docs/integrations/sources/mssql.md +++ b/docs/integrations/sources/mssql.md @@ -416,7 +416,8 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura ## Changelog | Version | Date | Pull Request | Subject | -| :------ | :--------- | :---------------------------------------------------------------------------------------------------------------- | :---------------------------------------------------------------------------------------------------------------------------------------------- | +|:--------|:-----------| :---------------------------------------------------------------------------------------------------------------- |:------------------------------------------------------------------------------------------------------------------------------------------------| +| 4.0.13 | 2024-04-16 | [37111](https://github.com/airbytehq/airbyte/pull/37111) | Populate null values in record message. | | 4.0.12 | 2024-04-15 | [37326](https://github.com/airbytehq/airbyte/pull/37326) | Allow up to 60 minutes of wait for the an initial CDC record. | | 4.0.11 | 2024-04-15 | [37325](https://github.com/airbytehq/airbyte/pull/37325) | Populate airbyte_meta.changes + error handling. | | 4.0.10 | 2024-04-15 | [37110](https://github.com/airbytehq/airbyte/pull/37110) | Internal cleanup. | diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index c3343beab04a..9c151ff4024e 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -223,6 +223,7 @@ Any database or table encoding combination of charset and collation is supported | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.3.20 | 2024-04-16 | [37111](https://github.com/airbytehq/airbyte/pull/37111) | Populate null values in record message. | | 3.3.19 | 2024-04-15 | [37328](https://github.com/airbytehq/airbyte/pull/37328) | Populate airbyte_meta.changes | | 3.3.18 | 2024-04-15 | [37324](https://github.com/airbytehq/airbyte/pull/37324) | Refactor source operations. | | 3.3.17 | 2024-04-10 | [36919](https://github.com/airbytehq/airbyte/pull/36919) | Fix a bug in conversion of null values. |