From 4c26d4cceb2efa6e4e4e204627f6b68d4e7257e6 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Wed, 27 Sep 2023 15:57:43 -0700 Subject: [PATCH] DV2: Fix column name collision detection (#30739) Co-authored-by: edgao --- .../typing_deduping/CatalogParser.java | 6 ++- .../destination-bigquery/Dockerfile | 2 +- .../destination-bigquery/metadata.yaml | 2 +- .../typing_deduping/BigQuerySqlGenerator.java | 8 +-- .../BigQuerySqlGeneratorTest.java | 44 +++++++++++++++++ .../destination-snowflake/Dockerfile | 2 +- .../destination-snowflake/metadata.yaml | 2 +- .../SnowflakeSqlGenerator.java | 8 +-- .../alltypes_expectedrecords_final.jsonl | 2 +- .../alltypes_expectedrecords_raw.jsonl | 2 +- .../SnowflakeSqlGeneratorTest.java | 49 ++++++++++++++++++- docs/integrations/destinations/bigquery.md | 1 + docs/integrations/destinations/snowflake.md | 1 + 13 files changed, 112 insertions(+), 17 deletions(-) diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.java index f34873d699f1..2a5c0c46a8f7 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.java @@ -6,6 +6,7 @@ import static io.airbyte.cdk.integrations.base.JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE; +import com.google.common.annotations.VisibleForTesting; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; import java.util.ArrayList; @@ -66,7 +67,10 @@ public ParsedCatalog parseCatalog(final ConfiguredAirbyteCatalog catalog) { return new ParsedCatalog(streamConfigs); } - private StreamConfig toStreamConfig(final ConfiguredAirbyteStream stream) { + // TODO maybe we should extract the column collision stuff to a separate method, since that's the + // interesting bit + @VisibleForTesting + public StreamConfig toStreamConfig(final ConfiguredAirbyteStream stream) { final AirbyteType schema = AirbyteType.fromJsonSchema(stream.getStream().getJsonSchema()); final LinkedHashMap airbyteColumns; if (schema instanceof final Struct o) { diff --git a/airbyte-integrations/connectors/destination-bigquery/Dockerfile b/airbyte-integrations/connectors/destination-bigquery/Dockerfile index ef2cdee8024f..d417281bf814 100644 --- a/airbyte-integrations/connectors/destination-bigquery/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery/Dockerfile @@ -25,5 +25,5 @@ ENV AIRBYTE_NORMALIZATION_INTEGRATION bigquery COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=2.0.17 +LABEL io.airbyte.version=2.0.18 LABEL io.airbyte.name=airbyte/destination-bigquery diff --git a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml index 6d46e76fdabe..aeb2fd4c63d3 100644 --- a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml +++ b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 - dockerImageTag: 2.0.17 + dockerImageTag: 2.0.18 dockerRepository: airbyte/destination-bigquery githubIssueLabel: destination-bigquery icon: bigquery.svg diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java index 9acc00ad2255..8c9776ccee13 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java @@ -80,10 +80,12 @@ public StreamId buildStreamId(final String namespace, final String name, final S @Override public ColumnId buildColumnId(final String name, final String suffix) { - // Bigquery columns are case-insensitive, so do all our validation on the lowercased name final String nameWithSuffix = name + suffix; - final String canonicalized = nameWithSuffix.toLowerCase(); - return new ColumnId(nameTransformer.getIdentifier(nameWithSuffix), nameWithSuffix, canonicalized); + return new ColumnId( + nameTransformer.getIdentifier(nameWithSuffix), + name, + // Bigquery columns are case-insensitive, so do all our validation on the lowercased name + nameTransformer.getIdentifier(nameWithSuffix.toLowerCase())); } public StandardSQLTypeName toDialectType(final AirbyteType type) { diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java index ec26fe497edc..1fac62e2d681 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java @@ -4,6 +4,7 @@ package io.airbyte.integrations.destination.bigquery.typing_deduping; +import static java.util.Collections.emptyList; import static org.junit.jupiter.api.Assertions.assertEquals; import com.google.cloud.bigquery.Clustering; @@ -11,18 +12,25 @@ import com.google.cloud.bigquery.StandardTableDefinition; import com.google.cloud.bigquery.TimePartitioning; import com.google.common.collect.ImmutableList; +import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType; import io.airbyte.integrations.base.destination.typing_deduping.Array; +import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser; import io.airbyte.integrations.base.destination.typing_deduping.ColumnId; import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; +import io.airbyte.integrations.base.destination.typing_deduping.StreamId; import io.airbyte.integrations.base.destination.typing_deduping.Struct; import io.airbyte.integrations.base.destination.typing_deduping.Union; import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf; +import io.airbyte.protocol.models.v0.AirbyteStream; +import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; import io.airbyte.protocol.models.v0.DestinationSyncMode; +import io.airbyte.protocol.models.v0.SyncMode; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -139,4 +147,40 @@ public void testSchemaContainAllFinalTableV2AirbyteColumns() { BigQuerySqlGenerator.schemaContainAllFinalTableV2AirbyteColumns(Set.of("_AIRBYTE_META", "_AIRBYTE_EXTRACTED_AT", "_AIRBYTE_RAW_ID"))); } + @Test + void columnCollision() { + final CatalogParser parser = new CatalogParser(generator); + assertEquals( + new StreamConfig( + new StreamId("bar", "foo", "airbyte_internal", "bar_raw__stream_foo", "bar", "foo"), + SyncMode.INCREMENTAL, + DestinationSyncMode.APPEND, + emptyList(), + Optional.empty(), + new LinkedHashMap<>() { + + { + put(new ColumnId("CURRENT_DATE", "CURRENT_DATE", "current_date"), AirbyteProtocolType.STRING); + put(new ColumnId("current_date_1", "current_date", "current_date_1"), AirbyteProtocolType.INTEGER); + } + + }), + parser.toStreamConfig(new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.INCREMENTAL) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withStream(new AirbyteStream() + .withName("foo") + .withNamespace("bar") + .withJsonSchema(Jsons.deserialize( + """ + { + "type": "object", + "properties": { + "CURRENT_DATE": {"type": "string"}, + "current_date": {"type": "integer"} + } + } + """))))); + } + } diff --git a/airbyte-integrations/connectors/destination-snowflake/Dockerfile b/airbyte-integrations/connectors/destination-snowflake/Dockerfile index 4349d5934082..a5dcba9fadb3 100644 --- a/airbyte-integrations/connectors/destination-snowflake/Dockerfile +++ b/airbyte-integrations/connectors/destination-snowflake/Dockerfile @@ -29,5 +29,5 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1 ENV ENABLE_SENTRY true -LABEL io.airbyte.version=3.1.13 +LABEL io.airbyte.version=3.1.14 LABEL io.airbyte.name=airbyte/destination-snowflake diff --git a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml index 08819c66ab02..58a40d811fa6 100644 --- a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml +++ b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 424892c4-daac-4491-b35d-c6688ba547ba - dockerImageTag: 3.1.13 + dockerImageTag: 3.1.14 dockerRepository: airbyte/destination-snowflake githubIssueLabel: destination-snowflake icon: snowflake.svg diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java index 0509376b40bf..35afdf7cfb10 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java @@ -48,7 +48,6 @@ public class SnowflakeSqlGenerator implements SqlGenerator assertEquals( @@ -60,4 +71,40 @@ void streamNameSpecialCharacterHandling() { generator.buildStreamId("{fo$o}", "{ba$r}", "airbyte_internal"))); } + @Test + void columnCollision() { + final CatalogParser parser = new CatalogParser(generator); + assertEquals( + new StreamConfig( + new StreamId("BAR", "FOO", "airbyte_internal", "bar_raw__stream_foo", "bar", "foo"), + SyncMode.INCREMENTAL, + DestinationSyncMode.APPEND, + emptyList(), + Optional.empty(), + new LinkedHashMap<>() { + + { + put(new ColumnId("_CURRENT_DATE", "CURRENT_DATE", "_CURRENT_DATE"), AirbyteProtocolType.STRING); + put(new ColumnId("_CURRENT_DATE_1", "current_date", "_CURRENT_DATE_1"), AirbyteProtocolType.INTEGER); + } + + }), + parser.toStreamConfig(new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.INCREMENTAL) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withStream(new AirbyteStream() + .withName("foo") + .withNamespace("bar") + .withJsonSchema(Jsons.deserialize( + """ + { + "type": "object", + "properties": { + "CURRENT_DATE": {"type": "string"}, + "current_date": {"type": "integer"} + } + } + """))))); + } + } diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index 928819a934a8..db16bd853712 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -133,6 +133,7 @@ Now that you have set up the BigQuery destination connector, check out the follo | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 2.0.18 | 2023-09-27 | [\#30739](https://github.com/airbytehq/airbyte/pull/30739) | Fix column name collision detection | | 2.0.17 | 2023-09-26 | [\#30696](https://github.com/airbytehq/airbyte/pull/30696) | Attempt unsafe typing operations with an exception clause | | 2.0.16 | 2023-09-22 | [\#30697](https://github.com/airbytehq/airbyte/pull/30697) | Improve resiliency to unclean exit during schema change | | 2.0.15 | 2023-09-21 | [\#30640](https://github.com/airbytehq/airbyte/pull/30640) | Handle streams with identical name and namespace | diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index 5cb0c1c59e8c..296b12bb6d58 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -271,6 +271,7 @@ Otherwise, make sure to grant the role the required permissions in the desired n | Version | Date | Pull Request | Subject | |:----------------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.1.14 | 2023-09-27 | [\#30739](https://github.com/airbytehq/airbyte/pull/30739) | Fix column name collision detection | | 3.1.13 | 2023-09-19 | [\#30599](https://github.com/airbytehq/airbyte/pull/30599) | Support concurrent syncs with identical stream name but different namespace | | 3.1.12 | 2023-09-21 | [\#30671](https://github.com/airbytehq/airbyte/pull/30671) | Reduce async buffer size | | 3.1.11 | 2023-09-19 | [\#30592](https://github.com/airbytehq/airbyte/pull/30592) | Internal code changes |