Skip to content

Commit

Permalink
DV2: Fix column name collision detection (#30739)
Browse files Browse the repository at this point in the history
Co-authored-by: edgao <[email protected]>
  • Loading branch information
edgao and edgao authored Sep 27, 2023
1 parent da32fc8 commit 4c26d4c
Show file tree
Hide file tree
Showing 13 changed files with 112 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import static io.airbyte.cdk.integrations.base.JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import java.util.ArrayList;
Expand Down Expand Up @@ -66,7 +67,10 @@ public ParsedCatalog parseCatalog(final ConfiguredAirbyteCatalog catalog) {
return new ParsedCatalog(streamConfigs);
}

private StreamConfig toStreamConfig(final ConfiguredAirbyteStream stream) {
// TODO maybe we should extract the column collision stuff to a separate method, since that's the
// interesting bit
@VisibleForTesting
public StreamConfig toStreamConfig(final ConfiguredAirbyteStream stream) {
final AirbyteType schema = AirbyteType.fromJsonSchema(stream.getStream().getJsonSchema());
final LinkedHashMap<String, AirbyteType> airbyteColumns;
if (schema instanceof final Struct o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ ENV AIRBYTE_NORMALIZATION_INTEGRATION bigquery

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=2.0.17
LABEL io.airbyte.version=2.0.18
LABEL io.airbyte.name=airbyte/destination-bigquery
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerImageTag: 2.0.17
dockerImageTag: 2.0.18
dockerRepository: airbyte/destination-bigquery
githubIssueLabel: destination-bigquery
icon: bigquery.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,12 @@ public StreamId buildStreamId(final String namespace, final String name, final S

@Override
public ColumnId buildColumnId(final String name, final String suffix) {
// Bigquery columns are case-insensitive, so do all our validation on the lowercased name
final String nameWithSuffix = name + suffix;
final String canonicalized = nameWithSuffix.toLowerCase();
return new ColumnId(nameTransformer.getIdentifier(nameWithSuffix), nameWithSuffix, canonicalized);
return new ColumnId(
nameTransformer.getIdentifier(nameWithSuffix),
name,
// Bigquery columns are case-insensitive, so do all our validation on the lowercased name
nameTransformer.getIdentifier(nameWithSuffix.toLowerCase()));
}

public StandardSQLTypeName toDialectType(final AirbyteType type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,33 @@

package io.airbyte.integrations.destination.bigquery.typing_deduping;

import static java.util.Collections.emptyList;
import static org.junit.jupiter.api.Assertions.assertEquals;

import com.google.cloud.bigquery.Clustering;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.common.collect.ImmutableList;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType;
import io.airbyte.integrations.base.destination.typing_deduping.Array;
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser;
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.integrations.base.destination.typing_deduping.Struct;
import io.airbyte.integrations.base.destination.typing_deduping.Union;
import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -139,4 +147,40 @@ public void testSchemaContainAllFinalTableV2AirbyteColumns() {
BigQuerySqlGenerator.schemaContainAllFinalTableV2AirbyteColumns(Set.of("_AIRBYTE_META", "_AIRBYTE_EXTRACTED_AT", "_AIRBYTE_RAW_ID")));
}

@Test
void columnCollision() {
final CatalogParser parser = new CatalogParser(generator);
assertEquals(
new StreamConfig(
new StreamId("bar", "foo", "airbyte_internal", "bar_raw__stream_foo", "bar", "foo"),
SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND,
emptyList(),
Optional.empty(),
new LinkedHashMap<>() {

{
put(new ColumnId("CURRENT_DATE", "CURRENT_DATE", "current_date"), AirbyteProtocolType.STRING);
put(new ColumnId("current_date_1", "current_date", "current_date_1"), AirbyteProtocolType.INTEGER);
}

}),
parser.toStreamConfig(new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(new AirbyteStream()
.withName("foo")
.withNamespace("bar")
.withJsonSchema(Jsons.deserialize(
"""
{
"type": "object",
"properties": {
"CURRENT_DATE": {"type": "string"},
"current_date": {"type": "integer"}
}
}
""")))));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1

ENV ENABLE_SENTRY true

LABEL io.airbyte.version=3.1.13
LABEL io.airbyte.version=3.1.14
LABEL io.airbyte.name=airbyte/destination-snowflake
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 3.1.13
dockerImageTag: 3.1.14
dockerRepository: airbyte/destination-snowflake
githubIssueLabel: destination-snowflake
icon: snowflake.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public class SnowflakeSqlGenerator implements SqlGenerator<SnowflakeTableDefinit

@Override
public StreamId buildStreamId(final String namespace, final String name, final String rawNamespaceOverride) {
// No escaping needed, as far as I can tell. We quote all our identifier names.
return new StreamId(
escapeSqlIdentifier(namespace).toUpperCase(),
escapeSqlIdentifier(name).toUpperCase(),
Expand All @@ -60,11 +59,8 @@ public StreamId buildStreamId(final String namespace, final String name, final S

@Override
public ColumnId buildColumnId(final String name, final String suffix) {
// No escaping needed, as far as I can tell. We quote all our identifier names.
final String nameWithSuffix = name + suffix;
return new ColumnId(prefixReservedColumnName(escapeSqlIdentifier(name).toUpperCase()) + suffix,
nameWithSuffix,
nameWithSuffix.toUpperCase());
final String escapedName = prefixReservedColumnName(escapeSqlIdentifier(name).toUpperCase()) + suffix.toUpperCase();
return new ColumnId(escapedName, name, escapedName);
}

public String toDialectType(final AirbyteType type) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{"ID1": 1, "ID2": 100, "UPDATED_AT": "2023-01-01T01:00:00.000000000Z", "ARRAY": ["foo"], "STRUCT": {"foo": "bar"}, "STRING": "foo", "NUMBER": 42.1, "INTEGER": 42, "BOOLEAN": true, "TIMESTAMP_WITH_TIMEZONE": "2023-01-23T12:34:56.000000000Z", "TIMESTAMP_WITHOUT_TIMEZONE": "2023-01-23T12:34:56.000000000", "TIME_WITH_TIMEZONE": "12:34:56Z", "TIME_WITHOUT_TIMEZONE": "12:34:56.000000000", "DATE": "2023-01-23", "UNKNOWN": {}, "_AIRBYTE_EXTRACTED_AT": "2023-01-01T00:00:00.000000000Z", "_AIRBYTE_META": {"errors": []}}
{"ID1": 2, "ID2": 100, "UPDATED_AT": "2023-01-01T01:00:00.000000000Z", "UNKNOWN": null, "_AIRBYTE_EXTRACTED_AT": "2023-01-01T00:00:00.000000000Z", "_AIRBYTE_META": {"errors": []}}
{"ID1": 3, "ID2": 100, "UPDATED_AT": "2023-01-01T01:00:00.000000000Z", "_AIRBYTE_EXTRACTED_AT": "2023-01-01T00:00:00.000000000Z", "_AIRBYTE_META": {"errors": []}}
{"ID1": 4, "ID2": 100, "UPDATED_AT": "2023-01-01T01:00:00.000000000Z", "UNKNOWN": null, "_AIRBYTE_EXTRACTED_AT": "2023-01-01T00:00:00.000000000Z", "_AIRBYTE_META": {"errors": ["Problem with `struct`", "Problem with `array`", "Problem with `number`", "Problem with `integer`", "Problem with `boolean`", "Problem with `timestamp_with_timezone`", "Problem with `timestamp_without_timezone`", "Problem with `time_with_timezone`", "Problem with `time_without_timezone`", "Problem with `date`"]}, "STRING": "{}"}
{"ID1": 4, "ID2": 100, "UPDATED_AT": "2023-01-01T01:00:00.000000000Z", "UNKNOWN": null, "_AIRBYTE_EXTRACTED_AT": "2023-01-01T00:00:00.000000000Z", "_AIRBYTE_META": {"errors": ["Problem with `struct`", "Problem with `array`", "Problem with `number`", "Problem with `integer`", "Problem with `boolean`", "Problem with `timestamp_with_timezone`", "Problem with `timestamp_without_timezone`", "Problem with `time_with_timezone`", "Problem with `time_without_timezone`", "Problem with `date`"]}}
// Note: no loss of precision on these numbers. A naive float64 conversion would yield 67.17411800000001.
{"ID1": 5, "ID2": 100, "UPDATED_AT": "2023-01-01T01:00:00.000000000Z", "NUMBER": 67.174118, "STRUCT": {"nested_number": 67.174118}, "ARRAY": [67.174118], "UNKNOWN": 67.174118, "_AIRBYTE_EXTRACTED_AT": "2023-01-01T00:00:00.000000000Z", "_AIRBYTE_META": {"errors": []}}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{"_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": ["foo"], "struct": {"foo": "bar"}, "string": "foo", "number": 42.1, "integer": 42, "boolean": true, "timestamp_with_timezone": "2023-01-23T12:34:56Z", "timestamp_without_timezone": "2023-01-23T12:34:56", "time_with_timezone": "12:34:56Z", "time_without_timezone": "12:34:56", "date": "2023-01-23", "unknown": {}}}'
{"_airbyte_raw_id": "53ce75a5-5bcc-47a3-b45c-96c2015cfe35", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_data": {"id1": 2, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": null, "struct": null, "string": null, "number": null, "integer": null, "boolean": null, "timestamp_with_timezone": null, "timestamp_without_timezone": null, "time_with_timezone": null, "time_without_timezone": null, "date": null, "unknown": null}}
{"_airbyte_raw_id": "7e1fac0c-017e-4ad6-bc78-334a34d64fbe", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_data": {"id1": 3, "id2": 100, "updated_at": "2023-01-01T01:00:00Z"}}
{"_airbyte_raw_id": "84242b60-3a34-4531-ad75-a26702960a9a", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_data": {"id1": 4, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": {}, "struct": [], "string": {}, "number": {}, "integer": {}, "boolean": {}, "timestamp_with_timezone": {}, "timestamp_without_timezone": {}, "time_with_timezone": {}, "time_without_timezone": {}, "date": {}, "unknown": null}}
{"_airbyte_raw_id": "84242b60-3a34-4531-ad75-a26702960a9a", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_data": {"id1": 4, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": {}, "struct": [], "string": null, "number": "foo", "integer": "bar", "boolean": "fizz", "timestamp_with_timezone": {}, "timestamp_without_timezone": {}, "time_with_timezone": {}, "time_without_timezone": {}, "date": "airbyte", "unknown": null}}
{"_airbyte_raw_id": "a4a783b5-7729-4d0b-b659-48ceb08713f1", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_data": {"id1": 5, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "number": 67.174118, "struct": {"nested_number": 67.174118}, "array": [67.174118], "unknown": 67.174118}}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,22 @@

package io.airbyte.integrations.destination.snowflake.typing_deduping;

import static java.util.Collections.emptyList;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;

import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType;
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser;
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.LinkedHashMap;
import java.util.Optional;
import org.junit.jupiter.api.Test;

public class SnowflakeSqlGeneratorTest {
Expand All @@ -23,7 +34,7 @@ void columnNameSpecialCharacterHandling() {
new ColumnId(
"__FOO_",
"${foo}",
"${FOO}"),
"__FOO_"),
generator.buildColumnId("${foo}")),
// But normally, we should leave those characters untouched.
() -> assertEquals(
Expand Down Expand Up @@ -60,4 +71,40 @@ void streamNameSpecialCharacterHandling() {
generator.buildStreamId("{fo$o}", "{ba$r}", "airbyte_internal")));
}

@Test
void columnCollision() {
final CatalogParser parser = new CatalogParser(generator);
assertEquals(
new StreamConfig(
new StreamId("BAR", "FOO", "airbyte_internal", "bar_raw__stream_foo", "bar", "foo"),
SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND,
emptyList(),
Optional.empty(),
new LinkedHashMap<>() {

{
put(new ColumnId("_CURRENT_DATE", "CURRENT_DATE", "_CURRENT_DATE"), AirbyteProtocolType.STRING);
put(new ColumnId("_CURRENT_DATE_1", "current_date", "_CURRENT_DATE_1"), AirbyteProtocolType.INTEGER);
}

}),
parser.toStreamConfig(new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(new AirbyteStream()
.withName("foo")
.withNamespace("bar")
.withJsonSchema(Jsons.deserialize(
"""
{
"type": "object",
"properties": {
"CURRENT_DATE": {"type": "string"},
"current_date": {"type": "integer"}
}
}
""")))));
}

}
1 change: 1 addition & 0 deletions docs/integrations/destinations/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ Now that you have set up the BigQuery destination connector, check out the follo

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 2.0.18 | 2023-09-27 | [\#30739](https://github.com/airbytehq/airbyte/pull/30739) | Fix column name collision detection |
| 2.0.17 | 2023-09-26 | [\#30696](https://github.com/airbytehq/airbyte/pull/30696) | Attempt unsafe typing operations with an exception clause |
| 2.0.16 | 2023-09-22 | [\#30697](https://github.com/airbytehq/airbyte/pull/30697) | Improve resiliency to unclean exit during schema change |
| 2.0.15 | 2023-09-21 | [\#30640](https://github.com/airbytehq/airbyte/pull/30640) | Handle streams with identical name and namespace |
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ Otherwise, make sure to grant the role the required permissions in the desired n

| Version | Date | Pull Request | Subject |
|:----------------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.1.14 | 2023-09-27 | [\#30739](https://github.com/airbytehq/airbyte/pull/30739) | Fix column name collision detection |
| 3.1.13 | 2023-09-19 | [\#30599](https://github.com/airbytehq/airbyte/pull/30599) | Support concurrent syncs with identical stream name but different namespace |
| 3.1.12 | 2023-09-21 | [\#30671](https://github.com/airbytehq/airbyte/pull/30671) | Reduce async buffer size |
| 3.1.11 | 2023-09-19 | [\#30592](https://github.com/airbytehq/airbyte/pull/30592) | Internal code changes |
Expand Down

0 comments on commit 4c26d4c

Please sign in to comment.