diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java index e7bc11b346ae..1112eb442306 100644 --- a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java @@ -373,6 +373,41 @@ public void incrementalDedupInvalidPrimaryKey() throws Exception { dumpFinalTableRecords(streamId, "")); } + /** + * Test that T+D supports streams whose name and namespace are the same. + */ + @Test + public void incrementalDedupSameNameNamespace() throws Exception { + final StreamId streamId = buildStreamId(namespace, namespace, namespace + "_raw"); + final StreamConfig stream = new StreamConfig( + streamId, + SyncMode.INCREMENTAL, + DestinationSyncMode.APPEND_DEDUP, + incrementalDedupStream.primaryKey(), + incrementalDedupStream.cursor(), + incrementalDedupStream.columns()); + + createRawTable(streamId); + createFinalTable(stream, ""); + insertRawTableRecords( + streamId, + List.of(Jsons.deserialize( + """ + { + "_airbyte_raw_id": "5ce60e70-98aa-4fe3-8159-67207352c4f0", + "_airbyte_extracted_at": "2023-01-01T00:00:00Z", + "_airbyte_data": {"id1": 1, "id2": 100} + } + """))); + + final String sql = generator.updateTable(stream, ""); + destinationHandler.execute(sql); + + final List rawRecords = dumpRawTableRecords(streamId); + final List finalRecords = dumpFinalTableRecords(streamId, ""); + verifyRecordCounts(1, rawRecords, 1, finalRecords); + } + /** * Run a full T+D update for an incremental-dedup stream, writing to a final table with "_foo" * suffix, with values for all data types. Verifies all behaviors for all types: diff --git a/airbyte-integrations/connectors/destination-bigquery/Dockerfile b/airbyte-integrations/connectors/destination-bigquery/Dockerfile index 08d31df4a357..1e5e11efcb9f 100644 --- a/airbyte-integrations/connectors/destination-bigquery/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery/Dockerfile @@ -25,5 +25,5 @@ ENV AIRBYTE_NORMALIZATION_INTEGRATION bigquery COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=2.0.14 +LABEL io.airbyte.version=2.0.15 LABEL io.airbyte.name=airbyte/destination-bigquery diff --git a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml index 3ecad4ed9e7f..a20603e954ac 100644 --- a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml +++ b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 - dockerImageTag: 2.0.14 + dockerImageTag: 2.0.15 dockerRepository: airbyte/destination-bigquery githubIssueLabel: destination-bigquery icon: bigquery.svg diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java index 80373997df5c..7a9b16ad20af 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -216,8 +216,8 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config, setDefaultStreamNamespace(catalog, defaultNamespace); final String datasetLocation = BigQueryUtils.getDatasetLocation(config); - final BigQuerySqlGenerator sqlGenerator = new BigQuerySqlGenerator(datasetLocation); - final ParsedCatalog parsedCatalog = parseCatalog(catalog, datasetLocation); + final BigQuerySqlGenerator sqlGenerator = new BigQuerySqlGenerator(config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(), datasetLocation); + final ParsedCatalog parsedCatalog = parseCatalog(config, catalog, datasetLocation); final BigQuery bigquery = getBigQuery(config); final TyperDeduper typerDeduper = buildTyperDeduper(sqlGenerator, parsedCatalog, bigquery, datasetLocation); @@ -241,8 +241,8 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN setDefaultStreamNamespace(catalog, defaultNamespace); final String datasetLocation = BigQueryUtils.getDatasetLocation(config); - final BigQuerySqlGenerator sqlGenerator = new BigQuerySqlGenerator(datasetLocation); - final ParsedCatalog parsedCatalog = parseCatalog(catalog, datasetLocation); + final BigQuerySqlGenerator sqlGenerator = new BigQuerySqlGenerator(config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(), datasetLocation); + final ParsedCatalog parsedCatalog = parseCatalog(config, catalog, datasetLocation); final BigQuery bigquery = getBigQuery(config); final TyperDeduper typerDeduper = buildTyperDeduper(sqlGenerator, parsedCatalog, bigquery, datasetLocation); // Shared code end @@ -374,8 +374,8 @@ private void setDefaultStreamNamespace(final ConfiguredAirbyteCatalog catalog, f } } - private ParsedCatalog parseCatalog(final ConfiguredAirbyteCatalog catalog, final String datasetLocation) { - final BigQuerySqlGenerator sqlGenerator = new BigQuerySqlGenerator(datasetLocation); + private ParsedCatalog parseCatalog(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final String datasetLocation) { + final BigQuerySqlGenerator sqlGenerator = new BigQuerySqlGenerator(config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(), datasetLocation); final CatalogParser catalogParser = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_DATA_DATASET).isPresent() ? new CatalogParser(sqlGenerator, TypingAndDedupingFlag.getRawNamespaceOverride(RAW_DATA_DATASET).get()) : new CatalogParser(sqlGenerator); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java index 86e9d63f089c..a1240855cfe6 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java @@ -52,14 +52,18 @@ public class BigQuerySqlGenerator implements SqlGenerator { private final ColumnId CDC_DELETED_AT_COLUMN = buildColumnId("_ab_cdc_deleted_at"); private final Logger LOGGER = LoggerFactory.getLogger(BigQuerySqlGenerator.class); + + private final String projectId; private final String datasetLocation; /** + * @param projectId * @param datasetLocation This is technically redundant with {@link BigQueryDestinationHandler} - * setting the query execution location, but let's be explicit since this is typically a - * compliance requirement. + * setting the query execution location, but let's be explicit since this is typically a + * compliance requirement. */ - public BigQuerySqlGenerator(final String datasetLocation) { + public BigQuerySqlGenerator(final String projectId, final String datasetLocation) { + this.projectId = projectId; this.datasetLocation = datasetLocation; } @@ -186,6 +190,7 @@ public String createTable(final StreamConfig stream, final String suffix, final final String forceCreateTable = force ? "OR REPLACE" : ""; return new StringSubstitutor(Map.of( + "project_id", '`' + projectId + '`', "final_namespace", stream.id().finalNamespace(QUOTE), "dataset_location", datasetLocation, "force_create_table", forceCreateTable, @@ -193,10 +198,10 @@ public String createTable(final StreamConfig stream, final String suffix, final "column_declarations", columnDeclarations, "cluster_config", clusterConfig)).replace( """ - CREATE SCHEMA IF NOT EXISTS ${final_namespace} + CREATE SCHEMA IF NOT EXISTS ${project_id}.${final_namespace} OPTIONS(location="${dataset_location}"); - CREATE ${force_create_table} TABLE ${final_table_id} ( + CREATE ${force_create_table} TABLE ${project_id}.${final_table_id} ( _airbyte_raw_id STRING NOT NULL, _airbyte_extracted_at TIMESTAMP NOT NULL, _airbyte_meta JSON NOT NULL, @@ -329,9 +334,11 @@ public String softReset(final StreamConfig stream) { } private String clearLoadedAt(final StreamId streamId) { - return new StringSubstitutor(Map.of("raw_table_id", streamId.rawTableId(QUOTE))) + return new StringSubstitutor(Map.of( + "project_id", '`' + projectId + '`', + "raw_table_id", streamId.rawTableId(QUOTE))) .replace(""" - UPDATE ${raw_table_id} SET _airbyte_loaded_at = NULL WHERE 1=1; + UPDATE ${project_id}.${raw_table_id} SET _airbyte_loaded_at = NULL WHERE 1=1; """); } @@ -399,12 +406,13 @@ String validatePrimaryKeys(final StreamId id, }).collect(joining("\n")); return new StringSubstitutor(Map.of( + "project_id", '`' + projectId + '`', "raw_table_id", id.rawTableId(QUOTE), "pk_null_checks", pkNullChecks)).replace( """ SET missing_pk_count = ( SELECT COUNT(1) - FROM ${raw_table_id} + FROM ${project_id}.${raw_table_id} WHERE `_airbyte_loaded_at` IS NULL ${pk_null_checks} @@ -450,6 +458,7 @@ AND JSON_VALUE(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NOT NULL } return new StringSubstitutor(Map.of( + "project_id", '`' + projectId + '`', "raw_table_id", stream.id().rawTableId(QUOTE), "final_table_id", stream.id().finalTableId(QUOTE, finalSuffix), "column_casts", columnCasts, @@ -457,7 +466,7 @@ AND JSON_VALUE(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NOT NULL "cdcConditionalOrIncludeStatement", cdcConditionalOrIncludeStatement, "column_list", columnList)).replace( """ - INSERT INTO ${final_table_id} + INSERT INTO ${project_id}.${final_table_id} ( ${column_list} _airbyte_meta, @@ -470,7 +479,7 @@ WITH intermediate_data AS ( ${column_errors} AS column_errors, _airbyte_raw_id, _airbyte_extracted_at - FROM ${raw_table_id} + FROM ${project_id}.${raw_table_id} WHERE _airbyte_loaded_at IS NULL ${cdcConditionalOrIncludeStatement} @@ -494,17 +503,18 @@ String dedupFinalTable(final StreamId id, .orElse(""); return new StringSubstitutor(Map.of( + "project_id", '`' + projectId + '`', "final_table_id", id.finalTableId(QUOTE, finalSuffix), "pk_list", pkList, "cursor_order_clause", cursorOrderClause)).replace( """ - DELETE FROM ${final_table_id} + DELETE FROM ${project_id}.${final_table_id} WHERE `_airbyte_raw_id` IN ( SELECT `_airbyte_raw_id` FROM ( SELECT `_airbyte_raw_id`, row_number() OVER ( PARTITION BY ${pk_list} ORDER BY ${cursor_order_clause} `_airbyte_extracted_at` DESC - ) as row_number FROM ${final_table_id} + ) as row_number FROM ${project_id}.${final_table_id} ) WHERE row_number != 1 ) @@ -530,19 +540,20 @@ String cdcDeletes(final StreamConfig stream, // we want to grab IDs for deletion from the raw table (not the final table itself) to hand // out-of-order record insertions after the delete has been registered return new StringSubstitutor(Map.of( + "project_id", '`' + projectId + '`', "final_table_id", stream.id().finalTableId(QUOTE, finalSuffix), "raw_table_id", stream.id().rawTableId(QUOTE), "pk_list", pkList, "pk_extracts", pkCasts, "quoted_cdc_delete_column", QUOTE + "_ab_cdc_deleted_at" + QUOTE)).replace( """ - DELETE FROM ${final_table_id} + DELETE FROM ${project_id}.${final_table_id} WHERE (${pk_list}) IN ( SELECT ( ${pk_extracts} ) - FROM ${raw_table_id} + FROM ${project_id}.${raw_table_id} WHERE JSON_TYPE(PARSE_JSON(JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at'), wide_number_mode=>'round')) != 'null' ) ;"""); @@ -551,6 +562,7 @@ WHERE JSON_TYPE(PARSE_JSON(JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at'), @VisibleForTesting String dedupRawTable(final StreamId id, final String finalSuffix) { return new StringSubstitutor(Map.of( + "project_id", '`' + projectId + '`', "raw_table_id", id.rawTableId(QUOTE), "final_table_id", id.finalTableId(QUOTE, finalSuffix))).replace( // Note that this leaves _all_ deletion records in the raw table. We _could_ clear them out, but it @@ -558,10 +570,10 @@ String dedupRawTable(final StreamId id, final String finalSuffix) { // and it only matters in a few edge cases. """ DELETE FROM - ${raw_table_id} + ${project_id}.${raw_table_id} WHERE `_airbyte_raw_id` NOT IN ( - SELECT `_airbyte_raw_id` FROM ${final_table_id} + SELECT `_airbyte_raw_id` FROM ${project_id}.${final_table_id} ) ;"""); } @@ -569,9 +581,10 @@ String dedupRawTable(final StreamId id, final String finalSuffix) { @VisibleForTesting String commitRawTable(final StreamId id) { return new StringSubstitutor(Map.of( + "project_id", '`' + projectId + '`', "raw_table_id", id.rawTableId(QUOTE))).replace( """ - UPDATE ${raw_table_id} + UPDATE ${project_id}.${raw_table_id} SET `_airbyte_loaded_at` = CURRENT_TIMESTAMP() WHERE `_airbyte_loaded_at` IS NULL ;"""); @@ -580,12 +593,13 @@ String commitRawTable(final StreamId id) { @Override public String overwriteFinalTable(final StreamId streamId, final String finalSuffix) { return new StringSubstitutor(Map.of( + "project_id", '`' + projectId + '`', "final_table_id", streamId.finalTableId(QUOTE), "tmp_final_table", streamId.finalTableId(QUOTE, finalSuffix), "real_final_table", streamId.finalName(QUOTE))).replace( """ - DROP TABLE IF EXISTS ${final_table_id}; - ALTER TABLE ${tmp_final_table} RENAME TO ${real_final_table}; + DROP TABLE IF EXISTS ${project_id}.${final_table_id}; + ALTER TABLE ${project_id}.${tmp_final_table} RENAME TO ${real_final_table}; """); } @@ -598,15 +612,16 @@ private String wrapAndQuote(final String namespace, final String tableName) { @Override public String migrateFromV1toV2(final StreamId streamId, final String namespace, final String tableName) { return new StringSubstitutor(Map.of( + "project_id", '`' + projectId + '`', "raw_namespace", StringUtils.wrap(streamId.rawNamespace(), QUOTE), "dataset_location", datasetLocation, "v2_raw_table", streamId.rawTableId(QUOTE), "v1_raw_table", wrapAndQuote(namespace, tableName))).replace( """ - CREATE SCHEMA IF NOT EXISTS ${raw_namespace} + CREATE SCHEMA IF NOT EXISTS ${project_id}.${raw_namespace} OPTIONS(location="${dataset_location}"); - CREATE OR REPLACE TABLE ${v2_raw_table} ( + CREATE OR REPLACE TABLE ${project_id}.${v2_raw_table} ( _airbyte_raw_id STRING, _airbyte_data STRING, _airbyte_extracted_at TIMESTAMP, @@ -620,7 +635,7 @@ PARTITION BY DATE(_airbyte_extracted_at) _airbyte_data AS _airbyte_data, _airbyte_emitted_at AS _airbyte_extracted_at, CAST(NULL AS TIMESTAMP) AS _airbyte_loaded_at - FROM ${v1_raw_table} + FROM ${project_id}.${v1_raw_table} ); """); } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/AbstractBigQueryDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/AbstractBigQueryDestinationAcceptanceTest.java index 6f1bfe8d3ce4..697df25d56f8 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/AbstractBigQueryDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/AbstractBigQueryDestinationAcceptanceTest.java @@ -147,7 +147,7 @@ protected List retrieveRecords(final TestDestinationEnv env, final JsonNode streamSchema) throws Exception { final StreamId streamId = - new BigQuerySqlGenerator(null).buildStreamId(namespace, streamName, JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE); + new BigQuerySqlGenerator(null, null).buildStreamId(namespace, streamName, JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE); return retrieveRecordsFromTable(streamId.rawName(), streamId.rawNamespace()) .stream() .map(node -> node.get(JavaBaseConstants.COLUMN_NAME_DATA).asText()) diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java index 08af31d65fe4..d5414508824a 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java @@ -424,7 +424,7 @@ void testWritePartitionOverUnpartitioned(final String configName) throws Excepti final JsonNode testConfig = configs.get(configName); initBigQuery(config); final StreamId streamId = - new BigQuerySqlGenerator(null).buildStreamId(datasetId, USERS_STREAM_NAME, JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE); + new BigQuerySqlGenerator(projectId, null).buildStreamId(datasetId, USERS_STREAM_NAME, JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE); final Dataset dataset = BigQueryDestinationTestUtils.initDataSet(config, bigquery, streamId.rawNamespace()); createUnpartitionedTable(bigquery, dataset, streamId.rawName()); assertFalse(isTablePartitioned(bigquery, dataset, streamId.rawName())); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/AbstractBigQueryTypingDedupingTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/AbstractBigQueryTypingDedupingTest.java index a7d5915f67b2..9869e5e72546 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/AbstractBigQueryTypingDedupingTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/AbstractBigQueryTypingDedupingTest.java @@ -15,6 +15,7 @@ import io.airbyte.integrations.base.destination.typing_deduping.BaseTypingDedupingTest; import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; +import io.airbyte.integrations.destination.bigquery.BigQueryConsts; import io.airbyte.integrations.destination.bigquery.BigQueryDestination; import io.airbyte.integrations.destination.bigquery.BigQueryDestinationTestUtils; import io.airbyte.integrations.destination.bigquery.BigQueryUtils; @@ -82,7 +83,7 @@ protected void teardownStreamAndNamespace(String streamNamespace, final String s @Override protected SqlGenerator getSqlGenerator() { - return new BigQuerySqlGenerator(null); + return new BigQuerySqlGenerator(getConfig().get(BigQueryConsts.CONFIG_PROJECT_ID).asText(), null); } /** diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java index 3d0306b1146e..f42957bc1219 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java @@ -33,6 +33,7 @@ import io.airbyte.integrations.base.destination.typing_deduping.BaseSqlGeneratorIntegrationTest; import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; +import io.airbyte.integrations.destination.bigquery.BigQueryConsts; import io.airbyte.integrations.destination.bigquery.BigQueryDestination; import io.airbyte.protocol.models.v0.DestinationSyncMode; import io.airbyte.protocol.models.v0.SyncMode; @@ -60,17 +61,22 @@ public class BigQuerySqlGeneratorIntegrationTest extends BaseSqlGeneratorIntegra private static final Logger LOGGER = LoggerFactory.getLogger(BigQuerySqlGeneratorIntegrationTest.class); private static BigQuery bq; + private static String projectId; + private static String datasetLocation; @BeforeAll public static void setupBigquery() throws Exception { final String rawConfig = Files.readString(Path.of("secrets/credentials-gcs-staging.json")); final JsonNode config = Jsons.deserialize(rawConfig); bq = BigQueryDestination.getBigQuery(config); + + projectId = config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(); + datasetLocation = config.get(BigQueryConsts.CONFIG_DATASET_LOCATION).asText(); } @Override protected BigQuerySqlGenerator getSqlGenerator() { - return new BigQuerySqlGenerator("US"); + return new BigQuerySqlGenerator(projectId, datasetLocation); } @Override @@ -363,7 +369,7 @@ public void testCreateTableInOtherRegion() throws InterruptedException { // We're creating the dataset in the wrong location in the @BeforeEach block. Explicitly delete it. bq.getDataset(namespace).delete(); - destinationHandler.execute(new BigQuerySqlGenerator("asia-east1").createTable(incrementalDedupStream, "", false)); + destinationHandler.execute(new BigQuerySqlGenerator(projectId, "asia-east1").createTable(incrementalDedupStream, "", false)); // Empirically, it sometimes takes Bigquery nearly 30 seconds to propagate the dataset's existence. // Give ourselves 2 minutes just in case. diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java index f04008e47a1f..ec26fe497edc 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java @@ -32,7 +32,7 @@ public class BigQuerySqlGeneratorTest { - private final BigQuerySqlGenerator generator = new BigQuerySqlGenerator("US"); + private final BigQuerySqlGenerator generator = new BigQuerySqlGenerator("foo", "US"); @Test public void testToDialectType() { diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index 94e53fbcb92c..0beaed87e248 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -133,6 +133,7 @@ Now that you have set up the BigQuery destination connector, check out the follo | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 2.0.15 | 2023-09-21 | [\#30640](https://github.com/airbytehq/airbyte/pull/30640) | Handle streams with identical name and namespace | | 2.0.14 | 2023-09-20 | [\#30069](https://github.com/airbytehq/airbyte/pull/30069) | Staging destination async | | 2.0.13 | 2023-09-19 | [\#30592](https://github.com/airbytehq/airbyte/pull/30592) | Internal code changes | | 2.0.12 | 2023-09-19 | [\#30319](https://github.com/airbytehq/airbyte/pull/30319) | Improved testing |