Skip to content

Commit

Permalink
Revert "[source-postgres] Fix duplicate streams in postgres" (#40757)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaohansong authored Jul 5, 2024
1 parent 9a0ada0 commit 5573c3f
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ java {
}

airbyteJavaConnector {
cdkVersionRequired = '0.40.10'
cdkVersionRequired = '0.40.7'
features = ['db-sources', 'datastore-postgres']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.4.22
dockerImageTag: 3.4.23
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ private void initStream(final CtidStreams ctidStreams,
}

if (configuredAirbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) {
if (configuredAirbyteStream.getStream().getIsResumable()) {
if (fileNodeHandler.hasFileNode(
new AirbyteStreamNameNamespacePair(configuredAirbyteStream.getStream().getName(), configuredAirbyteStream.getStream().getNamespace()))) {
this.resumableFullRefreshStreams.add(pair);
} else {
this.nonResumableFullRefreshStreams.add(pair);
Expand Down Expand Up @@ -143,14 +144,10 @@ public CdcState getCdcState() {

}

private boolean isIncrementalStream(final AirbyteStreamNameNamespacePair pair) {
return !resumableFullRefreshStreams.contains(pair) && !nonResumableFullRefreshStreams.contains(pair);
}

@Override
public AirbyteStateMessage createFinalStateMessage(final AirbyteStreamNameNamespacePair pair, final JsonNode streamStateForIncrementalRun) {
// Only incremental streams can be transformed into the next phase.
if (isIncrementalStream(pair)) {
if (!resumableFullRefreshStreams.contains(pair)) {
streamsThatHaveCompletedSnapshot.add(pair);
}
final List<AirbyteStreamState> streamStates = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
Field.of("name", JsonSchemaType.STRING))
.withSourceDefinedCursor(true)
.withSourceDefinedPrimaryKey(List.of(List.of("id")))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withIsResumable(true)),
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
Expand All @@ -98,8 +97,7 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
Field.of("name", JsonSchemaType.STRING))
.withSourceDefinedCursor(true)
.withSourceDefinedPrimaryKey(List.of(List.of("id")))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withIsResumable(true))));
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
}

protected ConfiguredAirbyteCatalog getConfiguredCatalogWithPartialColumns() {
Expand All @@ -114,8 +112,7 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalogWithPartialColumns() {
/* no name field */)
.withSourceDefinedCursor(true)
.withSourceDefinedPrimaryKey(List.of(List.of("id")))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withIsResumable(true)),
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
Expand All @@ -126,8 +123,7 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalogWithPartialColumns() {
Field.of("name", JsonSchemaType.STRING))
.withSourceDefinedCursor(true)
.withSourceDefinedPrimaryKey(List.of(List.of("id")))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withIsResumable(true))));
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ protected void validateStreamStateInResumableFullRefresh(final JsonNode streamSt
assertEquals("ctid", streamStateToBeTested.get("state_type").asText());
}

@Override
@Test
protected void testCdcAndNonResumableFullRefreshInSameSync() throws Exception {}

@Override
protected void assertStateMessagesForNewTableSnapshotTest(final List<? extends AirbyteStateMessage> stateMessages,
final AirbyteStateMessage stateMessageEmittedAfterFirstSyncCompletion) {
Expand Down
4 changes: 2 additions & 2 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,8 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp
<summary>Expand to review</summary>

| Version | Date | Pull Request | Subject |
|---------|------------|----------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.4.22 | 2024-07-05 | [40719](https://github.com/airbytehq/airbyte/pull/40719) | Fix Postgres sending duplicated streams |
|---------| ---------- | -------------------------------------------------------- |---------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.4.23 | 2024-07-01 | [40757](https://github.com/airbytehq/airbyte/pull/40757) | Rollback 3.4.22. |
| 3.4.21 | 2024-07-01 | [40516](https://github.com/airbytehq/airbyte/pull/40516) | Remove dbz hearbeat. |
| 3.4.20 | 2024-06-23 | [40559](https://github.com/airbytehq/airbyte/pull/40559) | Remove strict check for stream states of unknown types |
| 3.4.19 | 2024-06-23 | [40223](https://github.com/airbytehq/airbyte/pull/40223) | Revert the changes introduced in version 3.4.15. |
Expand Down

0 comments on commit 5573c3f

Please sign in to comment.