Skip to content

Commit

Permalink
🐛 Destination Snowflake - support reserved column names (#30319)
Browse files Browse the repository at this point in the history
Co-authored-by: cynthiaxyin <[email protected]>
  • Loading branch information
cynthiaxyin and cynthiaxyin authored Sep 19, 2023
1 parent 6b48ecf commit 839fb26
Show file tree
Hide file tree
Showing 18 changed files with 318 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,41 @@ public void noCrashOnSpecialCharacters(final String specialChars) throws Excepti
}
}

/**
* Verify column names that are reserved keywords are handled successfully. Each destination should
* always have at least 1 column in the record data that is a reserved keyword.
*/
@Test
public void testReservedKeywords() throws Exception {
createRawTable(streamId);
insertRawTableRecords(
streamId,
BaseTypingDedupingTest.readRecords("sqlgenerator/reservedkeywords_inputrecords_raw.jsonl"));
final StreamConfig stream = new StreamConfig(
streamId,
SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND,
null,
Optional.empty(),
new LinkedHashMap<>() {

{
put(generator.buildColumnId("current_date"), AirbyteProtocolType.STRING);
put(generator.buildColumnId("join"), AirbyteProtocolType.STRING);
}

});

final String createTable = generator.createTable(stream, "", false);
destinationHandler.execute(createTable);
final String updateTable = generator.updateTable(stream, "");
destinationHandler.execute(updateTable);

DIFFER.diffFinalTableRecords(
BaseTypingDedupingTest.readRecords("sqlgenerator/reservedkeywords_expectedrecords_final.jsonl"),
dumpFinalTableRecords(streamId, ""));
}

