Skip to content

Commit

Permalink
bulk-cdk: DiscoverOperation supports isResumable (#43406)
Browse files Browse the repository at this point in the history
  • Loading branch information
postamar authored Aug 8, 2024
1 parent 55c94f9 commit a16cc58
Show file tree
Hide file tree
Showing 14 changed files with 26 additions and 10 deletions.
2 changes: 1 addition & 1 deletion airbyte-cdk/bulk/core/base/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ dependencies {
api 'com.fasterxml.jackson.core:jackson-databind'
api 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
api 'com.kjetland:mbknor-jackson-jsonschema_2.13:1.0.39'
api('io.airbyte.airbyte-protocol:protocol-models:0.9.0') {
api('io.airbyte.airbyte-protocol:protocol-models:0.12.2') {
exclude group: 'com.google.guava', module: 'guava'
exclude group: 'com.google.api-client'
exclude group: 'org.apache.logging.log4j'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ class DiscoverOperation(
}
continue
}
val primaryKeys: List<List<String>> =
metadataQuerier.primaryKeys(name, namespace)
val discoveredStream = DiscoveredStream(name, namespace, fields, primaryKeys)
val primaryKey: List<List<String>> = metadataQuerier.primaryKey(name, namespace)
val discoveredStream = DiscoveredStream(name, namespace, fields, primaryKey)
airbyteStreams.add(toAirbyteStream(discoveredStream))
}
}
Expand All @@ -68,6 +67,7 @@ class DiscoverOperation(
airbyteStream.withSourceDefinedPrimaryKey(
if (isValidPK) discoveredStream.primaryKeyColumnIDs else listOf(),
)
airbyteStream.isResumable = airbyteStream.sourceDefinedPrimaryKey.isNotEmpty()
if (config.global) {
// There is a global feed of incremental records, like CDC.
airbyteStreamDecorator.decorateGlobal(airbyteStream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ interface AirbyteStreamDecorator {
* message.
*
* This method does not determine (1), of course, because the source keys are defined in the
* source database itself and are retrieved via [MetadataQuerier.primaryKeys]. Instead, this
* source database itself and are retrieved via [MetadataQuerier.primaryKey]. Instead, this
* method determines (2) based on the type information of the field, typically the [FieldType]
* objects. For instance if the [Field.type] does not map to a [LosslessFieldType] then the
* field can't reliably round-trip checkpoint values during a resumable initial sync.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ interface MetadataQuerier : AutoCloseable {
streamNamespace: String?,
): List<Field>

/** Queries the information_schema for all primary keys for the given table. */
fun primaryKeys(
/** Queries the information_schema for any primary key on the given table. */
fun primaryKey(
streamName: String,
streamNamespace: String?,
): List<List<String>>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class JdbcMetadataQuerierTest {
val tableName = (mdq as JdbcMetadataQuerier).findTableName("KV", "PUBLIC")
Assertions.assertNotNull(tableName)
Assertions.assertEquals(expectedColumnMetadata, mdq.columnMetadata(tableName!!))
Assertions.assertEquals(listOf(listOf("K")), mdq.primaryKeys("KV", "PUBLIC"))
Assertions.assertEquals(listOf(listOf("K")), mdq.primaryKey("KV", "PUBLIC"))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ class FakeSourceDiscoverTest {
.withJsonSchema(Jsons.readTree(EVENTS_SCHEMA))
.withSupportedSyncModes(listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(listOf(listOf("ID")))
.withIsResumable(true)
val kv =
AirbyteStream()
.withName("KV")
.withNamespace("PUBLIC")
.withJsonSchema(Jsons.readTree(KV_SCHEMA))
.withSupportedSyncModes(listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(listOf(listOf("K")))
.withIsResumable(true)
val expected = AirbyteCatalog().withStreams(listOf(events, kv))
discoverOperation.execute()
Assertions.assertEquals(listOf(expected), outputConsumer.catalogs())
Expand All @@ -59,13 +61,15 @@ class FakeSourceDiscoverTest {
.withJsonSchema(Jsons.readTree(EVENTS_SCHEMA))
.withSupportedSyncModes(listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(listOf(listOf("ID")))
.withIsResumable(true)
val kv =
AirbyteStream()
.withName("KV")
.withNamespace("PUBLIC")
.withJsonSchema(Jsons.readTree(KV_SCHEMA))
.withSupportedSyncModes(listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(listOf(listOf("K")))
.withIsResumable(true)
val expected = AirbyteCatalog().withStreams(listOf(events, kv))
discoverOperation.execute()
Assertions.assertEquals(listOf(expected), outputConsumer.catalogs())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"supported_sync_modes": ["full_refresh", "incremental"],
"default_cursor_field": ["ID", "TS"],
"source_defined_primary_key": [["ID"]],
"is_resumable": true,
"namespace": "PUBLIC"
},
"sync_mode": "full_refresh",
Expand All @@ -47,6 +48,7 @@
"supported_sync_modes": ["full_refresh", "incremental"],
"default_cursor_field": ["K"],
"source_defined_primary_key": [["K"]],
"is_resumable": true,
"namespace": "PUBLIC"
},
"sync_mode": "incremental",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"supported_sync_modes": ["full_refresh", "incremental"],
"default_cursor_field": [],
"source_defined_primary_key": [["ID"]],
"is_resumable": true,
"namespace": "PUBLIC"
},
"sync_mode": "incremental",
Expand All @@ -47,6 +48,7 @@
"supported_sync_modes": ["full_refresh", "incremental"],
"default_cursor_field": [],
"source_defined_primary_key": [["K"]],
"is_resumable": true,
"namespace": "PUBLIC"
},
"sync_mode": "full_refresh",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"supported_sync_modes": ["full_refresh", "incremental"],
"default_cursor_field": ["ID", "TS"],
"source_defined_primary_key": [["ID"]],
"is_resumable": true,
"namespace": "PUBLIC"
},
"sync_mode": "full_refresh",
Expand All @@ -48,6 +49,7 @@
"supported_sync_modes": ["full_refresh", "incremental"],
"default_cursor_field": ["K"],
"source_defined_primary_key": [["K"]],
"is_resumable": true,
"namespace": "PUBLIC"
},
"sync_mode": "incremental",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"supported_sync_modes": ["full_refresh", "incremental"],
"default_cursor_field": [],
"source_defined_primary_key": [["ID"]],
"is_resumable": true,
"namespace": "PUBLIC"
},
"sync_mode": "incremental",
Expand All @@ -48,6 +49,7 @@
"supported_sync_modes": ["full_refresh", "incremental"],
"default_cursor_field": [],
"source_defined_primary_key": [["K"]],
"is_resumable": true,
"namespace": "PUBLIC"
},
"sync_mode": "full_refresh",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
"source_defined_cursor": true,
"default_cursor_field": ["_ab_cdc_lsn"],
"source_defined_primary_key": [["ID"]],
"is_resumable": true,
"namespace": "PUBLIC"
},
{
Expand Down Expand Up @@ -77,6 +78,7 @@
"source_defined_cursor": true,
"default_cursor_field": ["_ab_cdc_lsn"],
"source_defined_primary_key": [["K"]],
"is_resumable": true,
"namespace": "PUBLIC"
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"supported_sync_modes": ["full_refresh", "incremental"],
"default_cursor_field": [],
"source_defined_primary_key": [["ID"]],
"is_resumable": true,
"namespace": "PUBLIC"
},
{
Expand All @@ -49,6 +50,7 @@
"supported_sync_modes": ["full_refresh", "incremental"],
"default_cursor_field": [],
"source_defined_primary_key": [["K"]],
"is_resumable": true,
"namespace": "PUBLIC"
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class ResourceDrivenMetadataQuerierFactory(
?: throw SQLException("query failed", "tbl")
}

override fun primaryKeys(
override fun primaryKey(
streamName: String,
streamNamespace: String?,
): List<List<String>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ class JdbcMetadataQuerier(

val memoizedPrimaryKeys = mutableMapOf<TableName, List<List<String>>>()

override fun primaryKeys(
override fun primaryKey(
streamName: String,
streamNamespace: String?,
): List<List<String>> {
Expand Down

0 comments on commit a16cc58

Please sign in to comment.