diff --git a/airbyte-cdk/bulk/core/base/build.gradle b/airbyte-cdk/bulk/core/base/build.gradle index e854169a8b68..245689c78820 100644 --- a/airbyte-cdk/bulk/core/base/build.gradle +++ b/airbyte-cdk/bulk/core/base/build.gradle @@ -5,7 +5,7 @@ dependencies { api 'com.fasterxml.jackson.core:jackson-databind' api 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' api 'com.kjetland:mbknor-jackson-jsonschema_2.13:1.0.39' - api('io.airbyte.airbyte-protocol:protocol-models:0.9.0') { + api('io.airbyte.airbyte-protocol:protocol-models:0.12.2') { exclude group: 'com.google.guava', module: 'guava' exclude group: 'com.google.api-client' exclude group: 'org.apache.logging.log4j' diff --git a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/operation/DiscoverOperation.kt b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/operation/DiscoverOperation.kt index 39bda1a65c4e..b71feb34518a 100644 --- a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/operation/DiscoverOperation.kt +++ b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/operation/DiscoverOperation.kt @@ -39,9 +39,8 @@ class DiscoverOperation( } continue } - val primaryKeys: List> = - metadataQuerier.primaryKeys(name, namespace) - val discoveredStream = DiscoveredStream(name, namespace, fields, primaryKeys) + val primaryKey: List> = metadataQuerier.primaryKey(name, namespace) + val discoveredStream = DiscoveredStream(name, namespace, fields, primaryKey) airbyteStreams.add(toAirbyteStream(discoveredStream)) } } @@ -68,6 +67,7 @@ class DiscoverOperation( airbyteStream.withSourceDefinedPrimaryKey( if (isValidPK) discoveredStream.primaryKeyColumnIDs else listOf(), ) + airbyteStream.isResumable = airbyteStream.sourceDefinedPrimaryKey.isNotEmpty() if (config.global) { // There is a global feed of incremental records, like CDC. airbyteStreamDecorator.decorateGlobal(airbyteStream) diff --git a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/source/AirbyteStreamDecorator.kt b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/source/AirbyteStreamDecorator.kt index e6e93774ebd2..618b3e61f043 100644 --- a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/source/AirbyteStreamDecorator.kt +++ b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/source/AirbyteStreamDecorator.kt @@ -34,7 +34,7 @@ interface AirbyteStreamDecorator { * message. * * This method does not determine (1), of course, because the source keys are defined in the - * source database itself and are retrieved via [MetadataQuerier.primaryKeys]. Instead, this + * source database itself and are retrieved via [MetadataQuerier.primaryKey]. Instead, this * method determines (2) based on the type information of the field, typically the [FieldType] * objects. For instance if the [Field.type] does not map to a [LosslessFieldType] then the * field can't reliably round-trip checkpoint values during a resumable initial sync. diff --git a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/source/MetadataQuerier.kt b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/source/MetadataQuerier.kt index b3f1a0e066f7..9438d6375fa6 100644 --- a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/source/MetadataQuerier.kt +++ b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/source/MetadataQuerier.kt @@ -19,8 +19,8 @@ interface MetadataQuerier : AutoCloseable { streamNamespace: String?, ): List - /** Queries the information_schema for all primary keys for the given table. */ - fun primaryKeys( + /** Queries the information_schema for any primary key on the given table. */ + fun primaryKey( streamName: String, streamNamespace: String?, ): List> diff --git a/airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/jdbc/JdbcMetadataQuerierTest.kt b/airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/jdbc/JdbcMetadataQuerierTest.kt index f42d0acbc20f..2ae7559243db 100644 --- a/airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/jdbc/JdbcMetadataQuerierTest.kt +++ b/airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/jdbc/JdbcMetadataQuerierTest.kt @@ -72,7 +72,7 @@ class JdbcMetadataQuerierTest { val tableName = (mdq as JdbcMetadataQuerier).findTableName("KV", "PUBLIC") Assertions.assertNotNull(tableName) Assertions.assertEquals(expectedColumnMetadata, mdq.columnMetadata(tableName!!)) - Assertions.assertEquals(listOf(listOf("K")), mdq.primaryKeys("KV", "PUBLIC")) + Assertions.assertEquals(listOf(listOf("K")), mdq.primaryKey("KV", "PUBLIC")) } } } diff --git a/airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/test/source/FakeSourceDiscoverTest.kt b/airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/test/source/FakeSourceDiscoverTest.kt index 05feeaa5cc1d..c1c02b720058 100644 --- a/airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/test/source/FakeSourceDiscoverTest.kt +++ b/airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/test/source/FakeSourceDiscoverTest.kt @@ -34,6 +34,7 @@ class FakeSourceDiscoverTest { .withJsonSchema(Jsons.readTree(EVENTS_SCHEMA)) .withSupportedSyncModes(listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) .withSourceDefinedPrimaryKey(listOf(listOf("ID"))) + .withIsResumable(true) val kv = AirbyteStream() .withName("KV") @@ -41,6 +42,7 @@ class FakeSourceDiscoverTest { .withJsonSchema(Jsons.readTree(KV_SCHEMA)) .withSupportedSyncModes(listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) .withSourceDefinedPrimaryKey(listOf(listOf("K"))) + .withIsResumable(true) val expected = AirbyteCatalog().withStreams(listOf(events, kv)) discoverOperation.execute() Assertions.assertEquals(listOf(expected), outputConsumer.catalogs()) @@ -59,6 +61,7 @@ class FakeSourceDiscoverTest { .withJsonSchema(Jsons.readTree(EVENTS_SCHEMA)) .withSupportedSyncModes(listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) .withSourceDefinedPrimaryKey(listOf(listOf("ID"))) + .withIsResumable(true) val kv = AirbyteStream() .withName("KV") @@ -66,6 +69,7 @@ class FakeSourceDiscoverTest { .withJsonSchema(Jsons.readTree(KV_SCHEMA)) .withSupportedSyncModes(listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) .withSourceDefinedPrimaryKey(listOf(listOf("K"))) + .withIsResumable(true) val expected = AirbyteCatalog().withStreams(listOf(events, kv)) discoverOperation.execute() Assertions.assertEquals(listOf(expected), outputConsumer.catalogs()) diff --git a/airbyte-cdk/bulk/core/extract/src/test/resources/read/cdc-catalog.json b/airbyte-cdk/bulk/core/extract/src/test/resources/read/cdc-catalog.json index 1d5a7dd29133..7fd5bc286a51 100644 --- a/airbyte-cdk/bulk/core/extract/src/test/resources/read/cdc-catalog.json +++ b/airbyte-cdk/bulk/core/extract/src/test/resources/read/cdc-catalog.json @@ -22,6 +22,7 @@ "supported_sync_modes": ["full_refresh", "incremental"], "default_cursor_field": ["ID", "TS"], "source_defined_primary_key": [["ID"]], + "is_resumable": true, "namespace": "PUBLIC" }, "sync_mode": "full_refresh", @@ -47,6 +48,7 @@ "supported_sync_modes": ["full_refresh", "incremental"], "default_cursor_field": ["K"], "source_defined_primary_key": [["K"]], + "is_resumable": true, "namespace": "PUBLIC" }, "sync_mode": "incremental", diff --git a/airbyte-cdk/bulk/core/extract/src/test/resources/read/cursor-catalog.json b/airbyte-cdk/bulk/core/extract/src/test/resources/read/cursor-catalog.json index 905e9ee81360..f1b4850c1fe1 100644 --- a/airbyte-cdk/bulk/core/extract/src/test/resources/read/cursor-catalog.json +++ b/airbyte-cdk/bulk/core/extract/src/test/resources/read/cursor-catalog.json @@ -22,6 +22,7 @@ "supported_sync_modes": ["full_refresh", "incremental"], "default_cursor_field": [], "source_defined_primary_key": [["ID"]], + "is_resumable": true, "namespace": "PUBLIC" }, "sync_mode": "incremental", @@ -47,6 +48,7 @@ "supported_sync_modes": ["full_refresh", "incremental"], "default_cursor_field": [], "source_defined_primary_key": [["K"]], + "is_resumable": true, "namespace": "PUBLIC" }, "sync_mode": "full_refresh", diff --git a/airbyte-cdk/bulk/core/extract/src/test/resources/test/source/cdc-catalog.json b/airbyte-cdk/bulk/core/extract/src/test/resources/test/source/cdc-catalog.json index 0e5082880699..0b4c5b8d3af9 100644 --- a/airbyte-cdk/bulk/core/extract/src/test/resources/test/source/cdc-catalog.json +++ b/airbyte-cdk/bulk/core/extract/src/test/resources/test/source/cdc-catalog.json @@ -23,6 +23,7 @@ "supported_sync_modes": ["full_refresh", "incremental"], "default_cursor_field": ["ID", "TS"], "source_defined_primary_key": [["ID"]], + "is_resumable": true, "namespace": "PUBLIC" }, "sync_mode": "full_refresh", @@ -48,6 +49,7 @@ "supported_sync_modes": ["full_refresh", "incremental"], "default_cursor_field": ["K"], "source_defined_primary_key": [["K"]], + "is_resumable": true, "namespace": "PUBLIC" }, "sync_mode": "incremental", diff --git a/airbyte-cdk/bulk/core/extract/src/test/resources/test/source/cursor-catalog.json b/airbyte-cdk/bulk/core/extract/src/test/resources/test/source/cursor-catalog.json index 6a9e3ba563db..3520f52b260b 100644 --- a/airbyte-cdk/bulk/core/extract/src/test/resources/test/source/cursor-catalog.json +++ b/airbyte-cdk/bulk/core/extract/src/test/resources/test/source/cursor-catalog.json @@ -23,6 +23,7 @@ "supported_sync_modes": ["full_refresh", "incremental"], "default_cursor_field": [], "source_defined_primary_key": [["ID"]], + "is_resumable": true, "namespace": "PUBLIC" }, "sync_mode": "incremental", @@ -48,6 +49,7 @@ "supported_sync_modes": ["full_refresh", "incremental"], "default_cursor_field": [], "source_defined_primary_key": [["K"]], + "is_resumable": true, "namespace": "PUBLIC" }, "sync_mode": "full_refresh", diff --git a/airbyte-cdk/bulk/core/extract/src/test/resources/test/source/expected-cdc-catalog.json b/airbyte-cdk/bulk/core/extract/src/test/resources/test/source/expected-cdc-catalog.json index fb29f0dd470a..2744b92ca0c4 100644 --- a/airbyte-cdk/bulk/core/extract/src/test/resources/test/source/expected-cdc-catalog.json +++ b/airbyte-cdk/bulk/core/extract/src/test/resources/test/source/expected-cdc-catalog.json @@ -40,6 +40,7 @@ "source_defined_cursor": true, "default_cursor_field": ["_ab_cdc_lsn"], "source_defined_primary_key": [["ID"]], + "is_resumable": true, "namespace": "PUBLIC" }, { @@ -77,6 +78,7 @@ "source_defined_cursor": true, "default_cursor_field": ["_ab_cdc_lsn"], "source_defined_primary_key": [["K"]], + "is_resumable": true, "namespace": "PUBLIC" } ] diff --git a/airbyte-cdk/bulk/core/extract/src/test/resources/test/source/expected-cursor-catalog.json b/airbyte-cdk/bulk/core/extract/src/test/resources/test/source/expected-cursor-catalog.json index 052b6b8c9d17..361331109c39 100644 --- a/airbyte-cdk/bulk/core/extract/src/test/resources/test/source/expected-cursor-catalog.json +++ b/airbyte-cdk/bulk/core/extract/src/test/resources/test/source/expected-cursor-catalog.json @@ -26,6 +26,7 @@ "supported_sync_modes": ["full_refresh", "incremental"], "default_cursor_field": [], "source_defined_primary_key": [["ID"]], + "is_resumable": true, "namespace": "PUBLIC" }, { @@ -49,6 +50,7 @@ "supported_sync_modes": ["full_refresh", "incremental"], "default_cursor_field": [], "source_defined_primary_key": [["K"]], + "is_resumable": true, "namespace": "PUBLIC" } ] diff --git a/airbyte-cdk/bulk/core/extract/src/testFixtures/kotlin/io/airbyte/cdk/discover/ResourceDrivenMetadataQuerierFactory.kt b/airbyte-cdk/bulk/core/extract/src/testFixtures/kotlin/io/airbyte/cdk/discover/ResourceDrivenMetadataQuerierFactory.kt index 88f737adea86..008e91c95e7e 100644 --- a/airbyte-cdk/bulk/core/extract/src/testFixtures/kotlin/io/airbyte/cdk/discover/ResourceDrivenMetadataQuerierFactory.kt +++ b/airbyte-cdk/bulk/core/extract/src/testFixtures/kotlin/io/airbyte/cdk/discover/ResourceDrivenMetadataQuerierFactory.kt @@ -66,7 +66,7 @@ class ResourceDrivenMetadataQuerierFactory( ?: throw SQLException("query failed", "tbl") } - override fun primaryKeys( + override fun primaryKey( streamName: String, streamNamespace: String?, ): List> { diff --git a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/jdbc/JdbcMetadataQuerier.kt b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/jdbc/JdbcMetadataQuerier.kt index ef958979bb09..300ae58cb51f 100644 --- a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/jdbc/JdbcMetadataQuerier.kt +++ b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/jdbc/JdbcMetadataQuerier.kt @@ -272,7 +272,7 @@ class JdbcMetadataQuerier( val memoizedPrimaryKeys = mutableMapOf>>() - override fun primaryKeys( + override fun primaryKey( streamName: String, streamNamespace: String?, ): List> {