From 05c7269a50a0e77a9aa0182d2cb13a02e6599035 Mon Sep 17 00:00:00 2001 From: Joe Bell Date: Wed, 27 Sep 2023 13:57:44 -0700 Subject: [PATCH] Destination BigQuery - Try unsafe operations with exception handling (#30696) --- .../sqlgenerator/alltypes_inputrecords.jsonl | 2 +- .../destination-bigquery/Dockerfile | 2 +- .../destination-bigquery/metadata.yaml | 2 +- .../typing_deduping/BigQuerySqlGenerator.java | 115 ++++++++++++------ .../alltypes_expectedrecords_final.jsonl | 2 +- .../alltypes_expectedrecords_raw.jsonl | 2 +- docs/integrations/destinations/bigquery.md | 1 + 7 files changed, 85 insertions(+), 41 deletions(-) diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sqlgenerator/alltypes_inputrecords.jsonl b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sqlgenerator/alltypes_inputrecords.jsonl index ca82be9ffdc4..728f365eb006 100644 --- a/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sqlgenerator/alltypes_inputrecords.jsonl +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sqlgenerator/alltypes_inputrecords.jsonl @@ -2,5 +2,5 @@ {"_airbyte_raw_id": "53ce75a5-5bcc-47a3-b45c-96c2015cfe35", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 2, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": null, "struct": null, "string": null, "number": null, "integer": null, "boolean": null, "timestamp_with_timezone": null, "timestamp_without_timezone": null, "time_with_timezone": null, "time_without_timezone": null, "date": null, "unknown": null}} {"_airbyte_raw_id": "7e1fac0c-017e-4ad6-bc78-334a34d64fbe", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 3, "id2": 100, "updated_at": "2023-01-01T01:00:00Z"}} // Note that array and struct have invalid values ({} and [] respectively). -{"_airbyte_raw_id": "84242b60-3a34-4531-ad75-a26702960a9a", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 4, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": {}, "struct": [], "string": {}, "number": {}, "integer": {}, "boolean": {}, "timestamp_with_timezone": {}, "timestamp_without_timezone": {}, "time_with_timezone": {}, "time_without_timezone": {}, "date": {}, "unknown": null}} +{"_airbyte_raw_id": "84242b60-3a34-4531-ad75-a26702960a9a", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 4, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": {}, "struct": [], "string": null, "number": "foo", "integer": "bar", "boolean": "fizz", "timestamp_with_timezone": {}, "timestamp_without_timezone": {}, "time_with_timezone": {}, "time_without_timezone": {}, "date": "airbyte", "unknown": null}} {"_airbyte_raw_id": "a4a783b5-7729-4d0b-b659-48ceb08713f1", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 5, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "number": 67.174118, "struct": {"nested_number": 67.174118}, "array": [67.174118], "unknown": 67.174118}} diff --git a/airbyte-integrations/connectors/destination-bigquery/Dockerfile b/airbyte-integrations/connectors/destination-bigquery/Dockerfile index 4a1158e3adfe..ef2cdee8024f 100644 --- a/airbyte-integrations/connectors/destination-bigquery/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery/Dockerfile @@ -25,5 +25,5 @@ ENV AIRBYTE_NORMALIZATION_INTEGRATION bigquery COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=2.0.16 +LABEL io.airbyte.version=2.0.17 LABEL io.airbyte.name=airbyte/destination-bigquery diff --git a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml index c3aaa7800d8f..6d46e76fdabe 100644 --- a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml +++ b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 - dockerImageTag: 2.0.16 + dockerImageTag: 2.0.17 dockerRepository: airbyte/destination-bigquery githubIssueLabel: destination-bigquery icon: bigquery.svg diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java index c001d89e1490..0e67c1dee56f 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java @@ -111,11 +111,11 @@ public StandardSQLTypeName toDialectType(final AirbyteType type) { throw new IllegalArgumentException("Unsupported AirbyteType: " + type); } - private String extractAndCast(final ColumnId column, final AirbyteType airbyteType) { + private String extractAndCast(final ColumnId column, final AirbyteType airbyteType, final boolean forceSafeCast) { if (airbyteType instanceof final Union u) { // This is guaranteed to not be a Union, so we won't recurse infinitely final AirbyteType chosenType = u.chooseType(); - return extractAndCast(column, chosenType); + return extractAndCast(column, chosenType, forceSafeCast); } else if (airbyteType instanceof Struct) { // We need to validate that the struct is actually a struct. // Note that struct columns are actually nullable in two ways. For a column `foo`: @@ -153,13 +153,13 @@ ELSE JSON_QUERY(`_airbyte_data`, '$."${column_name}"') """); } else { final StandardSQLTypeName dialectType = toDialectType(airbyteType); + final var baseTyping = "JSON_VALUE(`_airbyte_data`, '$.\"" + escapeColumnNameForJsonPath(column.originalName()) + "\"')"; if (dialectType == StandardSQLTypeName.STRING) { // json_value implicitly returns a string, so we don't need to cast it. - // SAFE_CAST is actually a massive performance hit, so we should skip it if we can. - return "JSON_VALUE(`_airbyte_data`, '$.\"" + escapeColumnNameForJsonPath(column.originalName()) + "\"')"; + return baseTyping; } else { - return "SAFE_CAST(JSON_VALUE(`_airbyte_data`, '$.\"" + escapeColumnNameForJsonPath(column.originalName()) + "\"') as " + dialectType.name() - + ")"; + // SAFE_CAST is actually a massive performance hit, so we should skip it if we can. + return cast(baseTyping, dialectType.name(), forceSafeCast); } } } @@ -363,14 +363,15 @@ public String updateTable(final StreamConfig stream, final String finalSuffix) { return updateTable(stream, finalSuffix, true); } - private String updateTable(final StreamConfig stream, final String finalSuffix, final boolean verifyPrimaryKeys) { - String pkVarDeclaration = ""; + private String updateTableQueryBuilder(final StreamConfig stream, + final String finalSuffix, + final boolean verifyPrimaryKeys, + final boolean forceSafeCasting) { String validatePrimaryKeys = ""; if (verifyPrimaryKeys && stream.destinationSyncMode() == DestinationSyncMode.APPEND_DEDUP) { - pkVarDeclaration = "DECLARE missing_pk_count INT64;"; - validatePrimaryKeys = validatePrimaryKeys(stream.id(), stream.primaryKey(), stream.columns()); + validatePrimaryKeys = validatePrimaryKeys(stream.id(), stream.primaryKey(), stream.columns(), forceSafeCasting); } - final String insertNewRecords = insertNewRecords(stream, finalSuffix, stream.columns()); + final String insertNewRecords = insertNewRecords(stream, finalSuffix, stream.columns(), forceSafeCasting); String dedupFinalTable = ""; String cdcDeletes = ""; String dedupRawTable = ""; @@ -378,12 +379,11 @@ private String updateTable(final StreamConfig stream, final String finalSuffix, dedupRawTable = dedupRawTable(stream.id(), finalSuffix); // If we're in dedup mode, then we must have a cursor dedupFinalTable = dedupFinalTable(stream.id(), finalSuffix, stream.primaryKey(), stream.cursor()); - cdcDeletes = cdcDeletes(stream, finalSuffix, stream.columns()); + cdcDeletes = cdcDeletes(stream, finalSuffix, stream.columns(), forceSafeCasting); } final String commitRawTable = commitRawTable(stream.id()); return new StringSubstitutor(Map.of( - "pk_var_declaration", pkVarDeclaration, "validate_primary_keys", validatePrimaryKeys, "insert_new_records", insertNewRecords, "dedup_final_table", dedupFinalTable, @@ -391,7 +391,6 @@ private String updateTable(final StreamConfig stream, final String finalSuffix, "dedupe_raw_table", dedupRawTable, "commit_raw_table", commitRawTable)).replace( """ - ${pk_var_declaration} BEGIN TRANSACTION; @@ -411,13 +410,36 @@ private String updateTable(final StreamConfig stream, final String finalSuffix, """); } + private String updateTable(final StreamConfig stream, final String finalSuffix, final boolean verifyPrimaryKeys) { + final var unsafeUpdate = updateTableQueryBuilder(stream, finalSuffix, verifyPrimaryKeys, false); + final var safeUpdate = updateTableQueryBuilder(stream, finalSuffix, verifyPrimaryKeys, true); + final String pkVarDeclaration = verifyPrimaryKeys ? "DECLARE missing_pk_count INT64;" : ""; + return new StringSubstitutor(Map.of("unsafe_update", unsafeUpdate, "safe_update", safeUpdate, "pk_var_declaration", pkVarDeclaration)).replace( + """ + ${pk_var_declaration} + + BEGIN + + ${unsafe_update} + + EXCEPTION WHEN ERROR THEN + ROLLBACK TRANSACTION; + + ${safe_update} + + END; + + """); + } + @VisibleForTesting String validatePrimaryKeys(final StreamId id, final List primaryKeys, - final LinkedHashMap streamColumns) { + final LinkedHashMap streamColumns, + final boolean forceSafeCasting) { final String pkNullChecks = primaryKeys.stream().map( pk -> { - final String jsonExtract = extractAndCast(pk, streamColumns.get(pk)); + final String jsonExtract = extractAndCast(pk, streamColumns.get(pk), forceSafeCasting); return "AND " + jsonExtract + " IS NULL"; }).collect(joining("\n")); @@ -441,26 +463,36 @@ SELECT COUNT(1) } @VisibleForTesting - String insertNewRecords(final StreamConfig stream, final String finalSuffix, final LinkedHashMap streamColumns) { + String insertNewRecords(final StreamConfig stream, + final String finalSuffix, + final LinkedHashMap streamColumns, + final boolean forceSafeCasting) { final String columnCasts = streamColumns.entrySet().stream().map( - col -> extractAndCast(col.getKey(), col.getValue()) + " as " + col.getKey().name(QUOTE) + ",") + col -> extractAndCast(col.getKey(), col.getValue(), forceSafeCasting) + " as " + col.getKey().name(QUOTE) + ",") .collect(joining("\n")); - final String columnErrors = "[" + streamColumns.entrySet().stream().map( - col -> new StringSubstitutor(Map.of( - "raw_col_name", escapeColumnNameForJsonPath(col.getKey().originalName()), - "col_type", toDialectType(col.getValue()).name(), - "json_extract", extractAndCast(col.getKey(), col.getValue()))).replace( - // Explicitly parse json here. This is safe because we're not using the actual value anywhere, - // and necessary because json_query - """ - CASE - WHEN (JSON_QUERY(PARSE_JSON(`_airbyte_data`, wide_number_mode=>'round'), '$."${raw_col_name}"') IS NOT NULL) - AND (JSON_TYPE(JSON_QUERY(PARSE_JSON(`_airbyte_data`, wide_number_mode=>'round'), '$."${raw_col_name}"')) != 'null') - AND (${json_extract} IS NULL) - THEN 'Problem with `${raw_col_name}`' - ELSE NULL - END""")) - .collect(joining(",\n")) + "]"; + final String columnErrors; + if (forceSafeCasting) { + columnErrors = "[" + streamColumns.entrySet().stream().map( + col -> new StringSubstitutor(Map.of( + "raw_col_name", escapeColumnNameForJsonPath(col.getKey().originalName()), + "col_type", toDialectType(col.getValue()).name(), + "json_extract", extractAndCast(col.getKey(), col.getValue(), true))).replace( + // Explicitly parse json here. This is safe because we're not using the actual value anywhere, + // and necessary because json_query + """ + CASE + WHEN (JSON_QUERY(PARSE_JSON(`_airbyte_data`, wide_number_mode=>'round'), '$."${raw_col_name}"') IS NOT NULL) + AND (JSON_TYPE(JSON_QUERY(PARSE_JSON(`_airbyte_data`, wide_number_mode=>'round'), '$."${raw_col_name}"')) != 'null') + AND (${json_extract} IS NULL) + THEN 'Problem with `${raw_col_name}`' + ELSE NULL + END""")) + .collect(joining(",\n")) + "]"; + } else { + // We're not safe casting, so any error should throw an exception and trigger the safe cast logic + columnErrors = "[]"; + } + final String columnList = streamColumns.keySet().stream().map(quotedColumnId -> quotedColumnId.name(QUOTE) + ",").collect(joining("\n")); String cdcConditionalOrIncludeStatement = ""; @@ -540,7 +572,8 @@ String dedupFinalTable(final StreamId id, @VisibleForTesting String cdcDeletes(final StreamConfig stream, final String finalSuffix, - final LinkedHashMap streamColumns) { + final LinkedHashMap streamColumns, + final boolean forceSafeCasting) { if (stream.destinationSyncMode() != DestinationSyncMode.APPEND_DEDUP) { return ""; @@ -551,7 +584,8 @@ String cdcDeletes(final StreamConfig stream, } final String pkList = stream.primaryKey().stream().map(columnId -> columnId.name(QUOTE)).collect(joining(",")); - final String pkCasts = stream.primaryKey().stream().map(pk -> extractAndCast(pk, streamColumns.get(pk))).collect(joining(",\n")); + final String pkCasts = + stream.primaryKey().stream().map(pk -> extractAndCast(pk, streamColumns.get(pk), forceSafeCasting)).collect(joining(",\n")); // we want to grab IDs for deletion from the raw table (not the final table itself) to hand // out-of-order record insertions after the delete has been registered @@ -683,4 +717,13 @@ private String escapeColumnNameForJsonPath(final String stringContents) { .replace("'", "\\'"); } + private static String cast(final String content, final String asType, boolean useSafeCast) { + final var open = useSafeCast ? "SAFE_CAST(" : "CAST("; + return wrap(open, content + " as " + asType, ")"); + } + + private static String wrap(final String open, final String content, final String close) { + return open + content + close; + } + } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sqlgenerator/alltypes_expectedrecords_final.jsonl b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sqlgenerator/alltypes_expectedrecords_final.jsonl index faf1bda26c1a..627521e4d958 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sqlgenerator/alltypes_expectedrecords_final.jsonl +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sqlgenerator/alltypes_expectedrecords_final.jsonl @@ -1,7 +1,7 @@ {"id1": 1, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": ["foo"], "struct": {"foo": "bar"}, "string": "foo", "number": 42.1, "integer": 42, "boolean": true, "timestamp_with_timezone": "2023-01-23T12:34:56Z", "timestamp_without_timezone": "2023-01-23T12:34:56", "time_with_timezone": "12:34:56Z", "time_without_timezone": "12:34:56", "date": "2023-01-23", "unknown": {}, "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_meta": {"errors": []}} {"id1": 2, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "unknown": null, "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_meta": {"errors": []}} {"id1": 3, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_meta": {"errors": []}} -{"id1": 4, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "unknown": null, "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_meta": {"errors": ["Problem with `struct`", "Problem with `array`", "Problem with `string`", "Problem with `number`", "Problem with `integer`", "Problem with `boolean`", "Problem with `timestamp_with_timezone`", "Problem with `timestamp_without_timezone`", "Problem with `time_with_timezone`", "Problem with `time_without_timezone`", "Problem with `date`"]}} +{"id1": 4, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "unknown": null, "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_meta": {"errors": ["Problem with `struct`", "Problem with `array`", "Problem with `number`", "Problem with `integer`", "Problem with `boolean`","Problem with `timestamp_with_timezone`", "Problem with `timestamp_without_timezone`", "Problem with `time_with_timezone`","Problem with `time_without_timezone`", "Problem with `date`"]}} // Note that for numbers where we parse the value to JSON (struct, array, unknown) we lose precision. // But for numbers where we create a NUMBER column, we do not lose precision (see the `number` column). {"id1": 5, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "number": 67.174118, "struct": {"nested_number": 67.17411800000001}, "array": [67.17411800000001], "unknown": 67.17411800000001, "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_meta": {"errors": []}} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sqlgenerator/alltypes_expectedrecords_raw.jsonl b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sqlgenerator/alltypes_expectedrecords_raw.jsonl index bc145f60abd3..4004e363374c 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sqlgenerator/alltypes_expectedrecords_raw.jsonl +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sqlgenerator/alltypes_expectedrecords_raw.jsonl @@ -1,5 +1,5 @@ {"_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": ["foo"], "struct": {"foo": "bar"}, "string": "foo", "number": 42.1, "integer": 42, "boolean": true, "timestamp_with_timezone": "2023-01-23T12:34:56Z", "timestamp_without_timezone": "2023-01-23T12:34:56", "time_with_timezone": "12:34:56Z", "time_without_timezone": "12:34:56", "date": "2023-01-23", "unknown": {}}}' {"_airbyte_raw_id": "53ce75a5-5bcc-47a3-b45c-96c2015cfe35", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 2, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": null, "struct": null, "string": null, "number": null, "integer": null, "boolean": null, "timestamp_with_timezone": null, "timestamp_without_timezone": null, "time_with_timezone": null, "time_without_timezone": null, "date": null, "unknown": null}} {"_airbyte_raw_id": "7e1fac0c-017e-4ad6-bc78-334a34d64fbe", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 3, "id2": 100, "updated_at": "2023-01-01T01:00:00Z"}} -{"_airbyte_raw_id": "84242b60-3a34-4531-ad75-a26702960a9a", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 4, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": {}, "struct": [], "string": {}, "number": {}, "integer": {}, "boolean": {}, "timestamp_with_timezone": {}, "timestamp_without_timezone": {}, "time_with_timezone": {}, "time_without_timezone": {}, "date": {}, "unknown": null}} +{"_airbyte_raw_id": "84242b60-3a34-4531-ad75-a26702960a9a", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 4, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": {}, "struct": [], "string": null, "number": "foo", "integer": "bar", "boolean": "fizz", "timestamp_with_timezone": {}, "timestamp_without_timezone": {}, "time_with_timezone": {}, "time_without_timezone": {}, "date": "airbyte", "unknown": null}} {"_airbyte_raw_id": "a4a783b5-7729-4d0b-b659-48ceb08713f1", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 5, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "number": 67.174118, "struct": {"nested_number": 67.174118}, "array": [67.174118], "unknown": 67.174118}} diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index bc8973dae098..928819a934a8 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -133,6 +133,7 @@ Now that you have set up the BigQuery destination connector, check out the follo | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 2.0.17 | 2023-09-26 | [\#30696](https://github.com/airbytehq/airbyte/pull/30696) | Attempt unsafe typing operations with an exception clause | | 2.0.16 | 2023-09-22 | [\#30697](https://github.com/airbytehq/airbyte/pull/30697) | Improve resiliency to unclean exit during schema change | | 2.0.15 | 2023-09-21 | [\#30640](https://github.com/airbytehq/airbyte/pull/30640) | Handle streams with identical name and namespace | | 2.0.14 | 2023-09-20 | [\#30069](https://github.com/airbytehq/airbyte/pull/30069) | Staging destination async |