diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index 91cdf20d9da2..ef61c4f073a7 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -144,6 +144,7 @@ Maven and Gradle will automatically reference the correct (pinned) version of th | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.30.1 | 2024-04-11 | [\#36919](https://github.com/airbytehq/airbyte/pull/36919) | Fix regression in sources conversion of null values | | 0.30.0 | 2024-04-11 | [\#36974](https://github.com/airbytehq/airbyte/pull/36974) | Destinations: Pass config to jdbc sqlgenerator; allow cascade drop | | 0.29.13 | 2024-04-10 | [\#36981](https://github.com/airbytehq/airbyte/pull/36981) | DB sources : Emit analytics for data type serialization errors. | | 0.29.12 | 2024-04-10 | [\#36973](https://github.com/airbytehq/airbyte/pull/36973) | Destinations: Make flush batch size configurable for JdbcInsertFlush | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/AbstractJdbcCompatibleSourceOperations.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/AbstractJdbcCompatibleSourceOperations.kt index db78b4ab3b8b..0ae1e42c4f1b 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/AbstractJdbcCompatibleSourceOperations.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/AbstractJdbcCompatibleSourceOperations.kt @@ -43,6 +43,15 @@ abstract class AbstractJdbcCompatibleSourceOperations : val columnName = queryContext.metaData.getColumnName(i) val columnTypeName = queryContext.metaData.getColumnTypeName(i) try { + // attempt to access the column. this allows us to know if it is null before we do + // type-specific + // parsing. if it is null, we can move on. while awkward, this seems to be the + // agreed upon way of checking for null values with jdbc. + queryContext.getObject(i) + if (queryContext.wasNull()) { + continue + } + // convert to java types that will convert into reasonable json. copyToJsonField(queryContext, i, jsonNode) } catch (e: java.lang.Exception) { diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 902c55fe61a8..594d8441fb73 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.30.0 +version=0.30.1 \ No newline at end of file diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.kt index cb6cead0a7a7..d5dc3959aed6 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.kt @@ -28,8 +28,8 @@ import org.slf4j.LoggerFactory * type system. */ abstract class AbstractSourceDatabaseTypeTest : AbstractSourceConnectorTest() { - protected val testDataHolders: MutableList = ArrayList() - protected var database: Database? = null + @JvmField protected val testDataHolders: MutableList = ArrayList() + @JvmField protected var database: Database? = null protected val idColumnName: String /** diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/SourceAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/SourceAcceptanceTest.kt index f4af59769fbd..66b1dc8a1fa7 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/SourceAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/SourceAcceptanceTest.kt @@ -454,5 +454,23 @@ abstract class SourceAcceptanceTest : AbstractSourceConnectorTest() { .map { obj: AirbyteMessage -> obj.record } .collect(Collectors.toList()) } + + @JvmStatic + public fun extractLatestState(stateMessages: List): JsonNode? { + var latestState: JsonNode? = null + for (stateMessage in stateMessages) { + if (stateMessage.type == AirbyteStateMessage.AirbyteStateType.STREAM) { + latestState = Jsons.jsonNode(stateMessages) + break + } else if (stateMessage.type == AirbyteStateMessage.AirbyteStateType.GLOBAL) { + latestState = + Jsons.jsonNode(java.util.List.of(Iterables.getLast(stateMessages))) + break + } else { + throw RuntimeException("Unknown state type " + stateMessage.type) + } + } + return latestState + } } } diff --git a/airbyte-integrations/connectors/source-mssql/build.gradle b/airbyte-integrations/connectors/source-mssql/build.gradle index 8e94be23fb0c..aeadb10194b1 100644 --- a/airbyte-integrations/connectors/source-mssql/build.gradle +++ b/airbyte-integrations/connectors/source-mssql/build.gradle @@ -3,7 +3,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.29.1' + cdkVersionRequired = '0.30.1' features = ['db-sources'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/source-mssql/metadata.yaml b/airbyte-integrations/connectors/source-mssql/metadata.yaml index 8624804e3c2b..12053f2609c5 100644 --- a/airbyte-integrations/connectors/source-mssql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mssql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1 - dockerImageTag: 4.0.8 + dockerImageTag: 4.0.9 dockerRepository: airbyte/source-mssql documentationUrl: https://docs.airbyte.com/integrations/sources/mssql githubIssueLabel: source-mssql @@ -19,12 +19,8 @@ data: name: Microsoft SQL Server (MSSQL) registries: cloud: - # A regression in https://github.com/airbytehq/airbyte-internal-issues/issues/7055 - dockerImageTag: 4.0.6 enabled: true oss: - # A regression in https://github.com/airbytehq/airbyte-internal-issues/issues/7055 - dockerImageTag: 4.0.6 enabled: true releaseStage: generally_available supportLevel: certified diff --git a/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceAcceptanceTest.java index 0db3f1eb31cb..3294edd1eedb 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceAcceptanceTest.java @@ -4,9 +4,13 @@ package io.airbyte.integrations.source.mssql; -import static org.junit.jupiter.api.Assertions.assertEquals; +import static io.airbyte.protocol.models.v0.SyncMode.FULL_REFRESH; +import static io.airbyte.protocol.models.v0.SyncMode.INCREMENTAL; +import static org.junit.jupiter.api.Assertions.*; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import io.airbyte.cdk.integrations.base.ssh.SshHelpers; @@ -28,7 +32,9 @@ import io.airbyte.protocol.models.v0.DestinationSyncMode; import io.airbyte.protocol.models.v0.SyncMode; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; +import org.junit.Assert; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.TestInstance.Lifecycle; @@ -43,6 +49,7 @@ public class CdcMssqlSourceAcceptanceTest extends SourceAcceptanceTest { private static final String STREAM_NAME = "id_and_name"; private static final String STREAM_NAME2 = "starships"; private static final String CDC_ROLE_NAME = "cdc_selector"; + private static final String STREAM_NAME3 = "stream3"; private MsSQLTestDatabase testdb; @@ -75,8 +82,7 @@ protected List getConfiguredAirbyteStreams() { .withSyncMode(SyncMode.INCREMENTAL) .withDestinationSyncMode(DestinationSyncMode.APPEND) .withStream(CatalogHelpers.createAirbyteStream( - String.format("%s", STREAM_NAME), - String.format("%s", SCHEMA_NAME), + STREAM_NAME, SCHEMA_NAME, Field.of("id", JsonSchemaType.NUMBER), Field.of("name", JsonSchemaType.STRING)) .withSourceDefinedCursor(true) @@ -87,8 +93,7 @@ protected List getConfiguredAirbyteStreams() { .withSyncMode(SyncMode.INCREMENTAL) .withDestinationSyncMode(DestinationSyncMode.APPEND) .withStream(CatalogHelpers.createAirbyteStream( - String.format("%s", STREAM_NAME2), - String.format("%s", SCHEMA_NAME), + STREAM_NAME2, SCHEMA_NAME, Field.of("id", JsonSchemaType.NUMBER), Field.of("name", JsonSchemaType.STRING)) .withSourceDefinedCursor(true) @@ -111,12 +116,15 @@ protected void setupEnvironment(final TestDestinationEnv environment) { // create tables .with("CREATE TABLE %s.%s(id INTEGER PRIMARY KEY, name VARCHAR(200));", SCHEMA_NAME, STREAM_NAME) .with("CREATE TABLE %s.%s(id INTEGER PRIMARY KEY, name VARCHAR(200));", SCHEMA_NAME, STREAM_NAME2) + .with("CREATE TABLE %s.%s (id INTEGER PRIMARY KEY, name VARCHAR(200), userid INTEGER DEFAULT NULL);", SCHEMA_NAME, STREAM_NAME3) // populate tables .with("INSERT INTO %s.%s (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');", SCHEMA_NAME, STREAM_NAME) .with("INSERT INTO %s.%s (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato');", SCHEMA_NAME, STREAM_NAME2) + .with("INSERT INTO %s.%s (id, name) VALUES (4,'voyager');", SCHEMA_NAME, STREAM_NAME3) // enable cdc on tables for designated role .withCdcForTable(SCHEMA_NAME, STREAM_NAME, CDC_ROLE_NAME) .withCdcForTable(SCHEMA_NAME, STREAM_NAME2, CDC_ROLE_NAME) + .withCdcForTable(SCHEMA_NAME, STREAM_NAME3, CDC_ROLE_NAME) // revoke user permissions .with("REVOKE ALL FROM %s CASCADE;", testdb.getUserName()) .with("EXEC sp_msforeachtable \"REVOKE ALL ON '?' TO %s;\"", testdb.getUserName()) @@ -177,4 +185,64 @@ private List filterStateMessages(final List .collect(Collectors.toList()); } + @Test + protected void testNullValueConversion() throws Exception { + final List configuredAirbyteStreams = + Lists.newArrayList(new ConfiguredAirbyteStream() + .withSyncMode(INCREMENTAL) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withStream(CatalogHelpers.createAirbyteStream(STREAM_NAME3, + SCHEMA_NAME, + Field.of("id", JsonSchemaType.NUMBER), + Field.of("name", JsonSchemaType.STRING), + Field.of("userid", JsonSchemaType.NUMBER)) + .withSourceDefinedCursor(true) + .withSourceDefinedPrimaryKey(List.of(List.of("id"))) + .withSupportedSyncModes(Lists.newArrayList(FULL_REFRESH, INCREMENTAL)))); + + final ConfiguredAirbyteCatalog configuredCatalogWithOneStream = + new ConfiguredAirbyteCatalog().withStreams(List.of(configuredAirbyteStreams.get(0))); + + final List airbyteMessages = runRead(configuredCatalogWithOneStream, getState()); + final List recordMessages = filterRecords(airbyteMessages); + final List stateMessages = airbyteMessages + .stream() + .filter(m -> m.getType() == AirbyteMessage.Type.STATE) + .map(AirbyteMessage::getState) + .collect(Collectors.toList()); + Assert.assertEquals(recordMessages.size(), 1); + assertFalse(stateMessages.isEmpty(), "Reason"); + ObjectMapper mapper = new ObjectMapper(); + + assertTrue(cdcFieldsOmitted(recordMessages.get(0).getData()).equals( + mapper.readTree("{\"id\":4, \"name\":\"voyager\"}"))); + + // when we run incremental sync again there should be no new records. Run a sync with the latest + // state message and assert no records were emitted. + JsonNode latestState = extractLatestState(stateMessages); + + testdb.getDatabase().query(c -> c.query("INSERT INTO %s.%s (id, name) VALUES (5,'deep space nine')".formatted(SCHEMA_NAME, STREAM_NAME3))) + .execute(); + + assert Objects.nonNull(latestState); + final List secondSyncRecords = filterRecords(runRead(configuredCatalogWithOneStream, latestState)); + assertFalse( + secondSyncRecords.isEmpty(), + "Expected the second incremental sync to produce records."); + assertTrue(cdcFieldsOmitted(secondSyncRecords.get(0).getData()).equals( + mapper.readTree("{\"id\":5, \"name\":\"deep space nine\", \"userid\":null}"))); + + } + + private JsonNode cdcFieldsOmitted(final JsonNode node) { + ObjectMapper mapper = new ObjectMapper(); + ObjectNode object = mapper.createObjectNode(); + node.fieldNames().forEachRemaining(name -> { + if (!name.toLowerCase().startsWith("_ab_cdc_")) { + object.put(name, node.get(name)); + } + }); + return object; + } + } diff --git a/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/CloudDeploymentSslEnabledMssqlSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/CloudDeploymentSslEnabledMssqlSourceAcceptanceTest.java index f2a311d6b455..608971be54ad 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/CloudDeploymentSslEnabledMssqlSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/CloudDeploymentSslEnabledMssqlSourceAcceptanceTest.java @@ -26,7 +26,10 @@ protected void setupEnvironment(final TestDestinationEnv environment) { "(1,'picard', '2124-03-04T01:01:01Z'), " + "(2, 'crusher', '2124-03-04T01:01:01Z'), " + "(3, 'vash', '2124-03-04T01:01:01Z');") - .with("INSERT INTO %s.%s (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato'), (4, 'Argo');", SCHEMA_NAME, STREAM_NAME2); + .with("INSERT INTO %s.%s (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato'), (4, 'Argo');", SCHEMA_NAME, STREAM_NAME2) + .with("CREATE TABLE %s.%s (id INTEGER PRIMARY KEY, name VARCHAR(200), userid INTEGER DEFAULT NULL);", SCHEMA_NAME, STREAM_NAME3) + .with("INSERT INTO %s.%s (id, name) VALUES (4,'voyager');", SCHEMA_NAME, STREAM_NAME3); + } @Override diff --git a/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/MssqlSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/MssqlSourceAcceptanceTest.java index 4bdc5cecf61a..84fea93058ff 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/MssqlSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/MssqlSourceAcceptanceTest.java @@ -4,9 +4,13 @@ package io.airbyte.integrations.source.mssql; +import static io.airbyte.protocol.models.v0.SyncMode.INCREMENTAL; import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import io.airbyte.cdk.integrations.base.ssh.SshHelpers; @@ -29,6 +33,7 @@ import java.sql.SQLException; import java.util.HashMap; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; import org.junit.jupiter.api.Test; @@ -37,19 +42,23 @@ public class MssqlSourceAcceptanceTest extends SourceAcceptanceTest { protected static final String SCHEMA_NAME = "dbo"; protected static final String STREAM_NAME = "id_and_name"; protected static final String STREAM_NAME2 = "starships"; + protected static final String STREAM_NAME3 = "stream3"; protected MsSQLTestDatabase testdb; @Override protected void setupEnvironment(final TestDestinationEnv environment) throws SQLException { testdb = MsSQLTestDatabase.in(BaseImage.MSSQL_2022) - .with("CREATE TABLE id_and_name (id INTEGER, name VARCHAR(200), born DATETIMEOFFSET(7));") + .with("CREATE TABLE %s.%s (id INTEGER, name VARCHAR(200), born DATETIMEOFFSET(7));", SCHEMA_NAME, STREAM_NAME) .with("CREATE TABLE %s.%s(id INTEGER PRIMARY KEY, name VARCHAR(200));", SCHEMA_NAME, STREAM_NAME2) .with("INSERT INTO id_and_name (id, name, born) VALUES " + "(1, 'picard', '2124-03-04T01:01:01Z'), " + "(2, 'crusher', '2124-03-04T01:01:01Z'), " + "(3, 'vash', '2124-03-04T01:01:01Z');") - .with("INSERT INTO %s.%s (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato'), (4, 'Argo');", SCHEMA_NAME, STREAM_NAME2); + .with("INSERT INTO %s.%s (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato'), (4, 'Argo');", SCHEMA_NAME, STREAM_NAME2) + .with("CREATE TABLE %s.%s (id INTEGER PRIMARY KEY, name VARCHAR(200), userid INTEGER DEFAULT NULL);", SCHEMA_NAME, STREAM_NAME3) + .with("INSERT INTO %s.%s (id, name) VALUES (4,'voyager');", SCHEMA_NAME, STREAM_NAME3); + } @Override @@ -76,12 +85,25 @@ protected JsonNode getConfig() { @Override protected ConfiguredAirbyteCatalog getConfiguredCatalog() { - return CatalogHelpers.createConfiguredAirbyteCatalog( - STREAM_NAME, - SCHEMA_NAME, - Field.of("id", JsonSchemaType.NUMBER), - Field.of("name", JsonSchemaType.STRING), - Field.of("born", JsonSchemaType.STRING)); + return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList( + new ConfiguredAirbyteStream() + .withSyncMode(INCREMENTAL) + .withCursorField(Lists.newArrayList("id")) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withStream(CatalogHelpers.createAirbyteStream( + STREAM_NAME, SCHEMA_NAME, + Field.of("id", JsonSchemaType.NUMBER), + Field.of("name", JsonSchemaType.STRING)) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, INCREMENTAL))), + new ConfiguredAirbyteStream() + .withSyncMode(INCREMENTAL) + .withCursorField(Lists.newArrayList("id")) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withStream(CatalogHelpers.createAirbyteStream( + STREAM_NAME2, SCHEMA_NAME, + Field.of("id", JsonSchemaType.NUMBER), + Field.of("name", JsonSchemaType.STRING)) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, INCREMENTAL))))); } @Override @@ -97,14 +119,14 @@ protected void testAddNewStreamToExistingSync() throws Exception { Field.of("id", JsonSchemaType.NUMBER), Field.of("name", JsonSchemaType.STRING)) .withDestinationSyncMode(DestinationSyncMode.APPEND) - .withSyncMode(SyncMode.INCREMENTAL) + .withSyncMode(INCREMENTAL) .withCursorField(List.of("id")), CatalogHelpers.createConfiguredAirbyteStream(STREAM_NAME2, SCHEMA_NAME, Field.of("id", JsonSchemaType.NUMBER), Field.of("name", JsonSchemaType.STRING)) .withDestinationSyncMode(DestinationSyncMode.APPEND) - .withSyncMode(SyncMode.INCREMENTAL) + .withSyncMode(INCREMENTAL) .withCursorField(List.of("id"))); final ConfiguredAirbyteCatalog configuredCatalogWithOneStream = new ConfiguredAirbyteCatalog().withStreams(List.of(configuredAirbyteStreams.get(0))); @@ -139,6 +161,52 @@ protected void testAddNewStreamToExistingSync() throws Exception { assertEquals(SCHEMA_NAME, stateMessages2.get(1).getStream().getStreamDescriptor().getNamespace()); } + @Test + protected void testNullValueConversion() throws Exception { + final List configuredAirbyteStreams = + Lists.newArrayList(CatalogHelpers.createConfiguredAirbyteStream(STREAM_NAME3, + SCHEMA_NAME, + Field.of("id", JsonSchemaType.NUMBER), + Field.of("name", JsonSchemaType.STRING), + Field.of("userid", JsonSchemaType.NUMBER)) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withSyncMode(INCREMENTAL) + .withCursorField(List.of("id"))); + final ConfiguredAirbyteCatalog configuredCatalogWithOneStream = + new ConfiguredAirbyteCatalog().withStreams(List.of(configuredAirbyteStreams.get(0))); + + final List airbyteMessages = runRead(configuredCatalogWithOneStream, getState()); + final List recordMessages = filterRecords(airbyteMessages); + final List stateMessages = airbyteMessages + .stream() + .filter(m -> m.getType() == AirbyteMessage.Type.STATE) + .map(AirbyteMessage::getState) + .collect(Collectors.toList()); + assertEquals(recordMessages.size(), 1); + assertFalse(stateMessages.isEmpty(), "Reason"); + ObjectMapper mapper = new ObjectMapper(); + + assertTrue(recordMessages.get(0).getData().equals( + mapper.readTree("{\"id\":4, \"name\":\"voyager\"}"))); + + // when we run incremental sync again there should be no new records. Run a sync with the latest + // state message and assert no records were emitted. + JsonNode latestState = extractLatestState(stateMessages); + + testdb.getDatabase().query(c -> { + return c.query("INSERT INTO %s.%s (id, name) VALUES (5,'deep space nine');".formatted(SCHEMA_NAME, STREAM_NAME3)); + }).execute(); + + assert Objects.nonNull(latestState); + final List secondSyncRecords = filterRecords(runRead(configuredCatalogWithOneStream, latestState)); + assertFalse( + secondSyncRecords.isEmpty(), + "Expected the second incremental sync to produce records."); + assertTrue(secondSyncRecords.get(0).getData().equals( + mapper.readTree("{\"id\":5, \"name\":\"deep space nine\"}"))); + + } + private List filterStateMessages(final List messages) { return messages.stream().filter(r -> r.getType() == AirbyteMessage.Type.STATE).map(AirbyteMessage::getState) .collect(Collectors.toList()); diff --git a/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/SslEnabledMssqlSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/SslEnabledMssqlSourceAcceptanceTest.java index ccd887c9a4b9..ac68865487bd 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/SslEnabledMssqlSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/SslEnabledMssqlSourceAcceptanceTest.java @@ -31,7 +31,10 @@ protected void setupEnvironment(final TestDestinationEnv environment) { "(1, 'picard', '2124-03-04T01:01:01Z'), " + "(2, 'crusher', '2124-03-04T01:01:01Z'), " + "(3, 'vash', '2124-03-04T01:01:01Z');") - .with("INSERT INTO %s.%s (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato'), (4, 'Argo');", SCHEMA_NAME, STREAM_NAME2);; + .with("INSERT INTO %s.%s (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato'), (4, 'Argo');", SCHEMA_NAME, STREAM_NAME2) + .with("CREATE TABLE %s.%s (id INTEGER PRIMARY KEY, name VARCHAR(200), userid INTEGER DEFAULT NULL);", SCHEMA_NAME, STREAM_NAME3) + .with("INSERT INTO %s.%s (id, name) VALUES (4,'voyager');", SCHEMA_NAME, STREAM_NAME3); + } } diff --git a/airbyte-integrations/connectors/source-mysql/build.gradle b/airbyte-integrations/connectors/source-mysql/build.gradle index 06d33abe4981..10d03411dc9c 100644 --- a/airbyte-integrations/connectors/source-mysql/build.gradle +++ b/airbyte-integrations/connectors/source-mysql/build.gradle @@ -6,7 +6,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.29.6' + cdkVersionRequired = '0.30.1' features = ['db-sources'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/source-mysql/metadata.yaml b/airbyte-integrations/connectors/source-mysql/metadata.yaml index cdc39c3ead28..cb2931d12eba 100644 --- a/airbyte-integrations/connectors/source-mysql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad - dockerImageTag: 3.3.16 + dockerImageTag: 3.3.17 dockerRepository: airbyte/source-mysql documentationUrl: https://docs.airbyte.com/integrations/sources/mysql githubIssueLabel: source-mysql @@ -19,12 +19,8 @@ data: name: MySQL registries: cloud: - # A regression in https://github.com/airbytehq/airbyte-internal-issues/issues/7055 - dockerImageTag: 3.3.13 enabled: true oss: - # A regression in https://github.com/airbytehq/airbyte-internal-issues/issues/7055 - dockerImageTag: 3.3.13 enabled: true releaseStage: generally_available releases: diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcMySqlSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcMySqlSourceAcceptanceTest.java index 9e12122460b7..9c3d0e3a82c1 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcMySqlSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcMySqlSourceAcceptanceTest.java @@ -4,12 +4,15 @@ package io.airbyte.integrations.io.airbyte.integration_tests.sources; +import static io.airbyte.protocol.models.v0.SyncMode.FULL_REFRESH; import static io.airbyte.protocol.models.v0.SyncMode.INCREMENTAL; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import io.airbyte.cdk.integrations.base.ssh.SshHelpers; @@ -31,14 +34,17 @@ import io.airbyte.protocol.models.v0.DestinationSyncMode; import io.airbyte.protocol.models.v0.SyncMode; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; import org.apache.commons.lang3.ArrayUtils; +import org.junit.Assert; import org.junit.jupiter.api.Test; public class CdcMySqlSourceAcceptanceTest extends SourceAcceptanceTest { protected static final String STREAM_NAME = "id_and_name"; protected static final String STREAM_NAME2 = "starships"; + protected static final String STREAM_NAME3 = "stream3"; protected MySQLTestDatabase testdb; @@ -67,8 +73,7 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() { .withSyncMode(INCREMENTAL) .withDestinationSyncMode(DestinationSyncMode.APPEND) .withStream(CatalogHelpers.createAirbyteStream( - String.format("%s", STREAM_NAME), - testdb.getDatabaseName(), + STREAM_NAME, testdb.getDatabaseName(), Field.of("id", JsonSchemaType.NUMBER), Field.of("name", JsonSchemaType.STRING)) .withSourceDefinedCursor(true) @@ -79,8 +84,7 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() { .withSyncMode(INCREMENTAL) .withDestinationSyncMode(DestinationSyncMode.APPEND) .withStream(CatalogHelpers.createAirbyteStream( - String.format("%s", STREAM_NAME2), - testdb.getDatabaseName(), + STREAM_NAME2, testdb.getDatabaseName(), Field.of("id", JsonSchemaType.NUMBER), Field.of("name", JsonSchemaType.STRING)) .withSourceDefinedCursor(true) @@ -101,7 +105,9 @@ protected void setupEnvironment(final TestDestinationEnv environment) { .with("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200));") .with("INSERT INTO id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');") .with("CREATE TABLE starships(id INTEGER, name VARCHAR(200));") - .with("INSERT INTO starships (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato');"); + .with("INSERT INTO starships (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato');") + .with("CREATE TABLE %s (id INTEGER PRIMARY KEY, name VARCHAR(200), userid INTEGER DEFAULT NULL);", STREAM_NAME3) + .with("INSERT INTO %s (id, name) VALUES (4,'voyager');", STREAM_NAME3); } protected ContainerModifier[] getContainerModifiers() { @@ -158,8 +164,7 @@ private ConfiguredAirbyteCatalog getConfiguredCatalogWithPartialColumns() { .withSyncMode(INCREMENTAL) .withDestinationSyncMode(DestinationSyncMode.APPEND) .withStream(CatalogHelpers.createAirbyteStream( - String.format("%s", STREAM_NAME), - testdb.getDatabaseName(), + STREAM_NAME, testdb.getDatabaseName(), Field.of("id", JsonSchemaType.NUMBER) /* no name field */) .withSourceDefinedCursor(true) @@ -170,8 +175,7 @@ private ConfiguredAirbyteCatalog getConfiguredCatalogWithPartialColumns() { .withSyncMode(INCREMENTAL) .withDestinationSyncMode(DestinationSyncMode.APPEND) .withStream(CatalogHelpers.createAirbyteStream( - String.format("%s", STREAM_NAME2), - testdb.getDatabaseName(), + STREAM_NAME2, testdb.getDatabaseName(), /* no name field */ Field.of("id", JsonSchemaType.NUMBER)) .withSourceDefinedCursor(true) @@ -185,4 +189,66 @@ private void verifyFieldNotExist(final List records, final "Records contain unselected columns [%s:%s]".formatted(stream, field)); } + @Test + protected void testNullValueConversion() throws Exception { + final List configuredAirbyteStreams = + Lists.newArrayList(new ConfiguredAirbyteStream() + .withSyncMode(INCREMENTAL) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withStream(CatalogHelpers.createAirbyteStream(STREAM_NAME3, + testdb.getDatabaseName(), + Field.of("id", JsonSchemaType.NUMBER), + Field.of("name", JsonSchemaType.STRING), + Field.of("userid", JsonSchemaType.NUMBER)) + .withSourceDefinedCursor(true) + .withSourceDefinedPrimaryKey(List.of(List.of("id"))) + .withSupportedSyncModes(Lists.newArrayList(FULL_REFRESH, INCREMENTAL)))); + + final ConfiguredAirbyteCatalog configuredCatalogWithOneStream = + new ConfiguredAirbyteCatalog().withStreams(List.of(configuredAirbyteStreams.get(0))); + + final List airbyteMessages = runRead(configuredCatalogWithOneStream, getState()); + final List recordMessages = filterRecords(airbyteMessages); + final List stateMessages = airbyteMessages + .stream() + .filter(m -> m.getType() == AirbyteMessage.Type.STATE) + .map(AirbyteMessage::getState) + .collect(Collectors.toList()); + Assert.assertEquals(recordMessages.size(), 1); + assertFalse(stateMessages.isEmpty(), "Reason"); + ObjectMapper mapper = new ObjectMapper(); + + assertTrue(cdcFieldsOmitted(recordMessages.get(0).getData()).equals( + mapper.readTree("{\"id\":4, \"name\":\"voyager\"}"))); + + // when we run incremental sync again there should be no new records. Run a sync with the latest + // state message and assert no records were emitted. + JsonNode latestState = extractLatestState(stateMessages); + + testdb.getDatabase().query(c -> { + return c.query("INSERT INTO %s.%s (id, name) VALUES (5,'deep space nine');".formatted(testdb.getDatabaseName(), STREAM_NAME3)); + }).execute(); + + assert Objects.nonNull(latestState); + final List secondSyncRecords = filterRecords(runRead(configuredCatalogWithOneStream, latestState)); + assertFalse( + secondSyncRecords.isEmpty(), + "Expected the second incremental sync to produce records."); + assertTrue(cdcFieldsOmitted(secondSyncRecords.get(0).getData()).equals( + mapper.readTree("{\"id\":5, \"name\":\"deep space nine\", \"userid\":null}"))); + + } + + private JsonNode cdcFieldsOmitted(final JsonNode node) { + ObjectMapper mapper = new ObjectMapper(); + ObjectNode object = mapper.createObjectNode(); + node.fieldNames().forEachRemaining(name -> { + if (!name.toLowerCase().startsWith("_ab_cdc_")) { + object.put(name, node.get(name)); + } + + }); + return object; + } + } diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MySqlSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MySqlSourceAcceptanceTest.java index b13848fdfea0..39796b510807 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MySqlSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MySqlSourceAcceptanceTest.java @@ -4,7 +4,13 @@ package io.airbyte.integrations.io.airbyte.integration_tests.sources; +import static io.airbyte.protocol.models.v0.SyncMode.INCREMENTAL; +import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import io.airbyte.cdk.integrations.base.ssh.SshHelpers; import io.airbyte.cdk.integrations.standardtest.source.SourceAcceptanceTest; @@ -15,14 +21,13 @@ import io.airbyte.integrations.source.mysql.MySQLTestDatabase.ContainerModifier; import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.JsonSchemaType; -import io.airbyte.protocol.models.v0.CatalogHelpers; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; -import io.airbyte.protocol.models.v0.ConnectorSpecification; -import io.airbyte.protocol.models.v0.DestinationSyncMode; -import io.airbyte.protocol.models.v0.SyncMode; +import io.airbyte.protocol.models.v0.*; import java.util.HashMap; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; import org.apache.commons.lang3.ArrayUtils; +import org.junit.jupiter.api.Test; public class MySqlSourceAcceptanceTest extends SourceAcceptanceTest { @@ -75,7 +80,7 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() { .withCursorField(Lists.newArrayList("id")) .withDestinationSyncMode(DestinationSyncMode.APPEND) .withStream(CatalogHelpers.createAirbyteStream( - String.format("%s", STREAM_NAME), testdb.getDatabaseName(), + STREAM_NAME, testdb.getDatabaseName(), Field.of("id", JsonSchemaType.NUMBER), Field.of("name", JsonSchemaType.STRING)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))), @@ -84,8 +89,7 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() { .withCursorField(Lists.newArrayList("id")) .withDestinationSyncMode(DestinationSyncMode.APPEND) .withStream(CatalogHelpers.createAirbyteStream( - String.format("%s", STREAM_NAME2), - testdb.getDatabaseName(), + STREAM_NAME2, testdb.getDatabaseName(), Field.of("id", JsonSchemaType.NUMBER), Field.of("name", JsonSchemaType.STRING)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))))); @@ -96,4 +100,63 @@ protected JsonNode getState() { return Jsons.jsonNode(new HashMap<>()); } + @Test + protected void testNullValueConversion() throws Exception { + final String STREAM_NAME3 = "stream3"; + testdb.getDatabase().query(c -> { + return c.query(""" + CREATE TABLE %s.%s (id INTEGER PRIMARY KEY, name VARCHAR(200), userid INTEGER DEFAULT NULL); + """.formatted(testdb.getDatabaseName(), STREAM_NAME3)); + }).execute(); + + testdb.getDatabase().query(c -> { + return c.query(""" + INSERT INTO %s.%s (id, name) VALUES (4,'voyager'); + """.formatted(testdb.getDatabaseName(), STREAM_NAME3)); + }).execute(); + + final List configuredAirbyteStreams = + Lists.newArrayList(CatalogHelpers.createConfiguredAirbyteStream(STREAM_NAME3, + testdb.getDatabaseName(), + Field.of("id", JsonSchemaType.NUMBER), + Field.of("name", JsonSchemaType.STRING), + Field.of("userid", JsonSchemaType.NUMBER)) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withSyncMode(INCREMENTAL) + .withCursorField(List.of("id"))); + final ConfiguredAirbyteCatalog configuredCatalogWithOneStream = + new ConfiguredAirbyteCatalog().withStreams(List.of(configuredAirbyteStreams.get(0))); + + final List airbyteMessages = runRead(configuredCatalogWithOneStream, getState()); + final List recordMessages = filterRecords(airbyteMessages); + final List stateMessages = airbyteMessages + .stream() + .filter(m -> m.getType() == AirbyteMessage.Type.STATE) + .map(AirbyteMessage::getState) + .collect(Collectors.toList()); + assertEquals(recordMessages.size(), 1); + assertFalse(stateMessages.isEmpty(), "Reason"); + ObjectMapper mapper = new ObjectMapper(); + + assertTrue(recordMessages.get(0).getData().equals( + mapper.readTree("{\"id\":4, \"name\":\"voyager\"}"))); + + // when we run incremental sync again there should be no new records. Run a sync with the latest + // state message and assert no records were emitted. + JsonNode latestState = extractLatestState(stateMessages); + + testdb.getDatabase().query(c -> { + return c.query("INSERT INTO %s.%s (id, name) VALUES (5,'deep space nine');".formatted(testdb.getDatabaseName(), STREAM_NAME3)); + }).execute(); + + assert Objects.nonNull(latestState); + final List secondSyncRecords = filterRecords(runRead(configuredCatalogWithOneStream, latestState)); + assertFalse( + secondSyncRecords.isEmpty(), + "Expected the second incremental sync to produce records."); + assertTrue(secondSyncRecords.get(0).getData().equals( + mapper.readTree("{\"id\":5, \"name\":\"deep space nine\"}"))); + + } + } diff --git a/docs/integrations/sources/mssql.md b/docs/integrations/sources/mssql.md index 01fbd63d863c..031a678c69d1 100644 --- a/docs/integrations/sources/mssql.md +++ b/docs/integrations/sources/mssql.md @@ -327,6 +327,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura | Version | Date | Pull Request | Subject | |:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------| +| 4.0.9 | 2024-04-10 | [36919](https://github.com/airbytehq/airbyte/pull/36919) | Fix a bug in conversion of null values. | | 4.0.8 | 2024-04-05 | [36872](https://github.com/airbytehq/airbyte/pull/36872) | Update to connector's metadat definition. | | 4.0.7 | 2024-04-03 | [36772](https://github.com/airbytehq/airbyte/pull/36772) | Adopt latest CDK. | | 4.0.6 | 2024-03-25 | [36333](https://github.com/airbytehq/airbyte/pull/36333) | Deprecate Dbz state iterator. | diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index e615c451102b..a3fe489f4ea3 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -223,6 +223,7 @@ Any database or table encoding combination of charset and collation is supported | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.3.17 | 2024-04-10 | [36919](https://github.com/airbytehq/airbyte/pull/36919) | Fix a bug in conversion of null values. | | 3.3.16 | 2024-04-05 | [36872](https://github.com/airbytehq/airbyte/pull/36872) | Update to connector's metadat definition. | | 3.3.15 | 2024-04-05 | [36577](https://github.com/airbytehq/airbyte/pull/36577) | Config error will not send out system trace message | | 3.3.14 | 2024-04-04 | [36742](https://github.com/airbytehq/airbyte/pull/36742) | To use new kotlin CDK |