diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index 37a290b5a348..f31f4a90ea33 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -173,7 +173,8 @@ corresponds to that version. ### Java CDK | Version | Date | Pull Request | Subject | -| :------ | :--------- | :--------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------- | +|:--------| :--------- | :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.33.1 | 2024-05-03 | [\#37824](https://github.com/airbytehq/airbyte/pull/37824) | Add a unit test for cursor based sync | | 0.33.0 | 2024-05-03 | [\#36935](https://github.com/airbytehq/airbyte/pull/36935) | Destinations: Enable non-safe-casting DV2 tests | | 0.32.0 | 2024-05-03 | [\#36929](https://github.com/airbytehq/airbyte/pull/36929) | Destinations: Assorted DV2 changes for mysql | | 0.31.7 | 2024-05-02 | [\#36910](https://github.com/airbytehq/airbyte/pull/36910) | changes for destination-snowflake | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 47feb6ea8a08..deabb8cf5649 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.33.0 +version=0.33.1 diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt index 1a14751d22a1..36310bebe9e1 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt @@ -479,6 +479,46 @@ abstract class JdbcSourceAcceptanceTest> { Assertions.assertTrue(actualRecordMessages.containsAll(expectedMessages)) } + @Test + @Throws(Exception::class) + protected fun testReadBothIncrementalAndFullRefreshStreams() { + val catalog = getConfiguredCatalogWithOneStream(defaultNamespace) + val expectedMessages: MutableList = ArrayList(testMessages) + + val streamName2 = streamName() + 2 + val tableName = getFullyQualifiedTableName(TABLE_NAME + 2) + testdb!! + .with(createTableQuery(tableName, "id INTEGER, name VARCHAR(200)", "")) + .with("INSERT INTO %s(id, name) VALUES (1,'picard')", tableName) + .with("INSERT INTO %s(id, name) VALUES (2, 'crusher')", tableName) + .with("INSERT INTO %s(id, name) VALUES (3, 'vash')", tableName) + + val airbyteStream2 = + CatalogHelpers.createConfiguredAirbyteStream( + streamName2, + defaultNamespace, + Field.of(COL_ID, JsonSchemaType.NUMBER), + Field.of(COL_NAME, JsonSchemaType.STRING) + ) + airbyteStream2.syncMode = SyncMode.INCREMENTAL + airbyteStream2.cursorField = java.util.List.of(COL_ID) + airbyteStream2.destinationSyncMode = DestinationSyncMode.APPEND + catalog.streams.add(airbyteStream2) + + expectedMessages.addAll(getAirbyteMessagesSecondSync(streamName2)) + + System.out.println("catalog: " + catalog) + + val actualMessages = MoreIterators.toList(source()!!.read(config(), catalog, null)) + val actualRecordMessages = filterRecords(actualMessages) + + setEmittedAtToNull(actualMessages) + + Assertions.assertEquals(expectedMessages.size, actualRecordMessages.size) + Assertions.assertTrue(expectedMessages.containsAll(actualRecordMessages)) + Assertions.assertTrue(actualRecordMessages.containsAll(expectedMessages)) + } + protected open fun getAirbyteMessagesSecondSync(streamName: String?): List { return testMessages .stream() diff --git a/airbyte-integrations/connectors/source-mysql/build.gradle b/airbyte-integrations/connectors/source-mysql/build.gradle index a996fdc9f2a7..52936ba1217d 100644 --- a/airbyte-integrations/connectors/source-mysql/build.gradle +++ b/airbyte-integrations/connectors/source-mysql/build.gradle @@ -6,7 +6,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.31.8' + cdkVersionRequired = '0.33.1' features = ['db-sources'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/source-mysql/metadata.yaml b/airbyte-integrations/connectors/source-mysql/metadata.yaml index 431b43ad887e..f63f7f6b7c6b 100644 --- a/airbyte-integrations/connectors/source-mysql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad - dockerImageTag: 3.4.0 + dockerImageTag: 3.4.1 dockerRepository: airbyte/source-mysql documentationUrl: https://docs.airbyte.com/integrations/sources/mysql githubIssueLabel: source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlQueryUtils.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlQueryUtils.java index 871e837437ba..adf8a108a921 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlQueryUtils.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlQueryUtils.java @@ -175,7 +175,9 @@ public static Map> getIncrementalIterators(final if (isAnyStreamIncrementalSyncMode(catalog)) { LOGGER.info("Syncing via Primary Key"); final MySqlCursorBasedStateManager cursorBasedStateManager = new MySqlCursorBasedStateManager(stateManager.getRawStateMessages(), catalog); - final InitialLoadStreams initialLoadStreams = streamsForInitialPrimaryKeyLoad(cursorBasedStateManager, catalog); + final InitialLoadStreams initialLoadStreams = + filterStreamInIncrementalMode(streamsForInitialPrimaryKeyLoad(cursorBasedStateManager, catalog)); final Map pairToCursorBasedStatus = getCursorBasedSyncStatusForStreams(database, initialLoadStreams.streamsForInitialLoad(), stateManager, getQuoteString()); final CursorBasedStreams cursorBasedStreams = diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java index 1ad0aebfc05e..6dd3020f3c5d 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java @@ -50,6 +50,7 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; import io.airbyte.protocol.models.v0.StreamDescriptor; +import io.airbyte.protocol.models.v0.SyncMode; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; @@ -368,6 +369,13 @@ private static boolean streamHasPrimaryKey(final ConfiguredAirbyteStream stream) return stream.getStream().getSourceDefinedPrimaryKey().size() > 0; } + public static InitialLoadStreams filterStreamInIncrementalMode(final InitialLoadStreams stream) { + return new InitialLoadStreams( + stream.streamsForInitialLoad.stream().filter(airbyteStream -> airbyteStream.getSyncMode() == SyncMode.INCREMENTAL) + .collect(Collectors.toList()), + stream.pairToInitialLoadStatus); + } + public static List identifyStreamsToSnapshot(final ConfiguredAirbyteCatalog catalog, final Set alreadySyncedStreams) { final Set allStreams = AirbyteStreamNameNamespacePair.fromConfiguredCatalog(catalog); diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index d85c8f3f5402..1756ebeca456 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -226,6 +226,7 @@ Any database or table encoding combination of charset and collation is supported | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.4.1 | 2024-05-03 | [37824](https://github.com/airbytehq/airbyte/pull/37824) | Fixed a bug on Resumeable full refresh where cursor based source throw NPE. | | 3.4.0 | 2024-05-02 | [36932](https://github.com/airbytehq/airbyte/pull/36932) | Resumeable full refresh. Note please upgrade your platform - minimum platform version is 0.58.0. | | 3.3.25 | 2024-05-02 | [37781](https://github.com/airbytehq/airbyte/pull/37781) | Adopt latest CDK. | | 3.3.24 | 2024-05-01 | [37742](https://github.com/airbytehq/airbyte/pull/37742) | Adopt latest CDK. Remove Debezium retries. |