Skip to content

Commit

Permalink
add a fix
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaohansong committed May 8, 2024
1 parent 7f80227 commit 370e49c
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public AirbyteStateMessage createCtidStateMessage(final AirbyteStreamNameNamespa
public AirbyteStateMessage createFinalStateMessage(final AirbyteStreamNameNamespacePair pair, final JsonNode streamStateForIncrementalRun) {
if (streamStateForIncrementalRun == null || streamStateForIncrementalRun.isEmpty()) {
// resumeable full refresh for cursor based stream.
var ctidStatus = pairToCtidStatus.get(pair);
var ctidStatus = generateCtidStatusForState(pair);
return createCtidStateMessage(pair, ctidStatus);
}
return XminStateManager.getAirbyteStateMessage(pair, Jsons.object(streamStateForIncrementalRun, XminStatus.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,13 @@ public AirbyteStateMessage generateStateMessageAtCheckpoint(final ConfiguredAirb
protected CtidStatus generateCtidStatusForState(final AirbyteStreamNameNamespacePair pair) {
final Long fileNode = fileNodeHandler.getFileNode(pair);
assert fileNode != null;
// If the table is empty, lastCtid will be set to zero for the final state message.
final String lastCtidInState = (Objects.nonNull(lastCtid)
&& StringUtils.isNotBlank(lastCtid)) ? lastCtid : Ctid.ZERO.toString();
return new CtidStatus()
.withVersion(CTID_STATUS_VERSION)
.withStateType(StateType.CTID)
.withCtid(lastCtid)
.withCtid(lastCtidInState)
.withIncrementalState(getStreamState(pair))
.withRelationFilenode(fileNode);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,9 @@ void testUserDoesntHasPrivilegesToSelectTable() throws Exception {
final Set<AirbyteMessage> actualMessages =
MoreIterators.toSet(source().read(anotherUserConfig, CONFIGURED_CATALOG, null));
setEmittedAtToNull(actualMessages);
// expect 6 records and 2 state messages (view does not have its own state message because it goes
// expect 6 records and 3 state messages (view does not have its own state message because it goes
// to non resumable full refresh path).
assertEquals(8, actualMessages.size());
assertEquals(9, actualMessages.size());
final var actualRecordMessages = filterRecords(actualMessages);
assertEquals(PRIVILEGE_TEST_CASE_EXPECTED_MESSAGES, actualRecordMessages);
}
Expand Down Expand Up @@ -508,6 +508,40 @@ void testReadIncrementalSuccess() throws Exception {
assertThat(nextSyncMessages.contains(createRecord(STREAM_NAME, SCHEMA_NAME, map("id", "5.0", "name", "piccolo", "power", 100.0))));
}

@Test
void testReadFullRefreshEmptyTable() throws Exception {
// Delete all data from id_and_name table.
testdb.query(ctx -> {
ctx.fetch("DELETE FROM id_and_name WHERE id = 'NaN';");
ctx.fetch("DELETE FROM id_and_name WHERE id = '1';");
ctx.fetch("DELETE FROM id_and_name WHERE id = '2';");
return null;
});

final ConfiguredAirbyteCatalog configuredCatalog =
CONFIGURED_CATALOG
.withStreams(CONFIGURED_CATALOG.getStreams()
.stream()
.filter(s -> s.getStream().getName().equals(STREAM_NAME))
.toList());
final PostgresSource source = source();
source.setStateEmissionFrequencyForDebug(1);
final List<AirbyteMessage> actualMessages = MoreIterators.toList(source.read(getConfig(), configuredCatalog, null));
setEmittedAtToNull(actualMessages);

final List<AirbyteStateMessage> stateAfterFirstBatch = extractStateMessage(actualMessages);

setEmittedAtToNull(actualMessages);

// Assert that the correct number of messages are emitted - final state message.
assertEquals(1, actualMessages.size());
assertEquals(1, stateAfterFirstBatch.size());

AirbyteStateMessage stateMessage = stateAfterFirstBatch.get(0);
assertEquals("ctid", stateMessage.getStream().getStreamState().get("state_type").asText());
assertEquals("(0,0)", stateMessage.getStream().getStreamState().get("ctid").asText());
}

@Test
void testReadFullRefreshSuccessWithSecondAttempt() throws Exception {
// We want to test ordering, so we can delete the NaN entry and add a 3.
Expand Down Expand Up @@ -556,10 +590,9 @@ void testReadFullRefreshSuccessWithSecondAttempt() throws Exception {
MoreIterators.toSet(source.read(getConfig(), configuredCatalog, state));
setEmittedAtToNull(nextSyncMessages);

// An extra state message is emitted, in addition to the record messages.
assertEquals(nextSyncMessages.size(), 3);
// A state message is emitted, in addition to the new record messages.
assertEquals(nextSyncMessages.size(), 2);
assertThat(nextSyncMessages.contains(createRecord(STREAM_NAME, SCHEMA_NAME, map("id", "5.0", "name", "piccolo", "power", 100.0))));
assertThat(nextSyncMessages.contains(createRecord(STREAM_NAME, SCHEMA_NAME, map("id", new BigDecimal("3.0"), "name", "vegeta", "power", 222.1))));
}

@Test
Expand Down
2 changes: 2 additions & 0 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ Airbyte's certified Postgres connector offers the following features:

The contents below include a 'Quick Start' guide, advanced setup steps, and reference information (data type mapping, and changelogs). See [here](https://docs.airbyte.com/integrations/sources/postgres/postgres-troubleshooting) to troubleshooting issues with the Postgres connector.

**Please note the required minimum platform version is v0.58.0 for this connector.**

![Airbyte Postgres Connection](https://raw.githubusercontent.com/airbytehq/airbyte/c078e8ed6703020a584d9362efa5665fbe8db77f/docs/integrations/sources/postgres/assets/airbyte_postgres_source.png?raw=true)

## Quick Start
Expand Down

0 comments on commit 370e49c

Please sign in to comment.