Skip to content

Commit

Permalink
[source-mssql/postgres/mysql] fix rfr losing stream state (#42550)
Browse files Browse the repository at this point in the history
Co-authored-by: Evan Tahler <[email protected]>
Co-authored-by: alafanechere <[email protected]>
  • Loading branch information
3 people authored Aug 2, 2024
1 parent 7e42e03 commit ab005c6
Show file tree
Hide file tree
Showing 15 changed files with 353 additions and 290 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:-----------|:-----------|:------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.44.1 | 2024-08-01 | [\#42550](https://github.com/airbytehq/airbyte/pull/42550) | Fix error on reporting counts. |
| 0.44.0 | 2024-08-01 | [\#42405](https://github.com/airbytehq/airbyte/pull/42405) | s3-destinations: Use async framework, adapt to support refreshes |
| 0.43.6 | 2024-07-30 | [\#42540](https://github.com/airbytehq/airbyte/pull/42540) | Fix generationId handling for destinations |
| 0.43.6 | 2024-07-30 | [\#42514](https://github.com/airbytehq/airbyte/pull/42514) | Add tests around generationId handling for destinations. |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.44.0
version=0.44.1
Original file line number Diff line number Diff line change
Expand Up @@ -1405,16 +1405,69 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
stateAfterFirstBatch.map { state -> assertStateDoNotHaveDuplicateStreams(state) }

// Test for recovery - it should be able to resume using any previous state. Using the 3rd
// state to test.
// state to test. This is a phase where 1st stream has been checkpointed
// but 2nd stream has not.
// In the 2nd read we expect 3 records from 1st stream and 6 records from 2nd stream.
val recoveryState = Jsons.jsonNode(listOf(stateAfterFirstBatch[2]))

val recoverySyncIterator =
source().read(config()!!, fullRefreshConfiguredCatalog, recoveryState)
val dataFromRecoverySync = AutoCloseableIterators.toListAndClose(recoverySyncIterator)
val recordsFromRecoverySync = extractRecordMessages(dataFromRecoverySync)
val stateAfterRecoverySync = extractStateMessages(dataFromRecoverySync)

for (i in 0 until 2) {
val streamsInRecoveryState =
stateAfterRecoverySync[i]
.global
.streamStates
.map { obj: AirbyteStreamState -> obj.streamDescriptor }
.toSet()
Assertions.assertEquals(1, streamsInRecoveryState.size)
}

for (i in 2 until 9) {
val streamsInRecoveryState =
stateAfterRecoverySync[i]
.global
.streamStates
.map { obj: AirbyteStreamState -> obj.streamDescriptor }
.toSet()
Assertions.assertEquals(2, streamsInRecoveryState.size)
}

Assertions.assertEquals(9, stateAfterRecoverySync.size)
Assertions.assertEquals(9, recordsFromRecoverySync.size)
assertExpectedStateMessageCountMatches(stateAfterRecoverySync, 9)

// Test for recovery part 2. Using the 10th
// state to test.
//
// Expect to have 2 more message from stream2 in the follow up sync, but will have 3 state
// message because the first stream will resend the its final state message.
//
// This is a phase where both streams have been checkpointed.
val recoveryState2 = Jsons.jsonNode(listOf(stateAfterFirstBatch[9]))

val recoverySyncIterator2 =
source().read(config()!!, fullRefreshConfiguredCatalog, recoveryState2)
val dataFromRecoverySync2 = AutoCloseableIterators.toListAndClose(recoverySyncIterator2)
val recordsFromRecoverySync2 = extractRecordMessages(dataFromRecoverySync2)
val stateAfterRecoverySync2 = extractStateMessages(dataFromRecoverySync2)

for (i in 0 until 2) {
val streamsInRecoveryState =
stateAfterRecoverySync2[i]
.global
.streamStates
.map { obj: AirbyteStreamState -> obj.streamDescriptor }
.toSet()
Assertions.assertEquals(2, streamsInRecoveryState.size)
}

Assertions.assertEquals(3, stateAfterRecoverySync2.size)
Assertions.assertEquals(2, recordsFromRecoverySync2.size)
assertExpectedStateMessageCountMatches(stateAfterRecoverySync2, 2)
}

protected open fun assertStateMessagesForNewTableSnapshotTest(
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.43.4'
cdkVersionRequired = '0.44.1'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 4.1.5
dockerImageTag: 4.1.6
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ public AirbyteStateMessage generateStateMessageAtCheckpoint(final ConfiguredAirb

resumableFullRefreshStreams.forEach(stream -> {
var ocStatus = getOrderedColumnLoadStatus(stream);
streamStates.add(getAirbyteStreamState(stream, Jsons.jsonNode(ocStatus)));
if (ocStatus != null) {
streamStates.add(getAirbyteStreamState(stream, Jsons.jsonNode(ocStatus)));
}
});

if (airbyteStream.getSyncMode() == SyncMode.INCREMENTAL) {
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.43.4'
cdkVersionRequired = '0.44.1'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.6.6
dockerImageTag: 3.6.7
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ public AirbyteStateMessage generateStateMessageAtCheckpoint(final ConfiguredAirb

resumableFullRefreshStreams.forEach(stream -> {
var pkStatus = getPrimaryKeyLoadStatus(stream);
streamStates.add(getAirbyteStreamState(stream, (Jsons.jsonNode(pkStatus))));
if (pkStatus != null) {
streamStates.add(getAirbyteStreamState(stream, (Jsons.jsonNode(pkStatus))));
}
});
if (airbyteStream.getSyncMode() == SyncMode.INCREMENTAL) {
AirbyteStreamNameNamespacePair pair =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ java {
}

airbyteJavaConnector {
cdkVersionRequired = '0.43.4'
cdkVersionRequired = '0.44.1'
features = ['db-sources', 'datastore-postgres']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.6.11
dockerImageTag: 3.6.12
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,10 @@ public AirbyteStateMessage createCtidStateMessage(final AirbyteStreamNameNamespa
});

resumableFullRefreshStreams.forEach(stream -> {
final CtidStatus ctidStatusForFullRefreshStream = generateCtidStatusForState(stream);
streamStates.add(getAirbyteStreamState(stream, (Jsons.jsonNode(ctidStatusForFullRefreshStream))));
if (getCtidStatus(new AirbyteStreamNameNamespacePair(stream.getName(), stream.getNamespace())) != null) {
final CtidStatus ctidStatusForFullRefreshStream = generateCtidStatusForState(stream);
streamStates.add(getAirbyteStreamState(stream, (Jsons.jsonNode(ctidStatusForFullRefreshStream))));
}
});

nonResumableFullRefreshStreams.forEach(stream -> {
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mssql.md
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 4.1.6 | 2024-07-30 | [42550](https://github.com/airbytehq/airbyte/pull/42550) | Correctly report stream states. |
| 4.1.5 | 2024-07-29 | [42852](https://github.com/airbytehq/airbyte/pull/42852) | Bump CDK version to latest to use new bug fixes on error translation. |
| 4.1.4 | 2024-07-23 | [42421](https://github.com/airbytehq/airbyte/pull/42421) | Remove final transient error emitter iterators. |
| 4.1.3 | |2024-07-22 | [42411](https://github.com/airbytehq/airbyte/pull/42411) | Hide the "initial load timeout in hours" field by default in UI |
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ Any database or table encoding combination of charset and collation is supported

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.6.7 | 2024-07-30 | [42550](https://github.com/airbytehq/airbyte/pull/42550) | Correctly report stream states. |
| 3.6.6 | 2024-07-29 | [42852](https://github.com/airbytehq/airbyte/pull/42852) | Bump CDK version to latest to use new bug fixes on error translation. |
| 3.6.5 | 2024-07-24 | [42417](https://github.com/airbytehq/airbyte/pull/42417) | Handle null error message in ConnectorExceptionHandler. |
| 3.6.4 | 2024-07-23 | [42421](https://github.com/airbytehq/airbyte/pull/42421) | Remove final transient error emitter iterators. |
Expand Down
Loading

0 comments on commit ab005c6

Please sign in to comment.