Skip to content

Commit

Permalink
Destination bigquery: handle case where stream name and namespace are…
Browse files Browse the repository at this point in the history
… identical (#30640)

Co-authored-by: edgao <[email protected]>
  • Loading branch information
edgao and edgao authored Sep 22, 2023
1 parent f7f6b19 commit 15c8ce9
Show file tree
Hide file tree
Showing 11 changed files with 94 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,41 @@ public void incrementalDedupInvalidPrimaryKey() throws Exception {
dumpFinalTableRecords(streamId, ""));
}

/**
* Test that T+D supports streams whose name and namespace are the same.
*/
@Test
public void incrementalDedupSameNameNamespace() throws Exception {
final StreamId streamId = buildStreamId(namespace, namespace, namespace + "_raw");
final StreamConfig stream = new StreamConfig(
streamId,
SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND_DEDUP,
incrementalDedupStream.primaryKey(),
incrementalDedupStream.cursor(),
incrementalDedupStream.columns());

createRawTable(streamId);
createFinalTable(stream, "");
insertRawTableRecords(
streamId,
List.of(Jsons.deserialize(
"""
{
"_airbyte_raw_id": "5ce60e70-98aa-4fe3-8159-67207352c4f0",
"_airbyte_extracted_at": "2023-01-01T00:00:00Z",
"_airbyte_data": {"id1": 1, "id2": 100}
}
""")));

final String sql = generator.updateTable(stream, "");
destinationHandler.execute(sql);

final List<JsonNode> rawRecords = dumpRawTableRecords(streamId);
final List<JsonNode> finalRecords = dumpFinalTableRecords(streamId, "");
verifyRecordCounts(1, rawRecords, 1, finalRecords);
}