/**
* A stream with no columns is weird, but we shouldn't treat it specially in any way. It should
* create a final table as usual, and populate it with the relevant metadata columns.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// A column name that is a reserved keyword should be added for each typing & deduping destination.
{"_airbyte_raw_id": "b2e0efc4-38a8-47ba-970c-8103f09f08d5", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"current_date": "foo", "join": "bar"}}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private StreamConfig toStreamConfig(final ConfiguredAirbyteStream stream) {
// Append _1, _2, _3, ... to the column name until we find one that doesn't collide.
int i = 1;
while (true) {
columnId = sqlGenerator.buildColumnId(entry.getKey() + "_" + i);
columnId = sqlGenerator.buildColumnId(entry.getKey(), "_" + i);
final String canonicalName = columnId.canonicalName();
if (columns.keySet().stream().noneMatch(c -> c.canonicalName().equals(canonicalName))) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ public interface SqlGenerator<DialectTableDefinition> {

StreamId buildStreamId(String namespace, String name, String rawNamespaceOverride);

ColumnId buildColumnId(String name);
default ColumnId buildColumnId(final String name) {
return buildColumnId(name, "");
}

ColumnId buildColumnId(String name, String suffix);

/**
* Generate a SQL statement to create a fresh table to match the given stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ public void setup() {
sqlGenerator = mock(SqlGenerator.class);
// noop quoting logic
when(sqlGenerator.buildColumnId(any())).thenAnswer(invocation -> {
String fieldName = invocation.getArgument(0);
final String fieldName = invocation.getArgument(0);
return new ColumnId(fieldName, fieldName, fieldName);
});
when(sqlGenerator.buildStreamId(any(), any(), any())).thenAnswer(invocation -> {
String namespace = invocation.getArgument(0);
String name = invocation.getArgument(1);
String rawNamespace = invocation.getArgument(1);
final String namespace = invocation.getArgument(0);
final String name = invocation.getArgument(1);
final String rawNamespace = invocation.getArgument(1);
return new StreamId(namespace, name, rawNamespace, namespace + "_abab_" + name, namespace, name);
});

Expand All @@ -47,12 +47,12 @@ public void setup() {
@Test
public void finalNameCollision() {
when(sqlGenerator.buildStreamId(any(), any(), any())).thenAnswer(invocation -> {
String originalNamespace = invocation.getArgument(0);
String originalName = (invocation.getArgument(1));
String originalRawNamespace = (invocation.getArgument(1));
final String originalNamespace = invocation.getArgument(0);
final String originalName = (invocation.getArgument(1));
final String originalRawNamespace = (invocation.getArgument(1));

// emulate quoting logic that causes a name collision
String quotedName = originalName.replaceAll("bar", "");
final String quotedName = originalName.replaceAll("bar", "");
return new StreamId(originalNamespace, quotedName, originalRawNamespace, originalNamespace + "_abab_" + quotedName, originalNamespace,
originalName);
});
Expand All @@ -73,30 +73,30 @@ public void finalNameCollision() {
*/
@Test
public void columnNameCollision() {
when(sqlGenerator.buildColumnId(any())).thenAnswer(invocation -> {
String originalName = invocation.getArgument(0);
when(sqlGenerator.buildColumnId(any(), any())).thenAnswer(invocation -> {
final String originalName = invocation.getArgument(0);

// emulate quoting logic that causes a name collision
String quotedName = originalName.replaceAll("bar", "");
final String quotedName = originalName.replaceAll("bar", "");
return new ColumnId(quotedName, originalName, quotedName);
});
JsonNode schema = Jsons.deserialize("""
{
"type": "object",
"properties": {
"foobarfoo": {"type": "string"},
"foofoo": {"type": "string"}
}
}
""");
final JsonNode schema = Jsons.deserialize("""
{
"type": "object",
"properties": {
"foobarfoo": {"type": "string"},
"foofoo": {"type": "string"}
}
}
""");
final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of(stream("a", "a", schema)));

final ParsedCatalog parsedCatalog = parser.parseCatalog(catalog);

assertEquals(2, parsedCatalog.streams().get(0).columns().size());
}

private static ConfiguredAirbyteStream stream(String namespace, String name) {
private static ConfiguredAirbyteStream stream(final String namespace, final String name) {
return stream(
namespace,
name,
Expand All @@ -110,7 +110,7 @@ private static ConfiguredAirbyteStream stream(String namespace, String name) {
"""));
}

private static ConfiguredAirbyteStream stream(String namespace, String name, JsonNode schema) {
private static ConfiguredAirbyteStream stream(final String namespace, final String name, final JsonNode schema) {
return new ConfiguredAirbyteStream().withStream(
new AirbyteStream()
.withNamespace(namespace)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public StreamId buildStreamId(final String namespace, final String name, final S
}

@Override
public ColumnId buildColumnId(final String name) {
public ColumnId buildColumnId(final String name, final String suffix) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ ENV AIRBYTE_NORMALIZATION_INTEGRATION bigquery

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=2.0.11
LABEL io.airbyte.version=2.0.12
LABEL io.airbyte.name=airbyte/destination-bigquery
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerImageTag: 2.0.11
dockerImageTag: 2.0.12
dockerRepository: airbyte/destination-bigquery
githubIssueLabel: destination-bigquery
icon: bigquery.svg
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package io.airbyte.integrations.destination.bigquery;

import com.google.common.collect.ImmutableList;
import java.util.List;

/**
* NOTE: This class is not used, but is created for completeness.
* See https://cloud.google.com/bigquery/docs/reference/standard-sql/lexical#reserved_keywords
* Copied from https://github.com/airbytehq/airbyte/blob/f226503bd1d4cd9c7412b04d47de584523988443/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/reserved_keywords.py
*/
public class BigQueryReservedKeywords {

public static final List<String> RESERVED_KEYWORDS = ImmutableList.of(
"ALL",
"AND",
"ANY",
"ARRAY",
"AS",
"ASC",
"ASSERT_ROWS_MODIFIED",
"AT",
"BETWEEN",
"BY",
"CASE",
"CAST",
"COLLATE",
"CONTAINS",
"CREATE",
"CROSS",
"CUBE",
"CURRENT",
"CURRENT_DATE",
"CURRENT_TIME",
"CURRENT_TIMESTAMP",
"DEFAULT",
"DEFINE",
"DESC",
"DISTINCT",
"ELSE",
"END",
"ENUM",
"ESCAPE",
"EXCEPT",
"EXCLUDE",
"EXISTS",
"EXTRACT",
"FALSE",
"FETCH",
"FOLLOWING",
"FOR",
"FROM",
"FULL",
"GROUP",
"GROUPING",
"GROUPS",
"HASH",
"HAVING",
"IF",
"IGNORE",
"IN",
"INNER",
"INTERSECT",
"INTERVAL",
"INTO",
"IS",
"JOIN",
"LATERAL",
"LEFT",
"LIKE",
"LIMIT",
"LOOKUP",
"MERGE",
"NATURAL",
"NEW",
"NO",
"NOT",
"NULL",
"NULLS",
"OF",
"ON",
"OR",
"ORDER",
"OUTER",
"OVER",
"PARTITION",
"PRECEDING",
"PROTO",
"RANGE",
"RECURSIVE",
"RESPECT",
"RIGHT",
"ROLLUP",
"ROWS",
"SELECT",
"SET",
"SOME",
"STRUCT",
"TABLESAMPLE",
"THEN",
"TO",
"TREAT",
"TRUE",
"UNBOUNDED",
"UNION",
"UNNEST",
"USING",
"WHEN",
"WHERE",
"WINDOW",
"WITH",
"WITHIN");

}
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,11 @@ public StreamId buildStreamId(final String namespace, final String name, final S
}

@Override
public ColumnId buildColumnId(final String name) {
public ColumnId buildColumnId(final String name, final String suffix) {
// Bigquery columns are case-insensitive, so do all our validation on the lowercased name
final String canonicalized = name.toLowerCase();
return new ColumnId(nameTransformer.getIdentifier(name), name, canonicalized);
final String nameWithSuffix = name + suffix;
final String canonicalized = nameWithSuffix.toLowerCase();
return new ColumnId(nameTransformer.getIdentifier(nameWithSuffix), nameWithSuffix, canonicalized);
}

public StandardSQLTypeName toDialectType(final AirbyteType type) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"_airbyte_raw_id":"b2e0efc4-38a8-47ba-970c-8103f09f08d5","_airbyte_extracted_at":"2023-01-01T00:00:00Z","_airbyte_meta":{"errors":[]}, "current_date": "foo", "join": "bar"}
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1

ENV ENABLE_SENTRY true

LABEL io.airbyte.version=3.1.8
LABEL io.airbyte.version=3.1.9
LABEL io.airbyte.name=airbyte/destination-snowflake
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 3.1.8
dockerImageTag: 3.1.9
dockerRepository: airbyte/destination-snowflake
githubIssueLabel: destination-snowflake
icon: snowflake.svg
Expand Down
Loading

0 comments on commit 839fb26

Please sign in to comment.