From 89a1283bb0771d2672c889183a98f2afc5bbb8fc Mon Sep 17 00:00:00 2001 From: Guen Prawiroatmodjo Date: Fri, 25 Oct 2024 13:11:07 -0700 Subject: [PATCH] Fix: MotherDuck destination was not passing MotherDuck API key to SQL processor (#47380) --- .../airbyte_cdk/sql/_processors/motherduck.py | 2 +- .../destination_motherduck/destination.py | 44 +++++++++---------- .../destination-motherduck/metadata.yaml | 2 +- .../destination-motherduck/pyproject.toml | 2 +- docs/integrations/destinations/motherduck.md | 1 + 5 files changed, 25 insertions(+), 26 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sql/_processors/motherduck.py b/airbyte-cdk/python/airbyte_cdk/sql/_processors/motherduck.py index e3ddda5e4c9b..1e3bf0019780 100644 --- a/airbyte-cdk/python/airbyte_cdk/sql/_processors/motherduck.py +++ b/airbyte-cdk/python/airbyte_cdk/sql/_processors/motherduck.py @@ -54,7 +54,7 @@ def get_sql_alchemy_url(self) -> SecretString: return SecretString( f"duckdb:///md:{self.database}?motherduck_token={self.api_key}" - f"&custom_user_agent=={self.custom_user_agent}" + f"&custom_user_agent={self.custom_user_agent}" # Not sure why this doesn't work. We have to override later in the flow. # f"&schema={self.schema_name}" ) diff --git a/airbyte-integrations/connectors/destination-motherduck/destination_motherduck/destination.py b/airbyte-integrations/connectors/destination-motherduck/destination_motherduck/destination.py index fe30056656ac..24b7ab505fc0 100644 --- a/airbyte-integrations/connectors/destination-motherduck/destination_motherduck/destination.py +++ b/airbyte-integrations/connectors/destination-motherduck/destination_motherduck/destination.py @@ -12,7 +12,7 @@ from typing import Any, Dict, Iterable, List, Mapping from urllib.parse import urlparse -import duckdb +from airbyte_cdk import AirbyteStream, ConfiguredAirbyteStream, SyncMode from airbyte_cdk.destinations import Destination from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, DestinationSyncMode, Status, Type from airbyte_cdk.sql._processors.duckdb import DuckDBConfig, DuckDBSqlProcessor @@ -125,13 +125,7 @@ def write( path = str(config.get("destination_path")) path = self._get_destination_path(path) schema_name = validated_sql_name(config.get("schema", CONFIG_DEFAULT_SCHEMA)) - - # Get and register auth token if applicable motherduck_api_key = str(config.get(CONFIG_MOTHERDUCK_API_KEY, "")) - duckdb_config = {} - if motherduck_api_key: - duckdb_config["motherduck_token"] = motherduck_api_key - duckdb_config["custom_user_agent"] = "airbyte" for configured_stream in configured_catalog.streams: stream_name = configured_stream.stream.name @@ -140,6 +134,7 @@ def write( configured_catalog=configured_catalog, schema_name=schema_name, db_path=path, + motherduck_token=motherduck_api_key, ) processor._ensure_schema_exists() @@ -171,7 +166,7 @@ def write( for message in input_messages: if message.type == Type.STATE: # flush the buffer - self._flush_buffer(buffer, configured_catalog, path, schema_name) + self._flush_buffer(buffer, configured_catalog, path, schema_name, motherduck_api_key) buffer = defaultdict(lambda: defaultdict(list)) yield message @@ -196,7 +191,7 @@ def write( logger.info(f"Message type {message.type} not supported, skipping") # flush any remaining messages - self._flush_buffer(buffer, configured_catalog, path, schema_name) + self._flush_buffer(buffer, configured_catalog, path, schema_name, motherduck_api_key) def _flush_buffer( self, @@ -204,6 +199,7 @@ def _flush_buffer( configured_catalog: ConfiguredAirbyteCatalog, db_path: str, schema_name: str, + motherduck_api_key: str, ) -> None: """ Flush the buffer to the destination @@ -212,9 +208,7 @@ def _flush_buffer( stream_name = configured_stream.stream.name if stream_name in buffer: processor = self._get_sql_processor( - configured_catalog=configured_catalog, - schema_name=schema_name, - db_path=db_path, + configured_catalog=configured_catalog, schema_name=schema_name, db_path=db_path, motherduck_token=motherduck_api_key ) processor.write_stream_data_from_buffer(buffer, stream_name, configured_stream.destination_sync_mode) @@ -238,11 +232,8 @@ def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCon logger.info(f"Using DuckDB file at {path}") os.makedirs(os.path.dirname(path), exist_ok=True) - duckdb_config = {} - if CONFIG_MOTHERDUCK_API_KEY in config: - duckdb_config["motherduck_token"] = str(config[CONFIG_MOTHERDUCK_API_KEY]) - duckdb_config["custom_user_agent"] = "airbyte" - # Next, we want to specify 'saas_mode' for during check, + if self._is_motherduck(path): + # We want to specify 'saas_mode' for during check, # to reduce memory usage from unnecessary extensions if "?" in path: # There are already some query params; append to them. @@ -251,13 +242,20 @@ def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCon # No query params yet; add one. path += "?saas_mode=true" - con = duckdb.connect( - database=path, - read_only=False, - config=duckdb_config, + # Create a dummy catalog to check if the SQL processor works + check_stream = ConfiguredAirbyteStream( + stream=AirbyteStream(name="check", json_schema={"type": "object"}, supported_sync_modes=[SyncMode.incremental]), + sync_mode=SyncMode.incremental, + destination_sync_mode=SyncMode.incremental, ) - con.execute("SELECT 1;") - + check_catalog = ConfiguredAirbyteCatalog(streams=[check_stream]) + processor = self._get_sql_processor( + configured_catalog=check_catalog, + schema_name="test", + db_path=path, + motherduck_token=str(config.get(CONFIG_MOTHERDUCK_API_KEY, "")), + ) + processor._execute_sql("SELECT 1;") return AirbyteConnectionStatus(status=Status.SUCCEEDED) except Exception as e: diff --git a/airbyte-integrations/connectors/destination-motherduck/metadata.yaml b/airbyte-integrations/connectors/destination-motherduck/metadata.yaml index 50a7a1d6a7c3..362e2f625954 100644 --- a/airbyte-integrations/connectors/destination-motherduck/metadata.yaml +++ b/airbyte-integrations/connectors/destination-motherduck/metadata.yaml @@ -4,7 +4,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 042ee9b5-eb98-4e99-a4e5-3f0d573bee66 - dockerImageTag: 0.1.2 + dockerImageTag: 0.1.3 dockerRepository: airbyte/destination-motherduck githubIssueLabel: destination-motherduck icon: duckdb.svg diff --git a/airbyte-integrations/connectors/destination-motherduck/pyproject.toml b/airbyte-integrations/connectors/destination-motherduck/pyproject.toml index ebe7c776b133..e5ffb5034e99 100644 --- a/airbyte-integrations/connectors/destination-motherduck/pyproject.toml +++ b/airbyte-integrations/connectors/destination-motherduck/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "destination-motherduck" -version = "0.1.2" +version = "0.1.3" description = "Destination implementation for MotherDuck." authors = ["Guen Prawiroatmodjo, Simon Späti, Airbyte"] license = "MIT" diff --git a/docs/integrations/destinations/motherduck.md b/docs/integrations/destinations/motherduck.md index a94ba5b95467..b9d89c100c7d 100644 --- a/docs/integrations/destinations/motherduck.md +++ b/docs/integrations/destinations/motherduck.md @@ -69,6 +69,7 @@ This connector is primarily designed to work with MotherDuck and local DuckDB fi | Version | Date | Pull Request | Subject | |:--------| :--------- | :-------------------------------------------------------- | :----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| 0.1.3 | 2024-10-23 | [47315](https://github.com/airbytehq/airbyte/pull/47315) | Fix bug causing MotherDuck API key to not be correctly passed to the engine. | | 0.1.2 | 2024-10-23 | [47315](https://github.com/airbytehq/airbyte/pull/47315) | Use `saas_only` mode during connection check to reduce ram usage. | | 0.1.1 | 2024-10-23 | [47312](https://github.com/airbytehq/airbyte/pull/47312) | Fix: generate new unique destination ID | | 0.1.0 | 2024-10-23 | [46904](https://github.com/airbytehq/airbyte/pull/46904) | New MotherDuck destination |