/**
* Run a full T+D update for an incremental-dedup stream, writing to a final table with "_foo"
* suffix, with values for all data types. Verifies all behaviors for all types:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ ENV AIRBYTE_NORMALIZATION_INTEGRATION bigquery

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=2.0.14
LABEL io.airbyte.version=2.0.15
LABEL io.airbyte.name=airbyte/destination-bigquery
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerImageTag: 2.0.14
dockerImageTag: 2.0.15
dockerRepository: airbyte/destination-bigquery
githubIssueLabel: destination-bigquery
icon: bigquery.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
setDefaultStreamNamespace(catalog, defaultNamespace);

final String datasetLocation = BigQueryUtils.getDatasetLocation(config);
final BigQuerySqlGenerator sqlGenerator = new BigQuerySqlGenerator(datasetLocation);
final ParsedCatalog parsedCatalog = parseCatalog(catalog, datasetLocation);
final BigQuerySqlGenerator sqlGenerator = new BigQuerySqlGenerator(config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(), datasetLocation);
final ParsedCatalog parsedCatalog = parseCatalog(config, catalog, datasetLocation);
final BigQuery bigquery = getBigQuery(config);
final TyperDeduper typerDeduper = buildTyperDeduper(sqlGenerator, parsedCatalog, bigquery, datasetLocation);

Expand All @@ -241,8 +241,8 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
setDefaultStreamNamespace(catalog, defaultNamespace);

final String datasetLocation = BigQueryUtils.getDatasetLocation(config);
final BigQuerySqlGenerator sqlGenerator = new BigQuerySqlGenerator(datasetLocation);
final ParsedCatalog parsedCatalog = parseCatalog(catalog, datasetLocation);
final BigQuerySqlGenerator sqlGenerator = new BigQuerySqlGenerator(config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(), datasetLocation);
final ParsedCatalog parsedCatalog = parseCatalog(config, catalog, datasetLocation);
final BigQuery bigquery = getBigQuery(config);
final TyperDeduper typerDeduper = buildTyperDeduper(sqlGenerator, parsedCatalog, bigquery, datasetLocation);
// Shared code end
Expand Down Expand Up @@ -374,8 +374,8 @@ private void setDefaultStreamNamespace(final ConfiguredAirbyteCatalog catalog, f
}
}

private ParsedCatalog parseCatalog(final ConfiguredAirbyteCatalog catalog, final String datasetLocation) {
final BigQuerySqlGenerator sqlGenerator = new BigQuerySqlGenerator(datasetLocation);
private ParsedCatalog parseCatalog(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final String datasetLocation) {
final BigQuerySqlGenerator sqlGenerator = new BigQuerySqlGenerator(config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(), datasetLocation);
final CatalogParser catalogParser = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_DATA_DATASET).isPresent()
? new CatalogParser(sqlGenerator, TypingAndDedupingFlag.getRawNamespaceOverride(RAW_DATA_DATASET).get())
: new CatalogParser(sqlGenerator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,18 @@ public class BigQuerySqlGenerator implements SqlGenerator<TableDefinition> {
private final ColumnId CDC_DELETED_AT_COLUMN = buildColumnId("_ab_cdc_deleted_at");

private final Logger LOGGER = LoggerFactory.getLogger(BigQuerySqlGenerator.class);

private final String projectId;
private final String datasetLocation;

/**
* @param projectId
* @param datasetLocation This is technically redundant with {@link BigQueryDestinationHandler}
* setting the query execution location, but let's be explicit since this is typically a
* compliance requirement.
* setting the query execution location, but let's be explicit since this is typically a
* compliance requirement.
*/
public BigQuerySqlGenerator(final String datasetLocation) {
public BigQuerySqlGenerator(final String projectId, final String datasetLocation) {
this.projectId = projectId;
this.datasetLocation = datasetLocation;
}

Expand Down Expand Up @@ -186,17 +190,18 @@ public String createTable(final StreamConfig stream, final String suffix, final
final String forceCreateTable = force ? "OR REPLACE" : "";

return new StringSubstitutor(Map.of(
"project_id", '`' + projectId + '`',
"final_namespace", stream.id().finalNamespace(QUOTE),
"dataset_location", datasetLocation,
"force_create_table", forceCreateTable,
"final_table_id", stream.id().finalTableId(QUOTE, suffix),
"column_declarations", columnDeclarations,
"cluster_config", clusterConfig)).replace(
"""
CREATE SCHEMA IF NOT EXISTS ${final_namespace}
CREATE SCHEMA IF NOT EXISTS ${project_id}.${final_namespace}
OPTIONS(location="${dataset_location}");
CREATE ${force_create_table} TABLE ${final_table_id} (
CREATE ${force_create_table} TABLE ${project_id}.${final_table_id} (
_airbyte_raw_id STRING NOT NULL,
_airbyte_extracted_at TIMESTAMP NOT NULL,
_airbyte_meta JSON NOT NULL,
Expand Down Expand Up @@ -329,9 +334,11 @@ public String softReset(final StreamConfig stream) {
}

private String clearLoadedAt(final StreamId streamId) {
return new StringSubstitutor(Map.of("raw_table_id", streamId.rawTableId(QUOTE)))
return new StringSubstitutor(Map.of(
"project_id", '`' + projectId + '`',
"raw_table_id", streamId.rawTableId(QUOTE)))
.replace("""
UPDATE ${raw_table_id} SET _airbyte_loaded_at = NULL WHERE 1=1;
UPDATE ${project_id}.${raw_table_id} SET _airbyte_loaded_at = NULL WHERE 1=1;
""");
}

Expand Down Expand Up @@ -399,12 +406,13 @@ String validatePrimaryKeys(final StreamId id,
}).collect(joining("\n"));

return new StringSubstitutor(Map.of(
"project_id", '`' + projectId + '`',
"raw_table_id", id.rawTableId(QUOTE),
"pk_null_checks", pkNullChecks)).replace(
"""
SET missing_pk_count = (
SELECT COUNT(1)
FROM ${raw_table_id}
FROM ${project_id}.${raw_table_id}
WHERE
`_airbyte_loaded_at` IS NULL
${pk_null_checks}
Expand Down Expand Up @@ -450,14 +458,15 @@ AND JSON_VALUE(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NOT NULL
}

return new StringSubstitutor(Map.of(
"project_id", '`' + projectId + '`',
"raw_table_id", stream.id().rawTableId(QUOTE),
"final_table_id", stream.id().finalTableId(QUOTE, finalSuffix),
"column_casts", columnCasts,
"column_errors", columnErrors,
"cdcConditionalOrIncludeStatement", cdcConditionalOrIncludeStatement,
"column_list", columnList)).replace(
"""
INSERT INTO ${final_table_id}
INSERT INTO ${project_id}.${final_table_id}
(
${column_list}
_airbyte_meta,
Expand All @@ -470,7 +479,7 @@ WITH intermediate_data AS (
${column_errors} AS column_errors,
_airbyte_raw_id,
_airbyte_extracted_at
FROM ${raw_table_id}
FROM ${project_id}.${raw_table_id}
WHERE
_airbyte_loaded_at IS NULL
${cdcConditionalOrIncludeStatement}
Expand All @@ -494,17 +503,18 @@ String dedupFinalTable(final StreamId id,
.orElse("");

return new StringSubstitutor(Map.of(
"project_id", '`' + projectId + '`',
"final_table_id", id.finalTableId(QUOTE, finalSuffix),
"pk_list", pkList,
"cursor_order_clause", cursorOrderClause)).replace(
"""
DELETE FROM ${final_table_id}
DELETE FROM ${project_id}.${final_table_id}
WHERE
`_airbyte_raw_id` IN (
SELECT `_airbyte_raw_id` FROM (
SELECT `_airbyte_raw_id`, row_number() OVER (
PARTITION BY ${pk_list} ORDER BY ${cursor_order_clause} `_airbyte_extracted_at` DESC
) as row_number FROM ${final_table_id}
) as row_number FROM ${project_id}.${final_table_id}
)
WHERE row_number != 1
)
Expand All @@ -530,19 +540,20 @@ String cdcDeletes(final StreamConfig stream,
// we want to grab IDs for deletion from the raw table (not the final table itself) to hand
// out-of-order record insertions after the delete has been registered
return new StringSubstitutor(Map.of(
"project_id", '`' + projectId + '`',
"final_table_id", stream.id().finalTableId(QUOTE, finalSuffix),
"raw_table_id", stream.id().rawTableId(QUOTE),
"pk_list", pkList,
"pk_extracts", pkCasts,
"quoted_cdc_delete_column", QUOTE + "_ab_cdc_deleted_at" + QUOTE)).replace(
"""
DELETE FROM ${final_table_id}
DELETE FROM ${project_id}.${final_table_id}
WHERE
(${pk_list}) IN (
SELECT (
${pk_extracts}
)
FROM ${raw_table_id}
FROM ${project_id}.${raw_table_id}
WHERE JSON_TYPE(PARSE_JSON(JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at'), wide_number_mode=>'round')) != 'null'
)
;""");
Expand All @@ -551,27 +562,29 @@ WHERE JSON_TYPE(PARSE_JSON(JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at'),
@VisibleForTesting
String dedupRawTable(final StreamId id, final String finalSuffix) {
return new StringSubstitutor(Map.of(
"project_id", '`' + projectId + '`',
"raw_table_id", id.rawTableId(QUOTE),
"final_table_id", id.finalTableId(QUOTE, finalSuffix))).replace(
// Note that this leaves _all_ deletion records in the raw table. We _could_ clear them out, but it
// would be painful,
// and it only matters in a few edge cases.
"""
DELETE FROM
${raw_table_id}
${project_id}.${raw_table_id}
WHERE
`_airbyte_raw_id` NOT IN (
SELECT `_airbyte_raw_id` FROM ${final_table_id}
SELECT `_airbyte_raw_id` FROM ${project_id}.${final_table_id}
)
;""");
}

@VisibleForTesting
String commitRawTable(final StreamId id) {
return new StringSubstitutor(Map.of(
"project_id", '`' + projectId + '`',
"raw_table_id", id.rawTableId(QUOTE))).replace(
"""
UPDATE ${raw_table_id}
UPDATE ${project_id}.${raw_table_id}
SET `_airbyte_loaded_at` = CURRENT_TIMESTAMP()
WHERE `_airbyte_loaded_at` IS NULL
;""");
Expand All @@ -580,12 +593,13 @@ String commitRawTable(final StreamId id) {
@Override
public String overwriteFinalTable(final StreamId streamId, final String finalSuffix) {
return new StringSubstitutor(Map.of(
"project_id", '`' + projectId + '`',
"final_table_id", streamId.finalTableId(QUOTE),
"tmp_final_table", streamId.finalTableId(QUOTE, finalSuffix),
"real_final_table", streamId.finalName(QUOTE))).replace(
"""
DROP TABLE IF EXISTS ${final_table_id};
ALTER TABLE ${tmp_final_table} RENAME TO ${real_final_table};
DROP TABLE IF EXISTS ${project_id}.${final_table_id};
ALTER TABLE ${project_id}.${tmp_final_table} RENAME TO ${real_final_table};
""");
}

Expand All @@ -598,15 +612,16 @@ private String wrapAndQuote(final String namespace, final String tableName) {
@Override
public String migrateFromV1toV2(final StreamId streamId, final String namespace, final String tableName) {
return new StringSubstitutor(Map.of(
"project_id", '`' + projectId + '`',
"raw_namespace", StringUtils.wrap(streamId.rawNamespace(), QUOTE),
"dataset_location", datasetLocation,
"v2_raw_table", streamId.rawTableId(QUOTE),
"v1_raw_table", wrapAndQuote(namespace, tableName))).replace(
"""
CREATE SCHEMA IF NOT EXISTS ${raw_namespace}
CREATE SCHEMA IF NOT EXISTS ${project_id}.${raw_namespace}
OPTIONS(location="${dataset_location}");
CREATE OR REPLACE TABLE ${v2_raw_table} (
CREATE OR REPLACE TABLE ${project_id}.${v2_raw_table} (
_airbyte_raw_id STRING,
_airbyte_data STRING,
_airbyte_extracted_at TIMESTAMP,
Expand All @@ -620,7 +635,7 @@ PARTITION BY DATE(_airbyte_extracted_at)
_airbyte_data AS _airbyte_data,
_airbyte_emitted_at AS _airbyte_extracted_at,
CAST(NULL AS TIMESTAMP) AS _airbyte_loaded_at
FROM ${v1_raw_table}
FROM ${project_id}.${v1_raw_table}
);
""");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv env,
final JsonNode streamSchema)
throws Exception {
final StreamId streamId =
new BigQuerySqlGenerator(null).buildStreamId(namespace, streamName, JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE);
new BigQuerySqlGenerator(null, null).buildStreamId(namespace, streamName, JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE);
return retrieveRecordsFromTable(streamId.rawName(), streamId.rawNamespace())
.stream()
.map(node -> node.get(JavaBaseConstants.COLUMN_NAME_DATA).asText())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ void testWritePartitionOverUnpartitioned(final String configName) throws Excepti
final JsonNode testConfig = configs.get(configName);
initBigQuery(config);
final StreamId streamId =
new BigQuerySqlGenerator(null).buildStreamId(datasetId, USERS_STREAM_NAME, JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE);
new BigQuerySqlGenerator(projectId, null).buildStreamId(datasetId, USERS_STREAM_NAME, JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE);
final Dataset dataset = BigQueryDestinationTestUtils.initDataSet(config, bigquery, streamId.rawNamespace());
createUnpartitionedTable(bigquery, dataset, streamId.rawName());
assertFalse(isTablePartitioned(bigquery, dataset, streamId.rawName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.airbyte.integrations.base.destination.typing_deduping.BaseTypingDedupingTest;
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.integrations.destination.bigquery.BigQueryConsts;
import io.airbyte.integrations.destination.bigquery.BigQueryDestination;
import io.airbyte.integrations.destination.bigquery.BigQueryDestinationTestUtils;
import io.airbyte.integrations.destination.bigquery.BigQueryUtils;
Expand Down Expand Up @@ -82,7 +83,7 @@ protected void teardownStreamAndNamespace(String streamNamespace, final String s

@Override
protected SqlGenerator<?> getSqlGenerator() {
return new BigQuerySqlGenerator(null);
return new BigQuerySqlGenerator(getConfig().get(BigQueryConsts.CONFIG_PROJECT_ID).asText(), null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.airbyte.integrations.base.destination.typing_deduping.BaseSqlGeneratorIntegrationTest;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.integrations.destination.bigquery.BigQueryConsts;
import io.airbyte.integrations.destination.bigquery.BigQueryDestination;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import io.airbyte.protocol.models.v0.SyncMode;
Expand Down Expand Up @@ -60,17 +61,22 @@ public class BigQuerySqlGeneratorIntegrationTest extends BaseSqlGeneratorIntegra
private static final Logger LOGGER = LoggerFactory.getLogger(BigQuerySqlGeneratorIntegrationTest.class);

private static BigQuery bq;
private static String projectId;
private static String datasetLocation;

@BeforeAll
public static void setupBigquery() throws Exception {
final String rawConfig = Files.readString(Path.of("secrets/credentials-gcs-staging.json"));
final JsonNode config = Jsons.deserialize(rawConfig);
bq = BigQueryDestination.getBigQuery(config);

projectId = config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText();
datasetLocation = config.get(BigQueryConsts.CONFIG_DATASET_LOCATION).asText();
}

@Override
protected BigQuerySqlGenerator getSqlGenerator() {
return new BigQuerySqlGenerator("US");
return new BigQuerySqlGenerator(projectId, datasetLocation);
}

@Override
Expand Down Expand Up @@ -363,7 +369,7 @@ public void testCreateTableInOtherRegion() throws InterruptedException {
// We're creating the dataset in the wrong location in the @BeforeEach block. Explicitly delete it.
bq.getDataset(namespace).delete();

destinationHandler.execute(new BigQuerySqlGenerator("asia-east1").createTable(incrementalDedupStream, "", false));
destinationHandler.execute(new BigQuerySqlGenerator(projectId, "asia-east1").createTable(incrementalDedupStream, "", false));

// Empirically, it sometimes takes Bigquery nearly 30 seconds to propagate the dataset's existence.
// Give ourselves 2 minutes just in case.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

public class BigQuerySqlGeneratorTest {

private final BigQuerySqlGenerator generator = new BigQuerySqlGenerator("US");
private final BigQuerySqlGenerator generator = new BigQuerySqlGenerator("foo", "US");

@Test
public void testToDialectType() {
Expand Down
Loading

0 comments on commit 15c8ce9

Please sign in to comment.