diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java index 525b571c514d..653497145129 100644 --- a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java @@ -659,13 +659,11 @@ public void incrementalDedupNoCursor() throws Exception { final List actualRawRecords = dumpRawTableRecords(streamId); final List actualFinalRecords = dumpFinalTableRecords(streamId, ""); verifyRecordCounts( - 1, + 2, actualRawRecords, 1, actualFinalRecords); - assertAll( - () -> assertEquals("bar", actualRawRecords.get(0).get("_airbyte_data").get("string").asText()), - () -> assertEquals("bar", actualFinalRecords.get(0).get(generator.buildColumnId("string").name()).asText())); + assertEquals("bar", actualFinalRecords.get(0).get(generator.buildColumnId("string").name()).asText()); } @Test @@ -796,10 +794,9 @@ public void cdcComplexUpdate() throws Exception { destinationHandler.execute(sql); verifyRecordCounts( - // We keep the newest raw record per PK - 7, + 11, dumpRawTableRecords(streamId), - 5, + 6, dumpFinalTableRecords(streamId, "")); } @@ -824,11 +821,12 @@ public void testCdcOrdering_updateAfterDelete() throws Exception { streamId, BaseTypingDedupingTest.readRecords("sqlgenerator/cdcordering_updateafterdelete_inputrecords.jsonl")); - final String sql = generator.updateTable(cdcIncrementalDedupStream, "", Optional.empty()); + final Optional minTimestampForSync = destinationHandler.getMinTimestampForSync(cdcIncrementalAppendStream.id()); + final String sql = generator.updateTable(cdcIncrementalDedupStream, "", minTimestampForSync); destinationHandler.execute(sql); verifyRecordCounts( - 1, + 2, dumpRawTableRecords(streamId), 0, dumpFinalTableRecords(streamId, "")); @@ -861,11 +859,12 @@ public void testCdcOrdering_insertAfterDelete() throws Exception { "", BaseTypingDedupingTest.readRecords("sqlgenerator/cdcordering_insertafterdelete_inputrecords_final.jsonl")); - final String sql = generator.updateTable(cdcIncrementalDedupStream, "", Optional.empty()); + final Optional minTimestampForSync = destinationHandler.getMinTimestampForSync(cdcIncrementalAppendStream.id()); + final String sql = generator.updateTable(cdcIncrementalDedupStream, "", minTimestampForSync); destinationHandler.execute(sql); verifyRecordCounts( - 1, + 2, dumpRawTableRecords(streamId), 1, dumpFinalTableRecords(streamId, "")); diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java index ddb95b6e6268..5f9ad1d17061 100644 --- a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java @@ -4,6 +4,8 @@ package io.airbyte.integrations.base.destination.typing_deduping; +import static org.junit.jupiter.api.Assertions.assertAll; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; @@ -226,7 +228,7 @@ public void fullRefreshOverwrite() throws Exception { runSync(catalog, messages1); - final List expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_raw.jsonl"); + final List expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl"); final List expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl"); verifySyncResult(expectedRawRecords1, expectedFinalRecords1); @@ -261,7 +263,7 @@ public void fullRefreshAppend() throws Exception { runSync(catalog, messages1); - final List expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_raw.jsonl"); + final List expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl"); final List expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl"); verifySyncResult(expectedRawRecords1, expectedFinalRecords1); @@ -270,7 +272,7 @@ public void fullRefreshAppend() throws Exception { runSync(catalog, messages2); - final List expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_raw.jsonl"); + final List expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_raw.jsonl"); final List expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl"); verifySyncResult(expectedRawRecords2, expectedFinalRecords2); } @@ -300,7 +302,7 @@ public void incrementalAppend() throws Exception { runSync(catalog, messages1); - final List expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_raw.jsonl"); + final List expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl"); final List expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl"); verifySyncResult(expectedRawRecords1, expectedFinalRecords1); @@ -309,7 +311,7 @@ public void incrementalAppend() throws Exception { runSync(catalog, messages2); - final List expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_raw.jsonl"); + final List expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_raw.jsonl"); final List expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl"); verifySyncResult(expectedRawRecords2, expectedFinalRecords2); } @@ -337,7 +339,7 @@ public void incrementalDedup() throws Exception { runSync(catalog, messages1); - final List expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_dedup_raw.jsonl"); + final List expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl"); final List expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_dedup_final.jsonl"); verifySyncResult(expectedRawRecords1, expectedFinalRecords1); @@ -346,7 +348,7 @@ public void incrementalDedup() throws Exception { runSync(catalog, messages2); - final List expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_incremental_dedup_raw.jsonl"); + final List expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_raw.jsonl"); final List expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_incremental_dedup_final.jsonl"); verifySyncResult(expectedRawRecords2, expectedFinalRecords2); } @@ -372,7 +374,7 @@ public void incrementalDedupDefaultNamespace() throws Exception { runSync(catalog, messages1); - final List expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_dedup_raw.jsonl"); + final List expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl"); final List expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_dedup_final.jsonl"); verifySyncResult(expectedRawRecords1, expectedFinalRecords1, null, streamName); @@ -381,7 +383,7 @@ public void incrementalDedupDefaultNamespace() throws Exception { runSync(catalog, messages2); - final List expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_incremental_dedup_raw.jsonl"); + final List expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_raw.jsonl"); final List expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_incremental_dedup_final.jsonl"); verifySyncResult(expectedRawRecords2, expectedFinalRecords2, null, streamName); } @@ -424,7 +426,7 @@ public void testIncrementalSyncDropOneColumn() throws Exception { runSync(catalog, messages1); - final List expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_raw.jsonl"); + final List expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl"); final List expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl"); verifySyncResult(expectedRawRecords1, expectedFinalRecords1); @@ -437,7 +439,7 @@ public void testIncrementalSyncDropOneColumn() throws Exception { runSync(catalog, messages2); // The raw data is unaffected by the schema, but the final table should not have a `name` column. - final List expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_raw.jsonl"); + final List expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_raw.jsonl"); final List expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl").stream() .peek(record -> ((ObjectNode) record).remove(getSqlGenerator().buildColumnId("name").name())) .toList(); @@ -500,12 +502,12 @@ public void incrementalDedupIdenticalName() throws Exception { runSync(catalog, messages1); verifySyncResult( - readRecords("dat/sync1_expectedrecords_dedup_raw.jsonl"), + readRecords("dat/sync1_expectedrecords_raw.jsonl"), readRecords("dat/sync1_expectedrecords_dedup_final.jsonl"), namespace1, streamName); verifySyncResult( - readRecords("dat/sync1_expectedrecords_dedup_raw2.jsonl"), + readRecords("dat/sync1_expectedrecords_raw2.jsonl"), readRecords("dat/sync1_expectedrecords_dedup_final2.jsonl"), namespace2, streamName); @@ -518,12 +520,12 @@ public void incrementalDedupIdenticalName() throws Exception { runSync(catalog, messages2); verifySyncResult( - readRecords("dat/sync2_expectedrecords_incremental_dedup_raw.jsonl"), + readRecords("dat/sync2_expectedrecords_raw.jsonl"), readRecords("dat/sync2_expectedrecords_incremental_dedup_final.jsonl"), namespace1, streamName); verifySyncResult( - readRecords("dat/sync2_expectedrecords_incremental_dedup_raw2.jsonl"), + readRecords("dat/sync2_expectedrecords_raw2.jsonl"), readRecords("dat/sync2_expectedrecords_incremental_dedup_final2.jsonl"), namespace2, streamName); @@ -585,16 +587,15 @@ public void identicalNameSimultaneousSync() throws Exception { // And this will dump sync2's entire stdout to our stdout endSync(sync2); - verifySyncResult( - readRecords("dat/sync1_expectedrecords_dedup_raw.jsonl"), - readRecords("dat/sync1_expectedrecords_dedup_final.jsonl"), - namespace1, - streamName); - verifySyncResult( - readRecords("dat/sync1_expectedrecords_dedup_raw2.jsonl"), - readRecords("dat/sync1_expectedrecords_dedup_final2.jsonl"), - namespace2, - streamName); + // For simplicity, don't verify the raw table. Assume that if the final table is correct, then + // the raw data is correct. This is generally a safe assumption. + assertAll( + () -> DIFFER.diffFinalTableRecords( + readRecords("dat/sync1_expectedrecords_dedup_final.jsonl"), + dumpFinalTableRecords(namespace1, streamName)), + () -> DIFFER.diffFinalTableRecords( + readRecords("dat/sync1_expectedrecords_dedup_final2.jsonl"), + dumpFinalTableRecords(namespace2, streamName))); } @Test diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sqlgenerator/cdcupdate_inputrecords_raw.jsonl b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sqlgenerator/cdcupdate_inputrecords_raw.jsonl index 2327710d6e84..e5752b06c025 100644 --- a/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sqlgenerator/cdcupdate_inputrecords_raw.jsonl +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sqlgenerator/cdcupdate_inputrecords_raw.jsonl @@ -12,5 +12,5 @@ {"_airbyte_raw_id": "4d8674a5-eb6e-41ca-a310-69c64c88d101", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 0, "id2": 100, "updated_at": "2023-01-01T05:00:00Z", "_ab_cdc_deleted_at": null, "string": "zombie_returned"}} // CDC generally outputs an explicit null for deleted_at, but verify that we can also handle the case where deleted_at is unset. {"_airbyte_raw_id": "f0b59e49-8c74-4101-9f14-cb4d1193fd5a", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 4, "id2": 100, "updated_at": "2023-01-01T06:00:00Z", "string": "charlie"}} -// Verify that we can handle weird values in deleted_at +// Invalid values in _ab_cdc_deleted_at result in the record NOT being deleted. This behavior is up for debate, but it's an extreme edge case so not a high priority. {"_airbyte_raw_id": "d4e1d989-c115-403c-9e68-5d320e6376bb", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 5, "id2": 100, "updated_at": "2023-01-01T07:00:00Z", "_ab_cdc_deleted_at": {}, "string": "david1"}} diff --git a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml index e2bf97625512..0883b70f396b 100644 --- a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml +++ b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 - dockerImageTag: 2.1.6 + dockerImageTag: 2.2.0 dockerRepository: airbyte/destination-bigquery githubIssueLabel: destination-bigquery icon: bigquery.svg diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java index ec09c1569a39..90f6a7e09dba 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java @@ -425,12 +425,9 @@ private String updateTableQueryBuilder(final StreamConfig stream, final String insertNewRecords = insertNewRecords(stream, finalSuffix, stream.columns(), forceSafeCasting, minRawTimestamp); String dedupFinalTable = ""; String cdcDeletes = ""; - String dedupRawTable = ""; if (stream.destinationSyncMode() == DestinationSyncMode.APPEND_DEDUP) { - dedupRawTable = dedupRawTable(stream.id(), finalSuffix); - // If we're in dedup mode, then we must have a cursor dedupFinalTable = dedupFinalTable(stream.id(), finalSuffix, stream.primaryKey(), stream.cursor()); - cdcDeletes = cdcDeletes(stream, finalSuffix, stream.columns(), forceSafeCasting); + cdcDeletes = cdcDeletes(stream, finalSuffix); } final String commitRawTable = commitRawTable(stream.id(), minRawTimestamp); @@ -438,13 +435,11 @@ private String updateTableQueryBuilder(final StreamConfig stream, "insert_new_records", insertNewRecords, "dedup_final_table", dedupFinalTable, "cdc_deletes", cdcDeletes, - "dedupe_raw_table", dedupRawTable, "commit_raw_table", commitRawTable)).replace( """ BEGIN TRANSACTION; ${insert_new_records} ${dedup_final_table} - ${dedupe_raw_table} ${cdc_deletes} ${commit_raw_table} COMMIT TRANSACTION; @@ -457,12 +452,41 @@ String insertNewRecords(final StreamConfig stream, final LinkedHashMap streamColumns, final boolean forceSafeCasting, final Optional minRawTimestamp) { - final String columnCasts = streamColumns.entrySet().stream().map( + final String columnList = streamColumns.keySet().stream().map(quotedColumnId -> quotedColumnId.name(QUOTE) + ",").collect(joining("\n")); + final String extractNewRawRecords = extractNewRawRecords(stream, forceSafeCasting, minRawTimestamp); + + return new StringSubstitutor(Map.of( + "project_id", '`' + projectId + '`', + "final_table_id", stream.id().finalTableId(QUOTE, finalSuffix), + "column_list", columnList, + "extractNewRawRecords", extractNewRawRecords)).replace( + """ + INSERT INTO ${project_id}.${final_table_id} + ( + ${column_list} + _airbyte_meta, + _airbyte_raw_id, + _airbyte_extracted_at + ) + ${extractNewRawRecords};"""); + } + + /** + * A SQL SELECT statement that extracts new records from the raw table, casts their columns, and + * builds their airbyte_meta column. + *

+ * In dedup mode: Also extracts all raw CDC deletion records (for tombstoning purposes) and dedupes + * the records (since we only need the most-recent record to upsert). + */ + private String extractNewRawRecords(final StreamConfig stream, + final boolean forceSafeCasting, + final Optional minRawTimestamp) { + final String columnCasts = stream.columns().entrySet().stream().map( col -> extractAndCast(col.getKey(), col.getValue(), forceSafeCasting) + " as " + col.getKey().name(QUOTE) + ",") .collect(joining("\n")); final String columnErrors; if (forceSafeCasting) { - columnErrors = "[" + streamColumns.entrySet().stream().map( + columnErrors = "[" + stream.columns().entrySet().stream().map( col -> new StringSubstitutor(Map.of( "raw_col_name", escapeColumnNameForJsonPath(col.getKey().originalName()), "col_type", toDialectType(col.getValue()).name(), @@ -483,55 +507,99 @@ String insertNewRecords(final StreamConfig stream, columnErrors = "[]"; } - final String columnList = streamColumns.keySet().stream().map(quotedColumnId -> quotedColumnId.name(QUOTE) + ",").collect(joining("\n")); - - String cdcConditionalOrIncludeStatement = ""; - if (stream.destinationSyncMode() == DestinationSyncMode.APPEND_DEDUP && streamColumns.containsKey(CDC_DELETED_AT_COLUMN)) { - cdcConditionalOrIncludeStatement = """ - OR ( - _airbyte_loaded_at IS NOT NULL - AND JSON_VALUE(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NOT NULL - ) - """; - } - + final String columnList = stream.columns().keySet().stream().map(quotedColumnId -> quotedColumnId.name(QUOTE) + ",").collect(joining("\n")); final String extractedAtCondition = buildExtractedAtCondition(minRawTimestamp); - return new StringSubstitutor(Map.of( - "project_id", '`' + projectId + '`', - "raw_table_id", stream.id().rawTableId(QUOTE), - "final_table_id", stream.id().finalTableId(QUOTE, finalSuffix), - "column_casts", columnCasts, - "column_errors", columnErrors, - "cdcConditionalOrIncludeStatement", cdcConditionalOrIncludeStatement, - "extractedAtCondition", extractedAtCondition, - "column_list", columnList)).replace( - """ - INSERT INTO ${project_id}.${final_table_id} - ( - ${column_list} - _airbyte_meta, - _airbyte_raw_id, - _airbyte_extracted_at - ) - WITH intermediate_data AS ( + if (stream.destinationSyncMode() == DestinationSyncMode.APPEND_DEDUP) { + // When deduping, we need to dedup the raw records. Note the row_number() invocation in the SQL + // statement. Do the same extract+cast CTE + airbyte_meta construction as in non-dedup mode, but + // then add a row_number column so that we only take the most-recent raw record for each PK. + + // We also explicitly include old CDC deletion records, which act as tombstones to correctly delete + // out-of-order records. + String cdcConditionalOrIncludeStatement = ""; + if (stream.columns().containsKey(CDC_DELETED_AT_COLUMN)) { + cdcConditionalOrIncludeStatement = """ + OR ( + _airbyte_loaded_at IS NOT NULL + AND JSON_VALUE(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NOT NULL + ) + """; + } + + final String pkList = stream.primaryKey().stream().map(columnId -> columnId.name(QUOTE)).collect(joining(",")); + final String cursorOrderClause = stream.cursor() + .map(cursorId -> cursorId.name(QUOTE) + " DESC NULLS LAST,") + .orElse(""); + + return new StringSubstitutor(Map.of( + "project_id", '`' + projectId + '`', + "raw_table_id", stream.id().rawTableId(QUOTE), + "column_casts", columnCasts, + "column_errors", columnErrors, + "cdcConditionalOrIncludeStatement", cdcConditionalOrIncludeStatement, + "extractedAtCondition", extractedAtCondition, + "column_list", columnList, + "pk_list", pkList, + "cursor_order_clause", cursorOrderClause)).replace( + """ + WITH intermediate_data AS ( + SELECT + ${column_casts} + ${column_errors} AS column_errors, + _airbyte_raw_id, + _airbyte_extracted_at + FROM ${project_id}.${raw_table_id} + WHERE ( + _airbyte_loaded_at IS NULL + ${cdcConditionalOrIncludeStatement} + ) ${extractedAtCondition} + ), new_records AS ( + SELECT + ${column_list} + to_json(struct(COALESCE((SELECT ARRAY_AGG(unnested_column_errors IGNORE NULLS) FROM UNNEST(column_errors) unnested_column_errors), []) AS errors)) AS _airbyte_meta, + _airbyte_raw_id, + _airbyte_extracted_at + FROM intermediate_data + ), numbered_rows AS ( + SELECT *, row_number() OVER ( + PARTITION BY ${pk_list} ORDER BY ${cursor_order_clause} `_airbyte_extracted_at` DESC + ) AS row_number + FROM new_records + ) + SELECT ${column_list} _airbyte_meta, _airbyte_raw_id, _airbyte_extracted_at + FROM numbered_rows + WHERE row_number = 1"""); + } else { + // When not deduplicating, we just need to handle type casting. + // Extract+cast the not-yet-loaded records in a CTE, then select that CTE and build airbyte_meta. + + return new StringSubstitutor(Map.of( + "project_id", '`' + projectId + '`', + "raw_table_id", stream.id().rawTableId(QUOTE), + "column_casts", columnCasts, + "column_errors", columnErrors, + "extractedAtCondition", extractedAtCondition, + "column_list", columnList)).replace( + """ + WITH intermediate_data AS ( + SELECT + ${column_casts} + ${column_errors} AS column_errors, + _airbyte_raw_id, + _airbyte_extracted_at + FROM ${project_id}.${raw_table_id} + WHERE + _airbyte_loaded_at IS NULL + ${extractedAtCondition} + ) SELECT - ${column_casts} - ${column_errors} AS column_errors, - _airbyte_raw_id, - _airbyte_extracted_at - FROM ${project_id}.${raw_table_id} - WHERE - _airbyte_loaded_at IS NULL - ${cdcConditionalOrIncludeStatement} - ${extractedAtCondition} - ) - SELECT - ${column_list} - to_json(struct(COALESCE((SELECT ARRAY_AGG(unnested_column_errors IGNORE NULLS) FROM UNNEST(column_errors) unnested_column_errors), []) AS errors)) AS _airbyte_meta, - _airbyte_raw_id, - _airbyte_extracted_at - FROM intermediate_data;"""); + ${column_list} + to_json(struct(COALESCE((SELECT ARRAY_AGG(unnested_column_errors IGNORE NULLS) FROM UNNEST(column_errors) unnested_column_errors), []) AS errors)) AS _airbyte_meta, + _airbyte_raw_id, + _airbyte_extracted_at + FROM intermediate_data"""); + } } private static String buildExtractedAtCondition(final Optional minRawTimestamp) { @@ -569,63 +637,20 @@ String dedupFinalTable(final StreamId id, ;"""); } - @VisibleForTesting - String cdcDeletes(final StreamConfig stream, - final String finalSuffix, - final LinkedHashMap streamColumns, - final boolean forceSafeCasting) { - + private String cdcDeletes(final StreamConfig stream, final String finalSuffix) { if (stream.destinationSyncMode() != DestinationSyncMode.APPEND_DEDUP) { return ""; } - - if (!streamColumns.containsKey(CDC_DELETED_AT_COLUMN)) { + if (!stream.columns().containsKey(CDC_DELETED_AT_COLUMN)) { return ""; } - final String pkList = stream.primaryKey().stream().map(columnId -> columnId.name(QUOTE)).collect(joining(",")); - final String pkCasts = - stream.primaryKey().stream().map(pk -> extractAndCast(pk, streamColumns.get(pk), forceSafeCasting)).collect(joining(",\n")); - - // we want to grab IDs for deletion from the raw table (not the final table itself) to hand - // out-of-order record insertions after the delete has been registered return new StringSubstitutor(Map.of( "project_id", '`' + projectId + '`', - "final_table_id", stream.id().finalTableId(QUOTE, finalSuffix), - "raw_table_id", stream.id().rawTableId(QUOTE), - "pk_list", pkList, - "pk_extracts", pkCasts, - "quoted_cdc_delete_column", QUOTE + "_ab_cdc_deleted_at" + QUOTE)).replace( + "final_table_id", stream.id().finalTableId(QUOTE, finalSuffix))).replace( """ DELETE FROM ${project_id}.${final_table_id} - WHERE - (${pk_list}) IN ( - SELECT ( - ${pk_extracts} - ) - FROM ${project_id}.${raw_table_id} - WHERE JSON_TYPE(PARSE_JSON(JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at'), wide_number_mode=>'round')) != 'null' - ) - ;"""); - } - - @VisibleForTesting - String dedupRawTable(final StreamId id, final String finalSuffix) { - return new StringSubstitutor(Map.of( - "project_id", '`' + projectId + '`', - "raw_table_id", id.rawTableId(QUOTE), - "final_table_id", id.finalTableId(QUOTE, finalSuffix))).replace( - // Note that this leaves _all_ deletion records in the raw table. We _could_ clear them out, but it - // would be painful, - // and it only matters in a few edge cases. - """ - DELETE FROM - ${project_id}.${raw_table_id} - WHERE - `_airbyte_raw_id` NOT IN ( - SELECT `_airbyte_raw_id` FROM ${project_id}.${final_table_id} - ) - ;"""); + WHERE _ab_cdc_deleted_at IS NOT NULL;"""); } @VisibleForTesting @@ -724,7 +749,7 @@ private static String cast(final String content, final String asType, final bool return wrap(open, content + " as " + asType, ")"); } - private static Set getPks(StreamConfig stream) { + private static Set getPks(final StreamConfig stream) { return stream.primaryKey() != null ? stream.primaryKey().stream().map(ColumnId::name).collect(Collectors.toSet()) : Collections.emptySet(); } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/AbstractBigQueryTypingDedupingTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/AbstractBigQueryTypingDedupingTest.java index 3e44f36f6789..b6cd48b326f2 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/AbstractBigQueryTypingDedupingTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/AbstractBigQueryTypingDedupingTest.java @@ -113,7 +113,7 @@ public void testRawTableJsonToStringMigration() throws Exception { // 1.9.0 is known-good, but we might as well check that we're in good shape before continuing. // If this starts erroring out because we added more test records and 1.9.0 had a latent bug, // just delete these three lines :P - final List expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_raw.jsonl"); + final List expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl"); final List expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl"); verifySyncResult(expectedRawRecords1, expectedFinalRecords1); @@ -122,7 +122,7 @@ public void testRawTableJsonToStringMigration() throws Exception { runSync(catalog, messages2); - final List expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_raw.jsonl"); + final List expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_raw.jsonl"); final List expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl"); verifySyncResult(expectedRawRecords2, expectedFinalRecords2); } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync1_cursorchange_expectedrecords_dedup_raw.jsonl b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync1_cursorchange_expectedrecords_dedup_raw.jsonl index a9bf479e4e3e..916b0cb278b4 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync1_cursorchange_expectedrecords_dedup_raw.jsonl +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync1_cursorchange_expectedrecords_dedup_raw.jsonl @@ -1,3 +1,4 @@ +{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 0, "_ab_cdc_deleted_at": null, "name" :"Alice", "address": {"city": "San Francisco", "state": "CA"}}} {"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 1, "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}} {"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": {"city": "Boston", "state": "MA"}}} {"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync1_expectedrecords_dedup_raw.jsonl b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync1_expectedrecords_dedup_raw.jsonl deleted file mode 100644 index 88411c9e4de3..000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync1_expectedrecords_dedup_raw.jsonl +++ /dev/null @@ -1,4 +0,0 @@ -// Keep the Alice record with more recent updated_at -{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}} -{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}} -{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync1_expectedrecords_nondedup_raw.jsonl b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync1_expectedrecords_raw.jsonl similarity index 92% rename from airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync1_expectedrecords_nondedup_raw.jsonl rename to airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync1_expectedrecords_raw.jsonl index 4b4db08115e5..569905e1f03d 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync1_expectedrecords_nondedup_raw.jsonl +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync1_expectedrecords_raw.jsonl @@ -1,5 +1,4 @@ {"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}}} -// Note the duplicate record. In this sync mode, we don't dedup anything. {"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}} {"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}} // Invalid data is still allowed in the raw table. diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync1_expectedrecords_dedup_raw2.jsonl b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync1_expectedrecords_raw2.jsonl similarity index 100% rename from airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync1_expectedrecords_dedup_raw2.jsonl rename to airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync1_expectedrecords_raw2.jsonl diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync2_cursorchange_expectedrecords_incremental_dedup_raw.jsonl b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync2_cursorchange_expectedrecords_incremental_dedup_raw.jsonl index 4f3f04233ec1..62648ec30fa3 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync2_cursorchange_expectedrecords_incremental_dedup_raw.jsonl +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync2_cursorchange_expectedrecords_incremental_dedup_raw.jsonl @@ -1,4 +1,7 @@ +{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 0, "_ab_cdc_deleted_at": null, "name" :"Alice", "address": {"city": "San Francisco", "state": "CA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 1, "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": {"city": "Boston", "state": "MA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}} {"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Bob", "address": {"city": "New York", "state": "NY"}}} {"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z"}} -// Charlie wasn't reemitted in sync2. This record still has an old_cursor value. -{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_raw.jsonl b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_raw.jsonl deleted file mode 100644 index 5a3209db5e22..000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_raw.jsonl +++ /dev/null @@ -1,5 +0,0 @@ -{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}} -// Keep the record that deleted Bob, but delete the other records associated with id=(1, 201) -{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z"}} -// And keep Charlie's record, even though it wasn't reemitted in sync2. -{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_append_raw.jsonl b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync2_expectedrecords_raw.jsonl similarity index 100% rename from airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_append_raw.jsonl rename to airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync2_expectedrecords_raw.jsonl diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_raw2.jsonl b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync2_expectedrecords_raw2.jsonl similarity index 50% rename from airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_raw2.jsonl rename to airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync2_expectedrecords_raw2.jsonl index 2959ecef3f69..b8c566d38761 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_raw2.jsonl +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync2_expectedrecords_raw2.jsonl @@ -1 +1,2 @@ +{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2001-01-01T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Someone completely different"}} {"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2001-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Someone completely different v2"}} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sqlgenerator/incrementaldedup_expectedrecords_raw.jsonl b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sqlgenerator/incrementaldedup_expectedrecords_raw.jsonl index e2c19ff210a9..c8291c59fc89 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sqlgenerator/incrementaldedup_expectedrecords_raw.jsonl +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sqlgenerator/incrementaldedup_expectedrecords_raw.jsonl @@ -1,2 +1,3 @@ +{"_airbyte_raw_id": "d7b81af0-01da-4846-a650-cc398986bc99", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "string": "Alice", "struct": {"city": "San Francisco", "state": "CA"}, "integer": 42}} {"_airbyte_raw_id": "80c99b54-54b4-43bd-b51b-1f67dafa2c52", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T02:00:00Z", "string": "Alice", "struct": {"city": "San Diego", "state": "CA"}, "integer": 84}} {"_airbyte_raw_id": "ad690bfb-c2c2-4172-bd73-a16c86ccbb67", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 2, "id2": 100, "updated_at": "2023-01-01T03:00:00Z", "string": "Bob", "integer": "oops"}} diff --git a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml index b23efb947b1a..19d78f9df2fb 100644 --- a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml +++ b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 424892c4-daac-4491-b35d-c6688ba547ba - dockerImageTag: 3.2.3 + dockerImageTag: 3.3.0 dockerRepository: airbyte/destination-snowflake githubIssueLabel: destination-snowflake icon: snowflake.svg diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java index e7a6c291dad8..7c11ea0cfdc5 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java @@ -162,12 +162,9 @@ public String updateTable(final StreamConfig stream, final String finalSuffix, f final String insertNewRecords = insertNewRecords(stream, finalSuffix, stream.columns(), minRawTimestamp); String dedupFinalTable = ""; String cdcDeletes = ""; - String dedupRawTable = ""; if (stream.destinationSyncMode() == DestinationSyncMode.APPEND_DEDUP) { - dedupRawTable = dedupRawTable(stream.id(), finalSuffix); - // If we're in dedup mode, then we must have a cursor dedupFinalTable = dedupFinalTable(stream.id(), finalSuffix, stream.primaryKey(), stream.cursor()); - cdcDeletes = cdcDeletes(stream, finalSuffix, stream.columns()); + cdcDeletes = cdcDeletes(stream, finalSuffix); } final String commitRawTable = commitRawTable(stream.id(), minRawTimestamp); @@ -175,13 +172,11 @@ public String updateTable(final StreamConfig stream, final String finalSuffix, f "insert_new_records", insertNewRecords, "dedup_final_table", dedupFinalTable, "cdc_deletes", cdcDeletes, - "dedupe_raw_table", dedupRawTable, "commit_raw_table", commitRawTable)).replace( """ BEGIN TRANSACTION; ${insert_new_records} ${dedup_final_table} - ${dedupe_raw_table} ${cdc_deletes} ${commit_raw_table} COMMIT; @@ -302,10 +297,29 @@ String insertNewRecords(final StreamConfig stream, final String finalSuffix, final LinkedHashMap streamColumns, final Optional minRawTimestamp) { - final String columnCasts = streamColumns.entrySet().stream().map( + final String columnList = streamColumns.keySet().stream().map(quotedColumnId -> quotedColumnId.name(QUOTE) + ",").collect(joining("\n")); + final String extractNewRawRecords = extractNewRawRecords(stream, minRawTimestamp); + + return new StringSubstitutor(Map.of( + "final_table_id", stream.id().finalTableId(QUOTE, finalSuffix.toUpperCase()), + "column_list", columnList, + "extractNewRawRecords", extractNewRawRecords)).replace( + """ + INSERT INTO ${final_table_id} + ( + ${column_list} + "_AIRBYTE_META", + "_AIRBYTE_RAW_ID", + "_AIRBYTE_EXTRACTED_AT" + ) + ${extractNewRawRecords};"""); + } + + private String extractNewRawRecords(final StreamConfig stream, final Optional minRawTimestamp) { + final String columnCasts = stream.columns().entrySet().stream().map( col -> extractAndCast(col.getKey(), col.getValue()) + " as " + col.getKey().name(QUOTE) + ",") .collect(joining("\n")); - final String columnErrors = streamColumns.entrySet().stream().map( + final String columnErrors = stream.columns().entrySet().stream().map( col -> new StringSubstitutor(Map.of( "raw_col_name", escapeJsonIdentifier(col.getKey().originalName()), "printable_col_name", escapeSingleQuotedString(col.getKey().originalName()), @@ -320,54 +334,88 @@ String insertNewRecords(final StreamConfig stream, ELSE NULL END""")) .collect(joining(",\n")); - final String columnList = streamColumns.keySet().stream().map(quotedColumnId -> quotedColumnId.name(QUOTE) + ",").collect(joining("\n")); - - String cdcConditionalOrIncludeStatement = ""; - if (stream.destinationSyncMode() == DestinationSyncMode.APPEND_DEDUP && streamColumns.containsKey(CDC_DELETED_AT_COLUMN)) { - cdcConditionalOrIncludeStatement = """ - OR ( - "_airbyte_loaded_at" IS NOT NULL - AND TYPEOF("_airbyte_data":"_ab_cdc_deleted_at") NOT IN ('NULL', 'NULL_VALUE') - ) - """; - } - + final String columnList = stream.columns().keySet().stream().map(quotedColumnId -> quotedColumnId.name(QUOTE) + ",").collect(joining("\n")); final String extractedAtCondition = buildExtractedAtCondition(minRawTimestamp); - return new StringSubstitutor(Map.of( - "raw_table_id", stream.id().rawTableId(QUOTE), - "final_table_id", stream.id().finalTableId(QUOTE, finalSuffix.toUpperCase()), - "column_casts", columnCasts, - "column_errors", columnErrors, - "cdcConditionalOrIncludeStatement", cdcConditionalOrIncludeStatement, - "extractedAtCondition", extractedAtCondition, - "column_list", columnList)).replace( - """ - INSERT INTO ${final_table_id} - ( - ${column_list} - "_AIRBYTE_META", - "_AIRBYTE_RAW_ID", - "_AIRBYTE_EXTRACTED_AT" - ) - WITH intermediate_data AS ( + if (stream.destinationSyncMode() == DestinationSyncMode.APPEND_DEDUP) { + String cdcConditionalOrIncludeStatement = ""; + if (stream.columns().containsKey(CDC_DELETED_AT_COLUMN)) { + cdcConditionalOrIncludeStatement = """ + OR ( + "_airbyte_loaded_at" IS NOT NULL + AND TYPEOF("_airbyte_data":"_ab_cdc_deleted_at") NOT IN ('NULL', 'NULL_VALUE') + ) + """; + } + + final String pkList = stream.primaryKey().stream().map(columnId -> columnId.name(QUOTE)).collect(joining(",")); + final String cursorOrderClause = stream.cursor() + .map(cursorId -> cursorId.name(QUOTE) + " DESC NULLS LAST,") + .orElse(""); + + return new StringSubstitutor(Map.of( + "raw_table_id", stream.id().rawTableId(QUOTE), + "column_casts", columnCasts, + "column_errors", columnErrors, + "cdcConditionalOrIncludeStatement", cdcConditionalOrIncludeStatement, + "extractedAtCondition", extractedAtCondition, + "column_list", columnList, + "pk_list", pkList, + "cursor_order_clause", cursorOrderClause)).replace( + """ + WITH intermediate_data AS ( + SELECT + ${column_casts} + ARRAY_CONSTRUCT_COMPACT(${column_errors}) as "_airbyte_cast_errors", + "_airbyte_raw_id", + "_airbyte_extracted_at" + FROM ${raw_table_id} + WHERE ( + "_airbyte_loaded_at" IS NULL + ${cdcConditionalOrIncludeStatement} + ) ${extractedAtCondition} + ), new_records AS ( + SELECT + ${column_list} + OBJECT_CONSTRUCT('errors', "_airbyte_cast_errors") AS "_AIRBYTE_META", + "_airbyte_raw_id" AS "_AIRBYTE_RAW_ID", + "_airbyte_extracted_at" AS "_AIRBYTE_EXTRACTED_AT" + FROM intermediate_data + ), numbered_rows AS ( + SELECT *, row_number() OVER ( + PARTITION BY ${pk_list} ORDER BY ${cursor_order_clause} "_AIRBYTE_EXTRACTED_AT" DESC + ) AS row_number + FROM new_records + ) + SELECT ${column_list} "_AIRBYTE_META", "_AIRBYTE_RAW_ID", "_AIRBYTE_EXTRACTED_AT" + FROM numbered_rows + WHERE row_number = 1"""); + } else { + return new StringSubstitutor(Map.of( + "raw_table_id", stream.id().rawTableId(QUOTE), + "column_casts", columnCasts, + "column_errors", columnErrors, + "extractedAtCondition", extractedAtCondition, + "column_list", columnList)).replace( + """ + WITH intermediate_data AS ( + SELECT + ${column_casts} + ARRAY_CONSTRUCT_COMPACT(${column_errors}) as "_airbyte_cast_errors", + "_airbyte_raw_id", + "_airbyte_extracted_at" + FROM ${raw_table_id} + WHERE + "_airbyte_loaded_at" IS NULL + ${extractedAtCondition} + ) SELECT - ${column_casts} - ARRAY_CONSTRUCT_COMPACT(${column_errors}) as "_airbyte_cast_errors", - "_airbyte_raw_id", - "_airbyte_extracted_at" - FROM ${raw_table_id} - WHERE - "_airbyte_loaded_at" IS NULL - ${cdcConditionalOrIncludeStatement} - ${extractedAtCondition} - ) - SELECT - ${column_list} - OBJECT_CONSTRUCT('errors', "_airbyte_cast_errors") AS "_AIRBYTE_META", - "_airbyte_raw_id" AS "_AIRBYTE_RAW_ID", - "_airbyte_extracted_at" AS "_AIRBYTE_EXTRACTED_AT" - FROM intermediate_data;"""); + ${column_list} + OBJECT_CONSTRUCT('errors', "_airbyte_cast_errors") AS "_AIRBYTE_META", + "_airbyte_raw_id" AS "_AIRBYTE_RAW_ID", + "_airbyte_extracted_at" AS "_AIRBYTE_EXTRACTED_AT" + FROM intermediate_data"""); + } } private static String buildExtractedAtCondition(final Optional minRawTimestamp) { @@ -403,55 +451,21 @@ String dedupFinalTable(final StreamId id, """); } - @VisibleForTesting - String cdcDeletes(final StreamConfig stream, - final String finalSuffix, - final LinkedHashMap streamColumns) { - + private String cdcDeletes(final StreamConfig stream, final String finalSuffix) { if (stream.destinationSyncMode() != DestinationSyncMode.APPEND_DEDUP) { return ""; } - - if (!streamColumns.containsKey(CDC_DELETED_AT_COLUMN)) { + if (!stream.columns().containsKey(CDC_DELETED_AT_COLUMN)) { return ""; } - final String pkList = stream.primaryKey().stream().map(columnId -> columnId.name(QUOTE)).collect(joining(",")); - final String pkCasts = stream.primaryKey().stream().map(pk -> extractAndCast(pk, streamColumns.get(pk))).collect(joining(",\n")); - // we want to grab IDs for deletion from the raw table (not the final table itself) to hand // out-of-order record insertions after the delete has been registered return new StringSubstitutor(Map.of( - "final_table_id", stream.id().finalTableId(QUOTE, finalSuffix.toUpperCase()), - "raw_table_id", stream.id().rawTableId(QUOTE), - "pk_list", pkList, - "pk_extracts", pkCasts, - "quoted_cdc_delete_column", QUOTE + "_ab_cdc_deleted_at" + QUOTE)).replace( + "final_table_id", stream.id().finalTableId(QUOTE, finalSuffix.toUpperCase()))).replace( """ DELETE FROM ${final_table_id} - WHERE ARRAY_CONSTRUCT(${pk_list}) IN ( - SELECT ARRAY_CONSTRUCT( - ${pk_extracts} - ) - FROM ${raw_table_id} - WHERE "_airbyte_data":"_ab_cdc_deleted_at" != 'null' - ); - """); - } - - @VisibleForTesting - String dedupRawTable(final StreamId id, final String finalSuffix) { - return new StringSubstitutor(Map.of( - "raw_table_id", id.rawTableId(QUOTE), - "final_table_id", id.finalTableId(QUOTE, finalSuffix.toUpperCase()))).replace( - // Note that this leaves _all_ deletion records in the raw table. We _could_ clear them out, but it - // would be painful, - // and it only matters in a few edge cases. - """ - DELETE FROM ${raw_table_id} - WHERE "_airbyte_raw_id" NOT IN ( - SELECT "_AIRBYTE_RAW_ID" FROM ${final_table_id} - ); + WHERE _AB_CDC_DELETED_AT IS NOT NULL; """); } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.java index ea8616c66489..10ded7edb721 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.java @@ -149,7 +149,7 @@ public void testFinalTableUppercasingMigration_append() throws Exception { runSync(catalog, messages2); - final List expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_raw.jsonl"); + final List expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_raw.jsonl"); final List expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl"); verifySyncResult(expectedRawRecords2, expectedFinalRecords2); } finally { diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_cursorchange_expectedrecords_dedup_raw.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_cursorchange_expectedrecords_dedup_raw.jsonl index 4a370ae69377..fcf596ac0380 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_cursorchange_expectedrecords_dedup_raw.jsonl +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_cursorchange_expectedrecords_dedup_raw.jsonl @@ -1,3 +1,4 @@ +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 0, "_ab_cdc_deleted_at": null, "name" :"Alice", "address": {"city": "San Francisco", "state": "CA"}}} {"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 1, "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}} {"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": {"city": "Boston", "state": "MA"}}} {"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_dedup_raw.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_dedup_raw.jsonl deleted file mode 100644 index 7f6e7aa9438b..000000000000 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_dedup_raw.jsonl +++ /dev/null @@ -1,6 +0,0 @@ -// Keep the Alice record with more recent updated_at -// Note that extracted_at uses microseconds precision (because it's parsed directly by snowflake) -// but updated_at is still using seconds precision (because Snowflake treats it as a normal string inside a JSON blob) -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}} -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}} -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_nondedup_raw.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_raw.jsonl similarity index 100% rename from airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_nondedup_raw.jsonl rename to airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_raw.jsonl diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_dedup_raw2.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_raw2.jsonl similarity index 100% rename from airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_dedup_raw2.jsonl rename to airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_raw2.jsonl diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_cursorchange_expectedrecords_incremental_dedup_raw.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_cursorchange_expectedrecords_incremental_dedup_raw.jsonl index fc788ceeb4a5..347a9248d265 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_cursorchange_expectedrecords_incremental_dedup_raw.jsonl +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_cursorchange_expectedrecords_incremental_dedup_raw.jsonl @@ -1,4 +1,7 @@ +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 0, "_ab_cdc_deleted_at": null, "name" :"Alice", "address": {"city": "San Francisco", "state": "CA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 1, "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": {"city": "Boston", "state": "MA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}} {"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Bob", "address": {"city": "New York", "state": "NY"}}} {"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z"}} -// Charlie wasn't reemitted in sync2. This record still has an old_cursor value. -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_raw.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_raw.jsonl deleted file mode 100644 index fe2377ede753..000000000000 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_raw.jsonl +++ /dev/null @@ -1,5 +0,0 @@ -{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}} -// Keep the record that deleted Bob, but delete the other records associated with id=(1, 201) -{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z"}} -// And keep Charlie's record, even though it wasn't reemitted in sync2. -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_append_raw.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_raw.jsonl similarity index 100% rename from airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_append_raw.jsonl rename to airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_raw.jsonl diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_raw2.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_raw2.jsonl similarity index 50% rename from airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_raw2.jsonl rename to airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_raw2.jsonl index 66389b0799d6..4d2e3167888c 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_raw2.jsonl +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_raw2.jsonl @@ -1 +1,2 @@ +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2001-01-01T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Someone completely different"}} {"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2001-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Someone completely different v2"}} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/sqlgenerator/incrementaldedup_expectedrecords_raw.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/sqlgenerator/incrementaldedup_expectedrecords_raw.jsonl index 0cdce4a5e75e..cea4f178f80c 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/sqlgenerator/incrementaldedup_expectedrecords_raw.jsonl +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/sqlgenerator/incrementaldedup_expectedrecords_raw.jsonl @@ -1,2 +1,3 @@ +{"_airbyte_raw_id": "d7b81af0-01da-4846-a650-cc398986bc99", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "string": "Alice", "struct": {"city": "San Francisco", "state": "CA"}, "integer": 42}} {"_airbyte_raw_id": "80c99b54-54b4-43bd-b51b-1f67dafa2c52", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T02:00:00Z", "string": "Alice", "struct": {"city": "San Diego", "state": "CA"}, "integer": 84}} {"_airbyte_raw_id": "ad690bfb-c2c2-4172-bd73-a16c86ccbb67", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_data": {"id1": 2, "id2": 100, "updated_at": "2023-01-01T03:00:00Z", "string": "Bob", "integer": "oops"}} diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index 813c2c7d5c97..f1a8cfd2cc7d 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -127,6 +127,7 @@ Now that you have set up the BigQuery destination connector, check out the follo | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 2.2.0 | 2023-10-25 | [\#31520](https://github.com/airbytehq/airbyte/pull/31520) | Stop deduping raw table | | 2.1.6 | 2023-10-23 | [\#31717](https://github.com/airbytehq/airbyte/pull/31717) | Remove inadvertent Destination v2 check | | 2.1.5 | 2023-10-17 | [\#30069](https://github.com/airbytehq/airbyte/pull/30069) | Staging destination async | | 2.1.4 | 2023-10-17 | [\#31191](https://github.com/airbytehq/airbyte/pull/31191) | Improve typing+deduping performance by filtering new raw records on extracted_at | diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index 850febacfc6d..dc1e21b8104b 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -271,6 +271,7 @@ Otherwise, make sure to grant the role the required permissions in the desired n | Version | Date | Pull Request | Subject | |:----------------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.3.0 | 2023-10-25 | [\#31520](https://github.com/airbytehq/airbyte/pull/31520) | Stop deduping raw table | | 3.2.3 | 2023-10-17 | [\#31191](https://github.com/airbytehq/airbyte/pull/31191) | Improve typing+deduping performance by filtering new raw records on extracted_at | | 3.2.2 | 2023-10-10 | [\#31194](https://github.com/airbytehq/airbyte/pull/31194) | Deallocate unused per stream buffer memory when empty | | 3.2.1 | 2023-10-10 | [\#31083](https://github.com/airbytehq/airbyte/pull/31083) | Fix precision of numeric values in async destinations |