Skip to content

Commit

Permalink
Destinations Bigquery+Snowflake: Do not dedup raw table (#31520)
Browse files Browse the repository at this point in the history
Co-authored-by: edgao <[email protected]>
  • Loading branch information
edgao and edgao authored Oct 25, 2023
1 parent 534ccb1 commit 148dda1
Show file tree
Hide file tree
Showing 29 changed files with 292 additions and 260 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -659,13 +659,11 @@ public void incrementalDedupNoCursor() throws Exception {
final List<JsonNode> actualRawRecords = dumpRawTableRecords(streamId);
final List<JsonNode> actualFinalRecords = dumpFinalTableRecords(streamId, "");
verifyRecordCounts(
1,
2,
actualRawRecords,
1,
actualFinalRecords);
assertAll(
() -> assertEquals("bar", actualRawRecords.get(0).get("_airbyte_data").get("string").asText()),
() -> assertEquals("bar", actualFinalRecords.get(0).get(generator.buildColumnId("string").name()).asText()));
assertEquals("bar", actualFinalRecords.get(0).get(generator.buildColumnId("string").name()).asText());
}

@Test
Expand Down Expand Up @@ -796,10 +794,9 @@ public void cdcComplexUpdate() throws Exception {
destinationHandler.execute(sql);

verifyRecordCounts(
// We keep the newest raw record per PK
7,
11,
dumpRawTableRecords(streamId),
5,
6,
dumpFinalTableRecords(streamId, ""));
}

Expand All @@ -824,11 +821,12 @@ public void testCdcOrdering_updateAfterDelete() throws Exception {
streamId,
BaseTypingDedupingTest.readRecords("sqlgenerator/cdcordering_updateafterdelete_inputrecords.jsonl"));

final String sql = generator.updateTable(cdcIncrementalDedupStream, "", Optional.empty());
final Optional<Instant> minTimestampForSync = destinationHandler.getMinTimestampForSync(cdcIncrementalAppendStream.id());
final String sql = generator.updateTable(cdcIncrementalDedupStream, "", minTimestampForSync);
destinationHandler.execute(sql);

verifyRecordCounts(
1,
2,
dumpRawTableRecords(streamId),
0,
dumpFinalTableRecords(streamId, ""));
Expand Down Expand Up @@ -861,11 +859,12 @@ public void testCdcOrdering_insertAfterDelete() throws Exception {
"",
BaseTypingDedupingTest.readRecords("sqlgenerator/cdcordering_insertafterdelete_inputrecords_final.jsonl"));

final String sql = generator.updateTable(cdcIncrementalDedupStream, "", Optional.empty());
final Optional<Instant> minTimestampForSync = destinationHandler.getMinTimestampForSync(cdcIncrementalAppendStream.id());
final String sql = generator.updateTable(cdcIncrementalDedupStream, "", minTimestampForSync);
destinationHandler.execute(sql);

verifyRecordCounts(
1,
2,
dumpRawTableRecords(streamId),
1,
dumpFinalTableRecords(streamId, ""));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.base.destination.typing_deduping;

import static org.junit.jupiter.api.Assertions.assertAll;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -226,7 +228,7 @@ public void fullRefreshOverwrite() throws Exception {

runSync(catalog, messages1);

final List<JsonNode> expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_raw.jsonl");
final List<JsonNode> expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl");
final List<JsonNode> expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl");
verifySyncResult(expectedRawRecords1, expectedFinalRecords1);

Expand Down Expand Up @@ -261,7 +263,7 @@ public void fullRefreshAppend() throws Exception {

runSync(catalog, messages1);

final List<JsonNode> expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_raw.jsonl");
final List<JsonNode> expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl");
final List<JsonNode> expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl");
verifySyncResult(expectedRawRecords1, expectedFinalRecords1);

Expand All @@ -270,7 +272,7 @@ public void fullRefreshAppend() throws Exception {

runSync(catalog, messages2);

final List<JsonNode> expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_raw.jsonl");
final List<JsonNode> expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_raw.jsonl");
final List<JsonNode> expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl");
verifySyncResult(expectedRawRecords2, expectedFinalRecords2);
}
Expand Down Expand Up @@ -300,7 +302,7 @@ public void incrementalAppend() throws Exception {

runSync(catalog, messages1);

final List<JsonNode> expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_raw.jsonl");
final List<JsonNode> expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl");
final List<JsonNode> expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl");
verifySyncResult(expectedRawRecords1, expectedFinalRecords1);

Expand All @@ -309,7 +311,7 @@ public void incrementalAppend() throws Exception {

runSync(catalog, messages2);

final List<JsonNode> expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_raw.jsonl");
final List<JsonNode> expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_raw.jsonl");
final List<JsonNode> expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl");
verifySyncResult(expectedRawRecords2, expectedFinalRecords2);
}
Expand Down Expand Up @@ -337,7 +339,7 @@ public void incrementalDedup() throws Exception {

runSync(catalog, messages1);

final List<JsonNode> expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_dedup_raw.jsonl");
final List<JsonNode> expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl");
final List<JsonNode> expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_dedup_final.jsonl");
verifySyncResult(expectedRawRecords1, expectedFinalRecords1);

Expand All @@ -346,7 +348,7 @@ public void incrementalDedup() throws Exception {

runSync(catalog, messages2);

final List<JsonNode> expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_incremental_dedup_raw.jsonl");
final List<JsonNode> expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_raw.jsonl");
final List<JsonNode> expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_incremental_dedup_final.jsonl");
verifySyncResult(expectedRawRecords2, expectedFinalRecords2);
}
Expand All @@ -372,7 +374,7 @@ public void incrementalDedupDefaultNamespace() throws Exception {

runSync(catalog, messages1);

final List<JsonNode> expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_dedup_raw.jsonl");
final List<JsonNode> expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl");
final List<JsonNode> expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_dedup_final.jsonl");
verifySyncResult(expectedRawRecords1, expectedFinalRecords1, null, streamName);

Expand All @@ -381,7 +383,7 @@ public void incrementalDedupDefaultNamespace() throws Exception {

runSync(catalog, messages2);

final List<JsonNode> expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_incremental_dedup_raw.jsonl");
final List<JsonNode> expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_raw.jsonl");
final List<JsonNode> expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_incremental_dedup_final.jsonl");
verifySyncResult(expectedRawRecords2, expectedFinalRecords2, null, streamName);
}
Expand Down Expand Up @@ -424,7 +426,7 @@ public void testIncrementalSyncDropOneColumn() throws Exception {

runSync(catalog, messages1);

final List<JsonNode> expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_raw.jsonl");
final List<JsonNode> expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl");
final List<JsonNode> expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl");
verifySyncResult(expectedRawRecords1, expectedFinalRecords1);

Expand All @@ -437,7 +439,7 @@ public void testIncrementalSyncDropOneColumn() throws Exception {
runSync(catalog, messages2);

// The raw data is unaffected by the schema, but the final table should not have a `name` column.
final List<JsonNode> expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_raw.jsonl");
final List<JsonNode> expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_raw.jsonl");
final List<JsonNode> expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl").stream()
.peek(record -> ((ObjectNode) record).remove(getSqlGenerator().buildColumnId("name").name()))
.toList();
Expand Down Expand Up @@ -500,12 +502,12 @@ public void incrementalDedupIdenticalName() throws Exception {
runSync(catalog, messages1);

verifySyncResult(
readRecords("dat/sync1_expectedrecords_dedup_raw.jsonl"),
readRecords("dat/sync1_expectedrecords_raw.jsonl"),
readRecords("dat/sync1_expectedrecords_dedup_final.jsonl"),
namespace1,
streamName);
verifySyncResult(
readRecords("dat/sync1_expectedrecords_dedup_raw2.jsonl"),
readRecords("dat/sync1_expectedrecords_raw2.jsonl"),
readRecords("dat/sync1_expectedrecords_dedup_final2.jsonl"),
namespace2,
streamName);
Expand All @@ -518,12 +520,12 @@ public void incrementalDedupIdenticalName() throws Exception {
runSync(catalog, messages2);

verifySyncResult(
readRecords("dat/sync2_expectedrecords_incremental_dedup_raw.jsonl"),
readRecords("dat/sync2_expectedrecords_raw.jsonl"),
readRecords("dat/sync2_expectedrecords_incremental_dedup_final.jsonl"),
namespace1,
streamName);
verifySyncResult(
readRecords("dat/sync2_expectedrecords_incremental_dedup_raw2.jsonl"),
readRecords("dat/sync2_expectedrecords_raw2.jsonl"),
readRecords("dat/sync2_expectedrecords_incremental_dedup_final2.jsonl"),
namespace2,
streamName);
Expand Down Expand Up @@ -585,16 +587,15 @@ public void identicalNameSimultaneousSync() throws Exception {
// And this will dump sync2's entire stdout to our stdout
endSync(sync2);

verifySyncResult(
readRecords("dat/sync1_expectedrecords_dedup_raw.jsonl"),
readRecords("dat/sync1_expectedrecords_dedup_final.jsonl"),
namespace1,
streamName);
verifySyncResult(
readRecords("dat/sync1_expectedrecords_dedup_raw2.jsonl"),
readRecords("dat/sync1_expectedrecords_dedup_final2.jsonl"),
namespace2,
streamName);
// For simplicity, don't verify the raw table. Assume that if the final table is correct, then
// the raw data is correct. This is generally a safe assumption.
assertAll(
() -> DIFFER.diffFinalTableRecords(
readRecords("dat/sync1_expectedrecords_dedup_final.jsonl"),
dumpFinalTableRecords(namespace1, streamName)),
() -> DIFFER.diffFinalTableRecords(
readRecords("dat/sync1_expectedrecords_dedup_final2.jsonl"),
dumpFinalTableRecords(namespace2, streamName)));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@
{"_airbyte_raw_id": "4d8674a5-eb6e-41ca-a310-69c64c88d101", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 0, "id2": 100, "updated_at": "2023-01-01T05:00:00Z", "_ab_cdc_deleted_at": null, "string": "zombie_returned"}}
// CDC generally outputs an explicit null for deleted_at, but verify that we can also handle the case where deleted_at is unset.
{"_airbyte_raw_id": "f0b59e49-8c74-4101-9f14-cb4d1193fd5a", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 4, "id2": 100, "updated_at": "2023-01-01T06:00:00Z", "string": "charlie"}}
// Verify that we can handle weird values in deleted_at
// Invalid values in _ab_cdc_deleted_at result in the record NOT being deleted. This behavior is up for debate, but it's an extreme edge case so not a high priority.
{"_airbyte_raw_id": "d4e1d989-c115-403c-9e68-5d320e6376bb", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 5, "id2": 100, "updated_at": "2023-01-01T07:00:00Z", "_ab_cdc_deleted_at": {}, "string": "david1"}}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerImageTag: 2.1.6
dockerImageTag: 2.2.0
dockerRepository: airbyte/destination-bigquery
githubIssueLabel: destination-bigquery
icon: bigquery.svg
Expand Down
Loading

0 comments on commit 148dda1

Please sign in to comment.