From 370e49cdba525919c2c7dedcd0a6064450e78024 Mon Sep 17 00:00:00 2001 From: Xiaohan Song Date: Wed, 8 May 2024 15:17:12 -0700 Subject: [PATCH] add a fix --- .../ctid/CtidPerStreamStateManager.java | 2 +- .../postgres/ctid/CtidStateManager.java | 5 ++- .../source/postgres/PostgresSourceTest.java | 43 ++++++++++++++++--- docs/integrations/sources/postgres.md | 2 + 4 files changed, 45 insertions(+), 7 deletions(-) diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidPerStreamStateManager.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidPerStreamStateManager.java index 0da629404856..3a26ecbd6081 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidPerStreamStateManager.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidPerStreamStateManager.java @@ -83,7 +83,7 @@ public AirbyteStateMessage createCtidStateMessage(final AirbyteStreamNameNamespa public AirbyteStateMessage createFinalStateMessage(final AirbyteStreamNameNamespacePair pair, final JsonNode streamStateForIncrementalRun) { if (streamStateForIncrementalRun == null || streamStateForIncrementalRun.isEmpty()) { // resumeable full refresh for cursor based stream. - var ctidStatus = pairToCtidStatus.get(pair); + var ctidStatus = generateCtidStatusForState(pair); return createCtidStateMessage(pair, ctidStatus); } return XminStateManager.getAirbyteStateMessage(pair, Jsons.object(streamStateForIncrementalRun, XminStatus.class)); diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidStateManager.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidStateManager.java index e2d71f6cd876..0410319a02d0 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidStateManager.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidStateManager.java @@ -77,10 +77,13 @@ public AirbyteStateMessage generateStateMessageAtCheckpoint(final ConfiguredAirb protected CtidStatus generateCtidStatusForState(final AirbyteStreamNameNamespacePair pair) { final Long fileNode = fileNodeHandler.getFileNode(pair); assert fileNode != null; + // If the table is empty, lastCtid will be set to zero for the final state message. + final String lastCtidInState = (Objects.nonNull(lastCtid) + && StringUtils.isNotBlank(lastCtid)) ? lastCtid : Ctid.ZERO.toString(); return new CtidStatus() .withVersion(CTID_STATUS_VERSION) .withStateType(StateType.CTID) - .withCtid(lastCtid) + .withCtid(lastCtidInState) .withIncrementalState(getStreamState(pair)) .withRelationFilenode(fileNode); } diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java index 8ecb454ca156..17b2e8d3bbac 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java @@ -278,9 +278,9 @@ void testUserDoesntHasPrivilegesToSelectTable() throws Exception { final Set actualMessages = MoreIterators.toSet(source().read(anotherUserConfig, CONFIGURED_CATALOG, null)); setEmittedAtToNull(actualMessages); - // expect 6 records and 2 state messages (view does not have its own state message because it goes + // expect 6 records and 3 state messages (view does not have its own state message because it goes // to non resumable full refresh path). - assertEquals(8, actualMessages.size()); + assertEquals(9, actualMessages.size()); final var actualRecordMessages = filterRecords(actualMessages); assertEquals(PRIVILEGE_TEST_CASE_EXPECTED_MESSAGES, actualRecordMessages); } @@ -508,6 +508,40 @@ void testReadIncrementalSuccess() throws Exception { assertThat(nextSyncMessages.contains(createRecord(STREAM_NAME, SCHEMA_NAME, map("id", "5.0", "name", "piccolo", "power", 100.0)))); } + @Test + void testReadFullRefreshEmptyTable() throws Exception { + // Delete all data from id_and_name table. + testdb.query(ctx -> { + ctx.fetch("DELETE FROM id_and_name WHERE id = 'NaN';"); + ctx.fetch("DELETE FROM id_and_name WHERE id = '1';"); + ctx.fetch("DELETE FROM id_and_name WHERE id = '2';"); + return null; + }); + + final ConfiguredAirbyteCatalog configuredCatalog = + CONFIGURED_CATALOG + .withStreams(CONFIGURED_CATALOG.getStreams() + .stream() + .filter(s -> s.getStream().getName().equals(STREAM_NAME)) + .toList()); + final PostgresSource source = source(); + source.setStateEmissionFrequencyForDebug(1); + final List actualMessages = MoreIterators.toList(source.read(getConfig(), configuredCatalog, null)); + setEmittedAtToNull(actualMessages); + + final List stateAfterFirstBatch = extractStateMessage(actualMessages); + + setEmittedAtToNull(actualMessages); + + // Assert that the correct number of messages are emitted - final state message. + assertEquals(1, actualMessages.size()); + assertEquals(1, stateAfterFirstBatch.size()); + + AirbyteStateMessage stateMessage = stateAfterFirstBatch.get(0); + assertEquals("ctid", stateMessage.getStream().getStreamState().get("state_type").asText()); + assertEquals("(0,0)", stateMessage.getStream().getStreamState().get("ctid").asText()); + } + @Test void testReadFullRefreshSuccessWithSecondAttempt() throws Exception { // We want to test ordering, so we can delete the NaN entry and add a 3. @@ -556,10 +590,9 @@ void testReadFullRefreshSuccessWithSecondAttempt() throws Exception { MoreIterators.toSet(source.read(getConfig(), configuredCatalog, state)); setEmittedAtToNull(nextSyncMessages); - // An extra state message is emitted, in addition to the record messages. - assertEquals(nextSyncMessages.size(), 3); + // A state message is emitted, in addition to the new record messages. + assertEquals(nextSyncMessages.size(), 2); assertThat(nextSyncMessages.contains(createRecord(STREAM_NAME, SCHEMA_NAME, map("id", "5.0", "name", "piccolo", "power", 100.0)))); - assertThat(nextSyncMessages.contains(createRecord(STREAM_NAME, SCHEMA_NAME, map("id", new BigDecimal("3.0"), "name", "vegeta", "power", 222.1)))); } @Test diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index a28e325a7d14..ec34fe126174 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -9,6 +9,8 @@ Airbyte's certified Postgres connector offers the following features: The contents below include a 'Quick Start' guide, advanced setup steps, and reference information (data type mapping, and changelogs). See [here](https://docs.airbyte.com/integrations/sources/postgres/postgres-troubleshooting) to troubleshooting issues with the Postgres connector. +**Please note the required minimum platform version is v0.58.0 for this connector.** + ![Airbyte Postgres Connection](https://raw.githubusercontent.com/airbytehq/airbyte/c078e8ed6703020a584d9362efa5665fbe8db77f/docs/integrations/sources/postgres/assets/airbyte_postgres_source.png?raw=true) ## Quick Start