From 23a237fc42480599624d683585a87df2db5ac844 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Fri, 10 May 2024 12:55:03 -0700 Subject: [PATCH] extract generation id --- .../typing_deduping/CatalogParser.kt | 32 +++++++--- .../typing_deduping/StreamConfig.kt | 5 +- .../typing_deduping/CatalogParserTest.kt | 16 ++--- .../DefaultTyperDeduperTest.kt | 18 ++++-- .../DestinationV1V2MigratorTest.kt | 14 +++-- .../BaseSqlGeneratorIntegrationTest.kt | 60 ++++++++++++------- 6 files changed, 96 insertions(+), 49 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt index 4ac3e5e4d6f0..ab3ed847f5ae 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt @@ -55,13 +55,13 @@ constructor( .substring(0, 3) val newName = "${originalName}_$hash" actualStreamConfig = - StreamConfig( - sqlGenerator.buildStreamId(originalNamespace, newName, rawNamespace), - originalStreamConfig.syncMode, - originalStreamConfig.destinationSyncMode, - originalStreamConfig.primaryKey, - originalStreamConfig.cursor, - originalStreamConfig.columns, + originalStreamConfig.copy( + id = + sqlGenerator.buildStreamId( + originalNamespace, + newName, + rawNamespace, + ), ) } else { actualStreamConfig = originalStreamConfig @@ -112,6 +112,18 @@ constructor( @VisibleForTesting fun toStreamConfig(stream: ConfiguredAirbyteStream): StreamConfig { + if (stream.generationId == null) { + stream.generationId = 0 + stream.minimumGenerationId = 0 + stream.syncId = 0 + } + if ( + stream.minimumGenerationId != 0.toLong() && + stream.minimumGenerationId != stream.generationId + ) { + throw UnsupportedOperationException("Hybrid refreshes are not yet supported.") + } + val airbyteColumns = when ( val schema: AirbyteType = @@ -143,11 +155,13 @@ constructor( return StreamConfig( sqlGenerator.buildStreamId(stream.stream.namespace, stream.stream.name, rawNamespace), - stream.syncMode, stream.destinationSyncMode, primaryKey, cursor, - columns + columns, + stream.generationId, + stream.minimumGenerationId, + stream.syncId, ) } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/StreamConfig.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/StreamConfig.kt index c0fc5f7ce4a7..80fa46843646 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/StreamConfig.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/StreamConfig.kt @@ -4,15 +4,16 @@ package io.airbyte.integrations.base.destination.typing_deduping import io.airbyte.protocol.models.v0.DestinationSyncMode -import io.airbyte.protocol.models.v0.SyncMode import java.util.* import kotlin.collections.LinkedHashMap data class StreamConfig( val id: StreamId, - val syncMode: SyncMode, val destinationSyncMode: DestinationSyncMode, val primaryKey: List, val cursor: Optional, val columns: LinkedHashMap, + val generationId: Long, + val minimumGenerationId: Long, + val syncId: Long, ) diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParserTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParserTest.kt index 537044ac53cc..9032fc5e42fb 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParserTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParserTest.kt @@ -10,7 +10,6 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream import io.airbyte.protocol.models.v0.DestinationSyncMode import io.airbyte.protocol.models.v0.SyncMode -import java.util.List import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions.assertAll import org.junit.jupiter.api.BeforeEach @@ -74,7 +73,7 @@ internal class CatalogParserTest { } val catalog = ConfiguredAirbyteCatalog() - .withStreams(List.of(stream("a", "foobarfoo"), stream("a", "foofoo"))) + .withStreams(listOf(stream("a", "foobarfoo"), stream("a", "foofoo"))) val parsedCatalog = parser.parseCatalog(catalog) @@ -127,13 +126,13 @@ internal class CatalogParserTest { """.trimIndent() ) - val catalog = ConfiguredAirbyteCatalog().withStreams(List.of(stream("a", "a", schema))) + val catalog = ConfiguredAirbyteCatalog().withStreams(listOf(stream("a", "a", schema))) val parsedCatalog = parser.parseCatalog(catalog) - val columnsList = parsedCatalog.streams[0].columns!!.keys.toList() + val columnsList = parsedCatalog.streams[0].columns.keys.toList() assertAll( - { Assertions.assertEquals(2, parsedCatalog.streams[0].columns!!.size) }, + { Assertions.assertEquals(2, parsedCatalog.streams[0].columns.size) }, { Assertions.assertEquals("foofoo", columnsList[0].name) }, { Assertions.assertEquals("foofoo_1", columnsList[1].name) } ) @@ -168,10 +167,10 @@ internal class CatalogParserTest { val catalog = ConfiguredAirbyteCatalog().withStreams(listOf(stream("a", "a", schema))) val parsedCatalog = parser.parseCatalog(catalog) - val columnsList = parsedCatalog.streams[0].columns!!.keys.toList() + val columnsList = parsedCatalog.streams[0].columns.keys.toList() assertAll( - { Assertions.assertEquals(2, parsedCatalog.streams[0].columns!!.size) }, + { Assertions.assertEquals(2, parsedCatalog.streams[0].columns.size) }, { Assertions.assertEquals("aVeryLongC", columnsList[0].name) }, { Assertions.assertEquals("aV36rd", columnsList[1].name) } ) @@ -200,6 +199,9 @@ internal class CatalogParserTest { ) .withSyncMode(SyncMode.INCREMENTAL) .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withGenerationId(0) + .withMinimumGenerationId(0) + .withSyncId(0) } } } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.kt index 5209367698ec..f681d9be3be3 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.kt @@ -913,11 +913,13 @@ class DefaultTyperDeduperTest { "overwrite_ns", "overwrite_stream" ), - mock(), DestinationSyncMode.OVERWRITE, mock(), mock(), - mock() + mock(), + 0, + 0, + 0, ) private val APPEND_STREAM_CONFIG = StreamConfig( @@ -929,11 +931,13 @@ class DefaultTyperDeduperTest { "append_ns", "append_stream" ), - mock(), DestinationSyncMode.APPEND, mock(), mock(), - mock() + mock(), + 0, + 0, + 0, ) private val DEDUPE_STREAM_CONFIG = StreamConfig( @@ -945,11 +949,13 @@ class DefaultTyperDeduperTest { "dedup_ns", "dedup_stream" ), - mock(), DestinationSyncMode.APPEND_DEDUP, mock(), mock(), - mock() + mock(), + 0, + 0, + 0, ) } } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2MigratorTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2MigratorTest.kt index 646e32363baa..c1739fea4ad0 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2MigratorTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2MigratorTest.kt @@ -77,7 +77,7 @@ class DestinationV1V2MigratorTest { migrator: BaseDestinationV1V2Migrator<*>, expected: Boolean ) { - val config = StreamConfig(STREAM_ID, mock(), destinationSyncMode, mock(), mock(), mock()) + val config = StreamConfig(STREAM_ID, destinationSyncMode, mock(), mock(), mock(), 0, 0, 0) val actual = migrator.shouldMigrate(config) Assertions.assertEquals(expected, actual) } @@ -88,11 +88,13 @@ class DestinationV1V2MigratorTest { val config = StreamConfig( STREAM_ID, - mock(), DestinationSyncMode.APPEND_DEDUP, mock(), mock(), - mock() + mock(), + 0, + 0, + 0, ) val migrator = makeMockMigrator(true, true, false, false, false) val exception = @@ -112,11 +114,13 @@ class DestinationV1V2MigratorTest { val stream = StreamConfig( STREAM_ID, - mock(), DestinationSyncMode.APPEND_DEDUP, mock(), mock(), - mock() + mock(), + 0, + 0, + 0, ) val handler = Mockito.mock(DestinationHandler::class.java) val sql = sqlGenerator.migrateFromV1toV2(STREAM_ID, "v1_raw_namespace", "v1_raw_table") diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt index 7750a8e0cbf8..ac9e81c8d9be 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt @@ -222,39 +222,47 @@ abstract class BaseSqlGeneratorIntegrationTest(), Optional.empty(), - LinkedHashMap() + LinkedHashMap(), + 0, + 0, + 0, ) val createTable = generator.createTable(stream, "", false)