From 634c4941c2d8a80eef2d2547cd760731db199bf3 Mon Sep 17 00:00:00 2001 From: Stephane Geneix <147216312+stephane-airbyte@users.noreply.github.com> Date: Wed, 7 Aug 2024 14:52:49 -0700 Subject: [PATCH] cdk-java make TypingDedupingTest aware of column name overrides (#43330) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ## How ## Review guide ## User Impact ## Can this PR be safely reverted and rolled back? - [ ] YES 💚 - [ ] NO ❌ --- airbyte-cdk/java/airbyte-cdk/README.md | 2 + .../src/main/resources/version.properties | 2 +- .../typing_deduping/BaseTypingDedupingTest.kt | 117 +++++------------- 3 files changed, 36 insertions(+), 85 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index ed429f271dff..d8a5c6762d33 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -174,6 +174,8 @@ corresponds to that version. | Version | Date | Pull Request | Subject | |:-----------|:-----------|:------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.44.3 | 2024-08-07 | [\#43330](https://github.com/airbytehq/airbyte/pull/43330) | make TypingDedupingTest aware of column name renaming. | +| 0.44.3 | 2024-08-07 | [\#43329](https://github.com/airbytehq/airbyte/pull/43329) | move generationIdHandling to its own class. | | 0.44.2 | 2024-08-06 | [\#42869](https://github.com/airbytehq/airbyte/pull/42869) | Add logs about counting info to state message. | | 0.44.1 | 2024-08-01 | [\#42550](https://github.com/airbytehq/airbyte/pull/42550) | Fix error on reporting counts. | | 0.44.0 | 2024-08-01 | [\#42405](https://github.com/airbytehq/airbyte/pull/42405) | s3-destinations: Use async framework, adapt to support refreshes | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 7f02336b9adf..f965ccbf2396 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.44.2 +version=0.44.3 diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt index 4e2d91c2935e..e8a325ccb4d1 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt @@ -32,7 +32,6 @@ import java.util.* import java.util.concurrent.Callable import java.util.concurrent.CompletableFuture import java.util.concurrent.Executors -import java.util.concurrent.TimeUnit import java.util.function.Consumer import java.util.function.Function import kotlin.test.assertFails @@ -395,12 +394,7 @@ abstract class BaseTypingDedupingTest { val expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl") val expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl") - (expectedRawRecords1 + expectedFinalRecords1).forEach { - (it as ObjectNode).put( - JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, - inputGenerationId - ) - } + fixGenerationId(expectedRawRecords1, expectedFinalRecords1, inputGenerationId) verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison()) // Second sync @@ -411,12 +405,7 @@ abstract class BaseTypingDedupingTest { val expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_with_new_gen_id_raw.jsonl") val expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_with_new_gen_id_final.jsonl") - (expectedRawRecords2 + expectedFinalRecords2).forEach { - (it as ObjectNode).put( - JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, - inputGenerationId - ) - } + fixGenerationId(expectedRawRecords2, expectedFinalRecords2, inputGenerationId) verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison()) } @@ -461,12 +450,7 @@ abstract class BaseTypingDedupingTest { val expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl") val expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl") - (expectedRawRecords1 + expectedFinalRecords1).forEach { - (it as ObjectNode).put( - JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, - inputGenerationId - ) - } + fixGenerationId(expectedRawRecords1, expectedFinalRecords1, inputGenerationId) verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison()) // Second sync @@ -477,12 +461,7 @@ abstract class BaseTypingDedupingTest { val expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_raw.jsonl") val expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl") - (expectedRawRecords2 + expectedFinalRecords2).forEach { - (it as ObjectNode).put( - JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, - inputGenerationId - ) - } + fixGenerationId(expectedRawRecords2, expectedFinalRecords2, inputGenerationId) verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison()) } @@ -524,12 +503,7 @@ abstract class BaseTypingDedupingTest { val expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl") val expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_dedup_final.jsonl") - (expectedRawRecords1 + expectedFinalRecords1).forEach { - (it as ObjectNode).put( - JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, - inputGenerationId - ) - } + fixGenerationId(expectedRawRecords1, expectedFinalRecords1, inputGenerationId) verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison()) // Second sync @@ -540,12 +514,7 @@ abstract class BaseTypingDedupingTest { val expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_raw.jsonl") val expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_incremental_dedup_final.jsonl") - (expectedRawRecords2 + expectedFinalRecords2).forEach { - (it as ObjectNode).put( - JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, - inputGenerationId - ) - } + fixGenerationId(expectedRawRecords2, expectedFinalRecords2, inputGenerationId) verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison()) } @@ -557,9 +526,7 @@ abstract class BaseTypingDedupingTest { @ParameterizedTest @ValueSource(longs = [0L, 42L]) @Throws(Exception::class) - // This test writes a lot of data to the destination and can take longer than a minute. - @Timeout(value = 15, unit = TimeUnit.MINUTES) - fun largeDedupSync(inputGenerationId: Long) { + open fun largeDedupSync(inputGenerationId: Long) { val catalog = io.airbyte.protocol.models.v0 .ConfiguredAirbyteCatalog() @@ -592,12 +559,7 @@ abstract class BaseTypingDedupingTest { repeatList(25000, readRecords("dat/sync1_expectedrecords_raw.jsonl")) // But the final table should be fully deduped val expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_dedup_final.jsonl") - (expectedRawRecords1 + expectedFinalRecords1).forEach { - (it as ObjectNode).put( - JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, - inputGenerationId - ) - } + fixGenerationId(expectedRawRecords1, expectedFinalRecords1, inputGenerationId) verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison()) } @@ -634,12 +596,7 @@ abstract class BaseTypingDedupingTest { val expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl") val expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_dedup_final.jsonl") - (expectedRawRecords1 + expectedFinalRecords1).forEach { - (it as ObjectNode).put( - JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, - inputGenerationId - ) - } + fixGenerationId(expectedRawRecords1, expectedFinalRecords1, inputGenerationId) verifySyncResult( expectedRawRecords1, expectedFinalRecords1, @@ -656,12 +613,7 @@ abstract class BaseTypingDedupingTest { val expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_raw.jsonl") val expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_incremental_dedup_final.jsonl") - (expectedRawRecords2 + expectedFinalRecords2).forEach { - (it as ObjectNode).put( - JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, - inputGenerationId - ) - } + fixGenerationId(expectedRawRecords2, expectedFinalRecords2, inputGenerationId) verifySyncResult( expectedRawRecords2, expectedFinalRecords2, @@ -730,12 +682,7 @@ abstract class BaseTypingDedupingTest { val expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl") val expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl") - (expectedRawRecords1 + expectedFinalRecords1).forEach { - (it as ObjectNode).put( - JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, - inputGenerationId - ) - } + fixGenerationId(expectedRawRecords1, expectedFinalRecords1, inputGenerationId) verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison()) // Second sync @@ -754,12 +701,7 @@ abstract class BaseTypingDedupingTest { expectedFinalRecords2.forEach { record: JsonNode -> (record as ObjectNode).remove(sqlGenerator.buildColumnId("name").name) } - (expectedRawRecords2 + expectedFinalRecords2).forEach { - (it as ObjectNode).put( - JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, - inputGenerationId - ) - } + fixGenerationId(expectedRawRecords2, expectedFinalRecords2, inputGenerationId) verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison()) } @@ -897,8 +839,6 @@ abstract class BaseTypingDedupingTest { * stdout. */ @Test - // This test writes a lot of data to the destination and can take longer than a minute. - @Timeout(value = 15, unit = TimeUnit.MINUTES) @Throws(Exception::class) open fun identicalNameSimultaneousSync() { val namespace1 = streamNamespace + "_1" @@ -1071,12 +1011,7 @@ abstract class BaseTypingDedupingTest { readRecords("dat/sync1_cursorchange_expectedrecords_dedup_raw.jsonl") val expectedFinalRecords1 = readRecords("dat/sync1_cursorchange_expectedrecords_dedup_final.jsonl") - (expectedRawRecords1 + expectedFinalRecords1).forEach { - (it as ObjectNode).put( - JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, - inputGenerationId - ) - } + fixGenerationId(expectedRawRecords1, expectedFinalRecords1, inputGenerationId) verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison()) // Second sync @@ -1090,12 +1025,7 @@ abstract class BaseTypingDedupingTest { readRecords("dat/sync2_cursorchange_expectedrecords_incremental_dedup_raw.jsonl") val expectedFinalRecords2 = readRecords("dat/sync2_cursorchange_expectedrecords_incremental_dedup_final.jsonl") - (expectedRawRecords2 + expectedFinalRecords2).forEach { - (it as ObjectNode).put( - JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, - inputGenerationId - ) - } + fixGenerationId(expectedRawRecords2, expectedFinalRecords2, inputGenerationId) verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison()) } @@ -1516,6 +1446,25 @@ abstract class BaseTypingDedupingTest { return Companion.readRecords(filename) } + fun fixGenerationId( + rawRecords: List, + finalRecords: List, + generationId: Long + ) { + rawRecords.forEach { + (it as ObjectNode).put(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, generationId) + } + finalRecords.forEach { + (it as ObjectNode).put( + finalMetadataColumnNames.getOrDefault( + JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, + JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID + ), + generationId + ) + } + } + val schema: JsonNode = SCHEMA companion object {