Skip to content

Commit

Permalink
source-postgres - Streams not in the CDC publication still have a cur…
Browse files Browse the repository at this point in the history
…sor and PK (#38303)
  • Loading branch information
evantahler authored May 16, 2024
1 parent 5c492b5 commit 49fc60d
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.4.4
dockerImageTag: 3.4.5
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,15 @@ public static AirbyteStream addCdcMetadataColumns(final AirbyteStream stream) {
/**
* Modifies streams that are NOT present in the publication to be full-refresh only streams. Users
* should be able to replicate these streams, just not in incremental mode as they have no
* associated publication.
* associated publication. Previously, we also setSourceDefinedCursor(false) and
* setSourceDefinedPrimaryKey(List.of()) for streams that are in the catalog but not in the CDC
* publication, but now that full refresh streams can be resumable, we should include this
* information.
*/
public static AirbyteStream setFullRefreshForNonPublicationStreams(final AirbyteStream stream,
final Set<AirbyteStreamNameNamespacePair> publicizedTablesInCdc) {
if (!publicizedTablesInCdc.contains(new AirbyteStreamNameNamespacePair(stream.getName(), stream.getNamespace()))) {
stream.setSupportedSyncModes(List.of(SyncMode.FULL_REFRESH));
stream.setSourceDefinedCursor(false);
stream.setSourceDefinedPrimaryKey(List.of());
}
return stream;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,8 +569,8 @@ void testDiscoverFiltersNonPublication() throws Exception {
// The stream that does not have an associated publication should not have support for
// source-defined incremental sync.
assertEquals(streamNotInPublication.getSupportedSyncModes(), List.of(SyncMode.FULL_REFRESH));
assertTrue(streamNotInPublication.getSourceDefinedPrimaryKey().isEmpty());
assertFalse(streamNotInPublication.getSourceDefinedCursor());
assertFalse(streamNotInPublication.getSourceDefinedPrimaryKey().isEmpty());
assertTrue(streamNotInPublication.getSourceDefinedCursor());
testdb.query(ctx -> ctx.execute("DROP PUBLICATION " + testdb.getPublicationName() + ";"));
testdb.query(ctx -> ctx.execute("CREATE PUBLICATION " + testdb.getPublicationName() + " FOR ALL TABLES"));
}
Expand Down
Loading

0 comments on commit 49fc60d

Please sign in to comment.