Skip to content

Commit

Permalink
[Source-mysql/mssql] : Populate null values (#37111)
Browse files Browse the repository at this point in the history
  • Loading branch information
akashkulk authored Apr 16, 2024
1 parent 70afb9d commit bae6304
Show file tree
Hide file tree
Showing 14 changed files with 80 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,6 @@ abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
val columnName = queryContext.metaData.getColumnName(i)
val columnTypeName = queryContext.metaData.getColumnTypeName(i)
try {
// attempt to access the column. this allows us to know if it is null before we do
// type-specific
// parsing. if it is null, we can move on. while awkward, this seems to be the
// agreed upon way of checking for null values with jdbc.
queryContext.getObject(i)
if (queryContext.wasNull()) {
continue
}

// convert to java types that will convert into reasonable json.
copyToJsonField(queryContext, i, jsonNode)
} catch (e: java.lang.Exception) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.30.4
version=0.30.5
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.30.4'
cdkVersionRequired = '0.30.5'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 4.0.12
dockerImageTag: 4.0.13
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,20 @@ public AirbyteRecordData convertDatabaseRowToAirbyteRecordData(final ResultSet q
@Override
public void copyToJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json)
throws SQLException {

final SQLServerResultSetMetaData metadata = (SQLServerResultSetMetaData) resultSet
.getMetaData();
final String columnName = metadata.getColumnName(colIndex);
final String columnTypeName = metadata.getColumnTypeName(colIndex);
final JDBCType columnType = safeGetJdbcType(metadata.getColumnType(colIndex));

if (columnTypeName.equalsIgnoreCase("time")) {
// Attempt to access the column. this allows us to know if it is null before we do
// type-specific parsing. If the column is null, we will populate the null value and skip attempting
// to
// parse the column value.
resultSet.getObject(colIndex);
if (resultSet.wasNull()) {
json.putNull(columnName);
} else if (columnTypeName.equalsIgnoreCase("time")) {
putTime(json, columnName, resultSet, colIndex);
} else if (columnTypeName.equalsIgnoreCase("geometry")) {
putGeometry(json, columnName, resultSet, colIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ protected void testNullValueConversion() throws Exception {
ObjectMapper mapper = new ObjectMapper();

assertTrue(cdcFieldsOmitted(recordMessages.get(0).getData()).equals(
mapper.readTree("{\"id\":4, \"name\":\"voyager\"}")));
mapper.readTree("{\"id\":4, \"name\":\"voyager\", \"userid\":null}")));

// when we run incremental sync again there should be no new records. Run a sync with the latest
// state message and assert no records were emitted.
Expand All @@ -229,9 +229,8 @@ protected void testNullValueConversion() throws Exception {
assertFalse(
secondSyncRecords.isEmpty(),
"Expected the second incremental sync to produce records.");
assertTrue(cdcFieldsOmitted(secondSyncRecords.get(0).getData()).equals(
mapper.readTree("{\"id\":5, \"name\":\"deep space nine\", \"userid\":null}")));

assertEquals(cdcFieldsOmitted(secondSyncRecords.get(0).getData()),
mapper.readTree("{\"id\":5, \"name\":\"deep space nine\", \"userid\":null}"));
}

private JsonNode cdcFieldsOmitted(final JsonNode node) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ protected void testNullValueConversion() throws Exception {
ObjectMapper mapper = new ObjectMapper();

assertTrue(recordMessages.get(0).getData().equals(
mapper.readTree("{\"id\":4, \"name\":\"voyager\"}")));
mapper.readTree("{\"id\":4, \"name\":\"voyager\", \"userid\":null}}")));

// when we run incremental sync again there should be no new records. Run a sync with the latest
// state message and assert no records were emitted.
Expand All @@ -203,7 +203,7 @@ protected void testNullValueConversion() throws Exception {
secondSyncRecords.isEmpty(),
"Expected the second incremental sync to produce records.");
assertTrue(secondSyncRecords.get(0).getData().equals(
mapper.readTree("{\"id\":5, \"name\":\"deep space nine\"}")));
mapper.readTree("{\"id\":5, \"name\":\"deep space nine\", \"userid\":null}}")));

}

Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.30.1'
cdkVersionRequired = '0.30.5'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.3.19
dockerImageTag: 3.3.20
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,53 +102,62 @@ public void copyToJsonField(final ResultSet resultSet, final int colIndex, final
final String columnName = field.getName();
final MysqlType columnType = field.getMysqlType();

// https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-type-conversions.html
switch (columnType) {
case BIT -> {
if (field.getLength() == 1L) {
// BIT(1) is boolean
putBoolean(json, columnName, resultSet, colIndex);
} else {
putBinary(json, columnName, resultSet, colIndex);
// Attempt to access the column. this allows us to know if it is null before we do
// type-specific parsing. If the column is null, we will populate the null value and skip attempting
// to
// parse the column value.
resultSet.getObject(colIndex);
if (resultSet.wasNull()) {
json.putNull(columnName);
} else {
// https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-type-conversions.html
switch (columnType) {
case BIT -> {
if (field.getLength() == 1L) {
// BIT(1) is boolean
putBoolean(json, columnName, resultSet, colIndex);
} else {
putBinary(json, columnName, resultSet, colIndex);
}
}
}
case BOOLEAN -> putBoolean(json, columnName, resultSet, colIndex);
case TINYINT -> {
if (field.getLength() == 1L) {
// TINYINT(1) is boolean
putBoolean(json, columnName, resultSet, colIndex);
} else {
putShortInt(json, columnName, resultSet, colIndex);
case BOOLEAN -> putBoolean(json, columnName, resultSet, colIndex);
case TINYINT -> {
if (field.getLength() == 1L) {
// TINYINT(1) is boolean
putBoolean(json, columnName, resultSet, colIndex);
} else {
putShortInt(json, columnName, resultSet, colIndex);
}
}
}
case TINYINT_UNSIGNED, YEAR -> putShortInt(json, columnName, resultSet, colIndex);
case SMALLINT, SMALLINT_UNSIGNED, MEDIUMINT, MEDIUMINT_UNSIGNED -> putInteger(json, columnName, resultSet, colIndex);
case INT, INT_UNSIGNED -> {
if (field.isUnsigned()) {
putBigInt(json, columnName, resultSet, colIndex);
} else {
putInteger(json, columnName, resultSet, colIndex);
case TINYINT_UNSIGNED, YEAR -> putShortInt(json, columnName, resultSet, colIndex);
case SMALLINT, SMALLINT_UNSIGNED, MEDIUMINT, MEDIUMINT_UNSIGNED -> putInteger(json, columnName, resultSet, colIndex);
case INT, INT_UNSIGNED -> {
if (field.isUnsigned()) {
putBigInt(json, columnName, resultSet, colIndex);
} else {
putInteger(json, columnName, resultSet, colIndex);
}
}
}
case BIGINT, BIGINT_UNSIGNED -> putBigInt(json, columnName, resultSet, colIndex);
case FLOAT, FLOAT_UNSIGNED -> putFloat(json, columnName, resultSet, colIndex);
case DOUBLE, DOUBLE_UNSIGNED -> putDouble(json, columnName, resultSet, colIndex);
case DECIMAL, DECIMAL_UNSIGNED -> {
if (field.getDecimals() == 0) {
putBigInt(json, columnName, resultSet, colIndex);
} else {
putBigDecimal(json, columnName, resultSet, colIndex);
case BIGINT, BIGINT_UNSIGNED -> putBigInt(json, columnName, resultSet, colIndex);
case FLOAT, FLOAT_UNSIGNED -> putFloat(json, columnName, resultSet, colIndex);
case DOUBLE, DOUBLE_UNSIGNED -> putDouble(json, columnName, resultSet, colIndex);
case DECIMAL, DECIMAL_UNSIGNED -> {
if (field.getDecimals() == 0) {
putBigInt(json, columnName, resultSet, colIndex);
} else {
putBigDecimal(json, columnName, resultSet, colIndex);
}
}
case DATE -> putDate(json, columnName, resultSet, colIndex);
case DATETIME -> putTimestamp(json, columnName, resultSet, colIndex);
case TIMESTAMP -> putTimestampWithTimezone(json, columnName, resultSet, colIndex);
case TIME -> putTime(json, columnName, resultSet, colIndex);
case CHAR, VARCHAR -> putString(json, columnName, resultSet, colIndex);
case TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB, BINARY, VARBINARY, GEOMETRY -> putBinary(json, columnName, resultSet, colIndex);
case TINYTEXT, TEXT, MEDIUMTEXT, LONGTEXT, JSON, ENUM, SET -> putString(json, columnName, resultSet, colIndex);
case NULL -> json.set(columnName, NullNode.instance);
default -> putDefault(json, columnName, resultSet, colIndex);
}
case DATE -> putDate(json, columnName, resultSet, colIndex);
case DATETIME -> putTimestamp(json, columnName, resultSet, colIndex);
case TIMESTAMP -> putTimestampWithTimezone(json, columnName, resultSet, colIndex);
case TIME -> putTime(json, columnName, resultSet, colIndex);
case CHAR, VARCHAR -> putString(json, columnName, resultSet, colIndex);
case TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB, BINARY, VARBINARY, GEOMETRY -> putBinary(json, columnName, resultSet, colIndex);
case TINYTEXT, TEXT, MEDIUMTEXT, LONGTEXT, JSON, ENUM, SET -> putString(json, columnName, resultSet, colIndex);
case NULL -> json.set(columnName, NullNode.instance);
default -> putDefault(json, columnName, resultSet, colIndex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ protected void testNullValueConversion() throws Exception {
assertFalse(stateMessages.isEmpty(), "Reason");
ObjectMapper mapper = new ObjectMapper();

assertTrue(cdcFieldsOmitted(recordMessages.get(0).getData()).equals(
mapper.readTree("{\"id\":4, \"name\":\"voyager\"}")));
assertEquals(cdcFieldsOmitted(recordMessages.get(0).getData()),
mapper.readTree("{\"id\":4, \"name\":\"voyager\", \"userid\":null}"));

// when we run incremental sync again there should be no new records. Run a sync with the latest
// state message and assert no records were emitted.
Expand All @@ -234,8 +234,8 @@ protected void testNullValueConversion() throws Exception {
assertFalse(
secondSyncRecords.isEmpty(),
"Expected the second incremental sync to produce records.");
assertTrue(cdcFieldsOmitted(secondSyncRecords.get(0).getData()).equals(
mapper.readTree("{\"id\":5, \"name\":\"deep space nine\", \"userid\":null}")));
assertEquals(cdcFieldsOmitted(secondSyncRecords.get(0).getData()),
mapper.readTree("{\"id\":5, \"name\":\"deep space nine\", \"userid\":null}"));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import static io.airbyte.protocol.models.v0.SyncMode.INCREMENTAL;
import static org.junit.Assert.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -138,8 +137,8 @@ protected void testNullValueConversion() throws Exception {
assertFalse(stateMessages.isEmpty(), "Reason");
ObjectMapper mapper = new ObjectMapper();

assertTrue(recordMessages.get(0).getData().equals(
mapper.readTree("{\"id\":4, \"name\":\"voyager\"}")));
assertEquals(recordMessages.get(0).getData(),
mapper.readTree("{\"id\":4, \"name\":\"voyager\", \"userid\":null}"));

// when we run incremental sync again there should be no new records. Run a sync with the latest
// state message and assert no records were emitted.
Expand All @@ -154,8 +153,8 @@ protected void testNullValueConversion() throws Exception {
assertFalse(
secondSyncRecords.isEmpty(),
"Expected the second incremental sync to produce records.");
assertTrue(secondSyncRecords.get(0).getData().equals(
mapper.readTree("{\"id\":5, \"name\":\"deep space nine\"}")));
assertEquals(secondSyncRecords.get(0).getData(),
mapper.readTree("{\"id\":5, \"name\":\"deep space nine\", \"userid\":null}"));

}

Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/mssql.md
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,8 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura
## Changelog

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :---------------------------------------------------------------------------------------------------------------- | :---------------------------------------------------------------------------------------------------------------------------------------------- |
|:--------|:-----------| :---------------------------------------------------------------------------------------------------------------- |:------------------------------------------------------------------------------------------------------------------------------------------------|
| 4.0.13 | 2024-04-16 | [37111](https://github.com/airbytehq/airbyte/pull/37111) | Populate null values in record message. |
| 4.0.12 | 2024-04-15 | [37326](https://github.com/airbytehq/airbyte/pull/37326) | Allow up to 60 minutes of wait for the an initial CDC record. |
| 4.0.11 | 2024-04-15 | [37325](https://github.com/airbytehq/airbyte/pull/37325) | Populate airbyte_meta.changes + error handling. |
| 4.0.10 | 2024-04-15 | [37110](https://github.com/airbytehq/airbyte/pull/37110) | Internal cleanup. |
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ Any database or table encoding combination of charset and collation is supported

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.3.20 | 2024-04-16 | [37111](https://github.com/airbytehq/airbyte/pull/37111) | Populate null values in record message. |
| 3.3.19 | 2024-04-15 | [37328](https://github.com/airbytehq/airbyte/pull/37328) | Populate airbyte_meta.changes |
| 3.3.18 | 2024-04-15 | [37324](https://github.com/airbytehq/airbyte/pull/37324) | Refactor source operations. |
| 3.3.17 | 2024-04-10 | [36919](https://github.com/airbytehq/airbyte/pull/36919) | Fix a bug in conversion of null values. |
Expand Down

0 comments on commit bae6304

Please sign in to comment.