Skip to content

Commit

Permalink
Prevent conversion of null fields (#36919)
Browse files Browse the repository at this point in the history
  • Loading branch information
rodireich authored Apr 12, 2024
1 parent e1443c7 commit 1e9ee1d
Show file tree
Hide file tree
Showing 17 changed files with 343 additions and 50 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ Maven and Gradle will automatically reference the correct (pinned) version of th

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.30.1 | 2024-04-11 | [\#36919](https://github.com/airbytehq/airbyte/pull/36919) | Fix regression in sources conversion of null values |
| 0.30.0 | 2024-04-11 | [\#36974](https://github.com/airbytehq/airbyte/pull/36974) | Destinations: Pass config to jdbc sqlgenerator; allow cascade drop |
| 0.29.13 | 2024-04-10 | [\#36981](https://github.com/airbytehq/airbyte/pull/36981) | DB sources : Emit analytics for data type serialization errors. |
| 0.29.12 | 2024-04-10 | [\#36973](https://github.com/airbytehq/airbyte/pull/36973) | Destinations: Make flush batch size configurable for JdbcInsertFlush |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
val columnName = queryContext.metaData.getColumnName(i)
val columnTypeName = queryContext.metaData.getColumnTypeName(i)
try {
// attempt to access the column. this allows us to know if it is null before we do
// type-specific
// parsing. if it is null, we can move on. while awkward, this seems to be the
// agreed upon way of checking for null values with jdbc.
queryContext.getObject(i)
if (queryContext.wasNull()) {
continue
}

// convert to java types that will convert into reasonable json.
copyToJsonField(queryContext, i, jsonNode)
} catch (e: java.lang.Exception) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.30.0
version=0.30.1
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import org.slf4j.LoggerFactory
* type system.
*/
abstract class AbstractSourceDatabaseTypeTest : AbstractSourceConnectorTest() {
protected val testDataHolders: MutableList<TestDataHolder> = ArrayList()
protected var database: Database? = null
@JvmField protected val testDataHolders: MutableList<TestDataHolder> = ArrayList()
@JvmField protected var database: Database? = null

protected val idColumnName: String
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,5 +454,23 @@ abstract class SourceAcceptanceTest : AbstractSourceConnectorTest() {
.map { obj: AirbyteMessage -> obj.record }
.collect(Collectors.toList())
}

@JvmStatic
public fun extractLatestState(stateMessages: List<AirbyteStateMessage>): JsonNode? {
var latestState: JsonNode? = null
for (stateMessage in stateMessages) {
if (stateMessage.type == AirbyteStateMessage.AirbyteStateType.STREAM) {
latestState = Jsons.jsonNode(stateMessages)
break
} else if (stateMessage.type == AirbyteStateMessage.AirbyteStateType.GLOBAL) {
latestState =
Jsons.jsonNode(java.util.List.of(Iterables.getLast(stateMessages)))
break
} else {
throw RuntimeException("Unknown state type " + stateMessage.type)
}
}
return latestState
}
}
}
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.29.1'
cdkVersionRequired = '0.30.1'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
6 changes: 1 addition & 5 deletions airbyte-integrations/connectors/source-mssql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 4.0.8
dockerImageTag: 4.0.9
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql
Expand All @@ -19,12 +19,8 @@ data:
name: Microsoft SQL Server (MSSQL)
registries:
cloud:
# A regression in https://github.com/airbytehq/airbyte-internal-issues/issues/7055
dockerImageTag: 4.0.6
enabled: true
oss:
# A regression in https://github.com/airbytehq/airbyte-internal-issues/issues/7055
dockerImageTag: 4.0.6
enabled: true
releaseStage: generally_available
supportLevel: certified
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@

package io.airbyte.integrations.source.mssql;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static io.airbyte.protocol.models.v0.SyncMode.FULL_REFRESH;
import static io.airbyte.protocol.models.v0.SyncMode.INCREMENTAL;
import static org.junit.jupiter.api.Assertions.*;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.airbyte.cdk.integrations.base.ssh.SshHelpers;
Expand All @@ -28,7 +32,9 @@
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;
Expand All @@ -43,6 +49,7 @@ public class CdcMssqlSourceAcceptanceTest extends SourceAcceptanceTest {
private static final String STREAM_NAME = "id_and_name";
private static final String STREAM_NAME2 = "starships";
private static final String CDC_ROLE_NAME = "cdc_selector";
private static final String STREAM_NAME3 = "stream3";

private MsSQLTestDatabase testdb;

Expand Down Expand Up @@ -75,8 +82,7 @@ protected List<ConfiguredAirbyteStream> getConfiguredAirbyteStreams() {
.withSyncMode(SyncMode.INCREMENTAL)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
String.format("%s", STREAM_NAME),
String.format("%s", SCHEMA_NAME),
STREAM_NAME, SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSourceDefinedCursor(true)
Expand All @@ -87,8 +93,7 @@ protected List<ConfiguredAirbyteStream> getConfiguredAirbyteStreams() {
.withSyncMode(SyncMode.INCREMENTAL)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
String.format("%s", STREAM_NAME2),
String.format("%s", SCHEMA_NAME),
STREAM_NAME2, SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSourceDefinedCursor(true)
Expand All @@ -111,12 +116,15 @@ protected void setupEnvironment(final TestDestinationEnv environment) {
// create tables
.with("CREATE TABLE %s.%s(id INTEGER PRIMARY KEY, name VARCHAR(200));", SCHEMA_NAME, STREAM_NAME)
.with("CREATE TABLE %s.%s(id INTEGER PRIMARY KEY, name VARCHAR(200));", SCHEMA_NAME, STREAM_NAME2)
.with("CREATE TABLE %s.%s (id INTEGER PRIMARY KEY, name VARCHAR(200), userid INTEGER DEFAULT NULL);", SCHEMA_NAME, STREAM_NAME3)
// populate tables
.with("INSERT INTO %s.%s (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');", SCHEMA_NAME, STREAM_NAME)
.with("INSERT INTO %s.%s (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato');", SCHEMA_NAME, STREAM_NAME2)
.with("INSERT INTO %s.%s (id, name) VALUES (4,'voyager');", SCHEMA_NAME, STREAM_NAME3)
// enable cdc on tables for designated role
.withCdcForTable(SCHEMA_NAME, STREAM_NAME, CDC_ROLE_NAME)
.withCdcForTable(SCHEMA_NAME, STREAM_NAME2, CDC_ROLE_NAME)
.withCdcForTable(SCHEMA_NAME, STREAM_NAME3, CDC_ROLE_NAME)
// revoke user permissions
.with("REVOKE ALL FROM %s CASCADE;", testdb.getUserName())
.with("EXEC sp_msforeachtable \"REVOKE ALL ON '?' TO %s;\"", testdb.getUserName())
Expand Down Expand Up @@ -177,4 +185,64 @@ private List<AirbyteStateMessage> filterStateMessages(final List<AirbyteMessage>
.collect(Collectors.toList());
}

@Test
protected void testNullValueConversion() throws Exception {
final List<ConfiguredAirbyteStream> configuredAirbyteStreams =
Lists.newArrayList(new ConfiguredAirbyteStream()
.withSyncMode(INCREMENTAL)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(STREAM_NAME3,
SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING),
Field.of("userid", JsonSchemaType.NUMBER))
.withSourceDefinedCursor(true)
.withSourceDefinedPrimaryKey(List.of(List.of("id")))
.withSupportedSyncModes(Lists.newArrayList(FULL_REFRESH, INCREMENTAL))));

final ConfiguredAirbyteCatalog configuredCatalogWithOneStream =
new ConfiguredAirbyteCatalog().withStreams(List.of(configuredAirbyteStreams.get(0)));

final List<AirbyteMessage> airbyteMessages = runRead(configuredCatalogWithOneStream, getState());
final List<AirbyteRecordMessage> recordMessages = filterRecords(airbyteMessages);
final List<AirbyteStateMessage> stateMessages = airbyteMessages
.stream()
.filter(m -> m.getType() == AirbyteMessage.Type.STATE)
.map(AirbyteMessage::getState)
.collect(Collectors.toList());
Assert.assertEquals(recordMessages.size(), 1);
assertFalse(stateMessages.isEmpty(), "Reason");
ObjectMapper mapper = new ObjectMapper();

assertTrue(cdcFieldsOmitted(recordMessages.get(0).getData()).equals(
mapper.readTree("{\"id\":4, \"name\":\"voyager\"}")));

// when we run incremental sync again there should be no new records. Run a sync with the latest
// state message and assert no records were emitted.
JsonNode latestState = extractLatestState(stateMessages);

testdb.getDatabase().query(c -> c.query("INSERT INTO %s.%s (id, name) VALUES (5,'deep space nine')".formatted(SCHEMA_NAME, STREAM_NAME3)))
.execute();

assert Objects.nonNull(latestState);
final List<AirbyteRecordMessage> secondSyncRecords = filterRecords(runRead(configuredCatalogWithOneStream, latestState));
assertFalse(
secondSyncRecords.isEmpty(),
"Expected the second incremental sync to produce records.");
assertTrue(cdcFieldsOmitted(secondSyncRecords.get(0).getData()).equals(
mapper.readTree("{\"id\":5, \"name\":\"deep space nine\", \"userid\":null}")));

}

private JsonNode cdcFieldsOmitted(final JsonNode node) {
ObjectMapper mapper = new ObjectMapper();
ObjectNode object = mapper.createObjectNode();
node.fieldNames().forEachRemaining(name -> {
if (!name.toLowerCase().startsWith("_ab_cdc_")) {
object.put(name, node.get(name));
}
});
return object;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ protected void setupEnvironment(final TestDestinationEnv environment) {
"(1,'picard', '2124-03-04T01:01:01Z'), " +
"(2, 'crusher', '2124-03-04T01:01:01Z'), " +
"(3, 'vash', '2124-03-04T01:01:01Z');")
.with("INSERT INTO %s.%s (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato'), (4, 'Argo');", SCHEMA_NAME, STREAM_NAME2);
.with("INSERT INTO %s.%s (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato'), (4, 'Argo');", SCHEMA_NAME, STREAM_NAME2)
.with("CREATE TABLE %s.%s (id INTEGER PRIMARY KEY, name VARCHAR(200), userid INTEGER DEFAULT NULL);", SCHEMA_NAME, STREAM_NAME3)
.with("INSERT INTO %s.%s (id, name) VALUES (4,'voyager');", SCHEMA_NAME, STREAM_NAME3);

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@

package io.airbyte.integrations.source.mssql;

import static io.airbyte.protocol.models.v0.SyncMode.INCREMENTAL;
import static org.junit.Assert.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.airbyte.cdk.integrations.base.ssh.SshHelpers;
Expand All @@ -29,6 +33,7 @@
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;

Expand All @@ -37,19 +42,23 @@ public class MssqlSourceAcceptanceTest extends SourceAcceptanceTest {
protected static final String SCHEMA_NAME = "dbo";
protected static final String STREAM_NAME = "id_and_name";
protected static final String STREAM_NAME2 = "starships";
protected static final String STREAM_NAME3 = "stream3";

protected MsSQLTestDatabase testdb;

@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws SQLException {
testdb = MsSQLTestDatabase.in(BaseImage.MSSQL_2022)
.with("CREATE TABLE id_and_name (id INTEGER, name VARCHAR(200), born DATETIMEOFFSET(7));")
.with("CREATE TABLE %s.%s (id INTEGER, name VARCHAR(200), born DATETIMEOFFSET(7));", SCHEMA_NAME, STREAM_NAME)
.with("CREATE TABLE %s.%s(id INTEGER PRIMARY KEY, name VARCHAR(200));", SCHEMA_NAME, STREAM_NAME2)
.with("INSERT INTO id_and_name (id, name, born) VALUES " +
"(1, 'picard', '2124-03-04T01:01:01Z'), " +
"(2, 'crusher', '2124-03-04T01:01:01Z'), " +
"(3, 'vash', '2124-03-04T01:01:01Z');")
.with("INSERT INTO %s.%s (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato'), (4, 'Argo');", SCHEMA_NAME, STREAM_NAME2);
.with("INSERT INTO %s.%s (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato'), (4, 'Argo');", SCHEMA_NAME, STREAM_NAME2)
.with("CREATE TABLE %s.%s (id INTEGER PRIMARY KEY, name VARCHAR(200), userid INTEGER DEFAULT NULL);", SCHEMA_NAME, STREAM_NAME3)
.with("INSERT INTO %s.%s (id, name) VALUES (4,'voyager');", SCHEMA_NAME, STREAM_NAME3);

}

@Override
Expand All @@ -76,12 +85,25 @@ protected JsonNode getConfig() {

@Override
protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
return CatalogHelpers.createConfiguredAirbyteCatalog(
STREAM_NAME,
SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING),
Field.of("born", JsonSchemaType.STRING));
return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
new ConfiguredAirbyteStream()
.withSyncMode(INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME, SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, INCREMENTAL))),
new ConfiguredAirbyteStream()
.withSyncMode(INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME2, SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, INCREMENTAL)))));
}

@Override
Expand All @@ -97,14 +119,14 @@ protected void testAddNewStreamToExistingSync() throws Exception {
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withSyncMode(SyncMode.INCREMENTAL)
.withSyncMode(INCREMENTAL)
.withCursorField(List.of("id")),
CatalogHelpers.createConfiguredAirbyteStream(STREAM_NAME2,
SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withSyncMode(SyncMode.INCREMENTAL)
.withSyncMode(INCREMENTAL)
.withCursorField(List.of("id")));
final ConfiguredAirbyteCatalog configuredCatalogWithOneStream =
new ConfiguredAirbyteCatalog().withStreams(List.of(configuredAirbyteStreams.get(0)));
Expand Down Expand Up @@ -139,6 +161,52 @@ protected void testAddNewStreamToExistingSync() throws Exception {
assertEquals(SCHEMA_NAME, stateMessages2.get(1).getStream().getStreamDescriptor().getNamespace());
}

@Test
protected void testNullValueConversion() throws Exception {
final List<ConfiguredAirbyteStream> configuredAirbyteStreams =
Lists.newArrayList(CatalogHelpers.createConfiguredAirbyteStream(STREAM_NAME3,
SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING),
Field.of("userid", JsonSchemaType.NUMBER))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withSyncMode(INCREMENTAL)
.withCursorField(List.of("id")));
final ConfiguredAirbyteCatalog configuredCatalogWithOneStream =
new ConfiguredAirbyteCatalog().withStreams(List.of(configuredAirbyteStreams.get(0)));

final List<AirbyteMessage> airbyteMessages = runRead(configuredCatalogWithOneStream, getState());
final List<AirbyteRecordMessage> recordMessages = filterRecords(airbyteMessages);
final List<AirbyteStateMessage> stateMessages = airbyteMessages
.stream()
.filter(m -> m.getType() == AirbyteMessage.Type.STATE)
.map(AirbyteMessage::getState)
.collect(Collectors.toList());
assertEquals(recordMessages.size(), 1);
assertFalse(stateMessages.isEmpty(), "Reason");
ObjectMapper mapper = new ObjectMapper();

assertTrue(recordMessages.get(0).getData().equals(
mapper.readTree("{\"id\":4, \"name\":\"voyager\"}")));

// when we run incremental sync again there should be no new records. Run a sync with the latest
// state message and assert no records were emitted.
JsonNode latestState = extractLatestState(stateMessages);

testdb.getDatabase().query(c -> {
return c.query("INSERT INTO %s.%s (id, name) VALUES (5,'deep space nine');".formatted(SCHEMA_NAME, STREAM_NAME3));
}).execute();

assert Objects.nonNull(latestState);
final List<AirbyteRecordMessage> secondSyncRecords = filterRecords(runRead(configuredCatalogWithOneStream, latestState));
assertFalse(
secondSyncRecords.isEmpty(),
"Expected the second incremental sync to produce records.");
assertTrue(secondSyncRecords.get(0).getData().equals(
mapper.readTree("{\"id\":5, \"name\":\"deep space nine\"}")));

}

private List<AirbyteStateMessage> filterStateMessages(final List<AirbyteMessage> messages) {
return messages.stream().filter(r -> r.getType() == AirbyteMessage.Type.STATE).map(AirbyteMessage::getState)
.collect(Collectors.toList());
Expand Down
Loading

0 comments on commit 1e9ee1d

Please sign in to comment.