diff --git a/airbyte-integrations/connectors/source-postgres/build.gradle b/airbyte-integrations/connectors/source-postgres/build.gradle index b6d3875e4654..0d6f1543d68e 100644 --- a/airbyte-integrations/connectors/source-postgres/build.gradle +++ b/airbyte-integrations/connectors/source-postgres/build.gradle @@ -12,7 +12,7 @@ java { } airbyteJavaConnector { - cdkVersionRequired = '0.40.10' + cdkVersionRequired = '0.40.7' features = ['db-sources', 'datastore-postgres'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/source-postgres/metadata.yaml b/airbyte-integrations/connectors/source-postgres/metadata.yaml index c27ba4469c00..46641c2ce430 100644 --- a/airbyte-integrations/connectors/source-postgres/metadata.yaml +++ b/airbyte-integrations/connectors/source-postgres/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 - dockerImageTag: 3.4.22 + dockerImageTag: 3.4.23 dockerRepository: airbyte/source-postgres documentationUrl: https://docs.airbyte.com/integrations/sources/postgres githubIssueLabel: source-postgres diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidGlobalStateManager.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidGlobalStateManager.java index 02772689ef45..568774d308b4 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidGlobalStateManager.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidGlobalStateManager.java @@ -68,7 +68,8 @@ private void initStream(final CtidStreams ctidStreams, } if (configuredAirbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) { - if (configuredAirbyteStream.getStream().getIsResumable()) { + if (fileNodeHandler.hasFileNode( + new AirbyteStreamNameNamespacePair(configuredAirbyteStream.getStream().getName(), configuredAirbyteStream.getStream().getNamespace()))) { this.resumableFullRefreshStreams.add(pair); } else { this.nonResumableFullRefreshStreams.add(pair); @@ -143,14 +144,10 @@ public CdcState getCdcState() { } - private boolean isIncrementalStream(final AirbyteStreamNameNamespacePair pair) { - return !resumableFullRefreshStreams.contains(pair) && !nonResumableFullRefreshStreams.contains(pair); - } - @Override public AirbyteStateMessage createFinalStateMessage(final AirbyteStreamNameNamespacePair pair, final JsonNode streamStateForIncrementalRun) { // Only incremental streams can be transformed into the next phase. - if (isIncrementalStream(pair)) { + if (!resumableFullRefreshStreams.contains(pair)) { streamsThatHaveCompletedSnapshot.add(pair); } final List streamStates = new ArrayList<>(); diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceAcceptanceTest.java index a67dc759aad1..8e76d19c564f 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceAcceptanceTest.java @@ -86,8 +86,7 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() { Field.of("name", JsonSchemaType.STRING)) .withSourceDefinedCursor(true) .withSourceDefinedPrimaryKey(List.of(List.of("id"))) - .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) - .withIsResumable(true)), + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))), new ConfiguredAirbyteStream() .withSyncMode(SyncMode.INCREMENTAL) .withDestinationSyncMode(DestinationSyncMode.APPEND) @@ -98,8 +97,7 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() { Field.of("name", JsonSchemaType.STRING)) .withSourceDefinedCursor(true) .withSourceDefinedPrimaryKey(List.of(List.of("id"))) - .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) - .withIsResumable(true)))); + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))))); } protected ConfiguredAirbyteCatalog getConfiguredCatalogWithPartialColumns() { @@ -114,8 +112,7 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalogWithPartialColumns() { /* no name field */) .withSourceDefinedCursor(true) .withSourceDefinedPrimaryKey(List.of(List.of("id"))) - .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) - .withIsResumable(true)), + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))), new ConfiguredAirbyteStream() .withSyncMode(SyncMode.INCREMENTAL) .withDestinationSyncMode(DestinationSyncMode.APPEND) @@ -126,8 +123,7 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalogWithPartialColumns() { Field.of("name", JsonSchemaType.STRING)) .withSourceDefinedCursor(true) .withSourceDefinedPrimaryKey(List.of(List.of("id"))) - .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) - .withIsResumable(true)))); + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))))); } @Override diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java index 8f111e7bab5b..87fa4ba0ba95 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java @@ -266,6 +266,10 @@ protected void validateStreamStateInResumableFullRefresh(final JsonNode streamSt assertEquals("ctid", streamStateToBeTested.get("state_type").asText()); } + @Override + @Test + protected void testCdcAndNonResumableFullRefreshInSameSync() throws Exception {} + @Override protected void assertStateMessagesForNewTableSnapshotTest(final List stateMessages, final AirbyteStateMessage stateMessageEmittedAfterFirstSyncCompletion) { diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index 216f6ba31ccd..dd7760079526 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -310,8 +310,8 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp Expand to review | Version | Date | Pull Request | Subject | -|---------|------------|----------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 3.4.22 | 2024-07-05 | [40719](https://github.com/airbytehq/airbyte/pull/40719) | Fix Postgres sending duplicated streams | +|---------| ---------- | -------------------------------------------------------- |---------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.4.23 | 2024-07-01 | [40757](https://github.com/airbytehq/airbyte/pull/40757) | Rollback 3.4.22. | | 3.4.21 | 2024-07-01 | [40516](https://github.com/airbytehq/airbyte/pull/40516) | Remove dbz hearbeat. | | 3.4.20 | 2024-06-23 | [40559](https://github.com/airbytehq/airbyte/pull/40559) | Remove strict check for stream states of unknown types | | 3.4.19 | 2024-06-23 | [40223](https://github.com/airbytehq/airbyte/pull/40223) | Revert the changes introduced in version 3.4.15. |