diff --git a/airbyte-integrations/connectors/destination-bigquery/build.gradle b/airbyte-integrations/connectors/destination-bigquery/build.gradle index 2c685c13bc21..674de0894fc1 100644 --- a/airbyte-integrations/connectors/destination-bigquery/build.gradle +++ b/airbyte-integrations/connectors/destination-bigquery/build.gradle @@ -11,7 +11,7 @@ airbyteJavaConnector { 'gcs-destinations', 'core', ] - useLocalCdk = false + useLocalCdk = true } java { diff --git a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml index 836b6be3d2b7..2313a7e7e2f5 100644 --- a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml +++ b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 - dockerImageTag: 2.6.1 + dockerImageTag: 2.6.2 dockerRepository: airbyte/destination-bigquery documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery githubIssueLabel: destination-bigquery diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryConsumerFactory.kt b/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryConsumerFactory.kt index 29bfa325f9cf..1e2acc7b7689 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryConsumerFactory.kt +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryConsumerFactory.kt @@ -33,9 +33,9 @@ object BigQueryConsumerFactory { catalog = catalog, bufferManager = BufferManager( + defaultNamespace, (Runtime.getRuntime().maxMemory() * 0.4).toLong(), ), - defaultNamespace = Optional.of(defaultNamespace), ) } @@ -59,9 +59,9 @@ object BigQueryConsumerFactory { catalog = catalog, bufferManager = BufferManager( + defaultNamespace, (Runtime.getRuntime().maxMemory() * 0.5).toLong(), ), - defaultNamespace = Optional.of(defaultNamespace), ) } } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryDestination.kt b/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryDestination.kt index 73ce327846b8..57660493d15d 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryDestination.kt +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryDestination.kt @@ -56,7 +56,6 @@ import java.io.ByteArrayInputStream import java.io.IOException import java.util.* import java.util.function.Consumer -import org.apache.commons.lang3.StringUtils private val log = KotlinLogging.logger {} @@ -190,7 +189,6 @@ class BigQueryDestination : BaseConnector(), Destination { ): SerializedAirbyteMessageConsumer { val uploadingMethod = getLoadingMethod(config) val defaultNamespace = getDatasetId(config) - setDefaultStreamNamespace(catalog, defaultNamespace) val disableTypeDedupe = getDisableTypeDedupFlag(config) val datasetLocation = getDatasetLocation(config) val projectId = config[bqConstants.CONFIG_PROJECT_ID].asText() @@ -222,6 +220,7 @@ class BigQueryDestination : BaseConnector(), Destination { val parsedCatalog = parseCatalog( sqlGenerator, + defaultNamespace, rawNamespaceOverride.orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE), catalog, ) @@ -309,28 +308,18 @@ class BigQueryDestination : BaseConnector(), Destination { ) } - private fun setDefaultStreamNamespace(catalog: ConfiguredAirbyteCatalog, namespace: String) { - // Set the default originalNamespace on streams with null originalNamespace. This means we - // don't - // need to repeat this - // logic in the rest of the connector. - // (record messages still need to handle null namespaces though, which currently happens in - // e.g. - // AsyncStreamConsumer#accept) - // This probably should be shared logic amongst destinations eventually. - for (stream in catalog.streams) { - if (StringUtils.isEmpty(stream.stream.namespace)) { - stream.stream.withNamespace(namespace) - } - } - } - private fun parseCatalog( sqlGenerator: BigQuerySqlGenerator, + defaultNamespace: String, rawNamespaceOverride: String, catalog: ConfiguredAirbyteCatalog ): ParsedCatalog { - val catalogParser = CatalogParser(sqlGenerator, rawNamespaceOverride) + val catalogParser = + CatalogParser( + sqlGenerator, + defaultNamespace = defaultNamespace, + rawNamespace = rawNamespaceOverride, + ) return catalogParser.parseCatalog(catalog) } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java index 873482515eab..aab4c8494fe2 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java @@ -35,7 +35,7 @@ public void testBuildColumnId() { @Test void columnCollision() { - final CatalogParser parser = new CatalogParser(generator); + final CatalogParser parser = new CatalogParser(generator, "default_ns"); assertEquals( new StreamConfig( new StreamId("bar", "foo", "airbyte_internal", "bar_raw__stream_foo", "bar", "foo"), diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index a1d6050aa25c..3aa3f0613d39 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -223,6 +223,7 @@ tutorials: | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 2.6.2 | 2024-05-30 | [38331](https://github.com/airbytehq/airbyte/pull/38331) | Internal code changes in preparation for future feature release | | 2.6.1 | 2024-05-29 | [38770](https://github.com/airbytehq/airbyte/pull/38770) | Internal code change (switch to CDK artifact) | | 2.6.0 | 2024-05-28 | [38359](https://github.com/airbytehq/airbyte/pull/38359) | Propagate airbyte_meta from sources; add generation_id column | | 2.5.1 | 2024-05-22 | [38591](https://github.com/airbytehq/airbyte/pull/38591) | Bugfix to include forward-slash when cleaning up stage |